# 🧩 Lab 8: TD Methods on CartPole 

In previous labs, we focused on **❄️ FrozenLake**, a discrete and low-dimensional environment that helped us understand fundamental ideas such as **Markov decision processes (MDPs)**, **state transitions**, and **tabular value functions**.  

Now, we move beyond grid worlds and explore a **real control problem** — the **🏗️ CartPole environment**, a continuous and dynamic system widely used in reinforcement learning research.

### Algorithms to Implement

1. **Monte Carlo (MC)** – Learn from complete episodes using returns to update value estimates.  
2. **SARSA (On-Policy TD Control)** – Learn from the agent’s actual actions while following an ε-greedy policy.  
3. **Q-Learning (Off-Policy TD Control)** – Learn an optimal policy independent of the behavior policy.


##  Part 1: Understanding the CartPole Environment

We now move from static grid worlds to a **dynamic control system** — the classic `CartPole-v1` environment from **Gymnasium**.

Let’s begin by creating the environment and taking a few random steps to see how it behaves.


In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import HTML
from matplotlib import animation

# Use rgb_array so env.render() returns frames
env = gym.make("CartPole-v1", render_mode="rgb_array")
obs, info = env.reset(seed=0)

# Simple rule: push in the direction that reduces pole angle.
# If theta > 0 (pole tilts to the right), push right; else push left.
def greedy_policy(observation):
    _, _, theta, theta_dot = observation
    return 1 if theta > 0 else 0

frames = []
rewards = []
num_steps = 100

# capture initial frame
frames.append(env.render())

for t in range(num_steps):
    action = greedy_policy(obs)
    obs, r, terminated, truncated, info = env.step(action)
    rewards.append(r)
    frames.append(env.render())
    if terminated or truncated:
        # If episode ends early, reset and keep going until 100 frames collected
        obs, info = env.reset()
        frames.append(env.render())

env.close()

# Stack frames into a numpy array (T, H, W, C), dtype=uint8
frames = np.asarray(frames, dtype=np.uint8)
print(f"Frames shape: {frames.shape}, dtype: {frames.dtype}")

fig, ax = plt.subplots()
ax.set_axis_off()
img = ax.imshow(frames[0])

def animate(i):
    img.set_data(frames[i])
    return [img]

ani = animation.FuncAnimation(fig, animate, frames=len(frames), interval=30, blit=True)
plt.close(fig)  # prevent duplicate static display
HTML(ani.to_jshtml())

---

###  Environment Overview

The **CartPole** environment simulates a classic control problem:  
A pole is attached by an unactuated joint to a cart that moves along a frictionless track.  
The agent must apply forces to the cart (left or right) to keep the pole balanced upright.

**Episode Termination Conditions:**
- The pole angle exceeds **±12°** (≈ ±0.418 radians)  
- The cart position exceeds **±2.4 meters**  
- The episode length reaches **500 time steps**

At each step, the agent receives a **reward of +1** for keeping the pole balanced.


###  Observation (State) Space

The observation returned by the environment is a 4-dimensional continuous vector:

| Index | Variable | Description | Typical Range |
|:------:|-----------|--------------|---------------|
| 0 | `x` | Cart position (m) | −2.4 ~ 2.4 |
| 1 | `x_dot` | Cart velocity (m/s) | −∞ ~ ∞ |
| 2 | `theta` | Pole angle (radians) | −0.418 ~ 0.418 |
| 3 | `theta_dot` | Pole angular velocity (radians/s) | −∞ ~ ∞ |

These continuous values must often be **discretized** into bins for use with tabular RL algorithms such as **Monte Carlo**, **SARSA**, and **Q-Learning**.


###  Action Space

| Action | Meaning |
|:-------:|---------|
| `0` | Push the cart to the **left** |
| `1` | Push the cart to the **right** |

Each step:
1. The agent observes the current state \( s_t \).  
2. Chooses an action \( a_t \in \{0, 1\} \).  
3. Receives a reward \( r_t = +1 \) if the pole remains upright.  

The objective is to **maximize the total cumulative reward**, i.e., keep the pole balanced for as long as possible.

## Part 2: Discretizing the State Space

Unlike the discrete grid world in **FrozenLake**, the **CartPole** environment has a **continuous state space** — each state is represented by four real-valued variables:
$$
s = [x, \dot{x}, \theta, \dot{\theta}]
$$
To use **tabular methods** such as Monte Carlo, SARSA, or Q-learning, we must convert this continuous state into a **discrete representation**.

