In [None]:

import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import os

# TensorBoard support
try:
    from torch.utils.tensorboard import SummaryWriter
    TENSORBOARD_AVAILABLE = True
except ImportError:
    TENSORBOARD_AVAILABLE = False
    SummaryWriter = None
    print("TensorBoard not available. Scalar logging will be disabled.")

# Create environment
env = gym.make('CartPole-v0')
env = env.unwrapped  # same as your TF code

# Seeding for reproducibility (handles both old & new gym APIs)
seed = 1
np.random.seed(seed)
torch.manual_seed(seed)
try:
    env.reset(seed=seed)
except TypeError:
    # Older gym versions:
    env.seed(seed)

state_size = env.observation_space.shape[0]
action_size = env.action_space.n

# Hyperparameters
max_episodes_pg = 300           # vanilla policy gradient
max_episodes_pg_baseline = 300  # policy gradient with baseline
learning_rate = 0.01
gamma = 0.95  # Discount rate

os.makedirs("./models", exist_ok=True)

print("State size:", state_size)
print("Action size:", action_size)

**1** : 
Explain why we need to treat the output of reset() and step() differently across Gym versions.

Coding task: Fill in the TODOs so that reset_env and step_env work for both older and newer versions of Gym.

In [None]:
def reset_env(env):
    result = env.reset()
    # TODO: handle both tuple and non-tuple returns
    return state

def step_env(env, action):
    result = env.step(action)
    # TODO: handle both old 4-return and new 5-return format
    return next_state, reward, done, info

**Q1.1**: (short coding)
Fill in the missing line to select a random action:

Replace the TODO with a line that samples uniformly from the environment’s action space.

In [None]:
def run_random_agent(env, num_episodes=10, render=False):
    rewards = []
    for episode in range(num_episodes):
        state = reset_env(env)
        done = False
        total_reward = 0

        print("****************************************************")
        print("Random agent - EPISODE", episode)

        while not done:
            if render:
                env.render()

            # TODO (Q1.1): select an action using a *random guess*
            # action = ???

            next_state, reward, done, _ = step_env(env, action)
            total_reward += reward
            state = next_state

        rewards.append(total_reward)
        print("Score:", total_reward)

    avg_reward = np.mean(rewards)
    print("Random agent average reward over", num_episodes, "episodes:", avg_reward)
    return rewards


**Q1.2**: (conceptual: “random guess” questions)

CartPole episodes terminate when the pole falls or when the time limit (200 steps) is reached.

What is the maximum possible reward per episode?

Under a purely random policy, would you expect to hit this maximum often? Why or why not?

Suppose each episode gives you a return R_1, R_2, ..., R_n under random actions.

How would you compute the empirical average return and standard deviation of this random policy?

Why is it useful to run a random baseline before training a learning agent?

In [None]:
def discount_rewards(episode_rewards, gamma):
    """
    Computes discounted returns G_t without normalization.
    """
    discounted = np.zeros_like(episode_rewards, dtype=np.float32)
    cumulative = 0.0
    for i in reversed(range(len(episode_rewards))):
        # TODO (Q2.1): implement the recursive discounted return
        # cumulative = ...
        # discounted[i] = ...
        pass
    return discounted

def discount_and_normalize_rewards(episode_rewards, gamma):
    discounted = discount_rewards(episode_rewards, gamma)
    # TODO (Q2.2): normalize to mean 0 and std 1
    # mean = ...
    # std = ...
    # return ...


**Q2.1**: (conceptual + coding)
In discount_and_normalize_rewards we normalize the discounted returns:

Complete the code that subtracts the mean and divides by the standard deviation (with small epsilon to avoid division by zero).

Explain why normalization of returns can help training in vanilla policy gradient (REINFORCE).

In [None]:
class PolicyNetwork(nn.Module):
    def __init__(self, state_size, action_size):
        super(PolicyNetwork, self).__init__()
        # TODO (Q3.1): define layers described below
        # self.fc1 = ...
        # self.fc2 = ...
        # self.fc3 = ...

        # TODO (Q3.2): apply Xavier initialization to all layers

    def forward(self, x):
        # TODO (Q3.3): implement forward pass with ReLU activations on first two layers
        # and raw logits from final layer
        return logits


**Q3.1** (coding)
Implement a network with the following structure:

fc1: state_size → 10 with ReLU

fc2: 10 → action_size with ReLU

fc3: action_size → action_size with no activation (logits)

**Q3.2** (conceptual + coding)
Use Xavier/Glorot initialization for each layer’s weight matrix.

Add the initialization code.

In your own words, what problem is weight initialization trying to avoid?

Why is Xavier a reasonable choice for networks with ReLU or near-ReLU activations?

**Q3.3** (conceptual)
The network outputs raw logits, which you later pass through a softmax to get action probabilities.

Why do we typically output logits rather than probabilities directly from the network?

What is the relationship between logits and the softmax output?

