In [1]:
import math, time
from typing import Iterable, List, Tuple, Optional
import numpy as np

import gymnasium as gym
from gymnasium import spaces

import matplotlib.pyplot as plt

In [2]:
# Action indices
UP, RIGHT, DOWN, LEFT = 0, 1, 2, 3

class GridworldEnv(gym.Env):
    """
    4-action gridworld for tabular RL
    - Observation space: Discrete(n_rows * n_cols), state index = r * n_cols + c
    - Action space: Discrete(4) with actions {UP, RIGHT, DOWN, LEFT}
    - Reward: +1.0 at terminal; -0.01 per step otherwise (time penalty).
    - Episode ends when the agent reaches the terminal cell (goal).
    """

    def __init__(self, n_rows=4, n_cols=4, terminal=(None, None), render_mode=None):
        super().__init__()
        self.n_rows, self.n_cols = n_rows, n_cols # grid shape

        # Creating spaces
        self.observation_space = spaces.Discrete(n_rows * n_cols)
        self.action_space = spaces.Discrete(4)

        # Terminal (goal) state: bottom right
        tr, tc = terminal
        if tr is None or tc is None:
            tr, tc = n_rows - 1, n_cols - 1
        self.terminal_s = tr * n_cols + tc

    def reset(self, *, seed=None, options=None):
        """Start a new episode and return agent to starting state"""
        
        super().reset(seed=seed)
        self._state = 0 # starting state (top left)
        return self._state, {}

    def step(self, action):
        """
        Executes one time-step in the Gridworld based on the given action.

        Returns a 5-tuple: (next_state, reward, terminated, truncated)
        - The agent moves one cell in the chosen direction (up, right, down, left).
        - If the move would hit a wall, the agent stays in the same cell.
        - Each step gives a small negative reward (-0.01) to encourage faster solutions.
        - Reaching the goal (terminal cell) gives +1.0 and ends the episode.
        """

        # Decode the current state index into (row, col) coordinates
        r, c = divmod(self._state, self.n_cols)

        # Define how each action changes the agent's position
        dr, dc = {
        UP:    (-1, 0),  # move up decreases the row index
        RIGHT: (0, 1),   # move right increases the column index
        DOWN:  (1, 0),   # move down increases the row index
        LEFT:  (0, -1),  # move left decreases the column index
        }[int(action)]

        # Compute tentative next position 
        nr, nc = r + dr, c + dc

        # Boundary check: if move goes out of bounds, stay put
        if not (0 <= nr < self.n_rows and 0 <= nc < self.n_cols):
            nr, nc = r, c
        
        self._state = nr * self.n_cols + nc # converting new state to integer
        
        # Check for termination condition (episode ends if true)
        terminated = (self._state == self.terminal_s)
        reward = 1.0 if terminated else -0.01
        return self._state, reward, terminated, False, {}

    def draw(self):
        """Print a grid showing agent (A) and goal (T) after each action."""

        grid = []
        for r in range(self.n_rows):
            row = ""
            for c in range(self.n_cols):
                s = r * self.n_cols + c
                if s == self._state:
                    row += "A "
                elif s == self.terminal_s:
                    row += "T "
                else:
                    row += ". "
            grid.append(row)
        print("\n".join(grid), "\n")
    
    def close(self):
     pass

In [7]:
# Create and initialize environment
env = GridworldEnv(n_rows=4, n_cols=4)  # 4x4 Gridworld (simple tabular environment)

# Define hyperparameters
n_states = env.observation_space.n   # total number of discrete states in the grid
n_actions = env.action_space.n       # total number of possible actions (UP, RIGHT, DOWN, LEFT)
alpha = 0.1                          # learning rate (how fast Q-values are updated)
gamma = 0.99                         # discount factor (how much future rewards are valued)
epsilon = 0.1                        # exploration rate (probability of taking a random action)
episodes = 500                       # number of training episodes

# Initialize Q-table
# Q[state, action] represents the expected value of taking a given action in a given state.
# Start with all zeros; the agent will fill this table through experience.
Q = np.zeros((n_states, n_actions))

# SARSA training loop
for episode in range(episodes):
    state, _ = env.reset()

    # Choose an initial action using ε-greedy policy
    # With probability ε, pick a random action (exploration)
    # Otherwise, pick the action with the highest Q-value for the current state (exploitation)
    action = np.random.choice(n_actions) if np.random.rand() < epsilon else np.argmax(Q[state])

    done = False 

    # Loop until the agent reaches the terminal state or is truncated
    while not done:
        # Take the chosen action and observe the outcome
        # The environment returns:
        #   next_state: the new state index
        #   reward: numerical reward for this step
        #   terminated: True if goal reached (episode success)
        #   truncated: True if ended for another reason (like step limit)
        #   _: optional info dictionary (not used here)
        next_state, reward, terminated, truncated, _ = env.step(action)

        done = terminated or truncated

        # Choose the next action using ε-greedy policy (on-policy)
        # SARSA is "on-policy" because it learns the value of the policy that is being followed
        next_action = np.random.choice(n_actions) if np.random.rand() < epsilon else np.argmax(Q[next_state])

        # === SARSA update rule ===
        # Q(s,a) ← Q(s,a) + α * [r + γ * Q(s',a') − Q(s,a)]
        # Where:
        #   s,a = current state and action
        #   r = reward observed
        #   s',a' = next state and next action
        # This update adjusts Q-values to better predict future cumulative rewards.
        Q[state, action] += alpha * (reward + gamma * Q[next_state, next_action] - Q[state, action])

        # Move to the next state-action pair
        state, action = next_state, next_action

