<a href="https://colab.research.google.com/github/sujithh1110/reinforcement-learning/blob/main/lab04.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# frozenlake_td0_sarsa.py
# Gymnasium + NumPy only. TD(0) policy evaluation and SARSA control on FrozenLake-v1.
# Works on CPU; no extras required.

import gymnasium as gym
import numpy as np
from typing import Tuple

# ----------------------------
# Utilities
# ----------------------------
def make_random_policy(nS: int, nA: int) -> np.ndarray:
    """Equiprobable random policy: π(a|s) = 1/nA."""
    policy = np.ones((nS, nA), dtype=np.float64) / nA
    return policy

def epsilon_greedy_probs(Q: np.ndarray, state: int, epsilon: float) -> np.ndarray:
    """Return action probabilities for ε-greedy over Q(state, ·)."""
    nA = Q.shape[1]
    probs = np.ones(nA, dtype=np.float64) * (epsilon / nA)
    best_a = np.argmax(Q[state])
    probs[best_a] += 1.0 - epsilon
    return probs

def run_episode(env, policy_probs: np.ndarray, max_steps: int, rng: np.random.Generator) -> Tuple[float, int]:
    """Run one episode following a (stochastic) policy, return total reward and steps."""
    obs, _ = env.reset()
    total_r = 0.0
    steps = 0
    for _ in range(max_steps):
        a = rng.choice(env.action_space.n, p=policy_probs[obs])
        next_obs, r, terminated, truncated, _ = env.step(a)
        total_r += r
        steps += 1
        obs = next_obs
        if terminated or truncated:
            break
    return total_r, steps

# ----------------------------
# TD(0) Policy Evaluation
# ----------------------------
def td0_policy_evaluation(
    env,
    policy_probs: np.ndarray,
    alpha: float = 0.1,
    gamma: float = 0.99,
    num_episodes: int = 20_000,
    max_steps: int = 200,
    seed: int = 0,
) -> np.ndarray:
    """
    On-policy TD(0) state-value estimation under the given (stochastic) policy.
    V(s) <- V(s) + α [ R + γ V(s') - V(s) ]
    """
    rng = np.random.default_rng(seed)
    nS = env.observation_space.n
    V = np.zeros(nS, dtype=np.float64)

    for ep in range(num_episodes):
        s, _ = env.reset(seed=None)
        for t in range(max_steps):
            a = rng.choice(env.action_space.n, p=policy_probs[s])
            s_next, r, terminated, truncated, _ = env.step(a)
            target = r + (0.0 if (terminated or truncated) else gamma * V[s_next])
            V[s] += alpha * (target - V[s])
            s = s_next
            if terminated or truncated:
                break
    return V

