In [1]:
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import gym
import numpy as np

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [22]:
class ActorNet(nn.Module):
    def __init__(self, state_Size, action_size, hidden_size):
        super(ActorNet, self).__init__()
        self.dense_layer_1 = nn.Linear(state_size, hidden_size)
        self.dense_layer_2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, action_size)
        
    def forward(self, x):
        x = torch.clamp(x, -1.1, 1.1)
        x = F.relu(self.dense_layer_1(x))
        x = F.relu(self.dense_layer_2(x))
        return F.softmax(self.out(x), dim=-1) + 1e-8

In [23]:
class CriticNet(nn.Module):
    def __init__(self, state_Size, hidden_size):
        super(CriticNet, self).__init__()
        self.dense_layer_1 = nn.Linear(state_size, hidden_size)
        self.dense_layer_2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, 1)
        
    def forward(self, x):
        x = torch.clamp(x, -1.1, 1.1)
        x = F.relu(self.dense_layer_1(x))
        x = F.relu(self.dense_layer_2(x))
        return self.out(x)

In [28]:
class ActorCriticAgent():
    def __init__(self, state_size, action_size, hidden_size, actor_lr, critic_lr, discount):
        self.action_size = action_size
        self.actor_net = ActorNet(state_size, action_size, hidden_size).to(device)
        self.critic_net = CriticNet(state_size, hidden_size).to(device)
        self.actor_optimizer = optim.Adam(self.actor_net.parameters(), lr=learning_rate)
        self.critic_optimizer = optim.Adam(self.critic_net.parameters(), lr=learning_rate)
        self.discount = discount
        
    def select_action(self, state):
        with torch.no_grad():
            input_state = torch.FloatTensor(state).to(device)
            action_probs = self.actor_net(input_state)
            action_probs = action_probs.detach().cpu().numpy()
            action = np.random.choice(np.arange(self.action_size), p=action_probs)
        return action
    
    def train(self, state_list, action_list, next_state_list, reward_list, done_list):

        state_t = torch.FloatTensor(state_list).to(device)
        next_state_t = torch.FloatTensor(next_state_list).to(device)
        action_t = torch.LongTensor(action_list).to(device).view(-1, 1)
        reward_t = torch.FloatTensor(reward_list).to(device).view(-1, 1)
        done_t = torch.FloatTensor(done_list).to(device).view(-1, 1)
        
        critic_t = self.critic_net(state_t).view(-1, 1)
        with torch.no_grad():
            critic_td_t = reward_t + done_t * self.discount * self.critic_net(next_state_t).view(-1, 1)
            advantage_t = critic_td_t - critic_t
        
        selected_action_prob = self.actor_net(state_t).gather(1, action_t)
        actor_loss = torch.mean(-torch.log(selected_action_prob) * advantage_t)
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        critic_loss = F.smooth_l1_loss(critic_t, critic_td_t)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        
        return actor_loss.detach().cpu().numpy(), critic_loss.detach().cpu().numpy()

In [29]:
env = gym.make('LunarLander-v2')
action_size = env.action_space.n
state_size = env.observation_space.shape[0]
seed = 31
env.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    
episodes = 5000
hidden_size = 256
learning_rate = 0.0005
discount = 0.99
reward_scale = 0.01
actor_lr = 0.00002
critic_lr = 0.00001

agent = ActorCriticAgent(state_size, action_size, hidden_size, actor_lr, critic_lr, discount)

In [30]:
stats_rewards_list = []
stats_every = 10
total_reward = 0
timesteps = 0
episode_length = 0
stats_actor_loss, stats_critic_loss = [], []

for ep in range(episodes):
    state = env.reset()
    stats_loss = 0.
    
    if len(stats_rewards_list) > stats_every and np.mean(stats_rewards_list[-stats_every:], axis=0)[1] > 190:
        print("Stopping at episode {} with average rewards of {} in last {} episodes".format(ep, np.mean(stats_rewards_list[-stats_every:], axis=0)[1], stats_every))
        break
    
    state_list = []
    action_list = []
    reward_list = []
    next_state_list = []
    done_list = []
    
    while True:
        timesteps += 1
        action = agent.select_action(state)
        
        next_state, reward, done, _ = env.step(action)
        total_reward += reward
        episode_length += 1
        state_list.append(state)
        action_list.append(action)
        reward_list.append(reward * reward_scale)
        next_state_list.append(next_state)
        done_list.append(1. - float(done))
        
        if total_reward < -250:
            done = 1
            
        actor_loss, critic_loss = agent.train(state_list, action_list, reward_list, next_state_list, done_list)
        stats_actor_loss.append(actor_loss)
        stats_critic_loss.append(critic_loss)
        if done:
            stats_rewards_list.append((ep, total_reward, episode_length))
            total_reward = 0
            episode_length = 0
            if ep % stats_every == 0:
                print('Episode {}'.format(ep),
                     'Timestep: {}'.format(timesteps),
                     'Total reward: {:.1f}'.format(np.mean(stats_rewards_list[-stats_every:], axis=0)[1]),
                     'Episode length: {:.1f}'.format(np.mean(stats_rewards_list[-stats_every:], axis=0)[2]),
                     'Actor Loss: {:.3f}'.format(np.mean(stats_actor_loss)),
                     'Critic Loss: {:.4f}'.format(np.mean(stats_critic_loss)))
                stats_actor_loss, stats_critic_loss = [], []
            break
            
        state = next_state

RuntimeError: mat1 dim 1 must match mat2 dim 0

In [12]:
env.close()

In [31]:
class ActorCriticAgent():
    def __init__(self, state_size, action_size, hidden_size, actor_lr, critic_lr, discount, entropy_coeff):
        self.action_size = action_size
        self.actor_net = ActorNet(state_size, action_size, hidden_size).to(device)
        self.critic_net = CriticNet(state_size, hidden_size).to(device)
        self.actor_optimizer = optim.Adam(self.actor_net.parameters(), lr=learning_rate)
        self.critic_optimizer = optim.Adam(self.critic_net.parameters(), lr=learning_rate)
        self.discount = discount
        self.entropy_coeff = entropy_coeff
        
    def select_action(self, state):
        with torch.no_grad():
            input_state = torch.FloatTensor(state).to(device)
            action_probs = self.actor_net(input_state)
            action_probs = action_probs.detach().cpu().numpy()
            action = np.random.choice(np.arange(self.action_size), p=action_probs)
        return action
    
    def train(self, state_list, action_list, next_state_list, reward_list, done_list):

        state_t = torch.FloatTensor(state_list).to(device)
        next_state_t = torch.FloatTensor(next_state_list).to(device)
        action_t = torch.LongTensor(action_list).to(device).view(-1, 1)
        reward_t = torch.FloatTensor(reward_list).to(device).view(-1, 1)
        done_t = torch.FloatTensor(done_list).to(device).view(-1, 1)
        
        critic_t = self.critic_net(state_t).view(-1, 1)
        with torch.no_grad():
            critic_td_t = reward_t + done_t * self.discount * self.critic_net(next_state_t).view(-1, 1)
            advantage_t = critic_td_t - critic_t
        
        
        action_probs = self.actor_net(state_t)
        selected_action_prob = action_probs.gather(1, action_t)
        
        entropy_loss = -torch.sum(action_probs * torch.log(action_probs), dim=1)
        
        actor_loss = torch.mean(-torch.log(selected_action_prob) * advantage_t + self.entropy_coeff * entropy_loss)
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        critic_loss = F.smooth_l1_loss(critic_t, critic_td_t)
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        
        return actor_loss.detach().cpu().numpy(), critic_loss.detach().cpu().numpy()