# Introduction to OpenAI Gymnasium with CartPole

Before diving into policy gradient algorithms, let's get familiar with OpenAI Gymnasium using a simple environment: **CartPole**.

## Goals
* Understand the Gymnasium API
* Connect MDP theory to a concrete environment
* Practice implementing simple agents
* Understand the environment-agent interaction loop


## What is CartPole?

CartPole is a classic reinforcement learning problem. A pole is attached to a cart that moves along a frictionless track. The goal is to keep the pole balanced upright by moving the cart left or right.

Documentation: https://gymnasium.farama.org/environments/classic_control/cart_pole/

# 
# ![CartPole Animation](https://gymnasium.farama.org/_images/cart_pole.gif)
# 


In [None]:
!pip install "gymnasium[classic-control]" numpy matplotlib ipython -q

[0m

In [6]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
from typing import Callable

## CartPole as a Markov Decision Process (MDP)

Recall from the [Policy Gradient notebook](policygradient.ipynb) that an MDP is defined as a tuple $(S, A, P, R, \gamma)$:

* **$S$**: The set of possible states
* **$A$**: The set of possible actions
* **$P$**: Transition probabilities $P_a(s, s') = \Pr(s_{t+1} = s' | s_t = s, a_t = a)$
* **$R$**: Reward function $R_a(s, s')$
* **$\gamma$**: Discount factor

Let's see how CartPole maps to this framework:


In [None]:
# Create the CartPole environment
env = gym.make("CartPole-v1")

print("Observation Space:", env.observation_space)
print("Action Space:", env.action_space)
print("")
print("Number of actions:", env.action_space.n)

Observation Space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Action Space: Discrete(2)

Number of actions: 2


### Understanding the MDP Components

**State Space ($S$)**: A 4-dimensional continuous vector:
1. Cart position
2. Cart velocity
3. Pole angle (in radians)
4. Pole angular velocity

**Action Space ($A$)**: Discrete with 2 choices:
* `0`: Push cart to the left
* `1`: Push cart to the right

**Transition Function ($P$)**: The physics of the cart-pole system (handled by the environment)

**Reward Function ($R$)**: +1 reward for every timestep the pole remains upright

**Discount Factor ($\gamma$)**: We'll set this ourselves when training agents (typically 0.95-0.99)


## The Environment-Agent Loop

The fundamental interaction pattern in RL:

```
1. Environment provides initial observation
2. Agent selects an action based on observation
3. Environment executes the action
4. Environment returns: next observation, reward, done flags
5. Repeat until episode ends
```

Let's see this in action:


In [8]:
# Reset the environment to get initial observation
observation, info = env.reset(seed=42)
print(f"Initial observation: {observation}")
print("")

# Take a single step
action = 1  # Push right
observation, reward, terminated, truncated, info = env.step(action)

print(f"After taking action {action}:")
print(f"  New observation: {observation}")
print(f"  Reward: {reward}")
print(f"  Terminated: {terminated}")
print(f"  Truncated: {truncated}")

env.close()

Initial observation: [ 0.0273956  -0.00611216  0.03585979  0.0197368 ]

After taking action 1:
  New observation: [ 0.02727336  0.18847767  0.03625453 -0.26141977]
  Reward: 1.0
  Terminated: False
  Truncated: False


### Key Gymnasium API Functions

1. **`reset()`**: Resets environment to initial state, returns `(observation, info)`

2. **`step(action)`**: Executes an action, returns `(observation, reward, terminated, truncated, info)`
   - `terminated`: Episode ended due to failure conditions
   - `truncated`: Episode ended due to time limit

3. **`action_space`**: Describes valid actions

4. **`observation_space`**: Describes the observation format


## Helper Functions

(We can just run them) - they render and plot stats about the running trajectories, respectively. :)

In [10]:
def render_episode(agent: Callable[[np.ndarray], int], agent_name: str = "Agent"):
    """
    Render an episode as an animation in the notebook.
    """
    env = gym.make("CartPole-v1", render_mode="rgb_array")
    
    frames = []
    observation, info = env.reset()
    frames.append(env.render())
    
    total_reward = 0
    for step in range(500):
        action = agent(observation)
        observation, reward, terminated, truncated, info = env.step(action)
        frames.append(env.render())
        total_reward += reward
        
        if terminated or truncated:
            break
    
    env.close()
    
    # Create animation
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.set_title(f"{agent_name} (Total Reward: {total_reward})")
    ax.axis('off')
    
    img = ax.imshow(frames[0])
    
    def animate(i):
        img.set_array(frames[i])
        return [img]
    
    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50, blit=True)
    plt.close()
    
    return HTML(anim.to_jshtml())


We can also plot statistics about the episode:


In [11]:
def plot_episode_stats(agent: Callable, agent_name: str):
    """
    Plot the pole angle and cart position over an episode.
    """
    env = gym.make("CartPole-v1")
    observations, actions, rewards, total_reward = run_episode(env, agent)
    env.close()
    
    observations = np.array(observations)
    cart_positions = observations[:, 0]
    pole_angles = observations[:, 2]
    
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6))
    
    ax1.plot(cart_positions, linewidth=2)
    ax1.set_ylabel('Cart Position', fontsize=11)
    ax1.set_title(f'{agent_name} - Episode Performance (Total Reward: {total_reward})', fontsize=13)
    ax1.grid(True, alpha=0.3)
    
    ax2.plot(pole_angles, linewidth=2, color='orange')
    ax2.axhline(y=0, color='gray', linestyle='-', alpha=0.3)
    ax2.set_xlabel('Timestep', fontsize=11)
    ax2.set_ylabel('Pole Angle (radians)', fontsize=11)
    ax2.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()