###  Why Discretize?
- Tabular RL algorithms index the Q-table by discrete states.
- The continuous variables (like pole angle and velocity) take infinitely many values — impossible to store directly.
- We therefore divide each dimension into a finite number of **bins** and map each observation to the corresponding **bin index**.

In [None]:
import numpy as np

n_actions = env.action_space.n
np.set_printoptions(precision=3, suppress=True)

# ----- Discretization -----
NUM_BINS = np.array([6, 6, 12, 12])
STATE_BOUNDS = np.array([
    [-2.4,   2.4],
    [-3.0,   3.0],
    [-0.418, 0.418],
    [-2.0,   2.0]
])

def discretize_state(obs):
    lo, hi = STATE_BOUNDS[:,0], STATE_BOUNDS[:,1]
    ratios = (np.clip(obs, lo, hi) - lo) / (hi - lo)
    bins = (ratios * NUM_BINS).astype(int)
    return tuple(np.clip(bins, 0, NUM_BINS - 1))

In [None]:
# now print the new discrete state space
obs, _ = env.reset(seed=0)
discrete_state = discretize_state(obs)
print("Observation:", obs)
print("Discretized State:", discrete_state)

##  Part 3: Monte Carlo Control on CartPole

Now that we have a **discretized state space**, we can apply **Monte Carlo (MC)** learning to estimate the optimal control policy.

Monte Carlo methods learn directly from **complete episodes** of experience — they wait until an episode finishes, then update the Q-values based on the **total return** from each visited state–action pair.

---

###  Key Idea

For each episode:
1. Generate a trajectory of states, actions, and rewards  
   $$
   (s_0, a_0, r_1, s_1, a_1, r_2, \ldots, s_T)
   $$
2. Compute the **return**
   $$
   G_t = r_{t+1} + \gamma r_{t+2} + \cdots + \gamma^{T-t-1} r_T
   $$
3. Update the Q-value:
   $$
   Q(s_t, a_t) \leftarrow Q(s_t, a_t) + \alpha \big(G_t - Q(s_t, a_t)\big)
   $$
4. Derive the **policy** using ε-greedy exploration:
   $$
   \pi(s) = 
   \begin{cases}
   \arg\max_a Q(s, a) & \text{(exploit)} \\
   \text{random action} & \text{(explore with probability ε)}
   \end{cases}
   $$

---


In [None]:
# ----- ε-greedy policy -----
def greedy_action(Q, s, epsilon):
    if np.random.rand() < epsilon:
        return np.random.randint(n_actions)
    return int(np.argmax(Q[s]))

In [None]:
# ----- Initialize tables -----
Q = np.zeros((*NUM_BINS, n_actions))
N = np.zeros((*NUM_BINS, n_actions), dtype=int)


num_episodes = 5000
gamma = 0.99
epsilon = 1.0
eps_min, eps_decay = 0.05, 0.995
returns_history = []


for ep in range(1, num_episodes + 1):
    obs, _ = env.reset()
    episode = []

    # Your time to work on it 
    done = False
    while not done:
        s = discretize_state(obs)
        a = #    
        obs_next, r, term, trunc, _ = env.step(a)
        episode.append((s, a, r))
        obs = obs_next
        done = term or trunc

    T = len(episode)
    G = 0.0
    returns = np.zeros(T)
    for t in range(T - 1, -1, -1):
        _, _, r = episode[t]
        G = gamma * G + r
        returns[t] = G

    for t, (s, a, _) in enumerate(episode):
        
        N[s][a] = #
        n =  #
        Q[s][a] = #


    epsilon = max(eps_min, epsilon * eps_decay)
    ep_return = sum(r for _, _, r in episode)
    returns_history.append(ep_return)

    if ep % 500 == 0:
        avg = np.mean(returns_history[-100:])
        print(f"Episode {ep:5d} | ε={epsilon:.3f} | AvgReturn(100)={avg:.1f}")

In [None]:
def evaluate_greedy(Q, episodes=20):
    total = 0.0
    for _ in range(episodes):
        obs, _ = env.reset()
        done = False
        ep_ret = 0.0
        while not done:
            s = discretize_state(obs)
            a = np.argmax(Q[s])
            obs, r, term, trunc, _ = env.step(a)
            ep_ret += r
            done = term or trunc
        total += ep_ret
    return total / episodes

avg_eval = evaluate_greedy(Q)
print(f"\nGreedy policy average return: {avg_eval:.1f}")

## Part 4: n-Step SARSA (On-Policy TD Control)

**Goal.** Extend SARSA from 1-step targets to **n-step returns**, trading off bias/variance and enabling smoother learning than pure Monte Carlo.

