# Asynchronous Advantage Actor-Critic (A3C) with data parallelism

In [1]:
import sys
sys.path.append("../Chapter13/")

In [2]:
import gym
import os
import ptan
import numpy as np
import argparse
import collections
from tensorboardX import SummaryWriter

import torch
import torch.nn.utils as nn_utils
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp

from lib import common

In hyperparameters, we have three new values:
- PROCESSES_COUNT specifies the number of child processes that will gather
training data for us. This activity is mostly CPU-bound, as the heaviest
operation here is the preprocessing of Atari frames, so this value is set
equal to the number of CPU cores on my machine.
- MICRO_BATCH_SIZE sets the number of training samples that every child
process needs to obtain before transferring those samples to the main
process.
- NUM_ENVS is the number of environments every child process will use to
gather data. This number multiplied by the number of processes is the total
amount of parallel environments that we will get our training data from.

In [3]:
GAMMA = 0.99
LEARNING_RATE = 0.001
ENTROPY_BETA = 0.01
BATCH_SIZE = 128

REWARD_STEPS = 4
CLIP_GRAD = 0.1

PROCESSES_COUNT = 4
NUM_ENVS = 8
MICRO_BATCH_SIZE = 32

In [4]:
if True:
    ENV_NAME = "PongNoFrameskip-v4"
    REWARD_BOUND = 18
else:
    ENV_NAME = "BreakoutNoFrameskip-v4"
    REWARD_BOUND = 400

Before we get to the child process function, we need the environment construction
function and a tiny wrapper that we will use to send the total episode reward into
the main training process.

The data_func function is very simple, but it is special, as it will be executed in the
child process. (We will use the mp.Process class to launch those processes in the
main code block.) 

We pass it three arguments: our NN, the device to be used to
perform computation (cpu or cuda string), and the queue we will use to send data
from the child process to our master process, which will perform training. The
queue is used in the many-producers and one-consumer mode, and can contain
two different types of objects:
- TotalReward: This is a preceding object that we've defined, which has only
one field reward, which is a float value of the total undiscounted reward for
the completed episode.
- A tuple with tensors returned by the function common.unpack_batch().

Due to torch.multiprocessing magic, those tensors will be transferred
to the main process without copying physical memory, which might be a
costly operation (as an Atari observation is large).
As we get the required number of experience samples for our microbatch, we
convert them into training data using the unpack_batch function and clear the
batch. One thing to note is that as our experience samples represent four-step
subsequences (as REWARD_STEPS is 4), we need to use a proper discount factor of
𝛾^4 for the last V(s) reward term.

In [5]:
def make_env():
    return ptan.common.wrappers.wrap_dqn(gym.make(ENV_NAME))


TotalReward = collections.namedtuple('TotalReward', field_names='reward')


def data_func(net, device, train_queue):
    envs = [make_env() for _ in range(NUM_ENVS)]
    agent = ptan.agent.PolicyAgent(
        lambda x: net(x)[0], device=device, apply_softmax=True)
    exp_source = ptan.experience.ExperienceSourceFirstLast(
        envs, agent, gamma=GAMMA, steps_count=REWARD_STEPS)
    micro_batch = []

    for exp in exp_source:
        new_rewards = exp_source.pop_total_rewards()
        if new_rewards:
            data = TotalReward(reward=np.mean(new_rewards))
            train_queue.put(data)

        micro_batch.append(exp)
        if len(micro_batch) < MICRO_BATCH_SIZE:
            continue

        data = common.unpack_batch(
            micro_batch, net, device=device,
            last_val_gamma=GAMMA ** REWARD_STEPS)
        train_queue.put(data)
        micro_batch.clear()

In the beginning, we take familiar steps, except for a single call to the mp.set_start_method, which instructs the multiprocessing module about the kind of
parallelism we want to use. The native multiprocessing library in Python supports
several ways to start subprocesses, but due to PyTorch multiprocessing limitations,
spawn is the only option if you want to use GPU.

Another new line is assignment to the OMP_NUM_THREADS, which is an environment
variable instructing the OpenMP library about the number of threads it can start.

