# Listing 1
## Introduction to multiprocessing

In [None]:
import multiprocessing as mp
import numpy as np
import workers

def square(x):
    return np.square(x)

x = np.arange(64) # sets up an array with a sequence of numbers
x

In [None]:
mp.cpu_count()

In [None]:
pool = mp.Pool(mp.cpu_count()) # sets up a multiprocessing processor pool with cpu_count() processes
index = 64 / mp.cpu_count()
squared = pool.map(square, [x[index * i : index * i + index] for i in range(mp.cpu_count())]) # uses the pool's map function to apply the square function to each array in the list and returns the result in a list
squared

# Listing 2
## Manually starting individual processes

In [None]:
def square(i, x, queue):
    print("In process{}".format(i, ))
    queue.put(np.square(x))
    
processes = [] # sets up a list to store a reference to each process
queue = mp.Queue() # sets up a multiprocessing queue, a data structure that can be shared across processes
x = np.arange(64) # sets up some sample data, a sequence of numbers 
index = 64 / mp.cpu_count()

for i in range(mp.cpu_count()): # starts cpu_count() processes with the square function as the target and an individual piece of data to process
    start_index = index * i
    proc = mp.Process(target=square, args=(i, x[start_index:start_index + index], queue))
    proc.start()
    processes.append(proc)
        
for proc in processes: # waits for each process to finish before returning to the main thread
    proc.join()
    
for proc in processes: # terminates each process
    proc.terminate()
    
results = []
while not queue.empty(): # converts the multiprocessing queue into a list
    results.append(queue.get())
results

# Listing 3
## Pseudocode for online advantage actor-critic

# Listing 4
## CartPole actor-critic model

In [None]:
import torch
from torch import nn
from torch import optim
from torch.nn import functional as F
import gym
import torch.multiprocessing as mp # PyTorch wraps Python's built-in multiprocessing library, and the API is the same

class ActorCritic(nn.Module): # defines a single combined model for the actor and critic
    def __init__(self):
        super(ActorCritic, self).__init__()
        self.l1 = nn.Linear(4, 25)
        self.l2 = nn.Linear(25, 50)
        self.actor_lin1 = nn.Linear(50, 2)
        self.l3 = nn.Linear(50, 25)
        self.critic_lin1 = nn.Linear(25, 1)
        
    def forward(self, x):
        x = F.normalize(x, dim=0) # normalize the input so that the state values are all within the same range
        y = F.relu(self.l1(x))
        y = F.relu(self.l2(y))
        actor = F.log_softmax(self.actor_lin1(y), dim=0) # the actor head returns the log probabilities over the 2 actions
        c = F.relu(self.l3(y.detach())) # detaches the y node so the critic's loss won't backpropagate to layers 1 and 2
        critic = torch.tanh(self.critic_lin1(c)) # the critic returns a single number bounded by -1 and +1
        return actor, critic

# Listing 6
# The main training loop

In [None]:
def worker(t, worker_model, counter, params):
    worker_env = gym.make("CartPole-v1")
    worker_env.reset()
    worker_opt = optim.Adam(lr=1e-4, params=worker_model.parameters()) # each process runs its own isolated environment and optimizer but shares the model
    worker_opt.zero_grad()
    for i in range(params['epochs']):
        worker_opt.zero_grad()
        values, logprobs, rewards = run_episode(worker_env, worker_model) # the run_episode function plays a episode of the game, collecting data along the way
        actor_loss, critic_loss, eplen = update_params(worker_opt, values, logprobs, rewards) # uses the collected data from run_episode to run one parameter update step
        counter.value = counter.value + 1 # globally shared counter between all the running processes

# Listing 7
## Running an episode

In [None]:
def run_episode(worker_env, worker_model):
    state = torch.from_numpy(worker_env.env.state).float() # converts the environment state from a numpy array to a PyTorch tensor
    values, logprobs, rewards = [], [], [] # creates lists to store the computed state values (critic), log probabilities (actor), and rewards
    done = False
    j = 0
    while done == False: # keeps playing the game until the episode ends
        j += 1
        policy, value = worker_model(state) # computes the state value and log probabilities over actions
        values.append(value)
        logits = policy.view(-1)
        action_dist = torch.distributions.Categorical(logits=logits)
        action = action_dist.sample() # using the actor's log probabilities over actions, creates and samples from a categorical distribution to get an action
        logprob_ = policy.view(-1)[action]
        logprobs.append(logprob_)
        state_, _, done, info = worker_env.step(action.detach().numpy())
        state = torch.from_numpy(state_).float()
        if done: # if the last action caused the episode to end, sets the reward to -10 and resets the environment
            reward = -10
            worker_env.reset()
        else:
            reward = 1.0
        rewards.append(reward)
    return values, logprobs, rewards

