# COMP579 Assignment 2

**Coding: Tabular RL [70 points]**


In [1]:
"""
This module implements and compares SARSA and Expected SARSA algorithms on the FrozenLake environment.
It includes implementations of both algorithms, experiment running functionality, and visualization tools
to analyze the performance under different hyperparameter settings.

The FrozenLake environment presents a challenging reinforcement learning problem due to its
stochastic nature (slippery ice) and sparse rewards (only received upon reaching the goal).
"""


import gym
import numpy as np
import matplotlib.pyplot as plt
from gym.wrappers import StepAPICompatibility
from tqdm import tqdm

In [None]:
def softmax(x, temp):
    exps = np.exp((x - np.max(x)) / temp)
    return exps / np.sum(exps)

In [3]:
def softmax(x, temp):
    x_adj = x - np.max(x)
    exps = np.exp(x_adj / temp)
    return exps / np.sum(exps)

In [4]:
class SARSA:
    """
    Implementation of the SARSA (State-Action-Reward-State-Action) algorithm.
    
    SARSA is an on-policy temporal difference learning algorithm that learns
    Q-values for state-action pairs through direct experience.
    
    Attributes:
        env: OpenAI Gym environment
        alpha (float): Learning rate
        gamma (float): Discount factor for future rewards
        temp (float): Temperature parameter for softmax exploration
        Q (np.array): Q-value table for state-action pairs
    """
    
    def __init__(self, env, alpha, gamma, temp):
        """
        Initialize SARSA agent with given parameters.
        
        Args:
            env: OpenAI Gym environment
            alpha (float): Learning rate for Q-value updates
            gamma (float): Discount factor for future rewards
            temp (float): Temperature parameter for softmax exploration
        """
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.temp = temp
        # Initialize Q-table with zeros for all state-action pairs
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))

    def select_action(self, s, greedy=False):
        """
        Select an action using either greedy or softmax policy.
        
        Args:
            s (int): Current state
            greedy (bool): If True, select best action; if False, use softmax exploration
        
        Returns:
            int: Selected action
        """
        if greedy:
            # During evaluation, always choose the best action
            return np.argmax(self.Q[s])
        else:
            # During training, use softmax exploration
            action_probs = softmax(self.Q[s], self.temp)
            return np.random.choice(np.arange(self.env.action_space.n), p=action_probs)

    def update(self, s, a, r, s_prime, a_prime, done):
        """
        Update Q-values using the SARSA update rule.
        
        Q(s,a) = Q(s,a) + α[r + γQ(s',a') - Q(s,a)]
        
        Args:
            s (int): Current state
            a (int): Current action
            r (float): Reward received
            s_prime (int): Next state
            a_prime (int): Next action
            done (bool): Whether episode has terminated
        """
        # Calculate TD target (next state value is zero if episode is done)
        target = r + self.gamma * self.Q[s_prime, a_prime] * (not done)
        # Calculate TD error
        td_error = target - self.Q[s, a]
        # Update Q-value
        self.Q[s, a] += self.alpha * td_error

class ExpectedSARSA:
    """
    Implementation of the Expected SARSA algorithm.
    
    Expected SARSA is a variant of SARSA that uses the expected value over next
    actions instead of a single sampled action, potentially reducing variance
    in the updates.
    
    Attributes:
        env: OpenAI Gym environment
        alpha (float): Learning rate
        gamma (float): Discount factor for future rewards
        temp (float): Temperature parameter for softmax exploration
        Q (np.array): Q-value table for state-action pairs
    """
    
    def __init__(self, env, alpha, gamma, temp):
        """Initialize Expected SARSA agent with given parameters."""
        self.env = env
        self.alpha = alpha
        self.gamma = gamma
        self.temp = temp
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))

    def select_action(self, s, greedy=False):
        """Select action using either greedy or softmax policy."""
        if greedy:
            return np.argmax(self.Q[s])
        else:
            action_probs = softmax(self.Q[s], self.temp)
            return np.random.choice(np.arange(self.env.action_space.n), p=action_probs)

    def update(self, s, a, r, s_prime, a_prime, done):
        """
        Update Q-values using the Expected SARSA update rule.
        
        Q(s,a) = Q(s,a) + α[r + γΣπ(a'|s')Q(s',a') - Q(s,a)]
        
        The key difference from SARSA is using the expected value over all next actions
        weighted by their policy probabilities, rather than just the value of the
        selected next action.
        """
        # Calculate action probabilities for next state
        action_probs = softmax(self.Q[s_prime], self.temp)
        # Calculate expected value over all actions
        expected_q = np.sum(action_probs * self.Q[s_prime])
        # Calculate TD target using expected value
        target = r + self.gamma * expected_q * (not done)
        td_error = target - self.Q[s, a]
        self.Q[s, a] += self.alpha * td_error

