In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from random import random

import gym
import torch
import numpy as np
from tqdm import tqdm

from torch import nn
from torch.nn import functional as F
from torch.nn import Module, Linear

### Actor-Critic Network

In [None]:
class ActorCritic(Module):
    
    def __init__(self, in_size, out_size, num_layers=2, num_hidden=64):
        super(ActorCritic, self).__init__()
        self.fc = [Linear(in_size, num_hidden)]
        
        for _ in range(num_layers - 1):
            self.fc.append(Linear(num_hidden, num_hidden))
        
        self.val = Linear(num_hidden, 1)
        self.pi = Linear(num_hidden, out_size)
        
    def forward(self, x):
        for layer in self.fc:
            x = F.relu(layer(x))
            
        val = self.val(x)
        log_pi = F.log_softmax(self.pi(x), dim=0)
        
        return val, log_pi

### Calculating Log-prob from Logits

$$\log \pi(a_0) = \log (\frac{e^{x_0}}{e^{x_0} + e^{x_1}})$$
$$= x_0 - \log(e^{x_0} + e^{x_1})$$
$$= (x_0 - x^*) - \log(e^{x_0 - x^*} + e^{x_1 - x^*})$$
$$x^* = \max (x_0, x_1)$$

$$\log \pi(a_i) = x_i - \text{logsumexp}(x_1, \ldots, x_n)$$

In [None]:
def policy_loss(trajectory, advantage, model):
    """
    A trajectory is a sequence of the following:
    
    (x_t, a_t, r_t)
    
    x_t: The observation at time t
    a_t: The action taken at time t, given x_t.
    r_t: The reward received for taking action a_t at x_t.
    """
    n = len(trajectory)
    pi_loss = 0
    
    for t in range(n):
        adv_t = advantage[t]
        x_t, a_t, r_t = trajectory[t]
        _, log_pi_xt = model(x_t)
        
        pi_loss -= log_pi_xt[a_t] * adv_t
        
    pi_loss /= n
    
    return pi_loss

In [None]:
def trajectory_return(trajectory, discount):
    n = len(trajectory)
    rw_to_go = {}
    rw_sum = 0
    
    # Calculate suffix-sums of reward
    for t in reversed(range(n)):
        _, _, r_t = trajectory[t]
        rw_sum = r_t + discount * rw_sum
        rw_to_go[t] = rw_sum
        
    return rw_to_go

In [None]:
def value_loss(trajectory, discount, model):
    n = len(trajectory)
    rw_to_go = trajectory_return(trajectory, discount)
    ret = torch.empty((n,1))
    
    for t in range(n):
        ret[t] = rw_to_go[t]
    
    # Calculate advantage values with gradient
    xs, _, _ = zip(*trajectory)
    x_tensor = torch.stack(xs)
    vals, _ = model(x_tensor)
    
    return F.mse_loss(vals, ret)

In [None]:
def advantage_function(trajectory, discount, model):
    """
    Create a dictionary with the advantage at each
    timestep.
    """
    n = len(trajectory)
    advantage = []
    rw_to_go = {}
    rw_sum = 0
    
    # Calculate suffix-sums of reward
    for t in reversed(range(n)):
        _, _, r_t = trajectory[t]
        rw_sum = r_t + discount * rw_sum
        rw_to_go[t] = rw_sum
        
    # Calculate advantage
    for t in range(n):
        x_t, _, _ = trajectory[t]
        
        with torch.no_grad():
            val_xt, _ = model(x_t)
            advantage.append(rw_to_go[t] - val_xt)
    
    return torch.stack(advantage)

### Utility functions

In [None]:
def obs_to_tensor(obs):
    obs = obs.astype('float32')
    obs = torch.from_numpy(obs)
    
    return obs

In [None]:
def sample_act(log_probs):
    u = random()
    p = torch.exp(log_probs)
    
    cum_p = 0
    
    for i in range(len(log_probs)):
        cum_p += p[i]
        
        if u <= cum_p:
            return i
    
    # Return last action in case there is a
    # rounding error and cum_p doesn't go to 1
    return len(log_probs)

### Training the Model

In [None]:
def create_optimizer(model, lr):
    return torch.optim.Adam(model.parameters(), lr=lr)

In [None]:
def train_step(env, model, optimizer, c, discount, n_episodes):
    # Initialize loss and zero out gradients on parameters
    loss = 0
    optimizer.zero_grad()
    
    # Collect trajectories for n_episodes
    for _ in range(n_episodes):
        trajectory = []
        
        x_t = env.reset()
        done = False
        
        while not done:
            with torch.no_grad():
                x_t = obs_to_tensor(x_t)
                _, log_pi_xt = model(x_t)
            
            a_t = sample_act(log_pi_xt)
            x_tp1, r_t, done, _ = env.step(a_t)
            
            trajectory.append((x_t, a_t, r_t))
            x_t = x_tp1
            
            # Note: We can collect the log_pi at every
            # step here, and the value at every state, 
            # so that we can combined them into the 
            # loss after we're done
        
        val_loss = value_loss(trajectory, discount, model)
        advantage = advantage_function(trajectory, discount, model)
        pi_loss = policy_loss(trajectory, advantage, model)
        
        loss += pi_loss + c*val_loss
        
        
    # Perform gradient step
    loss /= n_episodes
    loss.backward()
    optimizer.step()
    
    return loss

In [None]:
def train_cartpole(
    n_epochs, 
    n_episodes,
    print_freq=100,
    discount=.99,
    lr=.01,
    c=.01
):
    env = gym.make('CartPole-v0')
    in_size = env.observation_space.shape[0]
    out_size = env.action_space.n
    
    model = ActorCritic(in_size, out_size)
    optimizer = create_optimizer(model, lr)
    
    loss = 0
    for t in tqdm(range(n_epochs)):
        loss += train_step(env, model, optimizer, c, discount, n_episodes)
        
        if (t + 1) % print_freq == 0:
            print(loss / print_freq)
            loss = 0
        
    return model

In [None]:
def play_cartpole(
    model,
    n_episodes,
    step_len=.02
):
    env = gym.make('CartPole-v0')
    
    for _ in range(n_episodes):
        x_t = env.reset()
        done = False
        t = 0
        
        while not done:
            env.render()
            with torch.no_grad():
                x_t = obs_to_tensor(x_t)
                _, log_pi_xt = model(x_t)
            
            a_t = sample_act(log_pi_xt)
            x_t, _, done, _ = env.step(a_t)
            t += 1
        print(f'ep_len: {t}')
    
    env.close()

In [None]:
model = train_cartpole(500, 10)

In [None]:
play_cartpole(model, 10)

### Test Algorithm Parts

In [None]:
# Constants
discount=.99
lr=.01
c=.01

In [None]:
# Environment and model definitions
env = gym.make('CartPole-v0')
in_size = env.observation_space.shape[0]
out_size = env.action_space.n

model = ActorCritic(in_size, out_size, num_layers=1, num_hidden=16)
optimizer = create_optimizer(model, lr)

In [None]:
# Model output sanity check
obs = env.reset()
obs

In [None]:
obs.dtype

In [None]:
obs_32 = obs.astype('float32')
x = torch.from_numpy(obs_32)
with torch.no_grad():
    val, pi = model(x)
(val, pi)

In [None]:
torch.exp(pi)

In [None]:
train_step(env, model, optimizer, c, discount, 2)