In [None]:
!pip install gym==0.23.1

# BASIC TASKS

## Building Environment

In [None]:
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt
import time

In [None]:
# Creating Maze Environment
class MazeEnv(gym.Env):
    metadata = {'render.modes': ['human']}   # Setting rendering to 'human' for display of the agent's behaviour

    def __init__(self, grid_size=5):
        super(MazeEnv, self).__init__()
        self.grid_size = grid_size   # Storing grid size = 5
        self.action_space = spaces.Discrete(4)   # Defining action space - agent can move in 4 directions: up, down, left, right
        self.observation_space = spaces.Discrete(grid_size * grid_size) # 5x5 2D grid
        self.has_key = False   # Tracking key collection
        self._create_maze()   # Building maze

    def _create_maze(self):
        self.grid = np.zeros((self.grid_size, self.grid_size), dtype=np.uint8)   # Building empty grid
        self.agent_pos = [0,0]   # Agent starting position top left corner
        self.goal_pos = [self.grid_size - 1, self.grid_size - 1]   # Goal position at bottom right (treasure)
        self.grid[tuple(self.goal_pos)] = 4   # Goal - Treasure
        self.grid[(3,2)] = 3   # Trap 2
        self.grid[(1,3)] = 2   # Trap 1
        self.grid[(2,4)] = 1   # Key

    # Resetting environment
    def reset(self):
        self._create_maze()
        self.has_key = False   # Resetting key collection status
        self.agent_pos = [0,0]   # Putting agent back to starting position
        return self._get_state()   # Returning initial state

    # Defining current state
    def _get_state(self):
        y,x = self.agent_pos
        return y * self.grid_size + x

    def step(self, action):
        y, x = self.agent_pos
        # Moving agent
        if action == 0 and y>0:   # Moving Up
            y -= 1
        elif action == 1 and x<self.grid_size - 1:   # Moving Right
            x += 1
        elif action == 2 and y<self.grid_size - 1:   # Moving Down
            y += 1
        elif action == 3 and x>0:   # Moving Left
            x -= 1

        self.agent_pos = [y,x]
        reward = -1.0   # Penalty for every step taken to incentivise quicker paths
        done = False

        tile = self.grid[y,x]
        if tile == 1:
            reward = 15
            self.has_key = True   # Agent collected key
            self.grid[y,x] = 0   # Removing key after collection
        elif tile == 2:
            reward = -10   # Agent encountered a trap
        elif tile == 3:
            reward = -10   # Agent encountered a trap
        elif tile == 4:
            if self.has_key:
                reward = 30   # Agent reached treasure with key
                done = True   # Terminating episode
            else:
                reward = -3   # Giving negative reward if agent tries finishing episode without key
                done = False   # Episode continues as agent did not collect key

        # Returning updated observation, reward, done flag
        return self._get_state(), reward, done, {}

    # Code for the following render function was taken from the video in [1].
    # Defining render function for display
    def render(self, mode='human'):
        print("\nCurrent Maze State:")
        for r in range(self.grid_size):
            for c in range(self.grid_size):
                pos = [r, c]
                if pos == self.agent_pos:
                    print('A', end=' ')   # Agent
                elif pos == self.goal_pos:
                    print('$', end=' ')   # Treasure
                elif pos == [3, 2]:
                    print('X', end=' ')   # Trap
                elif pos == [1, 3]:
                    print('X', end=' ')   # Trap
                elif pos == [2, 4]:
                    print('K', end=' ')   # Key
                else:
                    print('.', end=' ')   # Empty 
            print()
        print()

    def close(self):
        pass

## Setting up Q-Learning [2]

The following code was mainly taken from lab tutorial 4 part 2 [2].

In [None]:
# The following code was mainly taken from lab tutorial 4 part 2 [2].

# Defining function to initialise Qtable
def initialize_q_table(state_space, action_space):
    return np.zeros((state_space, action_space))

# Defining epsilon-greedy policy function
def epsilon_greedy_policy(Qtable, state, epsilon):
    if np.random.uniform(0,1) < epsilon:
        return np.random.choice(Qtable.shape[1])
    else:
        return np.argmax(Qtable[state])

