Q-learning implementation

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt  # Import matplotlib for plotting

# Initialize Taxi environment
env = gym.make('Taxi-v3')

# Initialize Q-table
Q = np.zeros([env.observation_space.n, env.action_space.n])

# Hyperparameters
alpha = 0.1  # Learning rate
gamma = 0.95  # Discount factor
epsilon = 1.0  # Exploration rate
epsilon_min = 0.01
decay_rate = 0.995
episodes = 10000

# List to store total rewards for each episode
rewards = []

# Q-learning training loop
for episode in range(episodes):
    state = env.reset()[0]
    done = False
    total_rewards = 0

    while not done:
        # Epsilon-greedy action selection
        action = np.random.choice(env.action_space.n) if np.random.rand() < epsilon else np.argmax(Q[state])
        
        # Take the action and observe the outcome
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        # Update Q-table using Q-learning formula
        Q[state, action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
        
        # Move to the next state
        state = next_state
        total_rewards += reward

    # Append total rewards for this episode to the rewards list
    rewards.append(total_rewards)

    # Decay epsilon
    epsilon = max(epsilon_min, epsilon * decay_rate)

    # Print progress every 100 episodes
    if episode % 100 == 0:
        print(f"Episode {episode}: Total Rewards = {total_rewards}")

# Save Q-table to disk (optional)
np.save("Q_table.npy", Q)

# Plot the total rewards over episodes
plt.plot(rewards)
plt.xlabel('Episodes')
plt.ylabel('Total Rewards')
plt.title('Q-learning Agent Performance Over Time')
plt.show()

print("Training complete!")

Evaluation of Q-learning agent compared to heuristic and random policy

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import os

# Ensure the results folder exists
if not os.path.exists('results'):
    os.makedirs('results')

# Initialize the Taxi environment
env = gym.make('Taxi-v3')

# Load the Q-learning agent (pre-trained Q-table)
Q = np.load("Q_table.npy")  # Make sure you saved this after training the agent

# Define the heuristic-based policy
def heuristic_policy(env, state):
    """A simple heuristic policy for Taxi-v3."""
    taxi_row, taxi_col, passenger, destination = env.decode(state)

    # Locations of pickup and dropoff points in the 5x5 grid
    locations = [(0, 0), (0, 4), (4, 0), (4, 3)]

    # If the passenger is not in the taxi (passenger < 4), move towards the passenger
    if passenger < 4:
        passenger_row, passenger_col = locations[passenger]
        if taxi_row < passenger_row:
            return 1  # South
        elif taxi_row > passenger_row:
            return 0  # North
        elif taxi_col < passenger_col:
            return 2  # East
        elif taxi_col > passenger_col:
            return 3  # West
        else:
            return 4  # Pick-up
    # If the passenger is in the taxi (passenger == 4), move towards the destination
    else:
        destination_row, destination_col = locations[destination]
        if taxi_row < destination_row:
            return 1  # South
        elif taxi_row > destination_row:
            return 0  # North
        elif taxi_col < destination_col:
            return 2  # East
        elif taxi_col > destination_col:
            return 3  # West
        else:
            return 5  # Drop-off

# Evaluation function for Q-learning agent
def test_q_learning_agent(env, Q, episodes=100):
    rewards = []
    steps = []

    for episode in range(episodes):
        state = env.reset()[0]
        total_rewards = 0
        done = False
        step_count = 0

        while not done:
            action = np.argmax(Q[state])  # Choose best action using Q-table
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_rewards += reward
            step_count += 1
            state = next_state

        rewards.append(total_rewards)
        steps.append(step_count)

    return rewards, steps

# Evaluation function for random policy
def test_random_policy(env, episodes=100):
    rewards = []
    steps = []

    for episode in range(episodes):
        state = env.reset()[0]
        total_rewards = 0
        done = False
        step_count = 0

        while not done:
            action = env.action_space.sample()  # Random action
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_rewards += reward
            step_count += 1
            state = next_state

        rewards.append(total_rewards)
        steps.append(step_count)

    return rewards, steps

# Evaluation function for heuristic policy
def test_heuristic_policy(env, episodes=100):
    rewards = []
    steps = []

    for episode in range(episodes):
        state = env.reset()[0]
        total_rewards = 0
        done = False
        step_count = 0

        while not done:
            action = heuristic_policy(env, state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_rewards += reward
            step_count += 1
            state = next_state

        rewards.append(total_rewards)
        steps.append(step_count)

    return rewards, steps

# Number of episodes for evaluation
episodes = 100

# Test Q-learning agent
q_learning_rewards, q_learning_steps = test_q_learning_agent(env, Q)

# Test random policy
random_rewards, random_steps = test_random_policy(env, episodes)

# Test heuristic policy
heuristic_rewards, heuristic_steps = test_heuristic_policy(env, episodes)

# Plot and compare cumulative rewards
plt.plot(q_learning_rewards, label='Q-learning Agent')
plt.plot(random_rewards, label='Random Policy')
plt.plot(heuristic_rewards, label='Heuristic Policy')
plt.xlabel('Episodes')
plt.ylabel('Cumulative Rewards')
plt.title('Comparison of Agent, Random, and Heuristic Policies - Cumulative Rewards')
plt.legend()
plt.savefig('results/rewards_comparison.png')  # Save plot to results folder
plt.show()

# Plot and compare steps taken
plt.plot(q_learning_steps, label='Q-learning Agent')
plt.plot(random_steps, label='Random Policy')
plt.plot(heuristic_steps, label='Heuristic Policy')
plt.xlabel('Episodes')
plt.ylabel('Steps Taken')
plt.title('Comparison of Agent, Random, and Heuristic Policies - Steps Taken')
plt.legend()
plt.savefig('results/steps_comparison.png')  # Save plot to results folder
plt.show()

Optimization and comparison with other algorithms (SARSA & REINFORCE)

In [None]:
import gym
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim


# Load Taxi environment
env = gym.make('Taxi-v3')

# Suppress the deprecated warning from Gym
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# SARSA Algorithm (on-policy learning)
def sarsa(env, alpha=0.1, gamma=0.95, epsilon=1.0, episodes=10000):
    Q = np.zeros([env.observation_space.n, env.action_space.n])
    rewards = []
    steps = []

    for episode in range(episodes):
        state = env.reset()[0]
        action = np.random.choice(env.action_space.n) if np.random.rand() < epsilon else np.argmax(Q[state])
        done = False
        total_reward = 0
        step_count = 0

        while not done:
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            next_action = np.random.choice(env.action_space.n) if np.random.rand() < epsilon else np.argmax(Q[next_state])

            # SARSA update
            Q[state, action] += alpha * (reward + gamma * Q[next_state, next_action] - Q[state, action])

            state = next_state
            action = next_action
            total_reward += reward
            step_count += 1

        # Decay epsilon
        epsilon = max(0.01, epsilon * 0.99)

        # Store total reward and steps for this episode
        rewards.append(total_reward)
        steps.append(step_count)

        # Print progress every 100 episodes
        if episode % 100 == 0:
            print(f"SARSA - Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Steps: {step_count}")

    return Q, rewards, steps

# Q-Learning Algorithm (off-policy learning)
def q_learning(env, alpha=0.1, gamma=0.99, epsilon=1.0, episodes=10000, epsilon_min=0.01, decay_rate=0.995):
    Q = np.zeros([env.observation_space.n, env.action_space.n])
    rewards = []
    steps = []

    for episode in range(episodes):
        state = env.reset()[0]
        done = False
        total_rewards = 0
        step_count = 0

        while not done:
            action = np.random.choice(env.action_space.n) if np.random.rand() < epsilon else np.argmax(Q[state])
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            Q[state, action] += alpha * (reward + gamma * np.max(Q[next_state]) - Q[state, action])
            state = next_state
            total_rewards += reward
            step_count += 1

        rewards.append(total_rewards)
        steps.append(step_count)
        epsilon = max(epsilon_min, epsilon * decay_rate)

        if episode % 100 == 0:
            print(f"Q-learning - Episode {episode}: Total Rewards = {total_rewards}, Steps = {step_count}")

    return Q, rewards, steps

# Define the Policy Network for REINFORCE
class PolicyNetwork(nn.Module):
    def __init__(self, input_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.output = nn.Linear(64, output_size)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return torch.softmax(self.output(x), dim=-1)

# Function to calculate discounted rewards with baseline subtraction
def compute_discounted_rewards(rewards, gamma=0.99):
    discounted_rewards = []
    cumulative_reward = 0
    for reward in reversed(rewards):
        cumulative_reward = reward + gamma * cumulative_reward
        discounted_rewards.insert(0, cumulative_reward)
    discounted_rewards = torch.tensor(discounted_rewards)
    return discounted_rewards - discounted_rewards.mean()

# Reward shaping function
def get_shaped_reward(env, state, action):
    taxi_row, taxi_col, passenger, destination = env.decode(state)
    locations = [(0, 0), (0, 4), (4, 0), (4, 3)]
    target_row, target_col = locations[passenger if passenger < 4 else destination]
    if action == 0:
        taxi_row = min(taxi_row + 1, 4)
    elif action == 1:
        taxi_row = max(taxi_row - 1, 0)
    elif action == 2:
        taxi_col = min(taxi_col + 1, 4)
    elif action == 3:
        taxi_col = max(taxi_col - 1, 0)
    old_distance = abs(taxi_row - target_row) + abs(taxi_col - target_col)
    new_distance = abs(taxi_row - target_row) + abs(taxi_col - target_col)
    return 0.1 if new_distance < old_distance else -0.1

# REINFORCE algorithm with reward shaping
def reinforce(env, policy, optimizer, gamma=0.99, episodes=1000):
    total_rewards = []
    steps_taken = []

    for episode in range(episodes):
        state = env.reset()[0]
        done = False
        episode_rewards = []
        episode_log_probs = []
        step_count = 0

        while not done:
            state_tensor = torch.eye(env.observation_space.n)[state]  # One-hot encoding of the state
            action_probs = policy(state_tensor)
            distribution = torch.distributions.Categorical(action_probs)
            action = distribution.sample()
            log_prob = distribution.log_prob(action)

            next_state, reward, terminated, truncated, _ = env.step(action.item())
            done = terminated or truncated

            reward += get_shaped_reward(env, state, action.item())
            episode_rewards.append(reward)
            episode_log_probs.append(log_prob)
            step_count += 1
            state = next_state

        discounted_rewards = compute_discounted_rewards(episode_rewards, gamma)
        loss = -torch.sum(torch.stack([log_prob * reward for log_prob, reward in zip(episode_log_probs, discounted_rewards)]))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_reward = sum(episode_rewards)
        total_rewards.append(total_reward)
        steps_taken.append(step_count)

        if episode % 100 == 0:
            print(f"REINFORCE - Episode {episode + 1}/{episodes}, Total Reward: {total_reward}, Steps: {step_count}")

    return total_rewards, steps_taken

# Initialize and train REINFORCE policy network
policy_net = PolicyNetwork(env.observation_space.n, env.action_space.n)
optimizer = optim.Adam(policy_net.parameters(), lr=0.001)
rewards_reinforce, steps_reinforce = reinforce(env, policy_net, optimizer, episodes=1000)

# Train and compare SARSA and Q-learning
Q_sarsa, rewards_sarsa, steps_sarsa = sarsa(env)
Q_q_learning, rewards_q_learning, steps_q_learning = q_learning(env, episodes=10000)

# Individual Plot for SARSA
plt.figure(figsize=(10, 6))
plt.plot(range(len(rewards_sarsa)), rewards_sarsa, label='SARSA - Rewards', color='b')
plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.title('SARSA - Total Rewards Over Episodes')
plt.legend()
plt.grid(True)
plt.savefig('results/SARSA_rewards.png')  # Save plot to results folder
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(range(len(steps_sarsa)), steps_sarsa, label='SARSA - Steps', color='b')
plt.xlabel('Episodes')
plt.ylabel('Steps Taken')
plt.title('SARSA - Steps Taken Over Episodes')
plt.legend()
plt.grid(True)
plt.savefig('results/SARSA_steps.png')  # Save plot to results folder
plt.show()

# Individual Plot for REINFORCE
plt.figure(figsize=(10, 6))
plt.plot(range(len(rewards_reinforce)), rewards_reinforce, label='REINFORCE - Rewards', color='g')
plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.title('REINFORCE - Total Rewards Over Episodes')
plt.legend()
plt.grid(True)
plt.savefig('results/REINFORCE_rewards.png')  # Save plot to results folder
plt.show()

plt.figure(figsize=(10, 6))
plt.plot(range(len(steps_reinforce)), steps_reinforce, label='REINFORCE - Steps', color='g')
plt.xlabel('Episodes')
plt.ylabel('Steps Taken')
plt.title('REINFORCE - Steps Taken Over Episodes')
plt.legend()
plt.grid(True)
plt.savefig('results/REINFROCE_steps.png')  # Save plot to results folder
plt.show()

# Plot comparison of rewards for SARSA, Q-learning, and REINFORCE
plt.figure(figsize=(10, 6))
plt.plot(range(len(rewards_sarsa)), rewards_sarsa, label='SARSA - Rewards', color='b', linestyle='-', linewidth=1.5)
plt.plot(range(len(rewards_q_learning)), rewards_q_learning, label='Q-learning - Rewards', color='r', linestyle='--', linewidth=1.5)
plt.plot(range(len(rewards_reinforce)), rewards_reinforce, label='REINFORCE - Rewards', color='g', linestyle='-.', linewidth=1.5)
plt.xlabel('Episodes')
plt.ylabel('Total Reward')
plt.title('SARSA vs Q-learning vs REINFORCE - Total Rewards')
plt.legend()
plt.grid(True)
plt.savefig('results/Q_SARSA_REINFROCE_rewards.png')  # Save plot to results folder
plt.show()

# Plot comparison of steps taken for SARSA, Q-learning, and REINFORCE
plt.figure(figsize=(10, 6))
plt.plot(range(len(steps_sarsa)), steps_sarsa, label='SARSA - Steps', color='b', linestyle='-', linewidth=1.5)
plt.plot(range(len(steps_q_learning)), steps_q_learning, label='Q-learning - Steps', color='r', linestyle='--', linewidth=1.5)
plt.plot(range(len(steps_reinforce)), steps_reinforce, label='REINFORCE - Steps', color='g', linestyle='-.', linewidth=1.5)
plt.xlabel('Episodes')
plt.ylabel('Steps Taken')
plt.title('SARSA vs Q-learning vs REINFORCE - Steps Taken')
plt.legend()
plt.grid(True)
plt.savefig('results/Q_SARSA_REINFORCE_steps.png')  # Save plot to results folder
plt.show()