OpenMP (https://www.openmp.org/) is heavily used by the Gym and OpenCV
libraries to provide a speed-up on multicore systems, which is a good thing most
of the time. By default, the process that uses OpenMP starts a thread for every core
in the system. But in our case, the effect from OpenMP is the opposite: as we're
implementing our own parallelism, by launching several processes, extra threads
overload the cores with frequent context switches, which negatively impacts
performance. 

To avoid this, we explicitly set the maximum number of threads
OpenMP can start with a single thread. If you want, you can experiment yourself
with this parameter. On my system, I experienced a 3-4x performance drop without
this code line.

After that, we create our NN, move it to the CUDA device, and ask it to share its
weights. CUDA tensors are shared by default, but for CPU mode, a call to share_memory() is required for multiprocessing to work.

In [8]:
mp.set_start_method('spawn')
os.environ['OMP_NUM_THREADS'] = "1"
device = "cuda"

writer = SummaryWriter(comment=f"-a3c-data_pong_test_a3c")

env = make_env()
net = common.AtariA2C(env.observation_space.shape,
                      env.action_space.n).to(device)
net.share_memory()

AtariA2C(
  (conv): Sequential(
    (0): Conv2d(4, 32, kernel_size=(8, 8), stride=(4, 4))
    (1): ReLU()
    (2): Conv2d(32, 64, kernel_size=(4, 4), stride=(2, 2))
    (3): ReLU()
    (4): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1))
    (5): ReLU()
  )
  (policy): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=6, bias=True)
  )
  (value): Sequential(
    (0): Linear(in_features=3136, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=1, bias=True)
  )
)

As we can have only two types of objects in the queue (TotalReward and a tuple
with a microbatch), we need to check an entry obtained from the queue only once.
After the TotalReward entries are handled, we process the tuple with tensors. We
accumulate them in lists, and once the required batch size has been reached, we
concatenate the tensors using a torch.cat() call, which appends tensors along the
first dimension.
The rest of the training loop is standard actor-critic loss calculation, which is
performed in exactly the same way as in the previous chapter: we calculate the logits
of the policy and value estimation using our current network, and calculate the
policy, value, and entropy losses.

As the last step, we pass the calculated tensors to the TensorBoard tracker class,
which will perform the averaging and store the data that we want to monitor.

In the last finally block, which can be executed due to an exception (Ctrl + C, for
example) or the game solved condition, we terminate the child processes and wait for
them. This is required to make sure that there are no leftover processes.

In [9]:
optimizer = optim.Adam(net.parameters(), lr=LEARNING_RATE,
                       eps=1e-3)

train_queue = mp.Queue(maxsize=PROCESSES_COUNT)
data_proc_list = []
for _ in range(PROCESSES_COUNT):
    data_proc = mp.Process(target=data_func,
                           args=(net, device, train_queue))
    data_proc.start()
    data_proc_list.append(data_proc)

batch_states = []
batch_actions = []
batch_vals_ref = []
step_idx = 0
batch_size = 0

try:
    with common.RewardTracker(writer, REWARD_BOUND) as tracker:
        with ptan.common.utils.TBMeanTracker(
                writer, 100) as tb_tracker:
            while True:
                train_entry = train_queue.get()
                if isinstance(train_entry, TotalReward):
                    if tracker.reward(train_entry.reward,
                                      step_idx):
                        break
                    continue

                states_t, actions_t, vals_ref_t = train_entry
                batch_states.append(states_t)
                batch_actions.append(actions_t)
                batch_vals_ref.append(vals_ref_t)
                step_idx += states_t.size()[0]
                batch_size += states_t.size()[0]
                if batch_size < BATCH_SIZE:
                    continue

                states_v = torch.cat(batch_states)
                actions_t = torch.cat(batch_actions)
                vals_ref_v = torch.cat(batch_vals_ref)
                batch_states.clear()
                batch_actions.clear()
                batch_vals_ref.clear()
                batch_size = 0

                optimizer.zero_grad()
                logits_v, value_v = net(states_v)

                loss_value_v = F.mse_loss(
                    value_v.squeeze(-1), vals_ref_v)

                log_prob_v = F.log_softmax(logits_v, dim=1)
                adv_v = vals_ref_v - value_v.detach()
                size = states_v.size()[0]
                log_p_a = log_prob_v[range(size), actions_t]
                log_prob_actions_v = adv_v * log_p_a
                loss_policy_v = -log_prob_actions_v.mean()

                prob_v = F.softmax(logits_v, dim=1)
                ent = (prob_v * log_prob_v).sum(dim=1).mean()
                entropy_loss_v = ENTROPY_BETA * ent

                loss_v = entropy_loss_v + loss_value_v + \
                         loss_policy_v
                loss_v.backward()
                nn_utils.clip_grad_norm_(
                    net.parameters(), CLIP_GRAD)
                optimizer.step()

                tb_tracker.track("advantage", adv_v, step_idx)
                tb_tracker.track("values", value_v, step_idx)
                tb_tracker.track("batch_rewards", vals_ref_v,
                                 step_idx)
                tb_tracker.track("loss_entropy",
                                 entropy_loss_v, step_idx)
                tb_tracker.track("loss_policy",
                                 loss_policy_v, step_idx)
                tb_tracker.track("loss_value",
                                 loss_value_v, step_idx)
                tb_tracker.track("loss_total",
                                 loss_v, step_idx)
finally:
    for p in data_proc_list:
        p.terminate()
        p.join()

KeyboardInterrupt: 