# Training Step Function
def train(n_episodes, min_epsilon, max_epsilon, decay_rate, env, max_steps, Qtable, alpha, gamma):
    # Define empty rewards and steps lists
    episode_rewards = []
    episode_steps = []
    
    for episode in range(n_episodes):
        # Reduce epsilon - less and less exploration is needed
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode)
        # Reset the environment
        state = env.reset()
        done = False
        total_reward = 0
        steps = 0

        # Priniting epsiode number
        #print(f"\n--- Episode {episode + 1} ---")
        
        for _ in range(max_steps):
            action = epsilon_greedy_policy(Qtable, state, epsilon)
            new_state, reward, done, _ = env.step(action)

            # Render current state of the environment 
            #env.render()
            #time.sleep(0.1)
            
            # Updating Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            Qtable[state, action] += alpha * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state, action])

            # The state is the new state
            state = new_state
            total_reward += reward
            steps += 1
            
            # If done, finish the episode
            if done:
                break

        # Printing reward after every episode
        print(f"Episode {episode + 1}: Reward = {total_reward}")
        
        episode_rewards.append(total_reward)
        episode_steps.append(steps)
            
    return Qtable, episode_rewards, episode_steps

In [None]:
# Setting the environment as the above constructed maze
env = MazeEnv(grid_size=5)
# Calling state and action spaces
state_space = env.observation_space.n
action_space = env.action_space.n

# Initilising Qtable
Qtable = initialize_q_table(state_space, action_space)

# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
alpha = 0.7   # Learning rate
gamma = 0.95   # Discounting rate
max_epsilon = 1.0   # Exploration probability at the start = 1.0 for maximum exploration
min_epsilon = 0.05   # Minimum possible exploration probability
decay_rate = 0.0005   # Exponential decay rate for exploration probability

In [None]:
# Setting random seed for reproducibility
np.random.seed(42)

# Train
Qtable, episode_rewards, episode_steps = train(
    n_training_episodes,
    min_epsilon, max_epsilon,
    decay_rate,
    env,
    max_steps,
    Qtable,
    alpha,
    gamma
)

# Printing the q-values for every state
print("Final Q-table (state x action):")
for state in range(Qtable.shape[0]):
    print(f"State {state}: {Qtable[state]}")

## Model Evaluation [2]

The following code was mainly taken from lab tutorial 4 part 2 [2].

In [None]:
# The following code was mainly taken from lab tutorial 4 part 2 [2].

# Evaluates the agent for 'n_eval_episodes' episodes and returns average reward and std of reward
def evaluate_agent(env, Qtable, max_steps, n_eval_episodes=150):
    # env: Evaluation environment
    # Qtable: The Q-table
    # n_eval_episodes: Number of episode to evaluate the agent
    rewards = []
    for _ in range(n_eval_episodes):
        state = env.reset()
        total_reward = 0
        
        for _ in range(max_steps):
            action = np.argmax(Qtable[state])
            state, reward, done, _ = env.step(action)
            total_reward += reward
            if done:
                break

        rewards.append(total_reward)
    return np.mean(rewards), np.std(rewards)

# Evaluate
mean_reward, std_reward = evaluate_agent(env, Qtable, max_steps)
print(f"Evaluation results -> Mean reward = {mean_reward:.2f} +/- {std_reward:.2f}")

## Plotting Performance

In [None]:
# Defining moving average function to plot average reward per episode learning curve
def moving_average(data, window_size=50):   # Setting window_size=50 --> Averaging over 50 episodes
    return np.convolve(data, np.ones(window_size)/window_size, mode='valid')
avg_rewards = moving_average(episode_rewards, window_size=50)
avg_steps = moving_average(episode_steps, window_size=50)

fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

axs[0].plot(episode_rewards)
axs[0].set_title('Total Reward vs Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)

axs[1].plot(episode_steps)
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)

axs[2].plot(range(len(avg_rewards)), avg_rewards, label='Average Reward')
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)

axs[3].plot(range(len(avg_steps)), avg_steps, label='Average Steps')
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)

plt.tight_layout()
plt.show()

## Experiment with Different Parameter Values & Policies

## Varying Parameters

Leaving gamma and decay rate fixed while changing alpha parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
gamma = 0.95   # Discounting rate
max_epsilon = 1.0   # Exploration probability at the start = 1.0 for maximum exploration
min_epsilon = 0.05   # Minimum possible exploration probability
decay_rate = 0.0005   # Exponential decay rate for exploration probability

alphas = [0.5, 0.9]   # Learning rate
colors = ['#4D4D4D', '#DD8452']
labels = ['alpha=0.5', 'alpha=0.9']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

