In [1]:
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import time
import math
from collections import deque 

In [2]:
env = gym.make('CartPole-v0')
#obs  = 4,
#action space = 2
# rew : -inf to inf

In [3]:
class Actor(nn.Module):
    def __init__(self, obs_space, act_space):
        super(Actor, self).__init__()
        
        self.base_layers =  nn.Sequential(
                                nn.Linear(obs_space, 128),
                                nn.ReLU(),
                                nn.Linear(128, 256),
                            )
        self.action_layer = nn.Linear(256,act_space)
        
    def forward(self, x):
        x = F.relu(self.base_layers(x))
        x = self.action_layer(x) #outputs as logits
        return x

In [4]:
class Critic(nn.Module):
    def __init__(self, obs_space): #outputs a single value V(s)
        super(Critic, self).__init__()
        self.base_layers =  nn.Sequential(
                            nn.Linear(obs_space, 128),
                            nn.ReLU(),
                            nn.Linear(128, 256),
                        )
        
        self.value_layer = nn.Linear(256,1)
        
    def forward(self, x):
        x = torch.relu(self.base_layers(x))
        x = self.value_layer(x)
        return x   

In [5]:
def normalize(x):
    x_arr = np.array(x)
    return (x-np.mean(x))/np.std(x)

def discount_rewards(rewards, gamma):
    rewards_arr = np.zeros_like(rewards, dtype=np.float64)
    R = 0
    
    for t in reversed(range(0,len(rewards))):
        R = gamma*R + rewards[t]
        rewards_arr[t] = R
        
    return torch.tensor(normalize(rewards_arr), dtype=torch.float64)
    
    

In [6]:
class ModelTrainer():
    def __init__(self, actor_model, critic_model, gamma, lr=1e-3):
        
        #hyperparams
        self.lr = lr
        self.gamma = gamma
        
        #trainable
        self.actor_model = actor_model
        self.critic_model = critic_model
        self.xentropy_loss = nn.CrossEntropyLoss(reduction='none')
        self.msbe_loss = nn.MSELoss()
        self.optimizer = torch.optim.Adam(actor_model.parameters(), lr=self.lr)
        
        
        
        
    def train_step(self, observations, actions, rewards, next_observations, dones):
        #Convert Tensors
        obs_tensor = torch.tensor(observations, dtype=torch.float64)
        act_tensor = torch.tensor(actions, dtype=torch.long)
        rew_tensor = torch.tensor(rewards, dtype=torch.float64)
        next_obs_tensor = torch.tensor(next_observations, dtype=torch.float64)
        
        #Scale rewards
        returns = discount_rewards(rewards, self.gamma)
        
        #Calculate logits ACTOR
        self.optimizer.zero_grad()
        logits = self.actor_model(obs_tensor.float())
        
        #VALUE FROM CRITIC
        value_s = self.critic_model(obs_tensor.float()).squeeze()
        
        #Loss
        advantage = returns - value_s #G(t) - V(s), Critic network close to expected reward
        
        #1. Actor Loss
        xentropy_loss = self.xentropy_loss(logits, act_tensor)*advantage
        act_loss = xentropy_loss.mean()
        
        #2. Critic Loss
        mse_loss = self.msbe_loss(returns.float(),value_s.float())
        
        critic_loss = mse_loss.mean()
        
        loss = act_loss + critic_loss
        loss.backward()
        
        #Gradient optimization
        self.optimizer.step()
        
        return loss

In [7]:
class CartPoleAgent():
    def __init__(self, obs_space, act_space, gamma=0.95):
        self.gamma = gamma
        self.actor_model = Actor(obs_space, act_space)
        self.critic_model = Critic(obs_space)
        self.trainer = ModelTrainer(self.actor_model, self.critic_model, self.gamma)
        
        #Memory
        self.memory = deque()
        
    def getAction(self, observation):
        obs_tensor = torch.tensor(observation, dtype=torch.float64)
        with torch.no_grad():
            logits = self.actor_model(obs_tensor.float()).squeeze()
           
        policy_distribution = torch.distributions.Categorical(logits=logits)
        action = policy_distribution.sample()
        return action.numpy()
    
    def saveToMemory(self, observation, action, reward, next_observation, done):
        self.memory.append((observation,action,reward, next_observation,done))
        return True
    
    def clearMemory(self):
        self.memory = deque()
    
    def trainEpisode(self):
        obs, act, rew, next_obs, done = zip(*self.memory)
        return self.trainer.train_step(obs, act, rew, next_obs, done)
    
    def getEpisodeRewards(self):
        obs, act, rew, next_obs, done = zip(*self.memory)
        rewards = np.array(rew)
        return np.sum(rewards)

### Train

In [9]:
#https://www.youtube.com/watch?v=Ql8QPcp8818
import matplotlib.pyplot as plt
from IPython import display

plt.ion()
plt.style.use('seaborn')

def plot(rewards, loss, fig, axs):
    display.clear_output(wait=True)
    axs[0].clear()
    axs[1].clear()

    
    axs[0].plot(rewards, 'tab:orange')
    axs[0].title.set_text('Episode Rewards')

    axs[1].plot(loss, 'tab:blue')
    axs[1].title.set_text('Loss')
    

    
    plt.xlabel('Episode')

    axs[0].text(len(rewards)-1, rewards[-1], str(rewards[-1]))
    axs[1].text(len(loss)-1, loss[-1], str(loss[-1]))

    plt.show(block=False)
    plt.pause(.1)
    


In [10]:
env.render()

In [11]:
obs_space = env.observation_space.shape[0]
act_space = env.action_space.n

In [None]:
%matplotlib qt
fig, axs = plt.subplots(2)

CPA_agent = CartPoleAgent(obs_space, act_space, gamma=0.95)
n_steps = 200

#Plot lists
rewards = []
losses = []

while True:
    CPA_agent.clearMemory()
    observation = env.reset()
    
    for i in range(n_steps):
        env.render()
        action = CPA_agent.getAction(observation)
        next_observation, reward, done, info = env.step(action)
        
        CPA_agent.saveToMemory(observation, action.item(), reward, next_observation,done)
        
        if done:
            print(f"Episode finished at time step {i}")
            break;
        
        observation = next_observation
        
    loss = CPA_agent.trainEpisode()
    
    #plot
    rewards.append(CPA_agent.getEpisodeRewards())
    losses.append(loss.item())
    plot(rewards, losses, fig, axs)

Episode finished at time step 199
