# Policy Gradient on Racetrack Environment (Optimized)

This notebook implements a Policy Gradient method for a custom racetrack environment using PyTorch. It includes:
- Dynamic input size handling
- Entropy-based regularization
- Advantage normalization
- Gradient clipping
- **Performance optimization** by disabling rendering

We use the REINFORCE algorithm, which is a Monte Carlo policy gradient method for optimizing the policy network.

### 1. Imports and Device Configuration

We import the required libraries and set the device to CUDA if available.

In [11]:
import gymnasium as gym
import highway_env
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
from torch.distributions import Normal
import time

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

: 

### 2. Training Mode and Render Settings

We define a simple flag to control whether the environment should render for visualization or run headlessly for training.

In [3]:
training = True
render_mode = None if training else "human"

### 3. Environment Configuration

The configuration dictionary defines both the observation and action spaces, along with various simulation and reward parameters. Key features include:
- OccupancyGrid observation type with vehicle presence and road occupancy
- Continuous action space with lateral-only control (no acceleration)
- Reward structure penalizing collisions, drifting from the lane center, and overactive control
- Rendering is disabled when training to improve performance

In [4]:
config = {
    "observation": {
        "type": "OccupancyGrid",
        "features": ['presence', 'on_road'],
        "grid_size": [[-18, 18], [-18, 18]],
        "grid_step": [3, 3],
        "as_image": False,
        "align_to_vehicle_axes": True
    },
    "action": {
        "type": "ContinuousAction",
        "longitudinal": False,
        "lateral": True
    },
    "simulation_frequency": 15,
    "policy_frequency": 5,
    "duration": 100,
    "collision_reward": -1,
    "lane_centering_cost": 4,
    "action_reward": -0.3,
    "controlled_vehicles": 1,
    "other_vehicles": 0,
    "screen_width": 600,
    "screen_height": 600,
    "centering_position": [0.5, 0.5],
    "scaling": 7,
    "show_trajectories": False,
    "offroad_terminal": True,
    "render_agent": False if training else True,
    "offscreen_rendering": True if training else False
}

### 4. Environment Initialization

We instantiate the racetrack environment and inject the configuration directly into the unwrapped environment. A sample observation is retrieved to infer the shape of the state space and action dimensionality for the policy network.

In [5]:
env = gym.make("racetrack-v0", render_mode=render_mode)
env.unwrapped.configure(config)
obs_sample, _ = env.reset()

obs_shape = obs_sample.shape
act_dim = env.action_space.shape[0] if len(env.action_space.shape) > 0 else 1

### Policy and Value Networks

We use a classic actor-critic architecture in which:
- The Actor learns a stochastic policy modeled by a Normal distribution, allowing for continuous actions.
- The Critic estimates the value function V(s), used to reduce the variance of the policy gradient via advantage estimation.
- Both models use a fully connected feedforward network with ReLU activations and are designed to support dynamic input shapes.

### Actor Network

The actor outputs the parameters of a Normal distribution from which actions are sampled. We use log_std as a learnable parameter to allow the network to control its exploration behavior dynamically.


