In [1]:
#Set up the environment
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

In [2]:
def softmax(x, temp):
    """Compute softmax values for action probabilities."""
    exp_values = np.exp((x - np.max(x)) / temp)  # Subtract max for numerical stability
    return exp_values / np.sum(exp_values)

In [3]:
class Sarsa:
    def __init__(self, env, alpha, gamma, temp):
        """Initialize the SARSA agent."""
        self.env = env
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.temp = temp    # Temperature for softmax exploration
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))  # Q-table

    def select_action(self, s, greedy=False):
        """Select an action using softmax exploration or greedily."""
        if greedy:
            # Choose the action with the highest Q-value (exploitation)
            return np.argmax(self.Q[s])
        else:
            # Use softmax exploration
            action_probs = softmax(self.Q[s], self.temp)
            return np.random.choice(len(action_probs), p=action_probs)

    def update(self, s, a, r, s_prime, a_prime, done):
        """Update the Q-table using the SARSA update rule."""
        if done:
            target = r  # No next state if episode is done
        else:
            target = r + self.gamma * self.Q[s_prime, a_prime]
        self.Q[s, a] += self.alpha * (target - self.Q[s, a])


class ExpectedSarsa:
    def __init__(self, env, alpha, gamma, temp):
        """Initialize the Expected SARSA agent."""
        self.env = env
        self.alpha = alpha  # Learning rate
        self.gamma = gamma  # Discount factor
        self.temp = temp    # Temperature for softmax exploration
        self.Q = np.zeros((env.observation_space.n, env.action_space.n))  # Q-table

    def select_action(self, s, greedy=False):
        """Select an action using softmax exploration or greedily."""
        if greedy:
            # Choose the action with the highest Q-value (exploitation)
            return np.argmax(self.Q[s])
        else:
            # Use softmax exploration
            action_probs = softmax(self.Q[s], self.temp)
            return np.random.choice(len(action_probs), p=action_probs)

    def update(self, s, a, r, s_prime, done):
        """Update the Q-table using the Expected SARSA update rule."""
        if done:
            target = r  # No next state if episode is done
        else:
            # Compute the expected value of the next state
            action_probs = softmax(self.Q[s_prime], self.temp)
            expected_value = np.sum(action_probs * self.Q[s_prime])
            target = r + self.gamma * expected_value
        self.Q[s, a] += self.alpha * (target - self.Q[s, a])

In [4]:
def run_trial(env, agent_class, alpha, gamma, temp, num_segments=500, num_training_episodes=10):
    """Run a single trial and return training and testing returns."""
    agent = agent_class(env, alpha, gamma, temp)
    training_returns = []  # Stores average training returns per segment
    testing_returns = []   # Stores testing returns per segment

    for segment in range(num_segments):
        # Training phase (10 episodes)
        training_rewards = []
        for _ in range(num_training_episodes):
            s, _ = env.reset()
            a = agent.select_action(s, greedy=False)
            total_reward = 0

            while True:
                s_prime, r, done, truncated, _ = env.step(a)
                a_prime = agent.select_action(s_prime, greedy=False)
                if isinstance(agent, ExpectedSarsa):
                    agent.update(s, a, r, s_prime, done)
                else:
                    agent.update(s, a, r, s_prime, a_prime, done)
                total_reward += r
                s, a = s_prime, a_prime

                if done or truncated:
                    break

            training_rewards.append(total_reward)

        # Testing phase (1 episode)
        s, _ = env.reset()
        total_reward = 0
        while True:
            a = agent.select_action(s, greedy=True)
            s_prime, r, done, truncated, _ = env.step(a)
            total_reward += r
            s = s_prime

            if done or truncated:
                break

        # Store results
        training_returns.append(np.mean(training_rewards))  # Average training return
        testing_returns.append(total_reward)  # Testing return

    return training_returns, testing_returns


def run_experiment(env, agent_class, alphas, gammas, temps, num_trials=10):
    """Run experiments for all hyperparameter combinations."""
    results = {}

    for alpha in alphas:
        for gamma in gammas:
            for temp in temps:
                key = f"alpha={alpha}, gamma={gamma}, temp={temp}"
                print(f"Running {agent_class.__name__} with {key}")

                final_training_returns_all_trials = []

                for trial in range(num_trials):
                    training_returns, _ = run_trial(env, agent_class, alpha, gamma, temp)
                    # Store the average of the last 10 training returns for this trial
                    final_training_returns_all_trials.append(np.mean(training_returns[-10:]))

                # Store results for this hyperparameter combination
                results[key] = {
                    "alpha": alpha,
                    "temp": temp,
                    "mean_return": np.mean(final_training_returns_all_trials),
                    "min_return": np.min(final_training_returns_all_trials),
                    "max_return": np.max(final_training_returns_all_trials),
                }

    return results

In [5]:
def plot_final_training_performance(results, title, temps):
    """Plot the final training performance for different hyperparameters."""
    plt.figure(figsize=(10, 6))

    # Group results by temperature
    for temp in temps:
        alpha_values = []
        mean_returns = []
        min_returns = []
        max_returns = []

        for key, data in results.items():
            if data["temp"] == temp:
                alpha_values.append(data["alpha"])
                mean_returns.append(data["mean_return"])
                min_returns.append(data["min_return"])
                max_returns.append(data["max_return"])

        # Sort by alpha for plotting
        sorted_indices = np.argsort(alpha_values)
        alpha_values = np.array(alpha_values)[sorted_indices]
        mean_returns = np.array(mean_returns)[sorted_indices]
        min_returns = np.array(min_returns)[sorted_indices]
        max_returns = np.array(max_returns)[sorted_indices]

        # Plot mean and uncertainty (min/max)
        plt.plot(alpha_values, mean_returns, label=f"Temp={temp}")
        plt.fill_between(alpha_values, min_returns, max_returns, alpha=0.2)

    plt.xlabel("Learning Rate (alpha)")
    plt.ylabel("Final Training Return")
    plt.title(title)
    plt.legend()
    plt.grid()
    plt.show()

In [None]:
# Main code
env_name = 'Taxi-v3'
env = gym.make(env_name)
print("Action space:", env.action_space)
print("State space:", env.observation_space)

# Hyperparameters to test
alphas = [0.1, 0.5, 0.9]  # Learning rates
gammas = [0.9]            # Discount factor (fixed for simplicity)
temps = [0.1, 1.0, 10.0]  # Temperatures for softmax

# Run experiments for SARSA and Expected SARSA
sarsa_results = run_experiment(env, Sarsa, alphas, gammas, temps)
expected_sarsa_results = run_experiment(env, ExpectedSarsa, alphas, gammas, temps)

# Plot final training performance
plot_final_training_performance(sarsa_results, "SARSA Final Training Performance", temps)
plot_final_training_performance(expected_sarsa_results, "Expected SARSA Final Training Performance", temps)

Action space: Discrete(6)
State space: Discrete(500)
Running Sarsa with alpha=0.1, gamma=0.9, temp=0.1
