### Learning curves for different gamma values


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import gymnasium as gym
from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import BaseCallback

# Callback to log rewards
class RewardLoggerCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(RewardLoggerCallback, self).__init__(verbose)
        self.episode_rewards = []

    def _on_step(self) -> bool:
        if 'rewards' in self.locals:
            reward = self.locals['rewards'][0]
            if len(self.episode_rewards) == 0 or self.locals['dones'][0]:
                self.episode_rewards.append(0)
            self.episode_rewards[-1] += reward
        return True

def train_and_log_rewards(gamma, total_timesteps=250000): # can alter timesteps
    """
    Trains a DQN agent with a specified discount factor and logs rewards.
    Args:
    """
    env = gym.make('LunarLander-v3')

    # Initialise the reward logger
    reward_logger = RewardLoggerCallback()

    # Create DQN model with the specified gamma
    gamma_model = DQN(
        policy="MlpPolicy",
        env=env,
        learning_rate=0.00063,         # Optimal learning rate for Lunar Lander
        gamma=gamma,                  # Discount factor
        buffer_size=100000,
        learning_starts=1000,
        train_freq=4,
        batch_size=128,
        policy_kwargs=dict(net_arch=[256, 256]),
        target_update_interval=250,
        exploration_fraction=0.12,
        exploration_final_eps=0.1,
        verbose=1,
    )

    # Train the model
    gamma_model.learn(total_timesteps=total_timesteps, callback=reward_logger)
    return reward_logger.episode_rewards

def plot_gamma_learning_curves(all_rewards, gamma_values, window=20):
  
    plt.figure(figsize=(10, 6))
    for rewards, gamma in zip(all_rewards, gamma_values):
        # Smooth the rewards using a moving average
        smoothed_rewards = [np.mean(rewards[i:i+window]) for i in range(len(rewards) - window)]
        plt.plot(smoothed_rewards, label=f"Gamma = {gamma}")
    plt.xlabel("Episodes")
    plt.ylabel("Smoothed Reward")
    plt.title("Learning Curves for Different Gamma Values")
    plt.legend()
    plt.show()

# Define the gamma values to compare
gamma_values = [0.99, 0.9]

# Train the model for each gamma value and log rewards
all_rewards = []
for gamma in gamma_values:
    print(f"Training with gamma: {gamma}")
    rewards = train_and_log_rewards(gamma=gamma, total_timesteps=250000)
    all_rewards.append(rewards)

# Plot the learning curves
plot_gamma_learning_curves(all_rewards, gamma_values, window=50)


### Learning curves for different learning rates


In [None]:
def train_and_log_rewards(learning_rate, total_timesteps=100000):
    """
    Trains a DQN agent with a specified learning rate and logs rewards.
    Args:
    """
    env = gym.make('LunarLander-v3')

    # Initialise the reward logger
    reward_logger = RewardLoggerCallback()

    # Create DQN model with the specified learning rate
    learning_rate_model = DQN(
        policy="MlpPolicy",
        env=env,
        learning_rate=learning_rate,
        gamma=0.99, # optimal discount factor
        buffer_size=100000,
        learning_starts=1000,
        train_freq=4,
        batch_size=128,
        policy_kwargs=dict(net_arch=[256, 256]),
        target_update_interval=250,
        exploration_fraction=0.12,
        exploration_final_eps=0.1,
        verbose=1,
    )

    # Train the model
    learning_rate_model.learn(total_timesteps=total_timesteps, callback=reward_logger)
    return reward_logger.episode_rewards

def plot_learning_curves(all_rewards, learning_rates, window=20):
    
    plt.figure(figsize=(10, 6))
    for rewards, lr in zip(all_rewards, learning_rates):
        # Smooth the rewards using a moving average
        smoothed_rewards = [np.mean(rewards[i:i+window]) for i in range(len(rewards) - window)]
        plt.plot(smoothed_rewards, label=f"Learning Rate = {lr}")
    plt.xlabel("Episodes")
    plt.ylabel("Smoothed Reward")
    plt.title("Learning Curves for Different Learning Rates")
    plt.legend()
    plt.show()

# Define the learning rates to compare
learning_rates = [0.000002, 0.00063, 0.003]

# Train the model for each learning rate and log rewards
all_rewards = []
for lr in learning_rates:
    print(f"Training with learning rate: {lr}")
    rewards = train_and_log_rewards(learning_rate=lr, total_timesteps=500000) # change timesteps here
    all_rewards.append(rewards)

