# Week 3: Lab Solutions - REINFORCE for CartPole


This notebook contains complete solutions with detailed explanations.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import gymnasium as gym
from matplotlib import animation
from IPython.display import HTML

np.random.seed(42)
torch.manual_seed(42)

---

## Task 1 Solution: Policy Network

**Solution:**
- Hidden dimension: 32 neurons (a good default for small problems)
- Activation: ReLU (most common choice)

In [None]:
class PolicyNetwork(nn.Module):
    """Neural network policy for CartPole."""
    
    def __init__(self, state_dim=4, action_dim=2):
        super().__init__()
        
        # SOLUTION: Set hidden dimension to 32
        hidden_dim = 32
        
        # SOLUTION: Use ReLU activation
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),  # SOLUTION
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, state):
        """Forward pass: state ‚Üí action logits."""
        return self.network(state)


# Test the network
policy = PolicyNetwork()
test_state = torch.FloatTensor([0.0, 0.0, 0.1, 0.0])
logits = policy(test_state)

print(f"Input state shape: {test_state.shape}")
print(f"Output logits shape: {logits.shape}")
print(f"Output logits: {logits}")
print("\n‚úì Network architecture correct!")

---

## Task 2 Solution: Action Selection

**Key concepts:**
1. Use `softmax` to convert logits ‚Üí probabilities
2. Use `torch.multinomial` to sample from probability distribution
3. Return both action and log probability (needed for gradient)

In [None]:
class PolicyNetwork(nn.Module):
    """Neural network policy for CartPole."""
    
    def __init__(self, state_dim=4, action_dim=2):
        super().__init__()
        hidden_dim = 32
        
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim)
        )
    
    def forward(self, state):
        """Forward pass: state ‚Üí action logits."""
        return self.network(state)
    
    def select_action(self, state):
        """
        Sample an action from the policy.
        """
        state = torch.FloatTensor(state)
        logits = self.forward(state)
        
        # SOLUTION: Convert logits to probabilities using softmax
        probs = torch.softmax(logits, dim=-1)
        
        # SOLUTION: Sample action from categorical distribution
        action = torch.multinomial(probs, 1).item()
        
        # Compute log probability
        log_prob = torch.log(probs[action])
        
        return action, log_prob


# Test action selection
policy = PolicyNetwork()
test_state = np.array([0.0, 0.0, 0.1, 0.0])

print("Testing action selection (5 trials):")
for i in range(5):
    action, log_prob = policy.select_action(test_state)
    prob = torch.exp(log_prob).item()
    print(f"  Trial {i+1}: action={action}, log_prob={log_prob.item():.3f}, prob={prob:.3f}")

print("\n‚úì Action selection working correctly!")

---

## Task 3 Solution: Computing Discounted Returns

**The core of REINFORCE!**

**Algorithm:**
```
R = 0
For each reward r (from end to start):
    R = r + Œ≥ * R
    Insert R at beginning of returns
```

**Why backwards?** Because G_t depends on G_{t+1}, so we need to compute from the end.

**Example:**
- Rewards: [1, 1, 1, 1, 1]
- Œ≥ = 0.9
- Returns:
  - G_4 = 1
  - G_3 = 1 + 0.9 * 1 = 1.9
  - G_2 = 1 + 0.9 * 1.9 = 2.71
  - G_1 = 1 + 0.9 * 2.71 = 3.439
  - G_0 = 1 + 0.9 * 3.439 = 4.0951

In [None]:
def compute_returns(rewards, gamma=0.99):
    """
    Compute discounted returns for each timestep.
    
    G_t = r_t + Œ≥*r_{t+1} + Œ≥¬≤*r_{t+2} + ...
    """
    returns = []
    R = 0  # Running return
    
    # SOLUTION: Iterate backwards through rewards
    for r in reversed(rewards):
        # SOLUTION: Bellman equation for return
        R = r + gamma * R
        # SOLUTION: Insert at beginning to maintain order
        returns.insert(0, R)
    
    return torch.FloatTensor(returns)


# Test the implementation
test_rewards = [1, 1, 1, 1, 1]
test_gamma = 0.9

returns = compute_returns(test_rewards, test_gamma)

