# Racecar Gym PPO Training - Google Colab

This notebook provides a complete setup and training pipeline for training a PPO agent on the Racecar Gym environment.

## Setup Instructions

1. Make sure GPU is enabled: Runtime -> Change runtime type -> GPU
2. Run all cells in order
3. Training progress will be displayed in real-time

In [None]:
# Install dependencies
!pip install -q gymnasium>=0.29.0
!pip install -q stable-baselines3[extra]>=2.0.0
!pip install -q numpy>=1.24.0
!pip install -q pybullet>=3.2.0
!pip install -q matplotlib>=3.7.0
!pip install -q opencv-python>=4.8.0
!pip install -q tensorboard>=2.13.0
!pip install -q pyyaml>=6.0
!pip install -q git+https://github.com/axelbr/racecar_gym.git

print("All dependencies installed successfully!")

In [None]:
# Check GPU availability
import torch

if torch.cuda.is_available():
    print(f"GPU available: {torch.cuda.get_device_name(0)}")
    print(f"CUDA version: {torch.version.cuda}")
    device = "cuda"
else:
    print("GPU not available, using CPU")
    device = "cpu"

## Clone Repository and Setup

If you're using this notebook with a GitHub repository, clone it here. Otherwise, upload the necessary files manually.

In [None]:
# Option 1: Clone from GitHub (uncomment and modify if needed)
# !git clone https://github.com/yourusername/Racing_Gym_RL.git
# %cd Racing_Gym_RL

# Option 2: Upload files manually using the file browser
# Create necessary directories
import os
from pathlib import Path

os.makedirs('config', exist_ok=True)
os.makedirs('models', exist_ok=True)
os.makedirs('logs', exist_ok=True)
os.makedirs('results', exist_ok=True)

print("Directories created!")

In [None]:
# Create configuration file if it doesn't exist
config_content = """# Racecar Gym PPO Training Configuration

# Environment settings
environment:
  track: circle
  scenario: null
  render_mode: null
  render_options:
    width: 320
    height: 240

# Agent configuration
agent:
  sensors: [lidar, pose, velocity, acceleration]
  actuators: [motor, steering]
  color: blue

# PPO hyperparameters
ppo:
  learning_rate: 3.0e-4
  n_steps: 2048
  batch_size: 64
  n_epochs: 10
  gamma: 0.99
  gae_lambda: 0.95
  clip_range: 0.2
  ent_coef: 0.01
  vf_coef: 0.5
  max_grad_norm: 0.5
  use_sde: false
  sde_sample_freq: -1

# Policy network architecture
policy:
  net_arch: [256, 256]
  activation_fn: tanh

# Training settings
training:
  total_timesteps: 500000  # Reduced for Colab demo
  eval_freq: 10000
  n_eval_episodes: 5
  save_freq: 50000
  log_interval: 10

# Paths
paths:
  model_dir: ./models
  log_dir: ./logs
  results_dir: ./results

# Device
device: auto
"""

if not os.path.exists('config/circle_config.yaml'):
    with open('config/circle_config.yaml', 'w') as f:
        f.write(config_content)
    print("Configuration file created!")
else:
    print("Configuration file already exists")

## Training Setup

Import necessary libraries and set up the training environment.

In [None]:
import os
import yaml
from pathlib import Path
import gymnasium as gym
import racecar_gym.envs.gym_api
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback, CheckpointCallback, CallbackList
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.utils import set_random_seed
import torch
import numpy as np

print("Libraries imported successfully!")

In [None]:
# Load configuration
with open('config/circle_config.yaml', 'r') as f:
    config = yaml.safe_load(f)

# Set random seed for reproducibility
seed = 42
set_random_seed(seed)

# Create directories
model_dir = Path(config['paths']['model_dir'])
log_dir = Path(config['paths']['log_dir'])
results_dir = Path(config['paths']['results_dir'])
model_dir.mkdir(parents=True, exist_ok=True)
log_dir.mkdir(parents=True, exist_ok=True)
results_dir.mkdir(parents=True, exist_ok=True)

# Determine device
device_config = config.get('device', 'auto')
if device_config == 'auto':
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
else:
    device = device_config

print(f"Using device: {device}")
print(f"Model directory: {model_dir}")
print(f"Log directory: {log_dir}")
print(f"Results directory: {results_dir}")

## Create Environment

Create the Racecar Gym environment. On first run, tracks will be downloaded automatically.

In [None]:
# Create environment
env_config = config['environment']
track = env_config['track']
env_id = f'SingleAgent{track.capitalize()}-v0'

print(f"Creating environment: {env_id}")

# Helper function to create training environment
def make_train_env():
    env = gym.make(
        env_id,
        render_mode=env_config.get('render_mode'),
        render_options=env_config.get('render_options')
    )
    env = Monitor(env, filename=str(log_dir / 'monitor_0'))
    env.reset(seed=seed)
    return env

# Helper function to create evaluation environment
def make_eval_env():
    env = gym.make(
        env_id,
        render_mode='rgb_array',
        render_options=env_config.get('render_options')
    )
    env = Monitor(env, filename=str(log_dir / 'monitor_eval'))
    env.reset(seed=seed + 1000)
    return env

# Wrap in vectorized environment
env = DummyVecEnv([make_train_env])