# Listing 8
## Computing and minimizing the loss

In [None]:
def update_params(worker_opt, values, logprobs, rewards, clc=0.1, gamma=0.95):
    rewards = torch.Tensor(rewards).flip(dims=(0, )).view(-1) # reverse the order of the rewards, logprobs and values arrays and call .view(-1) to make sure they're flat
    logprobs = torch.stack(logprobs).flip(dims=(0, )).view(-1)
    values = torch.stack(values).flip(dims=(0, )).view(-1)
    Returns = []
    ret_ = torch.Tensor([0])
    
    for r in range(rewards.shape[0]): # for each reward (in reverse order), computes the return value and append it to a returns array
        ret_ = rewards[r] + gamma * ret_
        Returns.append(ret_)
        
    Returns = torch.stack(Returns).view(-1)
    Returns = F.normalize(Returns, dim=0)
    actor_loss = -logprobs * (Returns - values.detach()) # detach the values tensor from the graph to prevent backpropagating through the critic head
    critic_loss = torch.pow(values - Returns, 2) # the critic attempts to learn to predict the return
    loss = actor_loss.sum() + clc * critic_loss.sum() # sum the actor and critic losses to get an overall loss. Scale down the critic loss by the clc factor
    loss.backward()
    worker_opt.step()
    return actor_loss, critic_loss, len(rewards)

# Listing 5
## Distributing the training

In [None]:
MasterNode = ActorCritic() # creates a global, shared actor-critic model
MasterNode.share_memory() # this method will allow the parameters of the model to be shared across processes rather than being copied

processes = [] # sets up a list to store the instantiated processes
params = {
    'epochs': 1000,
    'n_workers': mp.cpu_count() - 1,
}
counter = mp.Value('i', 0) # a shared global counter using multiprocessing's built-in shared object. The 'i' parameter indicates the type is integer

for i in range(params['n_workers']):
    p = mp.Process(target=workers.worker, args=(i, MasterNode, counter, params)) # starts a new process that runs the worker function
    p.start()
    processes.append(p)

for p in processes: # joins each process to wait for it to finish before returning to the main process
    p.join()

for p in processes: # makes sure each process is terminated
    p.terminate()

print(counter.value, processes[0].exitcode) # prints the global counter value and the first process's exit code (which should be 0)

In [None]:
env = gym.make("CartPole-v1")
env.reset()
eplen = 0
steps = []

for i in range(1000):
    state_ = np.array(env.env.state)
    state = torch.from_numpy(state_).float()
    logits, value = MasterNode(state)
    action_dist = torch.distributions.Categorical(logits=logits)
    action = action_dist.sample()
    state2, reward, done, info = env.step(action.detach().numpy())
    if done:
        print("Lost")
        env.reset()
        steps.append(eplen)
        eplen = 0
    else:
        eplen += 1
    state_ = np.array(env.env.state)
    state = torch.from_numpy(state_).float()

In [None]:
def running_mean(x, N=50):
    kernel = np.ones(N)
    conv_len = x.shape[0]-N
    y = np.zeros(conv_len)
    for i in range(conv_len):
        y[i] = kernel @ x[i:i+N]
        y[i] /= N
    return y

In [None]:
from matplotlib import pylab as plt

steps = np.array(steps)
avg_steps = running_mean(steps, 50)

plt.figure(figsize=(10, 7))
plt.ylabel("Episode Duration", fontsize=22)
plt.xlabel("Training Epochs", fontsize=22)
plt.plot(avg_steps, color='green')

# Listing 9
## N-step training with CartPole