print(f"Rewards: {test_rewards}")
print(f"Gamma: {test_gamma}")
print(f"\nComputed returns: {returns}")

# Manual calculation for verification
print("\nManual verification:")
print(f"  G_4 = 1 = {1}")
print(f"  G_3 = 1 + 0.9*1 = {1 + 0.9*1}")
print(f"  G_2 = 1 + 0.9*1.9 = {1 + 0.9*1.9}")
print(f"  G_1 = 1 + 0.9*2.71 = {1 + 0.9*2.71}")
print(f"  G_0 = 1 + 0.9*3.439 = {1 + 0.9*3.439}")

expected = torch.FloatTensor([4.0951, 3.439, 2.71, 1.9, 1.0])
if torch.allclose(returns, expected, atol=0.01):
    print("\n‚úì Correct! Returns match expected values.")
else:
    print(f"\n‚ö†Ô∏è  Expected: {expected}")

---

## Task 4 Solution: Policy Gradient Loss

**The REINFORCE loss:**
$$L = -\sum_{t=0}^T \log \pi_\theta(a_t|s_t) \cdot G_t$$

**Intuition:**
- High return G_t ‚Üí increase log probability ‚Üí increase P(action)
- Low return G_t ‚Üí decrease log probability ‚Üí decrease P(action)
- Negative sign: minimize loss = maximize return

**Two implementation approaches:**
1. Loop version (explicit)
2. Vectorized version (efficient)

In [None]:
def compute_policy_loss(log_probs, returns):
    """
    Compute REINFORCE policy gradient loss.
    """
    # SOLUTION (Method 1): Loop version
    loss = 0
    for log_prob, G in zip(log_probs, returns):
        loss += -log_prob * G
    
    return loss


def compute_policy_loss_vectorized(log_probs, returns):
    """
    SOLUTION (Method 2): Vectorized version - more efficient.
    """
    # Stack log probs into a single tensor
    log_probs_tensor = torch.stack(log_probs)
    # Element-wise multiply and sum
    loss = -(log_probs_tensor * returns).sum()
    return loss


# Test both implementations
test_log_probs = [
    torch.tensor(-0.5),
    torch.tensor(-0.7),
    torch.tensor(-0.6)
]
test_returns = torch.FloatTensor([3.0, 2.0, 1.0])

loss1 = compute_policy_loss(test_log_probs, test_returns)
loss2 = compute_policy_loss_vectorized(test_log_probs, test_returns)

print(f"Log probs: {[lp.item() for lp in test_log_probs]}")
print(f"Returns: {test_returns.tolist()}")
print(f"\nLoop version loss: {loss1.item():.3f}")
print(f"Vectorized loss: {loss2.item():.3f}")

# Manual calculation:
# L = -(-0.5*3.0 + -0.7*2.0 + -0.6*1.0) = -(-1.5 - 1.4 - 0.6) = 3.5
expected_loss = 3.5
print(f"\nExpected loss: {expected_loss}")

if abs(loss1.item() - expected_loss) < 0.01:
    print("\n‚úì Correct! Both methods produce the expected loss.")
else:
    print("\n‚ö†Ô∏è  Check calculation")

---

## Complete Training Loop

Now we put everything together and train the agent!

In [None]:
def train_reinforce(env, policy, optimizer, n_episodes=1000, gamma=0.99):
    """
    Train policy using REINFORCE algorithm.
    Uses all the functions we implemented above.
    """
    episode_rewards = []
    
    for episode in range(n_episodes):
        # Sample an episode
        log_probs = []
        rewards = []
        
        state, _ = env.reset()
        done = False
        
        while not done:
            # Select action
            action, log_prob = policy.select_action(state)
            
            # Take action
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # Store
            log_probs.append(log_prob)
            rewards.append(reward)
            state = next_state
        
        # Compute returns
        returns = compute_returns(rewards, gamma)
        
        # Normalize returns (variance reduction)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        
        # Compute loss
        loss = compute_policy_loss(log_probs, returns)
        
        # Update policy
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Track performance
        episode_rewards.append(sum(rewards))
        
        # Print progress
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}: Avg Reward = {avg_reward:.2f}")
    
    return episode_rewards


# Create environment and policy
env = gym.make('CartPole-v1')
policy = PolicyNetwork()
optimizer = optim.Adam(policy.parameters(), lr=0.01)