# ----------------------------
# SARSA Control (on-policy)
# ----------------------------
def sarsa_control(
    env,
    alpha: float = 0.1,
    gamma: float = 0.99,
    epsilon: float = 1.0,
    epsilon_min: float = 0.05,
    epsilon_decay: float = 0.9995,
    num_episodes: int = 50_000,
    max_steps: int = 200,
    seed: int = 0,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    On-policy SARSA to learn Q and the ε-greedy policy derived from Q.
    Update: Q[s,a] <- Q[s,a] + α [ r + γ Q[s', a'] - Q[s,a] ]
    """
    rng = np.random.default_rng(seed)
    nS = env.observation_space.n
    nA = env.action_space.n
    Q = np.zeros((nS, nA), dtype=np.float64)

    for ep in range(num_episodes):
        # (Optional) seed only first reset for determinism of runs, not every episode.
        s, _ = env.reset()
        # ε-greedy action
        a = rng.choice(nA, p=epsilon_greedy_probs(Q, s, epsilon))

        for t in range(max_steps):
            s_next, r, terminated, truncated, _ = env.step(a)
            if terminated or truncated:
                td_target = r  # terminal next state's value = 0
                Q[s, a] += alpha * (td_target - Q[s, a])
                break

            # Choose next action a' using ε-greedy on next state
            a_next = rng.choice(nA, p=epsilon_greedy_probs(Q, s_next, epsilon))

            td_target = r + gamma * Q[s_next, a_next]
            Q[s, a] += alpha * (td_target - Q[s, a])

            s, a = s_next, a_next

        # Decay epsilon after each episode (on-policy)
        epsilon = max(epsilon_min, epsilon * epsilon_decay)

    # Derive greedy policy from Q
    greedy_policy = np.zeros_like(Q)
    best_actions = np.argmax(Q, axis=1)
    greedy_policy[np.arange(nS), best_actions] = 1.0
    return Q, greedy_policy

# ----------------------------
# Evaluation helpers
# ----------------------------
def evaluate_policy(
    env,
    policy_probs: np.ndarray,
    episodes: int = 1_000,
    max_steps: int = 200,
    seed: int = 123,
) -> float:
    """Average episodic return following a (stochastic) policy."""
    rng = np.random.default_rng(seed)
    total = 0.0
    for _ in range(episodes):
        r, _ = run_episode(env, policy_probs, max_steps, rng)
        total += r
    return total / episodes

def value_from_Q(Q: np.ndarray, policy_probs: np.ndarray, gamma: float = 0.99, tol: float = 1e-10) -> np.ndarray:
    """
    (Optional) Compute V^π from Q by V(s) = Σ_a π(a|s) Q(s,a). For deterministic policies, this equals max_a Q(s,a).
    """
    return (policy_probs * Q).sum(axis=1)

# ----------------------------
# Main demo
# ----------------------------
if __name__ == "__main__":
    # You can flip is_slippery to False for an easier, deterministic environment.
    env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=True)

    nS = env.observation_space.n
    nA = env.action_space.n
    print(f"Observation space (states): {nS}, Action space: {nA}")

    # ----- TD(0) policy evaluation on a random policy -----
    random_policy = make_random_policy(nS, nA)
    V_td0 = td0_policy_evaluation(
        env,
        random_policy,
        alpha=0.05,
        gamma=0.99,
        num_episodes=25_000,
        max_steps=200,
        seed=42,
    )
    print("\nTD(0) Value Function under equiprobable-random policy (first 10 states):")
    print(np.round(V_td0[:10], 4))

    # Evaluate how good that random policy is (it’s usually poor on FrozenLake)
    rand_return = evaluate_policy(env, random_policy, episodes=1_000, max_steps=200)
    print(f"Average return of random policy over 1000 episodes: {rand_return:.3f}")

    # ----- SARSA control to learn a policy -----
    Q, greedy_policy = sarsa_control(
        env,
        alpha=0.1,
        gamma=0.99,
        epsilon=1.0,
        epsilon_min=0.05,
        epsilon_decay=0.999,  # slower decay for more exploration on slippery variant
        num_episodes=60_000,
        max_steps=200,
        seed=123,
    )

    # Evaluate learned (greedy) policy
    learned_return = evaluate_policy(env, greedy_policy, episodes=2_000, max_steps=200)
    print(f"\nAverage return of SARSA-learned greedy policy over 2000 episodes: {learned_return:.3f}")

    # Show first 10 states' greedy actions and their Q-values
    actions_map = {0: "←", 1: "↓", 2: "→", 3: "↑"}
    best_actions = np.argmax(Q, axis=1)
    print("\nGreedy action (first 10 states):", [actions_map[a] for a in best_actions[:10]])
    print("\nSample Q-values (first 3 states):")
    for s in range(min(3, nS)):
        print(f"State {s}: {np.round(Q[s], 3)}")

Observation space (states): 16, Action space: 4

TD(0) Value Function under equiprobable-random policy (first 10 states):
[0.0089 0.007  0.0121 0.0082 0.0111 0.     0.0192 0.     0.0214 0.0634]
Average return of random policy over 1000 episodes: 0.013

Average return of SARSA-learned greedy policy over 2000 episodes: 0.728

Greedy action (first 10 states): ['←', '↑', '←', '↑', '←', '←', '←', '←', '↑', '↓']

Sample Q-values (first 3 states):
State 0: [0.406 0.368 0.37  0.38 ]
State 1: [0.259 0.164 0.133 0.344]
State 2: [0.295 0.263 0.244 0.258]