# Create evaluation environment
eval_env = DummyVecEnv([make_eval_env])

print(f"Observation space: {env.envs[0].observation_space}")
print(f"Action space: {env.envs[0].action_space}")
print("Environment created successfully!")

## Create PPO Model

Initialize the PPO model with appropriate policy for Dict observation spaces.

In [None]:
# Determine policy type based on observation space
if isinstance(env.envs[0].observation_space, gym.spaces.Dict):
    policy = 'MultiInputPolicy'
    print("Using MultiInputPolicy for Dict observation space")
else:
    policy = 'MlpPolicy'
    print("Using MlpPolicy for Box observation space")

# Get activation function
activation_fn_map = {
    'tanh': torch.nn.Tanh,
    'relu': torch.nn.ReLU,
    'elu': torch.nn.ELU
}
activation_fn = activation_fn_map.get(
    config['policy'].get('activation_fn', 'tanh'),
    torch.nn.Tanh
)

# Create PPO model
ppo_config = config['ppo']
policy_config = config['policy']

model = PPO(
    policy=policy,
    env=env,
    learning_rate=ppo_config['learning_rate'],
    n_steps=ppo_config['n_steps'],
    batch_size=ppo_config['batch_size'],
    n_epochs=ppo_config['n_epochs'],
    gamma=ppo_config['gamma'],
    gae_lambda=ppo_config['gae_lambda'],
    clip_range=ppo_config['clip_range'],
    ent_coef=ppo_config['ent_coef'],
    vf_coef=ppo_config['vf_coef'],
    max_grad_norm=ppo_config['max_grad_norm'],
    use_sde=ppo_config.get('use_sde', False),
    sde_sample_freq=ppo_config.get('sde_sample_freq', -1),
    policy_kwargs=dict(
        net_arch=policy_config['net_arch'],
        activation_fn=activation_fn
    ),
    device=device,
    verbose=1,
    tensorboard_log=str(log_dir)
)

print("PPO model created successfully!")
print(f"Policy: {model.policy}")

## Training

Start training the PPO agent. This may take a while depending on the number of timesteps.

In [None]:
# Setup callbacks
training_config = config['training']

# Evaluation callback
eval_callback = EvalCallback(
    eval_env,
    best_model_save_path=str(model_dir / 'best_model'),
    log_path=str(log_dir),
    eval_freq=training_config['eval_freq'],
    n_eval_episodes=training_config['n_eval_episodes'],
    deterministic=True,
    render=False
)

# Checkpoint callback
checkpoint_callback = CheckpointCallback(
    save_freq=training_config['save_freq'],
    save_path=str(model_dir),
    name_prefix='ppo_racecar'
)

# Combine callbacks
callbacks = CallbackList([eval_callback, checkpoint_callback])

print("Callbacks configured")
print(f"Total timesteps: {training_config['total_timesteps']}")
print(f"Evaluation frequency: {training_config['eval_freq']}")
print(f"Save frequency: {training_config['save_freq']}")
print("\nStarting training...")

In [None]:
# Train the model
model.learn(
    total_timesteps=training_config['total_timesteps'],
    callback=callbacks,
    log_interval=training_config.get('log_interval', 10),
    progress_bar=True
)

# Save final model
final_model_path = model_dir / 'final_model'
model.save(str(final_model_path))
print(f"\nTraining complete! Final model saved to {final_model_path}")

## Evaluation

Evaluate the trained model and visualize results.

In [None]:
# Load the best model (or use final_model)
model_path = model_dir / 'best_model' / 'best_model.zip'
if not model_path.exists():
    model_path = model_dir / 'final_model.zip'

print(f"Loading model from {model_path}")
trained_model = PPO.load(str(model_path))

# Evaluate the model
n_eval_episodes = 5
episode_rewards = []
episode_lengths = []

eval_env_test = gym.make(
    env_id,
    render_mode='rgb_array',
    render_options=env_config.get('render_options')
)

for episode in range(n_eval_episodes):
    obs, info = eval_env_test.reset(seed=seed + episode)
    done = False
    episode_reward = 0
    episode_length = 0
    
    while not done:
        action, _ = trained_model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = eval_env_test.step(action)
        done = terminated or truncated
        episode_reward += reward
        episode_length += 1
    
    episode_rewards.append(episode_reward)
    episode_lengths.append(episode_length)
    print(f"Episode {episode + 1}: Reward={episode_reward:.2f}, Length={episode_length}")

eval_env_test.close()

print(f"\nMean Reward: {np.mean(episode_rewards):.2f} ± {np.std(episode_rewards):.2f}")
print(f"Mean Episode Length: {np.mean(episode_lengths):.1f} ± {np.std(episode_lengths):.1f}")

## Visualize Training Progress

Load TensorBoard logs to visualize training progress.

In [None]:
# Load TensorBoard extension
%load_ext tensorboard

# Start TensorBoard
%tensorboard --logdir {log_dir}

## Download Results

Download trained models and results to your local machine.

In [None]:
# Create zip file of results
import shutil

results_zip = 'racecar_training_results.zip'
shutil.make_archive('racecar_training_results', 'zip', model_dir.parent)

print(f"Results zipped: {results_zip}")
print("\nTo download:")
print("1. Use the file browser on the left")
print("2. Right-click on racecar_training_results.zip")
print("3. Select 'Download'")