print("Training REINFORCE on CartPole-v1...")
print("This will take a few minutes.\n")

rewards = train_reinforce(env, policy, optimizer, n_episodes=1000)

final_avg = np.mean(rewards[-100:])
print(f"\n{'='*50}")
print(f"Training complete!")
print(f"Final 100-episode average: {final_avg:.2f}")

if final_avg >= 475:
    print(f"\nüéâ SUCCESS! Agent solved CartPole!")
else:
    print(f"\nAgent is learning but needs more training.")
print(f"{'='*50}")

---

## Visualizing Training Progress

In [None]:
# Plot learning curves
fig = plt.figure(figsize=(15, 5))

# Subplot 1: Raw rewards + Moving average
ax1 = plt.subplot(1, 3, 1)
plt.plot(rewards, alpha=0.3, color='blue', label='Episode reward')

window = 50
if len(rewards) >= window:
    moving_avg = np.convolve(rewards, np.ones(window)/window, mode='valid')
    plt.plot(range(window-1, len(rewards)), moving_avg, 
             color='red', linewidth=2, label=f'{window}-episode MA')

plt.axhline(y=475, color='g', linestyle='--', alpha=0.7, label='Success threshold')
plt.xlabel('Episode')
plt.ylabel('Episode Reward')
plt.title('Training Progress')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 2: Reward distribution
ax2 = plt.subplot(1, 3, 2)
early_rewards = rewards[:min(200, len(rewards)//2)]
late_rewards = rewards[max(len(rewards)//2, len(rewards)-200):]

plt.hist(early_rewards, bins=20, alpha=0.5, label='Early episodes', color='blue')
plt.hist(late_rewards, bins=20, alpha=0.5, label='Late episodes', color='red')
plt.xlabel('Episode Reward')
plt.ylabel('Frequency')
plt.title('Reward Distribution')
plt.legend()
plt.grid(True, alpha=0.3)

# Subplot 3: Rolling statistics
ax3 = plt.subplot(1, 3, 3)
window = 100
if len(rewards) >= window:
    rolling_mean = np.convolve(rewards, np.ones(window)/window, mode='valid')
    rolling_std = [np.std(rewards[max(0, i-window):i]) for i in range(window, len(rewards)+1)]
    
    x = range(window-1, len(rewards))
    plt.plot(x, rolling_mean, label='Mean', color='blue', linewidth=2)
    plt.fill_between(x, 
                     rolling_mean - np.array(rolling_std), 
                     rolling_mean + np.array(rolling_std),
                     alpha=0.3, color='blue', label='¬±1 std')
    
    plt.xlabel('Episode')
    plt.ylabel('Reward')
    plt.title(f'{window}-Episode Rolling Statistics')
    plt.legend()
    plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Print statistics
print("\nTraining Statistics:")
print(f"  First 100 episodes avg: {np.mean(rewards[:100]):.2f}")
print(f"  Last 100 episodes avg:  {np.mean(rewards[-100:]):.2f}")
print(f"  Best episode:           {max(rewards):.0f}")
print(f"  Worst episode:          {min(rewards):.0f}")
print(f"  Overall average:        {np.mean(rewards):.2f}")

---

## Visualizing the Trained Agent

In [None]:
def display_episode(env, policy, max_steps=500):
    """Record and display an episode as animation."""
    frames = []
    state, _ = env.reset()
    total_reward = 0
    
    for step in range(max_steps):
        # Render
        frame = env.render()
        frames.append(frame)
        
        # Select action (greedy - no exploration)
        state_tensor = torch.FloatTensor(state)
        with torch.no_grad():
            logits = policy(state_tensor)
            probs = torch.softmax(logits, dim=-1)
            action = torch.argmax(probs).item()
        
        # Step
        state, reward, terminated, truncated, _ = env.step(action)
        total_reward += reward
        
        if terminated or truncated:
            break
    
    env.close()
    
    print(f"Episode lasted {step + 1} steps, total reward: {total_reward:.0f}")
    
    # Create animation
    fig, ax = plt.subplots(figsize=(6, 4))
    ax.axis('off')
    ax.set_title(f'Trained Policy (Reward: {total_reward:.0f})', fontsize=14)
    img = ax.imshow(frames[0])
    
    def animate(i):
        img.set_data(frames[i])
        return [img]
    
    anim = animation.FuncAnimation(fig, animate, frames=len(frames), 
                                   interval=20, blit=True, repeat=True)
    plt.close()
    
    return HTML(anim.to_html5_video())


# Display trained policy
print("Watching trained agent...\n")
env = gym.make('CartPole-v1', render_mode='rgb_array')
display_episode(env, policy)

---

## Evaluating the Agent

In [None]:
def evaluate_policy(env, policy, n_episodes=100):
    """Evaluate trained policy over multiple episodes."""
    episode_rewards = []
    
    for episode in range(n_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0
        
        while not done:
            # Greedy action selection
            state_tensor = torch.FloatTensor(state)
            with torch.no_grad():
                logits = policy(state_tensor)
                probs = torch.softmax(logits, dim=-1)
                action = torch.argmax(probs).item()
            
            state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward
        
        episode_rewards.append(total_reward)
    
    return episode_rewards


# Evaluate
print("Evaluating agent over 100 episodes...\n")
env = gym.make('CartPole-v1')
eval_rewards = evaluate_policy(env, policy, n_episodes=100)

print(f"Evaluation Results:")
print(f"  Mean reward:   {np.mean(eval_rewards):.2f}")
print(f"  Std deviation: {np.std(eval_rewards):.2f}")
print(f"  Min reward:    {np.min(eval_rewards):.0f}")
print(f"  Max reward:    {np.max(eval_rewards):.0f}")
print(f"  Median reward: {np.median(eval_rewards):.0f}")

if np.mean(eval_rewards) >= 475:
    print(f"\n‚úì Agent successfully solved CartPole!")
else:
    print(f"\n‚ö† Agent needs more training to consistently solve CartPole.")

# Plot evaluation distribution
plt.figure(figsize=(10, 4))

plt.subplot(1, 2, 1)
plt.hist(eval_rewards, bins=20, edgecolor='black')
plt.axvline(np.mean(eval_rewards), color='r', linestyle='--', linewidth=2, label=f'Mean: {np.mean(eval_rewards):.1f}')
plt.axvline(475, color='g', linestyle='--', linewidth=2, label='Solved: 475')
plt.xlabel('Episode Reward')
plt.ylabel('Frequency')
plt.title('Evaluation Reward Distribution')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(eval_rewards, marker='o', linestyle='-', alpha=0.6)
plt.axhline(np.mean(eval_rewards), color='r', linestyle='--', label=f'Mean: {np.mean(eval_rewards):.1f}')
plt.axhline(475, color='g', linestyle='--', label='Solved: 475')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Evaluation Episodes')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

---

## Task 5 Solution (Optional): Value Baseline

This is a preview of **Actor-Critic** methods (next week)!

**Idea:** Use a value network V(s) to estimate expected return, then compute **advantages**:
$$A_t = G_t - V(s_t)$$

The advantage tells us: "How much better was this action than average?"

**Benefits:**
- Reduces variance ‚Üí more stable learning
- Faster convergence
- Better final performance

In [None]:
class ValueNetwork(nn.Module):
    """Value network for estimating V(s)."""
    
    def __init__(self, state_dim=4, hidden_dim=32):
        super().__init__()
        
        # SOLUTION: Network outputs a single value (not a vector!)
        self.network = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1)  # Single output
        )
    
    def forward(self, state):
        """Estimate value of state."""
        return self.network(state).squeeze()


def train_with_value_baseline(env, policy, value_net, policy_opt, value_opt, 
                               n_episodes=1000, gamma=0.99):
    """Train with learned value baseline (Actor-Critic style)."""
    episode_rewards = []
    
    for episode in range(n_episodes):
        states = []
        log_probs = []
        rewards = []
        values = []
        
        state, _ = env.reset()
        done = False
        
        while not done:
            state_tensor = torch.FloatTensor(state)
            
            # SOLUTION: Get value estimate
            value = value_net(state_tensor)
            
            # Get action
            action, log_prob = policy.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            states.append(state_tensor)
            log_probs.append(log_prob)
            rewards.append(reward)
            values.append(value)
            state = next_state
        
        # Compute returns
        returns = compute_returns(rewards, gamma)
        
        # SOLUTION: Compute advantages
        values_tensor = torch.stack(values)
        advantages = returns - values_tensor.detach()  # detach to not backprop through value
        
        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        
        # SOLUTION: Update policy using advantages
        policy_loss = compute_policy_loss(log_probs, advantages)
        
        policy_opt.zero_grad()
        policy_loss.backward()
        policy_opt.step()
        
        # SOLUTION: Update value network to predict returns better
        value_loss = ((values_tensor - returns) ** 2).mean()
        
        value_opt.zero_grad()
        value_loss.backward()
        value_opt.step()
        
        episode_rewards.append(sum(rewards))
        
        if (episode + 1) % 100 == 0:
            avg_reward = np.mean(episode_rewards[-100:])
            print(f"Episode {episode + 1}: Avg Reward = {avg_reward:.2f}")
    
    return episode_rewards


# Train with value baseline
print("Training with value baseline (Actor-Critic style)...\n")

env = gym.make('CartPole-v1')
policy_baseline = PolicyNetwork()
value_net = ValueNetwork()
policy_opt = optim.Adam(policy_baseline.parameters(), lr=0.01)
value_opt = optim.Adam(value_net.parameters(), lr=0.01)

rewards_baseline = train_with_value_baseline(
    env, policy_baseline, value_net, policy_opt, value_opt, 
    n_episodes=1000
)

print(f"\nFinal avg with baseline: {np.mean(rewards_baseline[-100:]):.2f}")

In [None]:
# Compare REINFORCE vs Actor-Critic style
plt.figure(figsize=(12, 5))

window = 50

# REINFORCE
if len(rewards) >= window:
    ma_reinforce = np.convolve(rewards, np.ones(window)/window, mode='valid')
    plt.plot(range(window-1, len(rewards)), ma_reinforce, 
             label='REINFORCE', linewidth=2)

# With value baseline
if len(rewards_baseline) >= window:
    ma_baseline = np.convolve(rewards_baseline, np.ones(window)/window, mode='valid')
    plt.plot(range(window-1, len(rewards_baseline)), ma_baseline, 
             label='With Value Baseline (Actor-Critic)', linewidth=2)

plt.axhline(y=475, color='g', linestyle='--', alpha=0.7, label='Solved')
plt.xlabel('Episode')
plt.ylabel(f'Reward ({window}-episode MA)')
plt.title('Comparison: REINFORCE vs Actor-Critic Style')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

print("\nComparison:")
print(f"  REINFORCE final avg:           {np.mean(rewards[-100:]):.2f}")
print(f"  Actor-Critic style final avg:  {np.mean(rewards_baseline[-100:]):.2f}")
print("\nThe value baseline typically provides:")
print("  - Faster initial learning")
print("  - More stable training")
print("  - Lower variance")

---

## Summary of Solutions

### Task 1: Policy Network
```python
hidden_dim = 32
activation = nn.ReLU()
```

### Task 2: Action Selection
```python
probs = torch.softmax(logits, dim=-1)
action = torch.multinomial(probs, 1).item()
```

### Task 3: Compute Returns
```python
for r in reversed(rewards):
    R = r + gamma * R
    returns.insert(0, R)
```

### Task 4: Policy Loss
```python
loss = 0
for log_prob, G in zip(log_probs, returns):
    loss += -log_prob * G
```

### Task 5 (Optional): Value Baseline
```python
# Value network
nn.Linear(state_dim, hidden_dim) ‚Üí nn.ReLU() ‚Üí nn.Linear(hidden_dim, 1)

# Advantages
advantages = returns - values_tensor.detach()

# Value loss
value_loss = ((values_tensor - returns) ** 2).mean()
```

---

## üéâ Congratulations!

You've successfully implemented REINFORCE from scratch and trained an agent to solve CartPole!

### What You Learned
1. **Policy networks** map states to action probabilities
2. **Stochastic sampling** enables exploration
3. **Discounted returns** assign credit to actions
4. **Policy gradients** directly optimize policy parameters
5. **Baselines** reduce variance and improve learning

### Next Week: Actor-Critic
We'll combine policy gradients with value functions for even better performance!