# Plot the learning curves
plot_learning_curves(all_rewards, learning_rates, window=100)


### Learning curve using optimised parameters


In [None]:
# Train model over large number of timesteps eg. 1000000
env = gym.make('LunarLander-v3')

# Initialise DQN model
trained_model = DQN(
    policy="MlpPolicy",          
    env=env,                     
    learning_rate=0.00063,      
    gamma=0.99,                  
    buffer_size=100000,         
    learning_starts=1000,        
    train_freq=4,                
    batch_size=128,              
    policy_kwargs=dict(net_arch=[256, 256]),                            
    target_update_interval=250,  
    exploration_fraction=0.12,    
    exploration_final_eps=0.1,  
    verbose=1                   
)

# track rewards 
reward_logger = RewardLoggerCallback()

trained_model.learn(total_timesteps=1000000)

# save trained model
trained_model.save("dict_lunar_lander_dqn")

def plot_learning_curve(rewards, window=20):

    # Compute moving average to smooth curve
    moving_avg = [np.mean(rewards[i-window:i]) for i in range(window, len(rewards))]

    # Plot the learning curve
    plt.figure(figsize=(10, 6))
    plt.plot(range(window, len(rewards)), moving_avg)
    plt.xlabel('Episodes')
    plt.ylabel('Average Reward')
    plt.title('Learning Curve for DQN')
    plt.legend()
    plt.show()

plot_learning_curve(reward_logger.episode_rewards, window=5)

### Histograms of rewards of an untrained and trained DQN 

In [None]:

def evaluate_agent(env, model, n_episodes=100):
    
    rewards = []
    for episode in range(n_episodes):
        obs, info = env.reset()
        done = False
        total_reward = 0

        while not done:
            # Take random actions since the agent is untrained
            action = env.action_space.sample()  # Random action
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
            total_reward += reward

        rewards.append(total_reward)
        print(f"Episode {episode + 1}: Reward = {total_reward}")

    return rewards

# Create the Lunar Lander environment
env = gym.make('LunarLander-v3')

# Create an untrained DQN model
untrained_model = DQN(policy="MlpPolicy", env=env, verbose=0)  # Do NOT train the model

# save model
untrained_model.save("lunar_lander_dqn")

# Evaluate the untrained agent
untrained_rewards = evaluate_agent(env, untrained_model, n_episodes=100)

# Plot the histogram of rewards for the untrained agent
plt.hist(untrained_rewards, bins=10, edgecolor='k')
plt.xlabel("Episode Reward")
plt.ylabel("Frequency")
plt.title("Distribution of Episode Rewards - Untrained Agent")
plt.show()

# Plot histogram of rewards for the untrained agent
trained_rewards = evaluate_agent(env, trained_model, n_episodes=100)

plt.hist(untrained_rewards, bins=10, edgecolor='k')
plt.xlabel("Episode Reward")
plt.ylabel("Frequency")
plt.title("Distribution of Episode Rewards - Trained Agent")
plt.show()

### Episode video sample from untrained and trained agent 

In [None]:
# record untrained agent
def record_untrained_agent(video_path, n_episodes=1):
    
    # Create the environment and wrap it with the RecordVideo wrapper
    env = gym.make('LunarLander-v3', render_mode="rgb_array")
    env = gym.wrappers.RecordVideo(env, video_path, episode_trigger=lambda episode_id: True)

    for episode in range(n_episodes):
        obs, info = env.reset()
        done = False
        while not done:
            # Random action for untrained agent
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
    env.close()

# Record the untrained agent video
record_untrained_agent(video_path="./untrained_agent_video", n_episodes=1)

# video of agent in trained eviroment

def record_trained_agent(model_path, video_path, n_episodes=1):
    
    # Load the trained model
    model = DQN.load(model_path)

    # Create the environment and wrap it with the RecordVideo wrapper
    env = gym.make('LunarLander-v3', render_mode="rgb_array")
    env = gym.wrappers.RecordVideo(env, video_path, episode_trigger=lambda episode_id: True)

    for episode in range(n_episodes):
        obs, info = env.reset()
        done = False
        while not done:
            # Predict the next action using the trained model
            action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
    env.close()

# Record the trained agent video
record_trained_agent(model_path="dict_lunar_lander_dqn", video_path="./trained_agent_video", n_episodes=1)


Acknowledgement:
I acknowledge the use of ChatGPT-4o (OpenAI,
https://chat.openai.com/) to assist in debugging functions to generate working plots. I confirm that no content generated by AI has been presented as my own work."