In [1]:
import os
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
from torch.distributions import Categorical
import matplotlib.pyplot as plt
from collections import namedtuple
import seaborn as sns
%matplotlib inline
from gym import wrappers

env = gym.make("CartPole-v0")
Experience = namedtuple("Experience", ['state', 'action', 'reward', 'next_state', 'done'])

In [2]:
class REINFORCEBaseline(nn.Module):
    """
    Create policy network which takes state featues as input and outputs state values
    """
    def __init__(self):
        super(REINFORCEBaseline, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(4, 128),
            nn.ReLU()
        )
        self.policy_head = nn.Linear(128, 2)
        self.value_head = nn.Linear(128, 1)
        
    def forward(self, x):
        x = self.net(x)
        x_policy = self.policy_head(x)
        x_value = self.value_head(x)
        return x_policy, x_value

In [3]:
def get_policy_values(model, state):
    """
    Calculate unnormalized policy values in a state.
    Args:
        state: a numpy array containing state features 
    Returns:
        a tensor of unnormalized policy values 
    """
    state = Variable(torch.from_numpy(state)).type(torch.FloatTensor).unsqueeze(0)
    policy_values, state_value = model(state)
    return policy_values, state_value

def generate_episode(env, policy):
    """
    Return experience in an episode
    """
    episode = []
    s = env.reset()
    
    while True:
        policy_values, _ = get_policy_values(policy, s)
        action_probs = F.softmax(policy_values, dim=-1).detach().numpy().reshape(-1)
        action = np.random.choice(np.arange(len(action_probs)), p=action_probs)
        next_s, r, done, _ = env.step(action)

        episode.append(Experience(s, action, r, next_s, done))
        
        s = next_s
        if done:
            break
    return episode

def update_model(episode, policy, optimizer):
    policy_loss, value_loss = 0, 0
    value_loss_func = nn.MSELoss()
    for t, exp in enumerate(episode):
        k = len(episode[t:])
        policy_values, state_value = get_policy_values(policy, exp.state)
        #============UPDATE VALUE FUNCTION STEP============
        discount_reward = np.sum(GAMMA ** np.arange(k) * np.ones(k))
        true_reward = torch.FloatTensor(discount_reward.reshape(-1,1))
        value_loss += value_loss_func(true_reward, state_value)
        
        #============UPDATE POLICY STEP=================
        baseline = state_value.reshape(-1)
        log_probs = F.log_softmax(policy_values).reshape(-1)
        policy_target = discount_reward - baseline
        policy_loss += -log_probs[exp.action] * policy_target * GAMMA ** t
#         print(policy_loss)
    optimizer.zero_grad()
    total_loss = value_loss + policy_loss
    total_loss.backward()
    optimizer.step()

def predict_action(model, state):
    policy_values, _ = get_policy_values(model, s)
    action_probs = F.softmax(policy_values, dim=-1).detach().numpy().reshape(-1)
    action = np.argmax(action_probs)
    return action

In [None]:
LR = 0.003
GAMMA = 0.99
EPISODES = 1000
NUM_TEST = 20

s = env.reset()
model = REINFORCEBaseline()
optimizer = optim.Adam(model.parameters(), lr=LR)

for ep in range(EPISODES):
    episode = generate_episode(env, model)
    update_model(episode, model, optimizer)
    
    # Test agent
    list_reward = []
    if ep % 50 == 0:
        with torch.no_grad():
            for n_test in range(NUM_TEST):
                episode = generate_episode(env, model)
                list_reward.append((len(episode)))
            mean_reward = np.mean(list_reward)
            print(f"Episode: {ep}. Mean reward: {mean_reward}")
    if mean_reward > 199:
        print(f"Game solved in {ep} episode(s)!")
        break

In [4]:
# torch.save(model, 'reinfoce_baseline.pth')
model = torch.load("reinfoce_baseline.pth")

In [5]:
# Set up fake display; otherwise rendering will fail
import os
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
import base64
from pathlib import Path
from IPython import display as ipythondisplay

env = gym.make("CartPole-v0")
# Use Wrapper.Moniter from openai gym to record video
wrapped_env = wrappers.Monitor(env, directory='./videos/reinforce_baseline_cartpole_1', force=True, resume=True)
s = wrapped_env.reset()
while True:
    action = predict_action(model, s)
    s, _, done, _ = wrapped_env.step(action)
    if done:
        break
wrapped_env.close()
env.close()