In [None]:
def run_random_agent(env, num_episodes=10, render=False):
    rewards = []
    for episode in range(num_episodes):
        state = reset_env(env)
        done = False
        total_reward = 0

        print("****************************************************")
        print("Random agent - EPISODE", episode)

        while not done:
            if render:
                env.render()

            # Uniform random action (random guess)
            action = env.action_space.sample()

            next_state, reward, done, _ = step_env(env, action)
            total_reward += reward
            state = next_state

        rewards.append(total_reward)
        print("Score:", total_reward)

    avg_reward = np.mean(rewards)
    print("Random agent average reward over", num_episodes, "episodes:", avg_reward)
    return rewards

# Run random agent baseline
random_rewards = run_random_agent(env, num_episodes=10, render=False)



**Part 4** — Vanilla Policy Gradient (REINFORCE)

This is your train_policy_gradient function.

In [None]:
policy_pg = PolicyNetwork(state_size, action_size)
criterion_pg = nn.CrossEntropyLoss(reduction='none')
optimizer_pg = optim.Adam(policy_pg.parameters(), lr=learning_rate)

def train_policy_gradient(env, num_episodes, render=False):
    all_rewards = []
    for episode in range(num_episodes):
        state = reset_env(env)
        episode_states, episode_actions, episode_rewards = [], [], []
        done = False

        while not done:
            if render:
                env.render()

            state_tensor = torch.from_numpy(state).float().unsqueeze(0)
            logits = policy_pg(state_tensor)
            # TODO (Q4.1): convert logits to action probabilities via softmax
            # probs = ...

            # TODO (Q4.2): sample an action from the probability distribution
            # action = ...

            next_state, reward, done, _ = step_env(env, action)
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            state = next_state

        # TODO (Q4.3): compute discounted & normalized returns
        # discounted_rewards = ...

        # TODO (Q4.4): convert episode data to tensors
        # states_tensor = ...
        # actions_tensor = ...
        # rewards_tensor = ...

        # TODO (Q4.5): forward pass on all states, compute per-step negative log-prob
        # logits = ...
        # neg_log_prob = criterion_pg(...)

        # TODO (Q4.6): compute policy gradient loss: mean(neg_log_prob * rewards_tensor)

        # Backprop & optimizer step
        optimizer_pg.zero_grad()
        loss.backward()
        optimizer_pg.step()

    return all_rewards


**Q4.1** (coding)
Given logits of shape [1, action_size], convert them to probabilities using softmax over the correct dimension.

**Q4.2** (conceptual + coding)
Sample an action according to the probabilities probs.

Implement the sampling using  PyTorch.

Why do we sample an action instead of always taking the argmax during training?

**Q4.3** (coding)
Use your discount_and_normalize_rewards function to compute discounted_rewards for the episode.

**Q4.4** (coding)
Convert episode_states, episode_actions, and discounted_rewards into proper PyTorch tensors:

states_tensor: shape [T, state_size], dtype=torch.float32

actions_tensor: shape [T], dtype=torch.long

rewards_tensor: shape [T], dtype=torch.float32

**Q4.5** (math + coding)
We use criterion_pg = nn.CrossEntropyLoss(reduction='none'). For logits of shape [T, action_size] and actions_tensor of shape [T]:

Implement the call to compute the negative log probability of the actions taken at each time step.

**Q4.6** (math + coding)


Implement loss = torch.mean(neg_log_prob * rewards_tensor).

Explain why multiplying negative log-probabilities by discounted returns implements the REINFORCE gradient (in expectation).



In [None]:
def evaluate_policy(env, policy, num_episodes=10, render=False, title="Policy"):
    rewards = []
    policy.eval()

    with torch.no_grad():
        for episode in range(num_episodes):
            state = reset_env(env)
            done = False
            total_reward = 0

            print("****************************************************")
            print(f"{title} - EPISODE", episode)

            while not done:
                if render:
                    env.render()

                state_tensor = torch.from_numpy(state).float().unsqueeze(0)
                logits = policy(state_tensor)
                probs = F.softmax(logits, dim=1).numpy().ravel()
                action = np.random.choice(action_size, p=probs)

                next_state, reward, done, _ = step_env(env, action)
                total_reward += reward
                state = next_state

            rewards.append(total_reward)
            print("Score:", total_reward)

    avg_reward = np.mean(rewards)
    print(f"{title} average score over {num_episodes} episodes:", avg_reward)
    return rewards

# Load & evaluate vanilla PG model
loaded_policy_pg = PolicyNetwork(state_size, action_size)
loaded_policy_pg.load_state_dict(torch.load(MODEL_PATH_PG, map_location=torch.device('cpu')))

pg_eval_rewards = evaluate_policy(env, loaded_policy_pg,
                                  num_episodes=10,
                                  render=False,
                                  title="Vanilla PG")