In [6]:
class Actor(nn.Module):
    def __init__(self, obs_shape, act_dim, hidden_size=64):
        super().__init__()
        self.flatten = nn.Flatten()

        # Dynamically compute input size after flattening
        with torch.no_grad():
            dummy = torch.zeros(1, *obs_shape)
            n_flatten = self.flatten(dummy).shape[1]

        # Policy network (shared feature extractor)
        self.net = nn.Sequential(
            nn.Linear(n_flatten, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU()
        )

        # Output layers: mean and log standard deviation
        self.mean = nn.Linear(hidden_size, act_dim)
        self.log_std = nn.Parameter(torch.zeros(act_dim))

    def forward(self, obs):
        x = self.flatten(obs)
        x = self.net(x)
        mu = self.mean(x)
        std = self.log_std.exp().expand_as(mu)
        return Normal(mu, std)  # Return a distribution object

### Critic Network

The critic estimates the value of each state using a separate feedforward network. Its output is a scalar value for each state input.

In [7]:
class Critic(nn.Module):
    def __init__(self, obs_shape, hidden_size=64):
        super().__init__()
        self.flatten = nn.Flatten()

        with torch.no_grad():
            dummy = torch.zeros(1, *obs_shape)
            n_flatten = self.flatten(dummy).shape[1]

        self.v = nn.Sequential(
            nn.Linear(n_flatten, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )

    def forward(self, obs):
        x = self.flatten(obs)
        return self.v(x).squeeze(-1)  # Output shape: (batch,)

### Model Initialization and Optimizers

We instantiate the networks and their optimizers. AdamW is used for better weight decay handling compared to standard Adam.

In [8]:
# Model instantiation
actor = Actor(obs_shape, act_dim).to(device)
critic = Critic(obs_shape).to(device)

# Optimizers
opt_actor = optim.AdamW(actor.parameters(), lr=1e-3)
opt_critic = optim.AdamW(critic.parameters(), lr=1e-3)

## REINFORCE with Baseline: Training Loop

This section implements the training procedure for a policy gradient method (REINFORCE) using the actor-critic architecture defined earlier.

Key elements of the implementation:
- The actor samples actions from a Normal distribution.
- The critic estimates the value function V(s) to compute the advantage.
- Rewards are accumulated using Monte Carlo returns.
- The policy is optimized using the advantage-weighted log-probabilities.
- We include entropy regularization to encourage exploration.
- Gradients are clipped to stabilize training.


In [9]:
gamma = 0.99     # Discount factor for return calculation
epochs = 200     # Number of episodes (training epochs)

for epoch in range(epochs):
    # Reset environment
    obs, _ = env.reset()
    obs = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)

    # Buffers for trajectory data
    log_probs, values, rewards = [], [], []

    done = False
    step = 0

    while not done and step < 100:
        # Forward pass: policy and value
        dist = actor(obs)                      # Returns Normal(mu, std)
        value = critic(obs)                    # Scalar value prediction
        action = dist.sample()                 # Sample from policy
        log_prob = dist.log_prob(action).sum(-1)

        # Environment transition
        obs_next, reward, done, truncated, _ = env.step(action.cpu().numpy()[0])

        # Save transition data
        log_probs.append(log_prob)
        values.append(value)
        rewards.append(reward)

        # Move to next state
        obs = torch.tensor(obs_next, dtype=torch.float32, device=device).unsqueeze(0)
        step += 1

    # Compute Monte Carlo returns
    returns, G = [], 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)

    returns = torch.tensor(returns, dtype=torch.float32, device=device)
    values = torch.cat(values)

    # Compute normalized advantages
    advantages = returns - values
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    log_probs = torch.cat(log_probs)
    policy_loss = -(log_probs * advantages.detach()).mean()
    value_loss = advantages.pow(2).mean()
    entropy = dist.entropy().mean()

    # Total loss includes entropy bonus and value loss with coefficient
    loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

    # Gradient step
    opt_actor.zero_grad()
    opt_critic.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(actor.parameters(), 0.5)
    torch.nn.utils.clip_grad_norm_(critic.parameters(), 0.5)
    opt_actor.step()
    opt_critic.step()

    if epoch % 10 == 0:
        print(f"Epoch {epoch} | Return: {returns.sum().item():.1f}")


Epoch 0 | Return: 70.8
Epoch 10 | Return: 119.5
Epoch 20 | Return: 76.2
Epoch 30 | Return: 92.8
Epoch 40 | Return: 345.6
Epoch 50 | Return: 552.3
Epoch 60 | Return: 302.9
Epoch 70 | Return: 696.2
Epoch 80 | Return: 969.6
Epoch 90 | Return: 955.0
Epoch 100 | Return: 382.1
Epoch 110 | Return: 725.8
Epoch 120 | Return: 947.6
Epoch 130 | Return: 982.5
Epoch 140 | Return: 686.5
Epoch 150 | Return: 922.7
Epoch 160 | Return: 1034.3
Epoch 170 | Return: 804.4
Epoch 180 | Return: 848.9
Epoch 190 | Return: 970.6


### Rendering the Trained Agent

After training the policy, we can visualize its performance by running the agent in the environment with rendering enabled. This helps qualitatively assess how well the agent learned the task, such as staying on track or making smooth lateral movements.

#### Key Points
- The environment is re-initialized with "human" render mode to display a GUI window.
- The trained policy is used in deterministic mode by taking the mean of the action distribution.
- Rendering is done for a limited number of steps or until the episode terminates.

In [10]:
env_render = gym.make("racetrack-v0", render_mode="human")
env_render.unwrapped.configure(config)

obs, _ = env_render.reset()
obs = torch.tensor(obs, dtype=torch.float32, device=device).unsqueeze(0)

done = False
step = 0

while not done and step < 300:
    dist = actor(obs)                                          # Forward pass
    action = dist.mean                                          # Deterministic policy (mean of Normal)
    obs_next, reward, done, truncated, _ = env_render.step(action.detach().cpu().numpy()[0])
    
    obs = torch.tensor(obs_next, dtype=torch.float32, device=device).unsqueeze(0)
    step += 1

env_render.close()

