<a href="https://colab.research.google.com/github/mohantyk/deep-rl/blob/master/Chap13_A3C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Warning
torch.multiprocessing does not seem to work with notebooks, see pytorch [issue](https://github.com/pytorch/pytorch/issues/17680). Need to run it as a script.

## Imports

In [1]:
!pip install ptan tensorboardX



In [2]:
import gym
import ptan
import numpy as np
from tensorboardX import SummaryWriter

In [3]:
import torch
import torch.nn as nn
import torch.nn.utils as nn_utils
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp

In [4]:
import time
import os
import sys
import collections

## Helper Functions

In [5]:
class RewardTracker:
  def __init__(self, writer, stop_reward):
    self.writer = writer
    self.stop_reward = stop_reward

  def __enter__(self):
    self.ts = time.time()
    self.ts_frame = 0
    self.total_rewards = []
    return self

  def __exit__(self):
    self.writer.close()

  def reward(self, reward, frame, epsilon=None):
    self.total_rewards.append(reward)
    speed = (frame - self.ts_frame)/(time.time() - self.ts)
    self.ts_frame = frame
    self.ts = time.time()
    mean_reward = np.mean(self.total_rewards[-100:])
    epsilon_str = '' if epsilon is None else f', eps {epsilon:.2f}'
    print(f'{frame} done {len(self.total_rewards)} games, mean reward {mean_reward:.3f}'
          f', speed {speed:.2f} f/s{epsilon_str}')
    sys.stdout.flush()
    if epsilon is not None:
      self.writer.add_scalar('epsilon', epsilon, frame)
    self.writer.add_scalar('speed', speed, frame)
    self.writer.add_scalar('reward_100', mean_reward, frame)
    self.writer.add_scalar('reward', reward, frame)
    if mean_reward > self.stop_reward:
      print(f'Solved in {frame} frames!')
      return True
    return False

## HyperParameters

In [6]:
GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
BATCH_SIZE = 128

REWARD_STEPS = 4
CLIP_GRAD = 0.1

PROCESSES_COUNT = 2
NUM_ENVS = 8
MICRO_BATCH_SIZE = 32

In [7]:
DEVICE = 'cuda'
device = torch.device(DEVICE)

# A3C

In [8]:
class AtariA2C(nn.Module):
  def __init__(self, input_shape, n_actions):
    super().__init__()
    self.conv = nn.Sequential(
        nn.Conv2d(input_shape[0], 32, kernel_size=8, stride=4),
        nn.ReLU(),
        nn.Conv2d(32, 64, kernel_size=4, stride=2),
        nn.ReLU(),
        nn.Conv2d(64, 64, kernel_size=3, stride=1),
        nn.ReLU()
    )
    conv_out_size = self._get_conv_out(input_shape)
    self.policy = nn.Sequential(
        nn.Linear(conv_out_size, 512),
        nn.ReLU(),
        nn.Linear(512, n_actions)
    )  
    self.value = nn.Sequential(
        nn.Linear(conv_out_size, 512),
        nn.ReLU(),
        nn.Linear(512, 1)
    )


  def _get_conv_out(self, shape):
    o = self.conv(torch.zeros(1, *shape))
    return int(np.prod(o.size()))

  def forward(self, x):
    fx = x.float()/256
    conv_out = self.conv(fx).view(fx.size()[0], -1)
    return self.policy(conv_out), self.value(conv_out)

In [9]:
def unpack_batch(batch, net, device):
  '''
  Converts batch into training tensors
  returns :
    training states, actions tensor, reference values
  '''
  states = []
  actions = []
  rewards = []
  not_done_idx = []
  last_states = []
  for idx, exp in enumerate(batch):
    states.append(np.array(exp.state, copy=False))
    actions.append(int(exp.action))
    rewards.append(exp.reward)
    if exp.last_state is not None:
      not_done_idx.append(idx)
      last_states.append(np.array(exp.last_state, copy=False))

  states_v = torch.FloatTensor(np.array(states, copy=False)).to(device)
  actions_t = torch.LongTensor(actions).to(device)
  
  rewards_np = np.array(rewards, dtype=np.float32)
  if not_done_idx:
    last_states_v = torch.FloatTensor(np.array(last_states, copy=False)).to(device)
    last_vals_v = net(last_states_v)[1]
    last_vals_np = last_vals_v.data.cpu().numpy()[:, 0]
    last_vals_np *= GAMMA ** REWARD_STEPS
    rewards_np[not_done_idx] += last_vals_np
  ref_vals_v = torch.FloatTensor(rewards_np).to(device)
  return states_v, actions_t, ref_vals_v

### Training

In [10]:
def make_env():
  return ptan.common.wrappers.wrap_dqn(gym.make('PongNoFrameskip-v4'))

TotalReward = collections.namedtuple('TotalReward', ['reward'])

In [11]:
def data_func(net, device, train_queue):
  '''
  Will run in child processes
  '''
  envs = [make_env() for _ in range(NUM_ENVS)]
  agent = ptan.agent.PolicyAgent(lambda x: net(x)[0], apply_softmax=True, 
                                 device=device)
  exp_source = ptan.experience.ExperienceSourceFirstLast(envs, agent, gamma=GAMMA, 
                                                       steps_count=REWARD_STEPS)
  
  micro_batch = []
  for exp in exp_source:
    new_rewards = exp_source.pop_total_rewards()
    if new_rewards:
      data = TotalReward(reward=np.mean(new_rewards))
      train_queue.put(data)
    
    micro_batch.append(exp)
    if len(micro_batch) < MICRO_BATCH_SIZE:
      continue

    data = unpack_batch(micro_batch, net, device=device)
    train_queue.put(data)
    micro_batch.clear()

In [12]:
mp.set_start_method('spawn')
os.environ['OMP_NUM_THREADS'] = "1" # Since we are implementing our own parallelism
                                    # stop OpenMP from generating its own threads

In [13]:
writer = SummaryWriter(comment='-pong-a3c-data')
env = make_env()
net = AtariA2C(env.observation_space.shape, env.action_space.n).to(device)
net.share_memory() # Only required for CPUs, GPU tensors are shared by default
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE, eps=1e-3)

