# 0. Import Dependencies

In [104]:
import numpy as np
import pandas as pd
from collections import deque
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import gymnasium as gym

# 1. Define the Environment and the Model

In [105]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [106]:
env_id = "LunarLander-v3"

env = gym.make(env_id)

s_size = env.observation_space.shape[0]
a_size = env.action_space.n

In [107]:
class PolicyNetwork(nn.Module):
    """
    The Policy Network (Actor).
    """
    def __init__(self, s_size, a_size, h_size):
        '''
        Initialise the policy network.
        '''
        super(PolicyNetwork, self).__init__()
        self.layer1 = nn.Linear(s_size, h_size)
        self.layer2 = nn.Linear(h_size, a_size)

    def forward(self, state):
        '''
        Perform a forward pass through the network.
        '''
        hidden = F.relu(self.layer1(state))
        action_probs = F.softmax(self.layer2(hidden), dim=1)
        return action_probs

    def act(self, state):
        '''
        Sample an action from the policy's distribution.
        '''
        probabilities = self.forward(state)
        distribution = Categorical(probabilities)
        action = distribution.sample()
        return action.item(), distribution.log_prob(action)

In [108]:
class ValueNetwork(nn.Module):
    '''
    The Value Network (Critic).
    '''

    def __init__(self, s_size, h_size):
        '''
        Initialize the value network.
        '''
        super(ValueNetwork, self).__init__()

        self.input_layer = nn.Linear(s_size, h_size)
        self.output_layer = nn.Linear(h_size, 1)

    def forward(self, state):
        '''
        Compute the state value.
        '''
        hidden = F.relu(self.input_layer(state))
        state_value = self.output_layer(hidden)
        return state_value

In [109]:
def generate_trajectory(policy, value_function, max_steps):
    '''
    Function used to generate a trajectory (sequence of states, actions, and rewards) using the current policy.
    '''
    log_probs = []
    rewards = []
    state_values = []
    state, _ = env.reset()
    
    for step in range(max_steps):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        action, log_prob = policy.act(state)
        value = value_function(state)
        next_state, reward, done, truncated , _ = env.step(action)

        rewards.append(reward)
        log_probs.append(log_prob)
        state_values.append(value)
        state = next_state

        if done:
            break
    return  log_probs, rewards, state_values

In [110]:
def calculate_discounted_returns(rewards, max_steps, gamma):
    '''
    Function used to calculate discounted cumulative rewards for a trajectory.
    '''
    returns = deque(maxlen=max_steps)
    n_steps = len(rewards)
    
    for step in range(n_steps)[::-1]:
        disc_return = (returns[0] if len(returns) > 0 else 0)
        returns.appendleft(rewards[step] + gamma * disc_return)
    return returns

In [111]:
def standardise_returns(returns):
    '''
    Function used to standarise returns.
    '''
    eps = np.finfo(np.float32).eps.item()
    returns = torch.tensor(returns, dtype=torch.float32).to(device)
    returns = (returns - returns.mean()) / (returns.std() + eps)
    return returns

In [112]:
def optimise_policy(policy_optimizer, log_probs, returns, state_values):
    '''
    Function used to optimise the policy.
    '''
    state_values = torch.stack(state_values).squeeze()
    advantages = returns - state_values.detach()
    advantages = torch.tensor(advantages).to(device)

    policy_loss = []
    for log_prob, advantage in zip(log_probs, advantages):
        policy_loss.append(-log_prob * advantage)
    policy_loss = torch.cat(policy_loss).sum()
    
    policy_optimizer.zero_grad()
    policy_loss.backward()
    policy_optimizer.step()

In [113]:
def optimise_value_function(value_optimizer, returns, state_values):
    '''
    Function used to optimise the value function network.
    '''
    state_values = torch.stack(state_values).squeeze()

    value_loss = F.mse_loss(state_values, returns)

    value_optimizer.zero_grad()
    value_loss.backward()
    value_optimizer.step()

In [114]:
def train_agent(policy, value_function, policy_optimizer, value_optimizer, num_episodes, max_steps, gamma,
                log_interval, early_stopping_threshold=None):
    """
    Function used to train an actor-critic agent.
    """
    recent_scores = deque(maxlen=100)
    scores = []

    for i_episode in range(1, num_episodes + 1):
        log_probs, rewards, state_values = generate_trajectory(policy, value_function, max_steps)
        episode_score = sum(rewards)
        recent_scores.append(episode_score)
        scores.append(episode_score)

        returns = calculate_discounted_returns(rewards, max_steps, gamma)
        standardised_returns = standardise_returns(returns)

        optimise_value_function(value_optimizer, standardised_returns, state_values)
        optimise_policy(policy_optimizer, log_probs, standardised_returns, state_values)

        if early_stopping_threshold is not None and len(recent_scores) == recent_scores.maxlen:
            avg_score = np.mean(recent_scores)            
            if avg_score >= early_stopping_threshold:
                best_score = np.max(recent_scores)
                worst_score = np.min(recent_scores)
                print(f'Early stopping triggered at episode {i_episode}')
                print(f'Episode {i_episode}\tAverage Score: {avg_score:.2f}\tBest Score: {best_score:.2f}' 
                  + f'\tWorst Score: {worst_score:.2f}')
                break

        if i_episode % log_interval == 0:
            avg_score = np.mean(recent_scores)
            best_score = np.max(recent_scores)
            worst_score = np.min(recent_scores)
            print(f'Episode {i_episode}\tAverage Score: {avg_score:.2f}\tBest Score: {best_score:.2f}' 
                  + f'\tWorst Score: {worst_score:.2f}')
    return scores

