In [None]:
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (16, 10)

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
torch.manual_seed(0)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
class ActorCritic(nn.Module):
    def __init__(self, state_size=4, action_size=2, hidden_size=32):
        super(ActorCritic, self).__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.actor = nn.Linear(hidden_size, action_size) # actor
        self.critic = nn.Linear(hidden_size, 1) #critic

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = self.actor(x)
        return F.softmax(x, dim=1), self.critic(x)
    
    def select_action(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs, _ = self.forward(state).cpu()
        model = Categorical(probs)
        action = model.sample()
        return action.item(), model.log_prob(action)


In [None]:
# Create the environment
env = gym.make("LunarLander-v2")
# env = gym.make("CartPole-v1") 
states_length = env.observation_space.shape[0]
n_actions = env.action_space.n

policy_nn = PolicyNetwork(state_size=states_length,
                          action_size=n_actions, 
                          hidden_size=32).to(device)
optimizer = optim.Adam(policy_nn.parameters(), lr=1e-2)
gamma = 0.99
eps = np.finfo(np.float32).eps.item()

max_episode = 10000
max_steps_per_episode = 1000
scores = deque(maxlen=max_episode)
running_reward = 0

for e in range(1, max_episode): # iterate untile solution (see below)
    state, _ = env.reset()
    episode_reward = 0
    saved_log_probs = []
    rewards = []

    # Collect trajectory
    for t in range(1, max_steps_per_episode+1):
        # Sample the action from current policy
        action, log_prob = policy_nn.select_action(state)
        state, reward, done, _, _ = env.step(action)
        saved_log_probs.append(log_prob)
        rewards.append(reward)
        episode_reward += reward
        if done:
            break
    
    scores.append(episode_reward)
    running_reward = 0.05 * episode_reward + (1 - 0.05) * running_reward

    G = 0
    policy_loss = []
    returns = deque()
    for r in rewards[::-1]:
        G = r + gamma * G
        returns.appendleft(G)

    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + eps)

    for log_prob, rew in zip(saved_log_probs, returns):
        policy_loss.append(-log_prob * rew)
    
    # Backpropagation
    optimizer.zero_grad()
    policy_loss = torch.cat(policy_loss).sum()
    policy_loss.backward()
    optimizer.step()

    del saved_log_probs
    del rewards
    
    if e % 10 == 0:
        print(f"Reward at {e} episode: {running_reward:.2f}")
    if running_reward >= 250: #termination condition
        print(f"Solved at {e} episode: score={running_reward}")
        break

torch.save(policy_nn.state_dict(), "checkpoints/moon_lander.pt")