## Running Episodes and Collecting Trajectories

Here's a function to run a complete episode and collect the trajectory:

As an exercise, replace the '???' with comments denoting what each step does.

In [None]:
def run_episode(env: gym.Env, agent: Callable[[np.ndarray], int], max_steps: int = 500) -> tuple[list, list, list, float]:
    """
    Run a single episode using the given agent.
    
    Args:
        env: Gymnasium environment
        agent: Function that takes observation and returns action
        max_steps: Maximum steps per episode
        
    Returns:
        observations: List of observations
        actions: List of actions taken
        rewards: List of rewards received
        total_reward: Sum of all rewards
    """
    observations = []
    actions = []
    rewards = []
    
    # ???
    observation, info = env.reset()
    
    for step in range(max_steps):
        observations.append(observation)
        
        # ???
        action = agent(observation)
        actions.append(action)
        
        # ???
        observation, reward, terminated, truncated, info = env.step(action)
        rewards.append(reward)
        
        # ???
        if terminated or truncated:
            break
    
    total_reward = sum(rewards)
    return observations, actions, rewards, total_reward


# Exercises: Simple Agents

Now it's your turn! To validate that our environment works as a MDP, try a few simple agent strategies and see how they perform.


## Exercise 1: Random Agent

Implement an agent that randomly selects actions from the action space.


In [None]:
def random_agent(observation: np.ndarray) -> int:
    """
    Agent that randomly samples from the action space.
    
    Args:
        observation: Current observation from environment (not used by this agent)
        
    Returns:
        action: Random action (0 or 1)
    """
    # TODO: Implement random action selection
    # Hint: Understand the action space and generate a random value within it.
    
    raise NotImplementedError()


In [None]:
# Render the random agent
render_episode(random_agent, "Random Agent")


In [None]:
# Plot statistics
plot_episode_stats(random_agent, "Random Agent")

## Exercise 2: Constant Action Agent

Implement an agent that always takes the same action (e.g., always pushes left).

In [None]:
def constant_agent(observation: np.ndarray) -> int:
    """
    Agent that always takes the same action.
    
    Args:
        observation: Current observation from environment
        
    Returns:
        action: Always returns the same action (choose 0 or 1)
    """
    # TODO: Implement constant action strategy
    raise NotImplementedError()


In [None]:
# Render the constant agent
render_episode(constant_agent, "Constant Agent")


In [None]:
# Plot statistics
plot_episode_stats(constant_agent, "Constant Agent")


## Exercise 3: Design Your Own Heuristic Agent

Now try to design a smarter agent that uses the observation to make decisions!

**Observation components** (reminder):
- `observation[0]`: Cart position
- `observation[1]`: Cart velocity
- `observation[2]`: Pole angle (in radians)
- `observation[3]`: Pole angular velocity

**Some ideas to explore**:
- Push in the direction the pole is leaning
- Consider the pole's angular velocity
- Try to keep the cart centered
- Use a combination of multiple observation features

**Challenge**: Can you create a heuristic that achieves 200+ reward consistently?


In [None]:
def heuristic_agent(observation: np.ndarray) -> int:
    """
    Design your own heuristic agent!
    
    Args:
        observation: [cart_pos, cart_vel, pole_angle, pole_angular_vel]
        
    Returns:
        action: 0 (left) or 1 (right)
    """
    # TODO: Implement your heuristic strategy
    # Experiment with different approaches!
    raise NotImplementedError()


In [None]:
# Render your heuristic agent
render_episode(heuristic_agent, "Heuristic Agent")

# May take some time - if your agent is good. : )


In [None]:
# Plot statistics
plot_episode_stats(heuristic_agent, "Heuristic Agent")

# Pytorch Neural Networks

An optional exercise if you're new to Pytorch.

Before we can learn policies with neural networks, we need to understand how PyTorch enables us to compute gradients automatically.

## Neural Networks as Parameterized Functions

A neural network is simply a function $f_\theta(x)$ where:
- $x$ is the input (e.g., an observation)
- $\theta$ are the parameters (weights and biases)
- $f_\theta(x)$ is the output (e.g., action probabilities)

The key insight: we can compute $\nabla_\theta f_\theta(x)$ (the gradient with respect to parameters) automatically using **backpropagation**.


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


## Defining a Simple Network

Let's create a simple feedforward network:


In [None]:
class SimpleNetwork(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        super().__init__()
        # Create two linear layers (nn.Linear) and store them as class attributes
        self.fc1 = 
        self.fc2 =
    
    def forward(self, x):
        x =  # First layer + activation
        x =         # Output layer
        return x

# Create a network: 4 inputs -> 16 hidden -> 2 outputs
net = SimpleNetwork(input_size=4, hidden_size=16, output_size=2)
print(net)


## Forward Pass: Computing $f_\theta(x)$

The forward pass computes the output for a given input:


In [None]:
# Create a dummy input (batch of 1, with 4 features)
x = torch.tensor([[0.1, 0.2, 0.3, 0.4]])

# Forward pass: compute f_θ(x)
output = net(x)
print(f"Input shape: {x.shape}")
print(f"Output shape: {output.shape}")
print(f"Output: {output}")


## Backward Pass: Computing $\nabla_\theta f_\theta(x)$

PyTorch's `.backward()` computes gradients automatically using the chain rule.

Here's the key connection to math:
- When we call `loss.backward()`, PyTorch computes $\nabla_\theta L$ where $L$ is our loss function
- These gradients tell us how to adjust $\theta$ to decrease the loss

Note that the gradient with respect to $\theta$ is always the same shape as $\theta$; if $\theta$ is a vector, $\nabla_\theta f$ is a vector of the same size, and if it's a tensor, $\nabla_\theta f$ is a tensor of the same shape.

In [None]:
# Create a simple loss (e.g., mean of outputs)
loss = output.mean()
print(f"Loss: {loss.item()}")

# Before backward: gradients are None
print(f"\nGradient of fc1.weight before backward: {net.fc1.weight.grad}")

# Compute gradients: ∇_θ loss
loss.backward()

# After backward: gradients are computed!
print(f"Gradient of fc1.weight after backward:")
print(net.fc1.weight.grad)
print(f"Shape: {net.fc1.weight.grad.shape}")


## Optimizers: Using Gradients to Update Parameters

Once we have gradients, we use an optimizer to update parameters:

$$\theta_{new} = \theta_{old} - \alpha \nabla_\theta L$$

where $\alpha$ is the learning rate.


In [None]:
# Create an optimizer
optimizer = torch.optim.Adam(net.parameters(), lr=0.01)

# Training loop pattern:
# 1. Zero out old gradients
optimizer.zero_grad()

# 2. Forward pass
output = net(x)
loss = output.mean()

# 3. Backward pass: compute ∇_θ loss
loss.backward()

# 4. Update parameters: θ_new = θ_old - α * ∇_θ loss
optimizer.step()

print("Parameters updated!")


Consider: 

1. Why do we *subtract* the gradient of the loss?
2. What's $\alpha$ used for?

# Putting It All Together: Neural Network Agent

Now let's combine everything: we'll create a policy network for CartPole and see how to train it!


## Step 1: Define a Policy Network

For CartPole:
- **Input**: 4-dimensional observation (cart position, cart velocity, pole angle, pole angular velocity)
- **Output**: 2-dimensional action probabilities (left, right)


In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, obs_size, hidden_size, action_size):
        super().__init__()
        self.fc1 = nn.Linear(???, hidden_size) # What should the input and output weight shapes be?
        self.fc2 = nn.Linear(hidden_size, ???)
    
    def forward(self, x):
        """
        Forward pass through the network.
        
        Args:
            x: observation tensor of shape (batch_size, obs_size)
            
        Returns:
            action_probs: probability distribution over actions
        """
        x = F.relu(self.fc1(x))
        logits = self.fc2(x)
        # Convert logits to probabilities using softmax
        action_probs = F.softmax(logits, dim=-1)
        return action_probs

# Create the policy network for CartPole
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]  # 4
action_size = env.action_space.n  # 2

policy_net = PolicyNetwork(obs_size=obs_size, hidden_size=32, action_size=action_size)
print(f"Policy network created: {obs_size} -> 32 -> {action_size}")

## Step 2: Create an Agent Using the Network

We'll create an agent that samples actions from the network's output distribution:


In [None]:
class NeuralAgent:
    def __init__(self, policy_network):
        self.policy_net = policy_network
    
    def __call__(self, observation: np.ndarray) -> int:
        """
        Select an action based on the current observation.
        
        Args:
            observation: numpy array of shape (obs_size,)
            
        Returns:
            action: integer action (0 or 1 for CartPole)
        """
        # Convert observation to tensor
        obs_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)  # Add batch dimension
        
        # Get action probabilities from network (no gradient needed for sampling)
        with torch.no_grad():
            action_probs = self.policy_net(obs_tensor)
        
        # Sample an action from the probability distribution
        action = torch.multinomial(action_probs, num_samples=1).item()
        
        return action

# Create an agent with our untrained network
neural_agent = NeuralAgent(policy_net)


Let's see how the untrained agent performs:


In [None]:
# Test the untrained agent
obs, actions, rewards, total_reward = run_episode(env, neural_agent)
print(f"Untrained agent reward: {total_reward}")

# Render it
render_episode(neural_agent, "Untrained Neural Agent")


## Step 3: Collect Trajectories

Before training, we need to collect experience from the environment:


In [None]:
# Collect one episode
observations, actions, rewards, total_reward = run_episode(env, neural_agent)
print(f"Collected {len(observations)} transitions")
print(f"Total reward: {total_reward}")

## Step 4: Implement a Training Step (Exercise)

Now it's your turn! We'll first implement a reward to go computation, which calculates the sum of all rewards from the present timestep until the trajectory's termination. For why we calculate this, refer to the derivation in `policygradient.ipynb` (week 2 reading).

In [None]:
def compute_rewards_to_go(rewards: list[float], gamma: float = 0.99) -> list[float]:
    """
    Compute the discounted reward-to-go for each timestep.
    
    Args:
        rewards: list of rewards [r_0, r_1, ..., r_{T-1}]
        gamma: discount factor
        
    Returns:
        rewards_to_go: list where rewards_to_go[t] = sum_{t'=t}^{T-1} gamma^{t'-t} * r_{t'}
    """
    # TODO: Implement reward-to-go computation
    # Hint: Work backwards from the end of the episode
    # rewards_to_go[T-1] = rewards[T-1]
    # rewards_to_go[t] = rewards[t] + gamma * rewards_to_go[t+1]
    
    raise NotImplementedError()