# 2. Train the Model

In [134]:
LL_hyperparameters = {
    "h_size": 256,  
    "n_training_episodes": 10000, 
    "n_evaluation_episodes": 1000,
    "max_steps": 1000,
    "gamma": 0.99,
    "lr": 1e-3,
    "state_space": s_size,
    "action_space": a_size,
}

In [116]:
LL_policy = PolicyNetwork(LL_hyperparameters["state_space"], LL_hyperparameters["action_space"],
                           LL_hyperparameters["h_size"]).to(device)
LL_policy_optimizer = optim.Adam(LL_policy.parameters(), lr=LL_hyperparameters["lr"])

In [117]:
LL_value = ValueNetwork(LL_hyperparameters["state_space"], LL_hyperparameters["h_size"]).to(device)
LL_value_optimizer = optim.Adam(LL_value.parameters(), lr=LL_hyperparameters["lr"])

In [None]:
scores = train_agent(LL_policy,
                LL_value,
                LL_policy_optimizer,
                LL_value_optimizer,
                LL_hyperparameters["n_training_episodes"],
                LL_hyperparameters["max_steps"],
                LL_hyperparameters["gamma"],
                100,
                230)

# 3. Evaluate the Model

In [50]:
scores= pd.Series(scores, name="policy_scores")

In [None]:
fig, ax = plt.subplots(figsize=(10, 6))

scores.plot(ax=ax, label="Scores", color='lightblue', linewidth=1.5)
(scores.rolling(window=100)
           .mean()
           .rename("Average (last 100)")
           .plot(ax=ax, color='red', linewidth=2, linestyle='-'))

ax.grid(color='gray', linestyle='--', linewidth=0.5, alpha=0.7)
ax.set_title("Policy scores of the training process", fontsize=16, fontweight='bold')
ax.set_xlabel("Episode Number", fontsize=14)
ax.set_ylabel("Scores", fontsize=14)

ax.tick_params(axis='both', which='major', labelsize=12)
ax.legend(fontsize=12, loc='lower right', frameon=True, shadow=True, fancybox=True)

# del?


ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
plt.tight_layout()
plt.show()

In [52]:
scores.to_csv('a2c_scores4.csv', index=False)

In [53]:
def evaluate(policy, env, num_episodes, max_steps):
    """
    Function to evaluate the performance of a policy.

    """
    all_rewards = []
    all_steps = []
    
    for i_episode in range(1, num_episodes + 1):
        state, _ = env.reset()
        episode_reward = 0
        
        for step in range(max_steps):
            state = torch.from_numpy(state).float().unsqueeze(0).to(device)
            action, _ = policy.act(state)
            state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                break
                
        all_rewards.append(episode_reward)
        all_steps.append(step + 1)

    avg_reward = np.mean(all_rewards)
    avg_steps = np.mean(all_steps)
    print(f"Evaluation over {num_episodes} episodes:")
    print(f"Average Reward: {avg_reward:.2f}")
    print(f"Average Episode Length: {avg_steps:.2f}")
    return all_rewards, all_steps

In [100]:
avg_reward, avg_steps = evaluate(LL_policy, env, LL_hyperparameters["n_evaluation_episodes"],
                                       LL_hyperparameters["max_steps"])

Evaluation over 10 episodes:
Average Reward: 156.46
Average Episode Length: 1000.00


# 4. Save the Model

In [55]:
path = './a2c_model4'

In [56]:
torch.save(LL_policy.state_dict(), path)

# 5. Load a Model

In [126]:
path_to_load = './a2c_model2'

In [102]:
model = PolicyNetwork(LL_hyperparameters["state_space"], LL_hyperparameters["action_space"],
                           LL_hyperparameters["h_size"]).to(device)
model.load_state_dict(torch.load(path_to_load))

  model.load_state_dict(torch.load(path_to_load))


<All keys matched successfully>

In [143]:
rewards, steps = evaluate(model, env, LL_hyperparameters["n_evaluation_episodes"],
                                       LL_hyperparameters["max_steps"])

Evaluation over 1000 episodes:
Average Reward: 239.85
Average Episode Length: 340.09


In [142]:
pd.DataFrame({
       'episode': range(1, len(rewards)+1),
       'reward': rewards,
       'steps': steps
   }).to_csv('test_scores_a2c.csv', index=False)