train_queue = mp.Queue(maxsize=PROCESSES_COUNT) # To transfer data from children
data_proc_list = []
for _ in range(PROCESSES_COUNT):
  data_proc = mp.Process(target=data_func, args=(net, device, train_queue))
  data_proc.start()
  data_proc_list.append(data_proc)

In [14]:
%load_ext tensorboard
%tensorboard --logdir runs/

Reusing TensorBoard on port 6006 (pid 284), started 0:15:50 ago. (Use '!kill 284' to kill it.)

<IPython.core.display.Javascript object>

In [15]:
batch_states = []
batch_actions = []
batch_vals_ref = []
step_idx = 0
batch_size = 0

try:
  with RewardTracker(writer, stop_reward=17) as tracker:
    with ptan.common.utils.TBMeanTracker(writer, 100) as tb_tracker:
      while True:
        train_entry = train_queue.get()
        if isinstance(train_entry, TotalReward):
          if tracker.reward(train_entry.reward, step_idx): # True only if step_reward exceeded
            break
          continue 
        # Else if state/action tensors
        states_t, actions_t, vals_ref_t = train_entry
        batch_states.append(states_t)
        batch_actions.append(actions_t)
        batch_vals_ref.append(vals_ref_t)
        step_idx += states_t.size()[0]
        batch_size += states_t.size()[0]
        if batch_size < BATCH_SIZE:
          continue

        states_v = torch.cat(batch_states)
        actions_t = torch.cat(batch_actions)
        vals_ref_v = torch.cat(batch_vals_ref)
        batch_states.clear()
        batch_actions.clear()
        batch_vals_ref.clear()
        batch_size = 0

        optimizer.zero_grad()
        logits_v, value_v = net(states_v)
        loss_value_v = F.mse_loss(value_v.squeeze(-1), vals_refs_v)

        log_prob_v = F.log_softmax(logits_v, dim=1)
        adv_v = vals_refs_v - value_v.detach()
        log_p_a = log_prob_v[range(BATCH_SIZE), actions_t]
        log_prob_actions_v = adv_v * log_p_a
        loss_policy_v = -log_prob_actions_v.mean()

        prob_v = F.softmax(logits_v, dim=1)
        entropy_loss_v = ENTROPY_BETA * (prob_v * log_prob_v).sum(dim=1).mean()

        # Calculate policy gradients only
        loss_policy_v.backward(retain_graph=True)
        grads = np.concatenate([p.grad.data.cpu().numpy().flatten()
                                for p in net.parameters()
                                if p.grad is not None])
        
        # Apply entropy and value gradients
        loss_v = entropy_loss_v + loss_value_v
        loss_v.backward()
        nn_utils.clip_grad_norm_(net.parameters(), CLIP_GRAD)
        optimizer.step()

        tb_tracker.track("advantage", adv_v, step_idx)
        tb_tracker.track("values", value_v, step_idx)
        tb_tracker.track("batch_rewards", vals_ref_v,
                          step_idx)
        tb_tracker.track("loss_entropy",
                          entropy_loss_v, step_idx)
        tb_tracker.track("loss_policy",
                          loss_policy_v, step_idx)
        tb_tracker.track("loss_value",
                          loss_value_v, step_idx)
        tb_tracker.track("loss_total",
                          loss_v, step_idx)
        
finally:
  for p in data_proc_list:
    p.terminate()
    p.join()

TypeError: ignored