In [None]:
# Policy gradient with baseline (learned state-value function)
policy_pg_baseline = PolicyNetwork(state_size, action_size)
value_baseline = ValueNetwork(state_size)

# Single optimizer over both networks
optimizer_pg_baseline = optim.Adam(
    list(policy_pg_baseline.parameters()) + list(value_baseline.parameters()),
    lr=learning_rate
)

MODEL_PATH_PG_BASELINE = "./models/cartpole_pg_baseline_pytorch.pth"

if TENSORBOARD_AVAILABLE:
    writer_pg_baseline = SummaryWriter(log_dir="./runs/pg_baseline_cartpole_pytorch")
else:
    writer_pg_baseline = None

def train_policy_gradient_with_baseline(env,
                                        num_episodes=max_episodes_pg_baseline,
                                        render=False,
                                        value_loss_coef=0.5):
    all_rewards = []

    for episode in range(num_episodes):
        state = reset_env(env)
        episode_states, episode_actions, episode_rewards = [], [], []
        done = False

        while not done:
            if render:
                env.render()

            state_tensor = torch.from_numpy(state).float().unsqueeze(0)
            logits = policy_pg_baseline(state_tensor)
            probs = F.softmax(logits, dim=1).detach().numpy().ravel()
            action = np.random.choice(action_size, p=probs)

            next_state, reward, done, _ = step_env(env, action)

            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)

            state = next_state

        # Episode summary
        episode_reward_sum = np.sum(episode_rewards)
        all_rewards.append(episode_reward_sum)
        mean_reward = np.mean(all_rewards)
        max_reward = np.max(all_rewards)

        print("==========================================")
        print("PG + Baseline - Episode:", episode)
        print("Reward:", episode_reward_sum)
        print("Mean reward:", mean_reward)
        print("Max reward so far:", max_reward)

        # Discounted returns (no normalization) for value network
        discounted_returns = discount_rewards(episode_rewards, gamma)

        # Tensors
        states_tensor = torch.tensor(episode_states, dtype=torch.float32)
        actions_tensor = torch.tensor(episode_actions, dtype=torch.long)
        returns_tensor = torch.tensor(discounted_returns, dtype=torch.float32)

        # Forward
        logits = policy_pg_baseline(states_tensor)
        values = value_baseline(states_tensor)  # shape [T]

        neg_log_prob = criterion_pg(logits, actions_tensor)  # -log π(a|s)

        # Advantage = G_t - V(s_t); treat V(s) as baseline (no gradient in policy loss)
        advantages = returns_tensor - values.detach()
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # Policy loss (REINFORCE with baseline)
        policy_loss = torch.mean(neg_log_prob * advantages)

        # Value loss (fit V(s) to returns)
        value_loss = F.mse_loss(values, returns_tensor)

        # Total loss
        loss = policy_loss + value_loss_coef * value_loss

        # Optimize both networks
        optimizer_pg_baseline.zero_grad()
        loss.backward()
        optimizer_pg_baseline.step()

        # TensorBoard logging
        if writer_pg_baseline is not None:
            writer_pg_baseline.add_scalar("Loss/policy", policy_loss.item(), episode)
            writer_pg_baseline.add_scalar("Loss/value", value_loss.item(), episode)
            writer_pg_baseline.add_scalar("Reward_mean", mean_reward, episode)

        # Save periodically
        if episode % 100 == 0:
            torch.save(policy_pg_baseline.state_dict(), MODEL_PATH_PG_BASELINE)
            print("Saved PG+baseline model at episode", episode)

    # Final save
    torch.save(policy_pg_baseline.state_dict(), MODEL_PATH_PG_BASELINE)
    print("Final PG+baseline model saved.")
    if writer_pg_baseline is not None:
        writer_pg_baseline.flush()

    return all_rewards

# Train policy gradient with baseline
pg_baseline_rewards = train_policy_gradient_with_baseline(env,
                                                          num_episodes=max_episodes_pg_baseline,
                                                          render=False)



In [None]:
# Load & evaluate PG + baseline model
loaded_policy_pg_baseline = PolicyNetwork(state_size, action_size)
loaded_policy_pg_baseline.load_state_dict(
    torch.load(MODEL_PATH_PG_BASELINE, map_location=torch.device('cpu'))
)

pg_baseline_eval_rewards = evaluate_policy(env,
                                           loaded_policy_pg_baseline,
                                           num_episodes=10,
                                           render=False,
                                           title="PG + Baseline")

# Optional: comparison summary (if you've run all cells)
print("\n=== Summary over 10 evaluation episodes (if all agents were run) ===")
if 'random_rewards' in globals():
    print("Random agent average reward:      ", np.mean(random_rewards))
else:
    print("Random agent average reward:      (not run in this session)")
print("Vanilla PG average reward:       ", np.mean(pg_eval_rewards))
print("PG with baseline average reward: ", np.mean(pg_baseline_eval_rewards))

env.close()