# Write your experiment code below


In [5]:
env_name = 'FrozenLake-v1'
env = gym.make(env_name)
env = StepAPICompatibility(env)
print("Action space:", env.action_space)
print("State space:", env.observation_space)

Action space: Discrete(4)
State space: Discrete(16)


## Sarsa performance


In [6]:
# Parameters
alpha_values = [0.1,0.2,0.5]
temp_values = [0.005,0.01,0.02]
trials = 10
segments = 500
episodes_per_segment = 10  # Training episodes
testing_episode = 1  # Testing episodes per segment

# Initialize the environment
env = gym.make(env_name)

In [None]:
# # function that runs each episode
# def run_experiment(agent_class, env, alpha, temp, segments=500, episodes_per_segment=10, trials=10):
#     training_results = np.zeros((trials, segments))
#     testing_results = np.zeros((trials, segments))
    
#     for trial in tqdm(range(trials)):
#         agent = agent_class(env, alpha, gamma=0.99, temp=temp)
        
#         for segment in range(segments):
#             segment_rewards = []
            
#             # Training episodes
#             for _ in range(episodes_per_segment):
#                 state, _ = env.reset()
#                 done = False
#                 total_reward = 0
                
#                 while not done:
#                     action = agent.select_action(state)
#                     next_state, reward, terminated, truncated, _ = env.step(action)
#                     done = terminated or truncated
                    
#                     next_action = agent.select_action(next_state)
#                     agent.update(state, action, reward, next_state, next_action, done)
#                     action = next_action
                    
#                     state = next_state
#                     total_reward += reward
                
#                 segment_rewards.append(total_reward)
            
#             training_results[trial, segment] = np.mean(segment_rewards)
            
#             # Testing episode
#             state, _ = env.reset()
#             done = False
#             total_reward = 0
            
#             while not done:
#                 action = agent.select_action(state, greedy=True)
#                 next_state, reward, terminated, truncated, _ = env.step(action)
#                 done = terminated or truncated
#                 state = next_state
#                 total_reward += reward
            
#             testing_results[trial, segment] = total_reward
    
#     return training_results, testing_results