###  Key Idea
For each time index $ \tau $, form the **n-step target** by accumulating up to $n$ future rewards and (if the episode continues) **bootstrapping** from $Q$ at $S_{\tau+n}, A_{\tau+n}$:
$$
G_{\tau:\tau+n} \;=\; \sum_{i=\tau+1}^{\min(\tau+n,\,T)} \gamma^{\,i-(\tau+1)}\,R_i \;+\; 
\begin{cases}
\gamma^{\,n}\, Q(S_{\tau+n}, A_{\tau+n}) & \text{if } \tau+n < T,\\
0 & \text{otherwise.}
\end{cases}
$$
Update:
$$
Q(S_\tau, A_\tau) \leftarrow Q(S_\tau, A_\tau)\;+\;\alpha\left[\,G_{\tau:\tau+n} - Q(S_\tau, A_\tau)\,\right].
$$

###  Mechanics (buffers & indices)
- Maintain rolling buffers: **states** $S_0,S_1,\dots$, **actions** $A_0,A_1,\dots$, **rewards** $R_1,R_2,\dots$.
- Let $T$ be the time step when the episode terminates (first terminal or truncation).
- At each step $t$, once $ \tau = t-n+1 \ge 0 $, compute the target for $(S_\tau,A_\tau)$.
- Stop when \( \tau = T-1 \).


### 📏 Pseudocode (high level)
1. Reset env; pick $A_0$ by ε-greedy.
2. For $t=0,1,\dots$:
   - Step with $A_t$ → observe $R_{t+1}, S_{t+1}$; pick $A_{t+1}$ ε-greedy (unless terminal).
   - Set $ \tau = t - n + 1 $.  
     If $ \tau \ge 0 $:  
     &nbsp;&nbsp;• Compute $G_{\tau:\tau+n}$ (sum of up to $n$ rewards + bootstrap if $\tau+n<T$).  
     &nbsp;&nbsp;• Update $Q(S_\tau,A_\tau)$ toward $G_{\tau:\tau+n}$.
   - Stop when $ \tau = T-1 $.

In [None]:
# ----- Tables & hyperparams -----
Q = np.zeros((*NUM_BINS, n_actions))
num_episodes = 4000
max_steps = 1000
gamma = 0.99
alpha = 0.1
n = 4                           # n-step horizon (tune me)
epsilon = 1.0
eps_min, eps_decay = 0.05, 0.995

returns_hist = []

for ep in range(1, num_episodes + 1):
    obs, _ = env.reset()
    s0 = discretize_state(obs)
    a0 = greedy_action(Q, s0, epsilon)

    # Buffers (index from 0); r[t] stores r_{t}, so shift by 1 for clarity with algorithm
    S = [s0]            # states S_0, S_1, ...
    A = [a0]            # actions A_0, A_1, ...
    R = [0.0]           # R[0] unused; will append R_1, R_2, ...
    T = np.inf

    t = 0
    ep_return = 0.0

    while True:
        if t < T:
            # Step in env using A_t
            obs_next, r, term, trunc, _ = env.step(A[t])
            ep_return += r
            done = bool(term or trunc)
            R.append(r)                       # this is R_{t+1}
            if done:
                T = t + 1
            else:
                s_next = discretize_state(obs_next)
                S.append(s_next)              # S_{t+1}
                a_next = greedy_action(Q, s_next, epsilon)
                A.append(a_next)              # A_{t+1}

        tau = t - n + 1                       # state to update
        if tau >= 0:
            # Compute G (n-step return starting at tau)
            # G = sum_{i=tau+1}^{min(tau+n, T)} gamma^{i-(tau+1)} R_i

            # Your time to work on it 
            G = 0.0
            upper = int(min(tau + n, T))
            power = 0
            for i in range(tau + 1, upper + 1):
                G = ####
                power = #### 
            if tau + n < T:                   # bootstrap if within episode
                
                G = ######

            s_tau = S[tau]
            a_tau = A[tau]
            Q[s_tau][a_tau]= ##### 

        if tau == T - 1:
            break
        t += 1

    # ε schedule & logging
    epsilon = max(eps_min, epsilon * eps_decay)
    returns_hist.append(ep_return)
    if ep % 500 == 0:
        avg = np.mean(returns_hist[-100:])
        print(f"Episode {ep:5d} | ε={epsilon:.3f} | AvgReturn(100)={avg:.1f}")

In [None]:
avg_eval = evaluate_greedy(Q, episodes=20)
print(f"\nGreedy policy average return (n={n} SARSA): {avg_eval:.1f}")

## Part 5: One-Step Q-Learning (Off-Policy TD Control)