# Test with simple example
# test_rewards = [1, 1, 1, 10]
# rtg = compute_rewards_to_go(test_rewards, gamma=0.9)
# print(f"Rewards: {test_rewards}")
# print(f"Rewards-to-go: {rtg}")


## Step 4: Implement REINFORCE Training Step

Now we can implement the REINFORCE policy gradient. :) The loss function is:

$$L = -\frac{1}{N}\sum_{i=1}^{N} \sum_{t=0}^{T_i-1} \log \pi_\theta(a_{i,t} | s_{i,t}) \hat{R}_{i,t}$$

where:
- $N$ is the number of trajectories (in our case, we flatten all transitions)
- $\pi_\theta(a_t | s_t)$ is the probability the policy assigns to action $a_t$ given state $s_t$
- $\hat{R}_t$ is the reward-to-go from timestep $t$

When we minimize this loss with gradient descent, we're doing gradient *ascent* on the expected return! Note that we're doing this kind of hacky trick because our optimizer expects a metric to minimize instead of maximize.


In [None]:
def train_step(policy_net, optimizer, observations, actions, rewards, gamma=0.99):
    """
    Perform one REINFORCE training step on the policy network.
    
    Args:
        policy_net: the policy network
        optimizer: PyTorch optimizer
        observations: list of observation arrays
        actions: list of actions taken
        rewards: list of rewards received
        gamma: discount factor for reward-to-go
        
    Returns:
        loss: the computed loss value
    """
    # TODO: Implement the REINFORCE training step
    # 
    # Steps:
    # 1. Compute rewards-to-go using the function above
    # 2. Convert observations to tensor: torch.tensor(np.array(observations), dtype=torch.float32)
    # 3. Convert actions to tensor: torch.tensor(actions, dtype=torch.long)
    # 4. Convert rewards_to_go to tensor: torch.tensor(rewards_to_go, dtype=torch.float32)
    # 5. Get action probabilities from network: action_probs = policy_net(obs_tensor)
    # 6. Get probabilities of actions actually taken:
    #    taken_action_probs = action_probs.gather(1, action_tensor.unsqueeze(1)).squeeze()
    # 7. Compute log probabilities: log_probs = torch.log(taken_action_probs + 1e-10)  # add small epsilon for numerical stability
    # 8. Compute REINFORCE loss: loss = -(log_probs * rtg_tensor).mean()
    # 9. Backprop: optimizer.zero_grad(), loss.backward(), optimizer.step()
    # 10. Return loss.item()
    
    raise NotImplementedError()

# Test your implementation (uncomment after implementing)
# optimizer = torch.optim.Adam(policy_net.parameters(), lr=0.01)
# loss = train_step(policy_net, optimizer, observations, actions, rewards)
# print(f"Loss after training step: {loss:.4f}")


## Full Training Loop

Once you've implemented `train_step`, try running a full training loop:


In [None]:
# Full training loop 

num_episodes = 200

policy_net = PolicyNetwork(obs_size=obs_size, hidden_size=32, action_size=action_size)
optimizer = torch.optim.Adam(policy_net.parameters(), lr=0.01)
neural_agent = NeuralAgent(policy_net)

episode_rewards = []

for episode in range(num_episodes):
    # Collect one episode
    observations, actions, rewards, total_reward = run_episode(env, neural_agent)
    
    # Train on this episode
    loss = train_step(policy_net, optimizer, observations, actions, rewards)
    
    # Track progress
    episode_rewards.append(total_reward)
    
    if episode % 20 == 0:
        print(f"Episode {episode}: Reward = {total_reward:.2f}, Loss = {loss:.4f}")

# Plot training progress
plt.figure(figsize=(10, 5))
plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Episode Reward')
plt.title('Training Progress')
plt.grid(True, alpha=0.3)
plt.show()

# Render the trained agent
render_episode(neural_agent, "Trained Neural Agent")


**Congratulations!** You've just implemented REINFORCE, one of the foundational policy gradient algorithms!

This implementation includes the core concepts, but there are additional improvements you'll learn about later:
- **Baselines**: Reducing variance in gradient estimates by subtracting a baseline from rewards
- **Entropy bonuses**: Encouraging exploration by adding an entropy term to the loss
- **Generalized Advantage Estimation (GAE)**: A more sophisticated way to estimate advantages

These refinements can significantly improve training stability and performance!


In [None]:
# Clean up
env.close()

## Chapter 2: Actor-Critic (A2C)


In [None]:
# run all from part 1 :)

We've successfully trained a policy network using REINFORCE! But let's think carefully about what REINFORCE is actually doing, and where it might struggle.

### What's wrong with REINFORCE? (review from our session)

Recall our REINFORCE gradient:

$$\nabla_\theta J(\theta) \approx \frac{1}{N}\sum_{i=1}^{N} \sum_{t=0}^{T-1} \nabla_\theta \log \pi_\theta(a_t | s_t) \cdot \hat{R}_t$$

This looks reasonable, but there are **two subtle problems** that can hurt learning in practice:

---

**Problem 1: What if all returns are positive?**