print("Learned Q-table:")
print(Q)

Learned Q-table:
[[ 0.48784275  0.47215995  0.77359399  0.21020667]
 [-0.00867544 -0.00847028  0.75873139  0.05229227]
 [-0.00479541 -0.00492178  0.05576012 -0.00553757]
 [-0.003726   -0.00379639  0.00231957 -0.00319894]
 [ 0.3409409   0.77750761  0.24003209  0.47110311]
 [ 0.39121299  0.39851479  0.84707998  0.50754995]
 [-0.00404992  0.06336632  0.69064499 -0.0029872 ]
 [-0.001999   -0.002809    0.70087636 -0.00210672]
 [ 0.09801819  0.75892846 -0.00490024 -0.00390587]
 [ 0.55039614  0.8845723   0.20606779  0.53378964]
 [ 0.29564707  0.96245591  0.58266552  0.55344371]
 [ 0.30613976  0.66931733  1.          0.68587067]
 [-0.002187    0.01267388 -0.003896   -0.0029079 ]
 [-0.001999    0.58510697 -0.0019     -0.001     ]
 [ 0.09350466  0.96566316  0.1468521   0.03428106]
 [ 0.          0.          0.          0.        ]]


In [8]:
# Print the policy learned from the Q-table

policy = np.argmax(Q, axis=1)
arrows = {0:"↑", 1:"→", 2:"↓", 3:"←"}

for r in range(env.n_rows):
    row = []
    for c in range(env.n_cols):
        s = r * env.n_cols + c
        row.append("T" if s == env.terminal_s else arrows[policy[s]])
    print(" ".join(row))

↓ ↓ ↓ ↓
→ ↓ ↓ ↓
→ → → ↓
→ → → T


In [9]:
def evaluate_policy(env, Q, n_episodes=200, max_steps=200):
    """
    Evaluate the policy induced by Q over many episodes.
    Returns metrics in a dict
    """
    wins, steps_list, rewards = 0, [], []
    for _ in range(n_episodes):
        s, _ = env.reset()
        total_reward, steps = 0.0, 0
        done = False
        last_terminated = False

        while not done and steps < max_steps:
            a = int(np.argmax(Q[s]))        
            s, r, terminated, truncated, _ = env.step(a)
            total_reward += r
            steps += 1
            done = terminated or truncated
            last_terminated = terminated

        wins += int(last_terminated)
        steps_list.append(steps)
        rewards.append(total_reward)

    return {
        "episodes": int(n_episodes),
        "success_rate": wins / n_episodes,
        "avg_steps": float(np.mean(steps_list)),
        "avg_reward": float(np.mean(rewards)),
        "min_steps": int(np.min(steps_list)),
        "max_steps": int(np.max(steps_list)),
    }


def random_baseline(env, n_episodes=200, max_steps=200):
    """
    Baseline that acts uniformly at random each step.
    Returns metrics in a dict
    """
    wins, steps_list, rewards = 0, [], []
    for _ in range(n_episodes):
        s, _ = env.reset()
        total_reward, steps = 0.0, 0
        done = False
        last_terminated = False

        while not done and steps < max_steps:
            a = env.action_space.sample()        
            s, r, terminated, truncated, _ = env.step(a)
            total_reward += r
            steps += 1
            done = terminated or truncated
            last_terminated = terminated

        wins += int(last_terminated)
        steps_list.append(steps)
        rewards.append(total_reward)

    return {
        "episodes": int(n_episodes),
        "success_rate": wins / n_episodes,
        "avg_steps": float(np.mean(steps_list)),
        "avg_reward": float(np.mean(rewards)),
        "min_steps": int(np.min(steps_list)),
        "max_steps": int(np.max(steps_list)),
    }


# Compare SARSA vs Random Actions
sarsa_metrics  = evaluate_policy(GridworldEnv(4, 4), Q, n_episodes=200, max_steps=200)
random_metrics = random_baseline(GridworldEnv(4, 4), n_episodes=200, max_steps=200)
print("SARSA policy   :", sarsa_metrics)
print("Random baseline:", random_metrics)

SARSA policy   : {'episodes': 200, 'success_rate': 1.0, 'avg_steps': 6.0, 'avg_reward': 0.9499999999999998, 'min_steps': 6, 'max_steps': 6}
Random baseline: {'episodes': 200, 'success_rate': 0.985, 'avg_steps': 54.35, 'avg_reward': 0.4513499999999997, 'min_steps': 7, 'max_steps': 200}


In [10]:
##  Visualizing the Agent's path through the grid using the learned policy
state, _ = env.reset()
done = False
steps = 0

# Show the starting position
env.draw()

# Run one episode following the greedy policy (no exploration)
while not done and steps < 50:  # 50-step safety cap
    # Choose best action for current state
    action = np.argmax(Q[state])

    # Take the action
    next_state, reward, terminated, truncated, _ = env.step(action)
    done = terminated or truncated

    # Show updated grid
    env.draw()

    # Update state and step count
    state = next_state
    steps += 1

A . . . 
. . . . 
. . . . 
. . . T  

. . . . 
A . . . 
. . . . 
. . . T  

. . . . 
. A . . 
. . . . 
. . . T  

. . . . 
. . . . 
. A . . 
. . . T  

. . . . 
. . . . 
. . A . 
. . . T  

. . . . 
. . . . 
. . . A 
. . . T  

. . . . 
. . . . 
. . . . 
. . . A  