###  Core Update Rule

For each transition $ (s_t, a_t, r_{t+1}, s_{t+1}) $:

$$
Q(s_t, a_t) \;\leftarrow\; Q(s_t, a_t) \;+\;
\alpha \,\Big[\, r_{t+1} \;+\; \gamma \max_{a'} Q(s_{t+1}, a') \;-\; Q(s_t, a_t) \,\Big].
$$

- The **target** term  $ r_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') $  uses the **best possible next action** under the current estimate $Q$.
- Thus, Q-Learning learns an **optimal policy** even while following an **exploratory** (ε-greedy) one.


In [None]:
Q = np.zeros((*NUM_BINS, n_actions), dtype=float)
num_episodes = 4000
gamma = 0.99
alpha = 0.1
epsilon = 1.0
eps_min, eps_decay = 0.05, 0.995
returns_hist = []

for ep in range(1, num_episodes + 1):
    obs, _ = env.reset()
    done = False
    ep_return = 0.0

    while not done:
        s = discretize_state(obs)
        a = greedy_action(Q, s, 1)  # Use random exploration as greedy action

        obs_next, r, term, trunc, _ = env.step(a)
        done = bool(term or trunc)
        ep_return += r

        s_next = discretize_state(obs_next)
        
        # ........ Your time to work on it ........
        if done: 
            td_target = r
        else:
            td_target = #
            
        Q[s][a] = #

        obs = obs_next

    # ε schedule & logging
    epsilon = max(eps_min, epsilon * eps_decay)
    returns_hist.append(ep_return)
    if ep % 500 == 0:
        avg = np.mean(returns_hist[-100:])
        print(f"Episode {ep:5d} | ε={epsilon:.3f} | AvgReturn(100)={avg:.1f}")

In [None]:
avg_eval = evaluate_greedy(Q, episodes=20)
print(f"\nGreedy policy average return (1-step Q-learning): {avg_eval:.1f}")

## 🧭 Part 6: Create our own environment

We implement a tiny **3×3 GridWorld** to learn how to build a custom Gym environment.

- **Start:** $(0,0)$, **Goal:** $(2,2)$  
- **Actions:** $0=\text{Up},\,1=\text{Down},\,2=\text{Left},\,3=\text{Right}$  
- **Rewards:** $+1$ on goal, $-0.01$ per step, $-0.05$ for bumping into a wall  
- **Termination:** reaching the goal or hitting a step limit

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np

# ----------------------------
# 1) Custom 3x3 Grid Environment
# ----------------------------
class Grid3x3Env(gym.Env):
    """
    A tiny 3x3 GridWorld.
      - Start at (0,0), goal at (2,2)
      - Actions: 0=Up, 1=Down, 2=Left, 3=Right
      - Rewards: +1 on reaching goal, -0.01 per step, -0.05 for bumping into a wall (stay in place)
      - Episode ends on goal or step limit
    Observation: Discrete(9) index r*3 + c
    """
    metadata = {"render_modes": ["ansi"]}

    def __init__(self, render_mode=None, max_steps=30):
        super().__init__()
        self.N = 3
        self.action_space = spaces.Discrete(4)
        self.observation_space = spaces.Discrete(self.N * self.N)

        self.start = (0, 0)
        self.goal  = (2, 2)
        self.max_steps = max_steps
        self.render_mode = render_mode

        self._pos = None
        self._steps = 0

        # (dr, dc) for Up, Down, Left, Right
        self._moves = [(-1,0), (1,0), (0,-1), (0,1)]

    def _rc_to_obs(self, r, c): return r * self.N + c
    def _obs_to_rc(self, obs):  return divmod(obs, self.N)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self._pos = self.start
        self._steps = 0
        return self._rc_to_obs(*self._pos), {}

    def step(self, action):
        self._steps += 1
        r, c = self._pos
        dr, dc = self._moves[int(action)]
        nr, nc = r + dr, c + dc

        reward = -0.01
        terminated = False
        truncated = self._steps >= self.max_steps

        # check bounds
        if 0 <= nr < self.N and 0 <= nc < self.N:
            self._pos = (nr, nc)
        else:
            # bump wall: stay & extra penalty
            reward -= 0.05

        if self._pos == self.goal:
            reward = 1.0
            terminated = True

        return self._rc_to_obs(*self._pos), reward, terminated, truncated, {}

    def render(self):
        board = [[" . "]*self.N for _ in range(self.N)]
        gr, gc = self.goal
        board[gr][gc] = "[G]"
        r, c = self._pos
        board[r][c] = " A "
        return "\n".join("".join(row) for row in board)