Imagine an environment where all trajectories are given positive total returns (e.g you can't lose points). In REINFORCE, the gradient update *increases* the probability of actions proportionally to the return. So even bad trajectories get up-weighted!

In the limit (infinite samples), this averages out correctly—worse trajectories get smaller positive updates than better ones. But in practice, with limited samples, this can significantly slow down learning or cause instability.

**Partial Solution**: Subtract a **baseline** from the returns:
$$\nabla_\theta J(\theta) \approx \mathbb{E}\left[\sum_t \nabla_\theta \log \pi_\theta(a_t | s_t) \cdot (\hat{R}_t - b)\right]$$

A common choice is $b = \bar{R}$ (the batch mean return). This centers our updates: trajectories better than average get pushed up, worse ones get pushed down.

Mathematically, subtracting any baseline $b$ that doesn't depend on actions keeps the gradient unbiased (since $\mathbb{E}[\nabla_\theta \log \pi_\theta(a|s)] = 0$).

---

**Problem 2: The Credit Assignment Problem**

Look at the REINFORCE update again. Notice that **every action in a trajectory gets weighted by the same return** $J$.

But what if we mostly made good decisions, with a few mistakes sprinkled in? The overall trajectory might still have a good return, so REINFORCE will *reinforce all the actions*, including the mistakes.

This is the **credit assignment problem**: which specific actions actually contributed to the reward we received?

REINFORCE doesn't distinguish well. A single unlucky action at $t=50$ that causes failure gets the same treatment as the good actions at $t=0,1,2,...$.

---

### How Actor-Critic Solves These Problems

**Actor-Critic** (specifically, **Advantage Actor-Critic** or **A2C**) addresses both issues by learning a **value function** alongside the policy:

1. **Against Problem 1**: Instead of using raw returns, we use **advantages** $A_t = Q(s_t, a_t) - V(s_t)$. The advantage measures "how much better was this action compared to the average action in this state?" This naturally centers our updates—good actions have positive advantage, bad actions have negative advantage.

2. **Against Problem 2**: We estimate value at each timestep using a learned **critic** $V_\phi(s)$. This lets us compute **per-step advantages** using the TD error:
$$\delta_t = r_t + \gamma V_\phi(s_{t+1}) - V_\phi(s_t)$$

The TD error asks: "Given what we expected from state $s_t$, did this action lead to a better or worse outcome?" This provides much finer-grained credit assignment—each action is evaluated based on its immediate consequence plus the value of where we ended up.

---

### The Actor-Critic Architecture

We now have **two networks**:

| Network | Name | What it learns | Output |
|---------|------|----------------|--------|
| $\pi_\theta(a\|s)$ | **Actor** (Policy) | Which actions to take | Probability distribution over actions |
| $V_\phi(s)$ | **Critic** (Value) | How good is each state | Single scalar value |

The critic helps the actor learn more efficiently by providing a learned baseline and enabling per-step credit assignment.


In [None]:
class ValueNetwork(nn.Module):
    """
    The Critic: estimates V(s), the expected return from state s.
    """
    def __init__(self, obs_size, hidden_size):
        super().__init__()
        self.fc1 = nn.Linear(obs_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, 1)  # Single value output
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.fc3(x).squeeze(-1)  # Shape: (batch_size,)

# Quick test
test_value_net = ValueNetwork(obs_size=4, hidden_size=32)
test_obs = torch.randn(1, 4)
print(f"Value network output shape: {test_value_net(test_obs).shape}")
print(f"Estimated value: {test_value_net(test_obs).item():.3f}")


### The Temporal Difference (TD) Error

The heart of actor-critic is the **TD error** $\delta_t$:

$$\delta_t = r_t + \gamma V_\phi(s_{t+1}) - V_\phi(s_t)$$

Let's break this down:
- $V_\phi(s_t)$: What we *expected* to get from state $s_t$
- $r_t + \gamma V_\phi(s_{t+1})$: What we *actually* got (immediate reward + discounted value of next state)
- $\delta_t$: The **surprise**—how much better (or worse) things turned out than expected

**Interpreting the TD error:**
- $\delta_t > 0$: "This action led to a better outcome than I expected!" → Increase probability of this action
- $\delta_t < 0$: "This action led to a worse outcome than I expected!" → Decrease probability of this action
- $\delta_t \approx 0$: "Things went about as expected" → Small or no update

**Why is the TD error a good advantage estimate?**

Recall that the advantage is $A(s, a) = Q(s, a) - V(s)$, measuring how much better action $a$ is compared to the average. 

The proof is a bit more involved, but it turns out that $\mathbb{E}[\delta_t | s_t, a_t] = A(s_t, a_t)$ - e.g, the expected value of our defined $\delta$ is equivalent to our advantage, which means that we're right on average.

<details>
<summary><b>Consider: Why does bootstrapping help with credit assignment?</b></summary>

<br>

In REINFORCE, if we take a bad action at timestep $t=50$ that eventually leads to failure at $t=100$, the return-to-go at $t=50$ will be low. But so will the returns at $t=48, 49, 51, 52$, etc.—even if those were perfectly good actions! The "blame" gets spread across many timesteps.

With TD learning, the critic learns $V(s)$ for each state. When we take that bad action at $t=50$:
- $V(s_{50})$ reflects the expected value *before* the bad action
- $V(s_{51})$ reflects the expected value *after* the bad action
- If the action was truly bad, $r_{50} + \gamma V(s_{51}) < V(s_{50})$, giving $\delta_{50} < 0$

The TD error *isolates* the effect of each action by comparing "before" and "after" states. This is much more precise credit assignment!

</details>


### Loss Functions for Actor-Critic

We have two networks, so we need two loss functions:

---

**Critic Loss (Training the Value Network)**

We want $V_\phi(s)$ to accurately predict the expected return. Using TD learning, our target is $r_t + \gamma V_\phi(s_{t+1})$, and we minimize the squared error:

$$L_{critic} = \frac{1}{N}\sum_t \left(V_\phi(s_t) - (r_t + \gamma V_\phi(s_{t+1}))\right)^2 = \frac{1}{N}\sum_t \delta_t^2$$

This is just mean squared error (MSE) between our prediction and the TD target.

**Important**: When computing the TD target $r_t + \gamma V_\phi(s_{t+1})$, we typically **stop gradients** through $V_\phi(s_{t+1})$. Otherwise, the network could "cheat" by making both $V(s_t)$ and $V(s_{t+1})$ wrong in a consistent way.

---

**Actor Loss (Training the Policy Network)**

Same as REINFORCE, but we use the TD error $\delta_t$ as our advantage estimate:

$$L_{actor} = -\frac{1}{N}\sum_t \log \pi_\theta(a_t | s_t) \cdot \delta_t$$

**Important**: Call $\delta_t$.detach() when computing this loss. We do not want policy gradients flowing back through the value network!


<details>
<summary><b>Consider: Why do we stop gradients for the TD target?</b></summary>

<br>

Suppose we did not stop gradients. The critic loss is:
$$L = (V_\phi(s_t) - r_t - \gamma V_\phi(s_{t+1}))^2$$

Gradients would flow through both $V_\phi(s_t)$ and $V_\phi(s_{t+1})$. The network could minimize this loss by making both predictions wrong in a way that cancels out—for example, always predicting 0 for everything!

By stopping gradients through $V_\phi(s_{t+1})$, we are saying: "treat $r_t + \gamma V_\phi(s_{t+1})$ as a fixed target, and update $V_\phi(s_t)$ to match it." This is the standard approach in TD learning.

</details>


### Exercise: Implement the Actor-Critic Training Step

Now it's your turn! Implement a training step that:

1. Computes the TD target: $y_t = r_t + \gamma V_\phi(s_{t+1})$ (with no gradient through $s_{t+1}$)
2. Computes the critic loss: MSE between $V_\phi(s_t)$ and $y_t$
3. Updates the critic
4. Computes TD errors (advantages): $\delta_t = y_t - V_\phi(s_t)$ (detached)
5. Computes the actor loss: $-\mathbb{E}[\log \pi_\theta(a_t|s_t) \cdot \delta_t]$
6. Updates the actor

First, we need a modified episode collection function that returns transitions (including next states and done flags):


In [None]:
def run_episode_ac(env, agent, max_steps=1000):
    """
    Run an episode and collect transitions for actor-critic training.
    
    Unlike our previous run_episode, we also need:
    - next_observations: the state we transitioned TO after each action
    - dones: whether each transition was terminal
    
    Returns:
        observations, actions, rewards, next_observations, dones, total_reward
    """
    observations = []
    actions = []
    rewards = []
    next_observations = []
    dones = []
    
    observation, info = env.reset()
    
    for step in range(max_steps):
        observations.append(observation)
        
        action = agent(observation)
        actions.append(action)
        
        next_observation, reward, terminated, truncated, info = env.step(action)
        rewards.append(reward)
        next_observations.append(next_observation)
        dones.append(float(terminated))  # 1.0 if truly terminal, 0.0 otherwise
        
        observation = next_observation
        
        if terminated or truncated:
            break
    
    total_reward = sum(rewards)
    return observations, actions, rewards, next_observations, dones, total_reward


Now implement the training step. Remember:
- Use `torch.no_grad()` or `.detach()` to stop gradients where needed
- Handle terminal states: if `done=True`, then $V(s_{t+1}) = 0$ (no future rewards)


In [None]:
def actor_critic_train_step(policy_net, value_net, policy_optimizer, value_optimizer,
                            observations, actions, rewards, next_observations, dones, 
                            gamma=0.99):
    """
    Perform one Actor-Critic training step.
    
    Args:
        policy_net: the actor network pi_theta
        value_net: the critic network V_phi
        policy_optimizer: optimizer for actor
        value_optimizer: optimizer for critic
        observations: list/array of states s_t
        actions: list/array of actions taken a_t
        rewards: list/array of rewards r_t
        next_observations: list/array of next states s_{t+1}
        dones: list/array of done flags (1.0 if terminal, 0.0 otherwise)
        gamma: discount factor
        
    Returns:
        actor_loss, critic_loss: the loss values
    """
    # TODO: Convert to tensors
    # obs_tensor = torch.tensor(np.array(observations), dtype=torch.float32)
    # next_obs_tensor = ...
    # action_tensor = ...
    # reward_tensor = ...
    # done_tensor = ...
    
    # ===== Step 1: Compute TD Target =====
    # TODO: y_t = r_t + gamma * V(s_{t+1}) * (1 - done)
    # Use torch.no_grad() to prevent gradients through the target!
    # If terminal (done=1), next value should be 0
    
    # ===== Step 2: Critic Loss =====
    # TODO: MSE between V(s_t) and the TD target
    
    # ===== Step 3: Update Critic =====
    # TODO: zero_grad, backward, step
    
    # ===== Step 4: Compute Advantages (TD Errors) =====
    # TODO: delta_t = y_t - V(s_t), detached so no gradients flow to critic
    
    # ===== Step 5: Actor Loss =====
    # TODO: -E[log pi(a|s) * advantage]
    
    # ===== Step 6: Update Actor =====
    # TODO: zero_grad, backward, step
    
    raise NotImplementedError()


### Training Actor-Critic on CartPole

Let us first verify our implementation works on CartPole, then we will tackle a harder environment.

Note: A2C takes some time to "spin up" - e.g the critic takes some time to learn anything at all before it can begin to propagate a valid learning signal to the actor. This means that A2C doesn't necessarily learn faster than REINFORCE on simple environments, and will take some number of steps before its performance will improve. On harder environments, we'll see that it learns much faster than REINFORCE does.


In [None]:
# Create environment
env = gym.make("CartPole-v1")
obs_size = env.observation_space.shape[0]  # 4
action_size = env.action_space.n  # 2

# Create networks
policy_net = PolicyNetwork(obs_size=obs_size, hidden_size=32, action_size=action_size)
value_net = ValueNetwork(obs_size=obs_size, hidden_size=32)

# Create optimizers
policy_optimizer = torch.optim.Adam(policy_net.parameters(), lr=1e-2)
value_optimizer = torch.optim.Adam(value_net.parameters(), lr=1e-2)

# Create agent
ac_agent = NeuralAgent(policy_net)

# Training loop
num_episodes = 1000
episode_rewards = []
actor_losses = []
critic_losses = []

for episode in range(num_episodes):
    # Collect episode
    obs, actions, rewards, next_obs, dones, total_reward = run_episode_ac(env, ac_agent)
    
    # Train
    actor_loss, critic_loss = actor_critic_train_step(
        policy_net, value_net, policy_optimizer, value_optimizer,
        obs, actions, rewards, next_obs, dones
    )
    
    episode_rewards.append(total_reward)
    actor_losses.append(actor_loss)
    critic_losses.append(critic_loss)
    
    if episode % 50 == 0:
        avg_reward = np.mean(episode_rewards[-50:]) if len(episode_rewards) >= 50 else np.mean(episode_rewards)
        print(f"Episode {episode}: Reward = {total_reward:.1f}, Avg(50) = {avg_reward:.1f}, "
              f"Actor Loss = {actor_loss:.3f}, Critic Loss = {critic_loss:.3f}")

print(f"\nFinal average reward (last 50): {np.mean(episode_rewards[-50:]):.1f}")


In [None]:
# Plot training progress
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Episode rewards
axes[0].plot(episode_rewards, alpha=0.6, label='Episode reward')
if len(episode_rewards) >= 50:
    smoothed = np.convolve(episode_rewards, np.ones(50)/50, mode='valid')
    axes[0].plot(range(49, len(episode_rewards)), smoothed, color='red', linewidth=2, label='50-ep average')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Reward')
axes[0].set_title('Episode Rewards')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Actor loss
axes[1].plot(actor_losses, alpha=0.6)
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('Loss')
axes[1].set_title('Actor Loss')
axes[1].grid(True, alpha=0.3)

# Critic loss
axes[2].plot(critic_losses, alpha=0.6)
axes[2].set_xlabel('Episode')
axes[2].set_ylabel('Loss')
axes[2].set_title('Critic Loss')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
# Render the trained agent
render_episode(ac_agent, "Actor-Critic Agent (CartPole)")

In [None]:
env.close()

### Challenge: LunarLander-v3

CartPole is a great testbed, but we are saturating on it—both REINFORCE and Actor-Critic solve it quickly, making it hard to see the difference! Let us try a harder environment where Actor-Critic's advantages really shine: **LunarLander-v3**.

**About LunarLander:**
- **State space**: 8-dimensional (position, velocity, angle, angular velocity, leg contact)
- **Action space**: 4 discrete actions (do nothing, fire left engine, fire main engine, fire right engine)
- **Reward**: +100 to +140 for landing, -100 for crash, small rewards for moving toward landing pad, penalties for using fuel
- **Goal**: Land safely on the landing pad (between the flags)

This environment is significantly harder:
- Larger state and action spaces
- Sparse rewards (most signal comes at the end)
- Need to learn precise control

<details>
<summary><b>Consider: Why might Actor-Critic outperform REINFORCE on LunarLander?</b></summary>

<br>

LunarLander episodes can be long, and the final outcome (crash vs. safe landing) dominates the return. With REINFORCE, if we crash at the very end, *all* actions in the trajectory get blamed equally—even the good ones at the start!

With Actor-Critic:
1. The critic learns the value of different states, providing per-step feedback
2. Good early actions that put us in high-value states get positive TD errors
3. The fatal action that caused the crash gets a very negative TD error
4. Credit is assigned more precisely to the actions that actually mattered

</details>


In [None]:
# Install box2d for LunarLander (if needed)
%pip install "gymnasium[box2d]" -q


In [None]:
# Create LunarLander environment
ll_env = gym.make("LunarLander-v3")
ll_obs_size = ll_env.observation_space.shape[0]  # 8
ll_action_size = ll_env.action_space.n  # 4

print(f"LunarLander observation space: {ll_env.observation_space}")
print(f"LunarLander action space: {ll_env.action_space}")
print(f"Obs size: {ll_obs_size}, Action size: {ll_action_size}")


In [None]:
# Create networks for LunarLander (larger hidden size for harder task)
ll_policy_net = PolicyNetwork(obs_size=ll_obs_size, hidden_size=128, action_size=ll_action_size)
ll_value_net = ValueNetwork(obs_size=ll_obs_size, hidden_size=128)

# Create optimizers
ll_policy_optimizer = torch.optim.Adam(ll_policy_net.parameters(), lr=1e-3)
ll_value_optimizer = torch.optim.Adam(ll_value_net.parameters(), lr=1e-3)

# Create agent
ll_agent = NeuralAgent(ll_policy_net)

# Training loop
num_episodes = 2000 # may take a while
ll_episode_rewards = []
ll_actor_losses = []
ll_critic_losses = []

for episode in range(num_episodes):
    # Collect episode
    obs, actions, rewards, next_obs, dones, total_reward = run_episode_ac(ll_env, ll_agent)
    
    # Train
    actor_loss, critic_loss = actor_critic_train_step(
        ll_policy_net, ll_value_net, ll_policy_optimizer, ll_value_optimizer,
        obs, actions, rewards, next_obs, dones, gamma=0.95
    )
    
    ll_episode_rewards.append(total_reward)
    ll_actor_losses.append(actor_loss)
    ll_critic_losses.append(critic_loss)
    
    if episode % 100 == 0:
        avg_reward = np.mean(ll_episode_rewards[-100:]) if len(ll_episode_rewards) >= 100 else np.mean(ll_episode_rewards)
        print(f"Episode {episode}: Reward = {total_reward:.1f}, Avg(100) = {avg_reward:.1f}, "
              f"Actor Loss = {actor_loss:.3f}, Critic Loss = {critic_loss:.3f}")

print(f"\nFinal average reward (last 100): {np.mean(ll_episode_rewards[-100:]):.1f}")


In [None]:
# Plot LunarLander training progress
fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Episode rewards
axes[0].plot(ll_episode_rewards, alpha=0.4, label='Episode reward')
if len(ll_episode_rewards) >= 100:
    smoothed = np.convolve(ll_episode_rewards, np.ones(100)/100, mode='valid')
    axes[0].plot(range(99, len(ll_episode_rewards)), smoothed, color='red', linewidth=2, label='100-ep average')
axes[0].axhline(y=200, color='green', linestyle='--', alpha=0.5, label='Solved threshold')
axes[0].set_xlabel('Episode')
axes[0].set_ylabel('Reward')
axes[0].set_title('LunarLander Episode Rewards')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Actor loss
axes[1].plot(ll_actor_losses, alpha=0.6)
axes[1].set_xlabel('Episode')
axes[1].set_ylabel('Loss')
axes[1].set_title('Actor Loss')
axes[1].grid(True, alpha=0.3)

# Critic loss
axes[2].plot(ll_critic_losses, alpha=0.6)
axes[2].set_xlabel('Episode')
axes[2].set_ylabel('Loss')
axes[2].set_title('Critic Loss')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
def render_lunar_lander(agent, agent_name="Agent"):
    """Render a LunarLander episode."""
    env = gym.make("LunarLander-v3", render_mode="rgb_array")
    
    frames = []
    observation, info = env.reset()
    frames.append(env.render())
    
    total_reward = 0
    for step in range(1000):
        action = agent(observation)
        observation, reward, terminated, truncated, info = env.step(action)
        frames.append(env.render())
        total_reward += reward
        
        if terminated or truncated:
            break
    
    env.close()
    
    # Create animation
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.set_title(f"{agent_name} (Total Reward: {total_reward:.1f})")
    ax.axis('off')
    
    img = ax.imshow(frames[0])
    
    def animate(i):
        img.set_array(frames[i])
        return [img]
    
    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=50, blit=True)
    plt.close()
    
    return HTML(anim.to_jshtml())