# Setting random seed for reproducibility
np.random.seed(42)

for idx, alpha in enumerate(alphas):
    # Initialising Qtable before training
    Qtable = initialize_q_table(state_space, action_space)
    Qtable, episode_rewards, episode_steps = train(
        n_training_episodes,
        min_epsilon, max_epsilon,
        decay_rate,
        env,
        max_steps,
        Qtable,
        alpha,
        gamma
    )

    # Evaluate agent
    mean_reward, std_reward = evaluate_agent(env, Qtable, max_steps)
    print(f"Alpha={alpha} -> Mean reward = {mean_reward:.2f} +/- {std_reward:.2f}")
    
    all_rewards.append(episode_rewards)
    all_steps.append(episode_steps)
    
    # Calculate smoothed average rewards
    def moving_average(data, window_size=50):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(episode_rewards, window_size=50)
    avg_steps = moving_average(episode_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

Leaving alpha and decay rate fixed while changing gamma parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
alpha = 0.7   # Learning rate
max_epsilon = 1.0   # Exploration probability at the start = 1.0 for maximum exploration
min_epsilon = 0.05   # Minimum possible exploration probability
decay_rate = 0.0005   # Exponential decay rate for exploration probability

gammas = [0.9, 0.99]   # Discounting rate
colors = ['#4D4D4D', '#DD8452']
labels = ['gamma=0.9', 'gamma=0.99']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

# Setting random seed for reproducibility
np.random.seed(42)

for idx, gamma in enumerate(gammas):
    # Initialising Qtable before training
    Qtable = initialize_q_table(state_space, action_space)
    Qtable, episode_rewards, episode_steps = train(
        n_training_episodes,
        min_epsilon, max_epsilon,
        decay_rate,
        env,
        max_steps,
        Qtable,
        alpha,
        gamma
    )

    # Evaluate agent
    mean_reward, std_reward = evaluate_agent(env, Qtable, max_steps)
    print(f"Gamma={gamma} -> Mean reward = {mean_reward:.2f} +/- {std_reward:.2f}")
    
    all_rewards.append(episode_rewards)
    all_steps.append(episode_steps)
    
    # Calculate smoothed average rewards
    def moving_average(data, window_size=50):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(episode_rewards, window_size=50)
    avg_steps = moving_average(episode_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

Leaving alpha and gamma fixed while changing decay rate parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
alpha = 0.7   # Learning rate
max_epsilon = 1.0   # Exploration probability at the start = 1.0 for maximum exploration
min_epsilon = 0.05   # Minimum possible exploration probability
gamma = 0.95   # Discounting rate

decay_rates = [0.001, 0.005]   # Exponential decay rate for exploration probability
colors = ['#4D4D4D', '#DD8452']
labels = ['decay_rate=0.001', 'decay_rate=0.005']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

# Setting random seed for reproducibility
np.random.seed(42)

for idx, decay_rate in enumerate(decay_rates):
    # Initialising Qtable before training
    Qtable = initialize_q_table(state_space, action_space)
    Qtable, episode_rewards, episode_steps = train(
        n_training_episodes,
        min_epsilon, max_epsilon,
        decay_rate,
        env,
        max_steps,
        Qtable,
        alpha,
        gamma
    )

    # Evaluate agent
    mean_reward, std_reward = evaluate_agent(env, Qtable, max_steps)
    print(f"Decay rate={decay_rate} -> Mean reward = {mean_reward:.2f} +/- {std_reward:.2f}")
    
    all_rewards.append(episode_rewards)
    all_steps.append(episode_steps)
    
    # Calculate smoothed average rewards
    def moving_average(data, window_size=10):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(episode_rewards, window_size=50)
    avg_steps = moving_average(episode_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

*Best parameter combination - which combination is the fastest to reach an average positive reward:*

ALPHA=0.7

GAMMA=0.95

DECAY_RATE=0.005

## Varying Policy

Experimenting with softmax policy instead of epsilon-greedy policy

In [None]:
# Defining softmax policy
def softmax_policy(Qtable, state, temperature=1.0):
    q_values = Qtable[state]
    exp_q = np.exp( q_values / temperature )
    probs = exp_q / np.sum(exp_q)
    return np.random.choice(len(q_values), p=probs)

# Training Step
def softmax_train(n_episodes, temperature, env, max_steps, Qtable, alpha, gamma):
    episode_rewards = []
    episode_steps = []
    
    for episode in range(n_episodes):
        # Reset the environment
        state = env.reset()
        done = False
        total_reward = 0
        steps = 0

        #print(f"\n--- Episode {episode + 1} ---")
        
        for _ in range(max_steps):
            action = softmax_policy(Qtable, state, temperature)
            new_state, reward, done, _ = env.step(action)

            # Render current state of the environment 
            #env.render()
            #time.sleep(0.1)
            
            # Updating Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            Qtable[state, action] += alpha * (reward + gamma * np.max(Qtable[new_state]) - Qtable[state, action])

            # Our state is the new state
            state = new_state
            total_reward += reward
            steps += 1
            
            # If done, finish the episode
            if done:
                break

        # Printing reward after every episode
        print(f"Episode {episode + 1}: Reward = {total_reward}")

        episode_rewards.append(total_reward)
        episode_steps.append(steps)
            
    return Qtable, episode_rewards, episode_steps

## Evaluating and Plotting Performance with Different Parameter Values

Leaving alpha and gamma fixed while changing temperature parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
alpha = 0.5   # Learning rate
gamma = 0.95   # Discounting rate

temperatures = [2.5, 5.0, 7.5]   # Temperature
colors = ['#55A868', '#C44E52', '#8172B2']
labels = ['T=2.5', 'T=5.0', 'T=7.5']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

np.random.seed(42) # Setting random seed for reproducibility

for idx, temp in enumerate(temperatures):
    # Initialising Qtable before training
    Qtable_softmax = initialize_q_table(state_space, action_space)
    # Training with softmax
    Qtable_softmax, softmax_rewards, softmax_steps = softmax_train(
        n_training_episodes,
        temp,
        env,
        max_steps,
        Qtable_softmax,
        alpha,
        gamma
    )

    # Evaluation
    mean_reward_softmax, std_reward_softmax = evaluate_agent(env, Qtable_softmax, max_steps)
    print(f"Temperature={temp} -> Mean reward = {mean_reward_softmax:.2f} +/- {std_reward_softmax:.2f}")

    all_rewards.append(softmax_rewards)
    all_steps.append(softmax_steps)
    
    # Calculate smoothed average rewards
    def moving_average(data, window_size=10):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(softmax_rewards, window_size=50)
    avg_steps = moving_average(softmax_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

Leaving temperature and gamma fixed while changing alpha parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
temperature = 2.5   # Temperature
gamma = 0.95   # Discounting rate

alphas = [0.2, 0.5, 0.8]   # Learning rate
colors = ['#55A868', '#C44E52', '#8172B2']
labels = ['alpha=0.2', 'alpha=0.5', 'alpha=0.8']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

np.random.seed(42) # For reproducibility

for idx, alpha in enumerate(alphas):
    # Initialising Qtable before training
    Qtable_softmax = initialize_q_table(state_space, action_space)
    # Training with softmax
    Qtable_softmax, softmax_rewards, softmax_steps = softmax_train(
        n_training_episodes,
        temp,
        env,
        max_steps,
        Qtable_softmax,
        alpha,
        gamma
    )

    # Evaluation
    mean_reward_softmax, std_reward_softmax = evaluate_agent(env, Qtable_softmax, max_steps)
    print(f"Alpahs={alpha} -> Mean reward = {mean_reward_softmax:.2f} +/- {std_reward_softmax:.2f}")

    all_rewards.append(softmax_rewards)
    all_steps.append(softmax_steps)

    def moving_average(data, window_size=50):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(softmax_rewards, window_size=50)
    avg_steps = moving_average(softmax_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

Leaving temperature and alpha fixed while changing gamma parameter

In [None]:
# Hyperparameters
n_training_episodes = 1500   # Total training episodes
max_steps = 100   # Maximum number of steps per episode
temperature = 2.5   # Temperature
alpha = 0.5   # Learning rate

gammas = [0.9, 0.95, 0.99]   # Discounting rate
colors = ['#55A868', '#C44E52', '#8172B2']
labels = ['gamma=0.9', 'gamma=0.95', 'gamma=0.99']

all_rewards = []
all_steps = []
all_avg_rewards = []
all_avg_steps = []

np.random.seed(42) # For reproducibility

for idx, temp in enumerate(temperatures):
    # Initialising Qtable before training
    Qtable_softmax = initialize_q_table(state_space, action_space)
    # Training with softmax
    Qtable_softmax, softmax_rewards, softmax_steps = softmax_train(
        n_training_episodes,
        temp,
        env,
        max_steps,
        Qtable_softmax,
        alpha,
        gamma
    )

    # Evaluation
    mean_reward_softmax, std_reward_softmax = evaluate_agent(env, Qtable_softmax, max_steps)
    print(f"Gammas={gamma} -> Mean reward = {mean_reward_softmax:.2f} +/- {std_reward_softmax:.2f}")

    all_rewards.append(softmax_rewards)
    all_steps.append(softmax_steps)

    def moving_average(data, window_size=50):
        return np.convolve(data, np.ones(window_size)/window_size, mode='valid')

    avg_rewards = moving_average(softmax_rewards, window_size=50)
    avg_steps = moving_average(softmax_steps, window_size=50)
    all_avg_rewards.append(avg_rewards)
    all_avg_steps.append(avg_steps)

In [None]:
# Plotting performance
fig, axs = plt.subplots(4, 1, figsize=(10,8), sharex=True)

for idx, rewards in enumerate(all_rewards):
    axs[0].plot(rewards, label=labels[idx], color=colors[idx])
axs[0].set_title('Total Reward per Episode')
axs[0].set_ylabel('Total Reward')
axs[0].grid(True)
axs[0].legend(loc='upper left')

for idx, steps in enumerate(all_steps):
    axs[1].plot(steps, label=labels[idx], color=colors[idx])
axs[1].set_title('Steps per Episode')
axs[1].set_ylabel('Steps')
axs[1].grid(True)
axs[1].legend(loc='upper left')

for idx, avg_rewards in enumerate(all_avg_rewards):
    axs[2].plot(range(len(avg_rewards)), avg_rewards, label=labels[idx], color=colors[idx])
axs[2].set_title('Average Reward per Episode')
axs[2].set_ylabel('Average Reward')
axs[2].grid(True)
axs[2].legend(loc='upper left')

for idx, avg_steps in enumerate(all_avg_steps):
    axs[3].plot(range(len(avg_steps)), avg_steps, label=labels[idx], color=colors[idx])
axs[3].set_title('Average Steps per Episode')
axs[3].set_xlabel('Episode')
axs[3].set_ylabel('Average Steps')
axs[3].grid(True)
axs[3].legend(loc='upper left')

plt.tight_layout()
plt.show()

Best parameter combination - which combination is the fastest to reach an average positive reward:

TEMPERATURE=2.5

ALPHA=0.5

GAMMA=0.9

# ADVANCED TASKS

## DQN + Double DQN Implementation

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
from torch import optim
from collections import deque
import pandas as pd

The great majority of the below code was inspired from lab 6 [3], and adapted for the defined environment

In [None]:
# The below defined experience replay buffer class is used to stabilise the learning process of the double DQN algorithm
class ExperienceReplayBuffer:
    def __init__(self, capacity: int, random_state=None):
        self.buffer = deque(maxlen=capacity)
        self.random_state = random_state

    def __len__(self):
        return len(self.buffer)

    def add(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        indices = self.random_state.choice(len(self.buffer), batch_size, replace=False)
        experiences = [self.buffer[i] for i in indices]
        states, actions, rewards, next_states, dones = zip(*experiences)
        return (
        torch.tensor(states, dtype=torch.float32),
        torch.tensor(actions, dtype=torch.int64).unsqueeze(1),
        torch.tensor(rewards, dtype=torch.float32).unsqueeze(1),
        torch.tensor(next_states, dtype=torch.float32),
        torch.tensor(dones, dtype=torch.uint8).unsqueeze(1)
        )

In [None]:
# Defining the Q-network
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, number_hidden_units):
        super().__init__()
        self.fc1 = nn.Linear(state_size, number_hidden_units)
        self.fc2 = nn.Linear(number_hidden_units, number_hidden_units)
        self.fc3 = nn.Linear(number_hidden_units, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

# Deep Q-learning Agent
class DeepQAgent:
    def __init__(
        self,
        state_size,
        action_size,
        number_hidden_units,
        optimizer_fn,
        batch_size,
        buffer_size,
        epsilon_decay_schedule,
        alpha,
        gamma,
        update_frequency,
        double_dqn = False,
        seed = 0
    ):
        self.random_state = np.random.RandomState(seed)
        self.torch_generator = torch.Generator().manual_seed(seed)
        
        self.state_size = state_size
        self.action_size = action_size
        self.qnetwork_local = QNetwork(state_size, action_size, number_hidden_units)
        self.qnetwork_target = QNetwork(state_size, action_size, number_hidden_units)
        self.optimizer = optimizer_fn(self.qnetwork_local.parameters())
        self.batch_size = batch_size
        self.memory = ExperienceReplayBuffer(buffer_size, self.random_state)
        self.epsilon_decay_schedule = epsilon_decay_schedule
        self.alpha = alpha
        self.gamma = gamma
        self.update_frequency = update_frequency
        self.double_dqn = double_dqn

        self.steps = 0

    def act(self, state, epsilon=0):
        if self.random_state.rand() > epsilon:
            with torch.no_grad():
                state = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
                q_values = self.qnetwork_local(state)
                return torch.argmax(q_values).item()

        else:
            return self.random_state.choice(np.arange(self.action_size))

    def step(self, state, action, reward, next_state, done):
        self.memory.add(state, action, reward, next_state, done)
        self.steps += 1

        if len(self.memory) >= self.batch_size and self.steps % self.update_frequency == 0:
            self.learn()

    def learn(self):
        states, actions, rewards, next_states, dones = self.memory.sample(self.batch_size)

        if self.double_dqn:
            double_q_learning_update(
                self.qnetwork_local,
                self.qnetwork_target,
                self.optimizer,
                states,
                actions,
                rewards,
                next_states,
                dones,
                self.gamma
            )
        else:
            standard_q_learning_update(
                self.qnetwork_local,
                self.qnetwork_target,
                self.optimizer,
                states,
                actions,
                rewards,
                next_states,
                dones,
                self.gamma
            )

        # Implementing soft target network update
        for target_param, local_param in zip(self.qnetwork_target.parameters(), self.qnetwork_local.parameters()):
            target_param.data.copy_(self.alpha * local_param.data + (1.0 - self.alpha) * target_param.data)

In [None]:
# Defining standard and double Q-learning update functions
def standard_q_learning_update(qnetwork_local, qnetwork_target, optimizer, states, actions, rewards, next_states, dones, gamma):
    q_targets_next = qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
    q_targets = rewards + (gamma * q_targets_next * (1 - dones.float()))

    q_expected = qnetwork_local(states).gather(1, actions)

    loss = F.mse_loss(q_expected, q_targets)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

def double_q_learning_update(qnetwork_local, qnetwork_target, optimizer, states, actions, rewards, next_states, dones, gamma):
    best_actions = qnetwork_local(next_states).detach().argmax(1).unsqueeze(1)
    q_targets_next = qnetwork_target(next_states).gather(1, best_actions)
    q_targets = rewards + (gamma * q_targets_next * (1 - dones.float()))

    q_expected = qnetwork_local(states).gather(1, actions)

    loss = F.mse_loss(q_expected, q_targets)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
# Defining power decay epsilon-schedule
def power_decay_schedule(episode_number: int, decay_factor: float, minimum_epsilon: float) -> float:
    return max(decay_factor ** episode_number, minimum_epsilon)

_epsilon_decay_schedule_kwargs = {
    "decay_factor": 0.99,
    "minimum_epsilon": 1e-2,
}
epsilon_decay_schedule = lambda n: power_decay_schedule(n, **_epsilon_decay_schedule_kwargs)

In [None]:
# Optimizer configuration
_optimizer_kwargs = {
    "lr": 1e-3,
    "betas": (0.9, 0.999),
    "eps": 1e-08,
    "weight_decay": 0,
    "amsgrad": False,
}
optimizer_fn = lambda parameters: optim.Adam(parameters, **_optimizer_kwargs)

In [None]:
# Implementation of training function
def train(agent, env, checkpoint_filepath, target_score=195.0, number_episodes=2000):
    scores = []
    scores_window = deque(maxlen=100)

    for episode in range(1, number_episodes + 1):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            epsilon = agent.epsilon_decay_schedule(episode)
            action = agent.act(state, epsilon)
            next_state, reward, done, _ = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

        scores.append(total_reward)
        scores_window.append(total_reward)

        print(f"Episode {episode}\tAverage Score: {np.mean(scores_window):.2f}")

        if np.mean(scores_window) >= target_score:
            print(f"\nEnvironment solved in {episode} episodes!")
            torch.save(agent.qnetwork_local.state_dict(), checkpoint_filepath)
            break

    return scores

In [None]:
# Setting up environment
env = gym.make("CartPole-v1")

# Agent configuration
_agent_kwargs = {
    "state_size": env.observation_space.shape[0],
    "action_size": env.action_space.n,
    "number_hidden_units": 64,
    "optimizer_fn": optimizer_fn,
    "epsilon_decay_schedule": epsilon_decay_schedule,
    "batch_size": 64,
    "buffer_size": 100000,
    "alpha": 1e-3,
    "gamma": 0.99,
    "update_frequency": 4,
    "seed": 42,
}

Training standard and double DQN agents without early stopping - but with target_score in mind (195)

In [None]:
# Standard DQN agent
_agent_kwargs["double_dqn"] = False
dqn_agent = DeepQAgent(**_agent_kwargs)
dqn_scores = train(dqn_agent, env, "dqn-checkpoint.pth", number_episodes=2000, target_score=float("inf"))

In [None]:
# Double DQN agent
_agent_kwargs["double_dqn"] = True
double_dqn_agent = DeepQAgent(**_agent_kwargs)
double_dqn_scores = train(double_dqn_agent, env, "double-dqn-checkpoint.pth", number_episodes=2000, target_score=float("inf"))

## Plotting Results

In [None]:
# Standard DQN
dqn_scores = pd.Series(dqn_scores, name="DQN Scores")

fig, ax = plt.subplots(figsize=(15, 3))

dqn_scores.plot(ax=ax, label="DQN Scores", color='blue')
dqn_scores.rolling(window=100).mean().rename("Rolling Avg").plot(ax=ax, color='red')
ax.legend(loc='upper left')
ax.set_xlabel("Episode")
ax.set_ylabel("Score")
ax.set_title("Standard DQN Score per Episode")

plt.show()

In [None]:
# Double DQN
double_dqn_scores = pd.Series(double_dqn_scores, name="Double DQN Scores")

fig, ax = plt.subplots(figsize=(15, 3))

double_dqn_scores.plot(ax=ax, label="Double DQN Scores", color='blue')
double_dqn_scores.rolling(window=100).mean().rename("Rolling Avg").plot(ax=ax, color='red')
ax.legend(loc='upper left')
ax.set_ylabel("Score")
ax.set_xlabel("Episode")
ax.set_title("Double DQN Score per Episode")

plt.show()

## Multi-Step Learning Implementation

In the following code, the double DQN multi-step learning is implemented.

Most of the code is taken from above, just with some slight changes to employ the multi-step learning.

In [None]:
from collections import namedtuple

In [None]:
# Defining experience replay buffer for multi-step DQN
class ExperienceReplayBuffer:
    def __init__(self, capacity, gamma, n_step, seed):
        self.buffer = deque(maxlen=capacity)
        self.n_step_buffer = deque(maxlen=n_step)
        self.n_step = n_step
        self.gamma = gamma
        self.random = random.Random(seed)
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])

    def _get_n_step_info(self):
        R, discount = 0.0, 1.0
        for (_, _, r, _, d) in self.n_step_buffer:
            R += discount * r
            discount *= self.gamma
            if d:
                break
        state, action, _, _, _ = self.n_step_buffer[0]
        _, _, _, next_state, done = self.n_step_buffer[-1]
        return (state, action, R, next_state, done)

    def add(self, state, action, reward, next_state, done):
        self.n_step_buffer.append((state, action, reward, next_state, done))
        if len(self.n_step_buffer) == self.n_step or done:
            experience = self._get_n_step_info()
            self.buffer.append(experience)
            if done:
                self.n_step_buffer.clear()

    def sample(self, batch_size):
        samples = self.random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*samples)
        return (
            torch.tensor(states, dtype=torch.float32),
            torch.tensor(actions, dtype=torch.int64).unsqueeze(1),
            torch.tensor(rewards, dtype=torch.float32).unsqueeze(1),
            torch.tensor(next_states, dtype=torch.float32),
            torch.tensor(dones, dtype=torch.uint8).unsqueeze(1),
        )

    def __len__(self):
        return len(self.buffer)

In [None]:
# Defining Q-network class
class QNetwork(nn.Module):
    def __init__(self, state_size, action_size, hidden_size):
        super().__init__()
        self.fc1 = nn.Linear(state_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, action_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x)

In [None]:
# Agent using Double DQN
class DoubleDQNAgent:
    def __init__(self, state_size, action_size, hidden_size, gamma, alpha,
                 batch_size, buffer_size, update_frequency, n_step, epsilon_schedule, seed):
        self.state_size = state_size
        self.action_size = action_size
        self.gamma = gamma
        self.alpha = alpha
        self.batch_size = batch_size
        self.update_frequency = update_frequency
        self.epsilon_schedule = epsilon_schedule
        self.step_count = 0

        self.q_local = QNetwork(state_size, action_size, hidden_size)
        self.q_target = QNetwork(state_size, action_size, hidden_size)
        self.optimizer = optim.Adam(self.q_local.parameters(), lr=1e-3)

        self.buffer = ExperienceReplayBuffer(buffer_size, gamma, n_step, seed)
        self.random = random.Random(seed)
        torch.manual_seed(seed)

    def act(self, state, episode):
        epsilon = self.epsilon_schedule(episode)
        if self.random.random() > epsilon:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                return torch.argmax(self.q_local(state_tensor)).item()
        return self.random.randint(0, self.action_size - 1)

    def step(self, state, action, reward, next_state, done):
        self.buffer.add(state, action, reward, next_state, done)
        self.step_count += 1

        if len(self.buffer) >= self.batch_size and self.step_count % self.update_frequency == 0:
            self.learn()

    def learn(self):
        states, actions, rewards, next_states, dones = self.buffer.sample(self.batch_size)

        with torch.no_grad():
            best_actions = self.q_local(next_states).argmax(1, keepdim=True)
            q_targets_next = self.q_target(next_states).gather(1, best_actions)
            q_targets = rewards + (self.gamma ** self.buffer.n_step) * q_targets_next * (1 - dones.float())

        q_expected = self.q_local(states).gather(1, actions)
        loss = F.mse_loss(q_expected, q_targets)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Soft update
        for t_param, l_param in zip(self.q_target.parameters(), self.q_local.parameters()):
            t_param.data.copy_(self.alpha * l_param.data + (1.0 - self.alpha) * t_param.data)

In [None]:
# Training loop
def train(agent, env, num_episodes=2000):
    scores = []
    for episode in range(1, num_episodes + 1):
        state = env.reset()
        total_reward = 0
        done = False
        while not done:
            action = agent.act(state, episode)
            next_state, reward, done, _ = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward

        scores.append(total_reward)
        print(f"Episode {episode}\tAverage Score: {np.mean(scores[-100:]):.2f}")
    return scores

In [None]:
# Setting up environment
env = gym.make("CartPole-v1")
state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Shared agent configuration
common_agent_kwargs = {
    "state_size": state_size,
    "action_size": action_size,
    "hidden_size": 64,
    "gamma": 0.99,
    "alpha": 1e-3,
    "batch_size": 64,
    "buffer_size": 100000,
    "update_frequency": 4,
    "epsilon_schedule": lambda ep: max(0.99 ** ep, 0.01),
    "seed": 42,
}

In [None]:
# Multi-step Double DQN (n=3)
multi_step_kwargs = dict(common_agent_kwargs)
multi_step_kwargs["n_step"] = 3
agent_multi = DoubleDQNAgent(**multi_step_kwargs)
scores_multi = train(agent_multi, env, num_episodes=2000)

In [None]:
# Converting to pandas Series
scores_multi = pd.Series(scores_multi, name="Double DQN (3-step)")

fig, ax = plt.subplots(figsize=(15, 3))

scores_multi.plot(ax=ax, label="Double DQN (3-step)", color='blue')
scores_multi.rolling(window=100).mean().rename("Rolling Avg").plot(ax=ax, color='red')
ax.legend(loc='upper left')
ax.set_ylabel("Score")
ax.set_xlabel("Episode")
ax.set_title("3-step Double DQN Score per Episode")

plt.show()

## References

[1] Johnny Code, *"Build a Custom Gymnasium Reinforcement Learning Environment & Train w Q-Learning & Stable Baselines3,"* YouTube, Mar. 22 2024. [Online]. Available: https://www.youtube.com/watch?v=AoGRjPt-vms&t=1251s 

[2] A. Riaz, *Lab_04_Part2_Q_Learning_with_gym,* unpublished lab tutorial, Dept. of Science
and Technology, City St. George's, University of London, 2025.

[3] A. Riaz, *Lab_6_DoubleDQN,* unpublished lab tutorial, Dept. of Science
and Technology, City St. George's, University of London, 2025.