In [10]:
# function that runs each episode
def run_experiment(agent_class, env, alpha, temp, segments=500, episodes_per_segment=10, trials=10):
    """
    Run complete experiment for given agent and parameters.
    
    This function runs multiple trials of the learning process, each consisting of
    multiple segments, where each segment contains training episodes followed by
    a testing episode.
    
    Args:
        agent_class: Class of the RL agent (SARSA or Expected SARSA)
        env: OpenAI Gym environment
        alpha (float): Learning rate
        temp (float): Temperature parameter
        segments (int): Number of segments to run
        episodes_per_segment (int): Number of training episodes per segment
        trials (int): Number of independent trials to run
    
    Returns:
        tuple: (training_results, testing_results) containing performance data
    """
    training_results = np.zeros((trials, segments))
    testing_results = np.zeros((trials, segments))
    
    for trial in tqdm(range(trials)):
        # Initialize new agent for each trial
        agent = agent_class(env, alpha, gamma=0.99, temp=temp)
        
        for segment in range(segments):
            segment_rewards = []
            
            # Training episodes
            for _ in range(episodes_per_segment):
                state, _ = env.reset()
                done = False
                total_reward = 0
                
                while not done:
                    # Select and execute action
                    action = agent.select_action(state)
                    next_state, reward, terminated, truncated, _ = env.step(action)
                    done = terminated or truncated
                    
                    # Select next action and update agent
                    next_action = agent.select_action(next_state)
                    agent.update(state, action, reward, next_state, next_action, done)
                    
                    state = next_state
                    action = next_action
                    total_reward += reward
                
                segment_rewards.append(total_reward)
            
            # Store average training performance for this segment
            training_results[trial, segment] = np.mean(segment_rewards)
            
            # Testing episode (greedy policy)
            state, _ = env.reset()
            done = False
            total_reward = 0
            
            while not done:
                action = agent.select_action(state, greedy=True)
                next_state, reward, terminated, truncated, _ = env.step(action)
                done = terminated or truncated
                state = next_state
                total_reward += reward
            
            testing_results[trial, segment] = total_reward
    
    return training_results, testing_results


# Storage for results
sarsa_results = {
    'training': np.zeros((len(alpha_values), len(temp_values), 10, 500)),
    'testing': np.zeros((len(alpha_values), len(temp_values), 10, 500))
}

expected_sarsa_results = {
    'training': np.zeros((len(alpha_values), len(temp_values), 10, 500)),
    'testing': np.zeros((len(alpha_values), len(temp_values), 10, 500))
}

# Run experiments for both algorithms
for i, alpha in enumerate(alpha_values):
    for j, temp in enumerate(temp_values):
        print(f"\nRunning SARSA with alpha={alpha}, temp={temp}")
        train_results, test_results = run_experiment(SARSA, env, alpha, temp)
        sarsa_results['training'][i, j] = train_results
        sarsa_results['testing'][i, j] = test_results
        
        print(f"\nRunning Expected SARSA with alpha={alpha}, temp={temp}")
        train_results, test_results = run_experiment(ExpectedSARSA, env, alpha, temp)
        expected_sarsa_results['training'][i, j] = train_results
        expected_sarsa_results['testing'][i, j] = test_results



In [None]:
# Visualization code
def plot_learning_curves(results, alpha_values, temp_values, title):
    """
    Create detailed learning curves with error bands for all parameter combinations.
    
    Args:
        results (dict): Dictionary containing training and testing results
        alpha_values (list): List of learning rates used
        temp_values (list): List of temperature values used
        title (str): Title for the plot
    """
    plt.figure(figsize=(12, 6))
    
    # Create distinct visual styles for different parameters
    colors = ['#1f77b4', '#ff7f0e', '#2ca02c']
    line_styles = ['-', '--', ':']
    
    for i, alpha in enumerate(alpha_values):
        for j, temp in enumerate(temp_values):
            # Calculate statistics across trials
            mean_returns = np.mean(results['training'][i, j], axis=0)
            std_error = np.std(results['training'][i, j], axis=0) / np.sqrt(10)
            
            # Scale x-axis by episodes per segment
            episodes = np.arange(len(mean_returns)) * 10
            
            # Plot mean line with confidence bands
            plt.plot(episodes, mean_returns, 
                    label=f'α={alpha}, T={temp}',
                    color=colors[i],
                    linestyle=line_styles[j],
                    linewidth=2)
            
            plt.fill_between(episodes, 
                            mean_returns - std_error,
                            mean_returns + std_error,
                            color=colors[i],
                            alpha=0.1)
    
    plt.title(f'{title}\n(Averaged over 10 trials, 10 episodes per segment)')
    plt.xlabel('Training Episodes')
    plt.ylabel('Average Return per Segment')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

# Plot learning curves for both algorithms
plot_learning_curves(sarsa_results, alpha_values, temp_values, 'SARSA Learning Curves')
plot_learning_curves(expected_sarsa_results, alpha_values, temp_values, 'Expected SARSA Learning Curves')