render_lunar_lander(ll_agent, "Actor-Critic Agent (LunarLander)")


In [None]:
ll_env.close()


### To remember: REINFORCE vs Actor-Critic

| Aspect | REINFORCE | Actor-Critic (A2C) |
|--------|-----------|-------------------|
| **Networks** | Policy only | Policy (actor) + Value (critic) |
| **Weight for updates** | Reward-to-go $\hat{R}_t$ | TD error $\delta_t$ (advantage) |
| **Baseline** | None (or batch mean) | Learned value function $V(s)$ |
| **Credit assignment** | All actions weighted by trajectory return | Per-step: each action judged by immediate outcome |
| **Variance** | High | Lower (bootstrapping from learned $V$) |
| **Bias** | Unbiased | Slightly biased (imperfect $V$) |

**Key takeaways:**

1. **Actor-Critic reduces variance** by using a learned baseline $V(s)$ instead of raw returns

2. **TD errors provide better credit assignment** by comparing "before" and "after" states for each action

3. **The critic learns to predict value**, which helps both as a baseline and for computing per-step advantages

4. **The tradeoff**: We introduce some bias (our $V$ is not perfect), but the variance reduction typically leads to faster, more stable learning

**What is next?** There are many ways to extend Actor-Critic:
- **GAE (Generalized Advantage Estimation)**: Interpolate between TD(0) and full returns
- **Entropy bonus**: Encourage exploration by adding policy entropy to the loss
- **A3C/A2C parallelism**: Run multiple environments in parallel for faster data collection
- **PPO (Proximal Policy Optimization)**: Constrain policy updates for stability