In [None]:
def run_episode(worker_env, worker_model, N_steps=10):
    raw_state = np.array(worker_env.env.state)
    state = torch.from_numpy(raw_state).float()
    values, logprobs, rewards = [], [], []
    done = False
    j = 0
    G = torch.Tensor([0]) # refers to the return, initialized to 0
    
    while j < N_steps and done == False: # plays game until N steps or when episode is over
        j += 1
        policy, value = worker_model(state)
        values.append(value)
        logits = policy.view(-1)
        action_dist = torch.distributions.Categorical(logits=logits)
        action = action_dist.sample()
        logprob_ = policy.view(-1)[action]
        logprobs.append(logprob_)
        state_, _, done, info = worker_env.step(action.detach().numpy())
        state = torch.from_numpy(state_).float()
        if done:
            reward = -10
            worker_env.reset()
        else: # if episode is not done, sets return to the last state value
            reward = 1.0
            G = value.detach()
        rewards.append(reward)
    return values, logprobs, rewards, G

In [None]:
def update_params(worker_opt, values, logprobs, rewards, G, clc=0.1, gamma=0.95):
    rewards = torch.Tensor(rewards).flip(dims=(0,)).view(-1)
    logprobs = torch.stack(logprobs).flip(dims=(0,)).view(-1)
    values = torch.stack(values).flip(dims=(0,)).view(-1)
    Returns = []
    ret_ = G
    
    for r in range(rewards.shape[0]): # for each reward (in reverse order), computes the return value and append it to a returns array
        ret_ = rewards[r] + gamma * ret_
        Returns.append(ret_)
        
    Returns = torch.stack(Returns).view(-1)
    Returns = F.normalize(Returns, dim=0)
    actor_loss = -logprobs * (Returns - values.detach()) # detach the values tensor from the graph to prevent backpropagating through the critic head
    critic_loss = torch.pow(values - Returns, 2) # the critic attempts to learn to predict the return
    loss = actor_loss.sum() + clc * critic_loss.sum() # sum the actor and critic losses to get an overall loss. Scale down the critic loss by the clc factor
    loss.backward()
    worker_opt.step()
    return actor_loss, critic_loss, len(rewards)

In [None]:
def worker(t, worker_model, counter, params):
    worker_env = gym.make("CartPole-v1")
    worker_env.reset()
    worker_opt = optim.Adam(lr=1e-4, params=worker_model.parameters())
    worker_opt.zero_grad()
    for i in range(params['epochs']):
        worker_opt.zero_grad()
        values, logprobs, rewards, G = run_episode(worker_env, worker_model)
        actor_loss, critic_loss, eplen = update_params(worker_opt, values, logprobs, rewards, G)
        counter.value = counter.value + 1

In [None]:
env = gym.make("CartPole-v1")
env.reset()
eplen = 0
steps = []

for i in range(1000):
    state_ = np.array(env.env.state)
    state = torch.from_numpy(state_).float()
    logits, value = MasterNode(state)
    action_dist = torch.distributions.Categorical(logits=logits)
    action = action_dist.sample()
    state2, reward, done, info = env.step(action.detach().numpy())
    if done:
        print("Lost")
        env.reset()
        steps.append(eplen)
        eplen = 0
    else:
        eplen += 1
    state_ = np.array(env.env.state)
    state = torch.from_numpy(state_).float()

In [None]:
steps = np.array(steps)
avg_steps = running_mean(steps, 50)

plt.figure(figsize=(10, 7))
plt.ylabel("Episode Duration", fontsize=22)
plt.xlabel("Training Epochs", fontsize=22)
plt.plot(avg_steps, color='green')

# Listing 10
## Returns with and without bootstrapping

In [None]:
# Simulated rewards for 3 steps
r1 = [1, 1, -1]
r2 = [1, 1, 1]
R1, R2 = 0.0, 0.0

# No bootstrapping
for i in range(len(r1) - 1, 0, -1):
    R1 = r1[i] + 0.99 * R1
for i in range(len(r2) - 1, 0, -1):
    R2 = r2[i] + 0.99 * R2
print("No bootstrapping")
print(R1, R2)

# With bootstrapping
R1, R2 = 1.0, 1.0
for i in range(len(r1) - 1, 0, -1):
    R1 = r1[i] + 0.99 * R1
for i in range(len(r2) - 1, 0, -1):
    R2 = r2[i] + 0.99 * R2
print("With bootstrapping")
print(R1, R2)