In [1]:
# built-in
import random
from collections import deque
from copy import deepcopy

# tihrd party
import gym
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

In [2]:
SEED = 1
BATCH_SIZE = 256
GAMMA = 0.99
LR = 0.03
EPS = 1e-8

In [3]:
class DuelingDQN(nn.Module):
    def __init__(self, obs_space, action_space, n_atoms):
        super().__init__()

        self.head = nn.Sequential(
            nn.Linear(obs_space, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU()
        )

        self.val = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

        self.adv = nn.Sequential(
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, action_space)
        )

    def forward(self, x):
        out = self.head(x)
        val_out = self.val(out).view(out.size(0), 1)
        adv_out = self.adv(out).view(out.size(0), -1)
        adv_mean = adv_out.mean(dim=1, keepdim=True)
        q = val_out + adv_out - adv_mean

        return q

In [None]:
losses = []


def learn(net, tgt_net, optimizer, rep_memory):
    net.train()
    tgt_net.train()

    train_data = []
    train_data.extend(random.sample(rep_memory, BATCH_SIZE))

    dataloader = DataLoader(train_data,
                            batch_size=BATCH_SIZE,
                            pin_memory=use_cuda)
    # double DQN
    for i, (s, a, r, _s, d) in enumerate(dataloader):
        s_batch = s.to(device).float()
        a_batch = a.to(device).long()
        _s_batch = _s.to(device).float()
        r_batch = r.detach().float()
        dones = d.detach().float()
        done_masks = 1. - dones
        
        _q_batch = net(_s_batch)
        _q_batch_masked = _q_batch * done_masks
        _a_batch = torch.argmax(_q_batch_masked)

        with torch.no_grad():
            _q_batch_tgt = tgt_net(_s_batch)
            _q_best_tgt = tgt_q_batch[range(BATCH_SIZE), _a_batch]

        q_batch = net(s_batch)
        q_acting = q_batch[range(BATCH_SIZE), a_batch.data]

        # loss
        
        loss = ((r_batch + GAMMA*_q_best_tgt) - q_acting).pow(2).sum(dim=1).mean()
        losses.append(loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
# set device
use_cuda = torch.cuda.is_available()
print('cuda:', use_cuda)
device = torch.device('cuda' if use_cuda else 'cpu')

# random seed
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if use_cuda:
    torch.cuda.manual_seed_all(SEED)

# make an environment
# env = gym.make('CartPole-v0')
env = gym.make('CartPole-v1')
# env = gym.make('MountainCar-v0')
# env = gym.make('LunarLander-v2')
env.seed(SEED)
obs_space = env.observation_space.shape[0]
action_space = env.action_space.n
max_steps = env.spec.max_episode_steps*env.spec.timestep_limit

# hyperparameter
learn_start = env.spec.timestep_limit*5
memory_size = learn_start*20
update_frq = int(env.spec.timestep_limit/10)
epsilon = 1.
eps_min = 0.0025
eps_decay = 1. - np.exp(np.log(eps_min)/(max_steps/2))
num_eval = 10

# global values
total_steps = 0
learn_steps = 0
rewards = []
reward_eval = deque(maxlen=num_eval)
is_learned = False
is_solved = False

# make two nerual networks
net = CategoricalDuelingDQN(obs_space, action_space, N_ATOMS).to(device)
target_net = deepcopy(net)

# make optimizer
# optimizer = optim.SGD(net.parameters(), momentum=0.9, lr=LR, weight_decay=1e-4)
optimizer = optim.Adam(net.parameters(), lr=LR, eps=EPS)

# make memory
rep_memory = deque(maxlen=memory_size)

In [None]:
env.spec.max_episode_steps

In [None]:
env.spec.reward_threshold

In [None]:
env.spec.timestep_limit

In [None]:
# play
for i in range(1, env.spec.max_episode_steps+1):
    obs = env.reset()
    done = False
    ep_reward = 0
    while not done:
        env.render()
        if np.random.rand() < epsilon:
            action = env.action_space.sample()
        else:
            target_net.eval()
            with torch.no_grad():
                state = torch.tensor([obs]).to(device).float()
                probs = target_net(state)
                weights = probs * net.support
                q = weights.sum(dim=2)
                q_np = q.cpu().numpy()[0]
                action = np.argmax(q_np)

        _obs, reward, done, _ = env.step(action)

        rep_memory.append((obs, action, reward, _obs, done))

        obs = _obs
        total_steps += 1
        ep_reward += reward
        epsilon -= epsilon * eps_decay
        epsilon = max(eps_min, epsilon)

        if len(rep_memory) >= learn_start:
            if len(rep_memory) == learn_start:
                print('\n============  Start Learning  ============\n')
            learn(net, target_net, optimizer, rep_memory)
            learn_steps += 1

        if learn_steps == update_frq:
            target_net.load_state_dict(net.state_dict())
            learn_steps = 0
    if done:
        rewards.append(ep_reward)
        reward_eval.append(ep_reward)
        print('{:3} Episode in {:5} steps, reward {:.2f}'.format(
            i, total_steps, ep_reward))

        if len(reward_eval) >= num_eval:
            if np.mean(reward_eval) >= env.spec.reward_threshold:
                print('\n{} is sloved! {:3} Episode in {:3} steps'.format(
                    env.spec.id, i, total_steps))
                break
env.close()

In [None]:
plt.figure(figsize=(15, 5))
plt.title('reward')
plt.plot(rewards)
plt.figure(figsize=(15, 5))
plt.title('loss')
plt.plot(losses)
plt.show()

In [104]:
x = torch.tensor([False])
y = torch.tensor([True])

In [105]:
x

tensor([0], dtype=torch.uint8)

In [106]:
y

tensor([1], dtype=torch.uint8)

In [110]:
z = torch.cat((x, y))
z = z.float()
z

tensor([0., 1.])

In [111]:
m = torch.tensor([3, 7]).float()
m * z

tensor([0., 7.])

In [113]:
1 - z

tensor([1., 0.])