# RL for Autonomous Vehicle Tasks: Safety and Traffic Optimization

## Agents

In [None]:
# TODO: define agents here

## Scenarios

In [None]:
import gymnasium
import highway_env
from matplotlib import pyplot as plt
%matplotlib inline
from stable_baselines3 import DQN
import logging
import json
import os
import numpy as np

### Lane Changing

In [None]:
lane_changing_env = gymnasium.make('highway-v0', render_mode='rgb_array')
config = {
    "vehicles_count": 50,  # Number of vehicles in the environment
    "controlled_vehicles": 1,  # Number of vehicles controlled by the agent
    "duration": 40,  # Duration of each episode
    "reverse_penalty": -10,  # Large penalty for reversing
    "drifting_penalty": -5,  # Penalty for drifting or sharp turns
    "collision_reward": -2,  # Stronger penalty for collisions
    "off_road_penalty": -10,  # Penalize for going off-road
    "lane_change_reward": 0.1,  # Reward for staying in lane
    "reward_speed_range": [10, 30],  # Reward only within this speed range
    "simulation_frequency": 15,  # Lower simulation speed to avoid erratic behavior
    "policy_frequency": 1,  # Fewer policy updates per second
    "screen_width": 600,
    "screen_height": 400,
    "offscreen_rendering": False,
    "show_trajectories": True,
    "action": {
        "type": "DiscreteAction"  # Discrete control (steer left, right, accelerate)
    },
}

lane_changing_env.unwrapped.configure(config)
lane_changing_env.reset()

### Roundabout

In [None]:
roundabout_env = gymnasium.make('roundabout-v0', render_mode='rgb_array')
config = {
    "vehicles_count": 50,  # Number of vehicles in the environment
    "controlled_vehicles": 1,  # Number of vehicles controlled by the agent
    "duration": 40,  # Duration of each episode
    "reverse_penalty": -10,  # Large penalty for reversing
    "drifting_penalty": -5,  # Penalty for drifting or sharp turns
    "collision_reward": -2,  # Stronger penalty for collisions
    "off_road_penalty": -10,  # Penalize for going off-road
    "lane_change_reward": 0.1,  # Reward for staying in lane
    "reward_speed_range": [10, 30],  # Reward only within this speed range
    "simulation_frequency": 15,  # Lower simulation speed to avoid erratic behavior
    "policy_frequency": 1,  # Fewer policy updates per second
    "screen_width": 600,
    "screen_height": 400,
    "offscreen_rendering": False,
    "show_trajectories": True,
    "action": {
        "type": "DiscreteAction"  # Discrete control (steer left, right, accelerate)
    },
}
roundabout_env.unwrapped.configure(config)
roundabout_env.reset()

### Overtaking

In [None]:
overtaking_env = gymnasium.make('highway-v0', render_mode='rgb_array')
config = {
    "vehicles_count": 50,  # Number of vehicles in the environment
    "controlled_vehicles": 1,  # Number of vehicles controlled by the agent
    "duration": 40,  # Duration of each episode
    "reverse_penalty": -10,  # Large penalty for reversing
    "drifting_penalty": -5,  # Penalty for drifting or sharp turns
    "collision_reward": -2,  # Stronger penalty for collisions
    "off_road_penalty": -10,  # Penalize for going off-road
    "lane_change_reward": 0.1,  # Reward for staying in lane
    "reward_speed_range": [10, 30],  # Reward only within this speed range
    "simulation_frequency": 15,  # Lower simulation speed to avoid erratic behavior
    "policy_frequency": 1,  # Fewer policy updates per second
    "screen_width": 600,
    "screen_height": 400,
    "offscreen_rendering": False,
    "show_trajectories": True,
    "action": {
        "type": "DiscreteAction"  # Discrete control (steer left, right, accelerate)
    },
}
overtaking_env.unwrapped.configure(config)
overtaking_env.reset()

## Training and Testing

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def create_directory(path):
    """
    Create directory if it doesn't exist
    
    Args:
        path (str): Directory path to create
    """
    try:
        os.makedirs(path, exist_ok=True)
        logger.info(f"Directory created: {path}")
    except Exception as e:
        logger.error(f"Error creating directory {path}: {e}")

def save_pipeline_data(agent_name, stage, environment_name, data):
    """
    Save pipeline stage data
    
    Args:
        agent_name (str): Name of the RL agent
        stage (str): Current pipeline stage
        environment_name (str): Name of the environment
        data (dict): Data to be saved
    """
    create_directory(f"results/{agent_name}")
    
    try:
        filename = f"results/{agent_name}/{environment_name + "_" + stage}_data.json"
        with open(filename, 'w') as f:
            json.dump(data, f, indent=4)
        logger.info(f"Data saved for {stage} in {environment_name}")
    except Exception as e:
        logger.error(f"Error saving data: {e}")

In [None]:
def train(agent, environment, agent_name, stage, environment_name, timesteps=1000):
    """
    Train the agent in a specific environment and save the model
    
    Args:
        agent (sb3.BaseAlgorithm): RL agent to train
        environment (gym.Env): Environment to train in
        agent_name (str): Name of the agent
        stage (str): Current pipeline stage
        environment_name (str): Name of the environment
        timesteps (int, optional): Number of training timesteps. Defaults to 100.
    
    Returns:
        Trained agent
    """
    try:
        # Set environment and learn
        agent.set_env(environment)
        training_results = agent.learn(total_timesteps=timesteps)
        
        # Create directories for model and results
        create_directory(f"models/{agent_name}")
        
        # Save model
        agent.save(f"models/{agent_name}/{environment_name}_model")
        
        # Prepare and save training information
        training_info = {
            'agent_name': agent_name,
            'environment_name': environment_name,
            'stage': stage,
            'total_timesteps': timesteps,
            'training_results': str(training_results)
        }
        save_pipeline_data(agent_name, "train", environment_name, training_info)
        
        logger.info(f"Training completed for {agent_name} in {environment_name}")
        return agent
    
    except Exception as e:
        logger.error(f"Training failed for {agent_name} in {environment_name}: {e}")
        raise



In [None]:
def test(agent, environment, agent_name, stage, environment_name, num_episodes=3):
    test_results = []
    total_rewards = []
    total_collisions = 0
    traffic_speeds = []

    for episode in range(num_episodes):
        try:
            obs, info = environment.reset()
            done = truncated = False
            episode_reward = 0
            episode_collisions = 0
            episode_speeds = []

            while not (done or truncated):
                # Predict action using the agent
                action, _states = agent.predict(obs, deterministic=True)
                obs, reward, done, truncated, info = environment.step(action)

                # Update reward
                episode_reward += reward
                
                # Check for collisions
                if info.get("crashed", False):
                    episode_collisions += 1

                # Gather traffic speeds
                current_speeds = [
                    vehicle.speed
                    for vehicle in environment.unwrapped.road.vehicles  # Use unwrapped
                    if vehicle != environment.unwrapped.vehicle  # Exclude agent vehicle
                ]
                episode_speeds.extend(current_speeds)

                # Optional rendering
                environment.render()

            environment.close()

            # Collect data for this episode
            total_rewards.append(episode_reward)
            total_collisions += episode_collisions
            traffic_speeds.extend(episode_speeds)

            test_results.append({
                'episode': episode,
                'total_reward': episode_reward,
                'collisions': episode_collisions,
                'avg_speed': np.mean(episode_speeds) if episode_speeds else 0,
                'speed_variance': np.var(episode_speeds) if episode_speeds else 0
            })
        
        except Exception as e:
            logger.error(f"Test episode {episode + 1} failed in {environment_name}: {e}")

    # Calculate overall KPIs
    avg_reward = np.mean(total_rewards)
    avg_speed = np.mean(traffic_speeds) if traffic_speeds else 0
    speed_variance = np.var(traffic_speeds) if traffic_speeds else 0

    logger.info(f"Test completed in {environment_name} - "
                f"Avg Reward: {avg_reward}, Total Collisions: {total_collisions}, "
                f"Avg Traffic Speed: {avg_speed}, Speed Variance: {speed_variance}")

    # Save test results and KPIs
    save_pipeline_data(agent_name, stage, environment_name, {
        'test_results': test_results,
        'kpis': {
            'average_reward': avg_reward,
            'total_collisions': total_collisions,
            'average_speed': avg_speed,
            'speed_variance': speed_variance
        },
        'environment': environment_name,
        'stage': stage
    })

    return {
        'test_results': test_results,
        'kpis': {
            'average_reward': avg_reward,
            'total_collisions': total_collisions,
            'average_speed': avg_speed,
            'speed_variance': speed_variance
        }
    }

In [None]:
def rl_pipeline(initial_agent, agent_name, lane_changing_env, roundabout_env, overtaking_env):
    """
    Sequential environment training pipeline
    
    Args:
        initial_agent (sb3.BaseAlgorithm): Initial RL agent
        agent_name (str): Name of the agent
        lane_changing_env (gym.Env): Lane changing environment
        roundabout_env (gym.Env): Roundabout environment
        overtaking_env (gym.Env): Overtaking environment
    
    Returns:
        Trained agent
    """
    performance = {}

    logger.info("Sequential Environment Pipeline Started")
    
    # Stage 1: Lane Changing
    logger.info("Stage 1: Lane Changing Environment")
    
    # Train in Lane Changing
    lane_changing_agent = train(
        initial_agent, lane_changing_env, 
        agent_name, 'lane_changing', 'lane_changing'
    )
    
    # Test in Lane Changing
    performance["lane_changing"] = test(
        lane_changing_agent, lane_changing_env, 
        agent_name, 'pos_training_test', 'lane_changing'
    )
    
    # Stage 2: Roundabout
    logger.info("Stage 2: Roundabout Environment")
    
    # First, test the lane changing agent in roundabout
    performance["lane_changing_roundabout"] = test(
        lane_changing_agent, roundabout_env, 
        agent_name, 'pre_training_test', 'roundabout'
    )
    
    # Then train in Roundabout
    roundabout_agent = train(
        lane_changing_agent, roundabout_env, 
        agent_name, 'roundabout', 'roundabout'
    )
    
    # Test in Roundabout
    performance["roundabout"] = test(
        roundabout_agent, roundabout_env, 
        agent_name, 'pos_training_test', 'roundabout'
    )
    
    # Stage 3: Overtaking
    logger.info("Stage 3: Overtaking Environment")
    
    # First, test the roundabout agent in overtaking
    performance["roundabout_overtaking"] = test(
        roundabout_agent, overtaking_env, 
        agent_name, 'pre_training_test', 'overtaking'
    )
    
    # Then train in Overtaking
    overtaking_agent = train(
        roundabout_agent, overtaking_env, 
        agent_name, 'overtaking', 'overtaking'
    )
    
    # Test in Overtaking
    performance["overtaking"] = test(
        overtaking_agent, overtaking_env, 
        agent_name, 'pos_training_test', 'overtaking'
    )
    
    logger.info("Sequential Environment Pipeline Completed")

    # Save the final agent
    create_directory(f"models/{agent_name}/final")
    overtaking_agent.save(f"models/{agent_name}/{agent_name}_final_model")
    
    return overtaking_agent, performance

- ### Agent 1

In [None]:
# Execute the pipeline to agent 1

# a1_results = pipeline(...)

# Display Results

agent_DQN = DQN('MlpPolicy', lane_changing_env,
        policy_kwargs=dict(net_arch=[256, 256]),
        learning_rate = 1e-4,
        buffer_size=15000,
        learning_starts=200,
        batch_size=64,
        gamma=0.8,
        train_freq=1,
        gradient_steps=1,
        target_update_interval=50,
        verbose=1)

final_agent_DQN, performance = rl_pipeline(agent_DQN, 'agent_DQN', lane_changing_env, roundabout_env, overtaking_env)

- ### Agent N

## Performance

In [None]:
print(performance)

In [None]:
# lane changing performance
lane_changing_performance = performance["lane_changing"]

print("Lane Changing Performance")
print("Average Reward: ", lane_changing_performance["kpis"]["average_reward"])
print("Total Collisions: ", lane_changing_performance["kpis"]["total_collisions"])
print("Average Speed: ", lane_changing_performance["kpis"]["average_speed"])
print("Speed Variance: ", lane_changing_performance["kpis"]["speed_variance"])
print("\n")

In [None]:
# lane changing roundabout performance
lane_changing_roundabout_performance = performance["lane_changing_roundabout"]

print("Lane Changing Roundabout Performance")
print("Average Reward: ", lane_changing_roundabout_performance["kpis"]["average_reward"])
print("Total Collisions: ", lane_changing_roundabout_performance["kpis"]["total_collisions"])
print("Average Speed: ", lane_changing_roundabout_performance["kpis"]["average_speed"])
print("Speed Variance: ", lane_changing_roundabout_performance["kpis"]["speed_variance"])
print("\n")

In [None]:
# roundabout performance
roundabout_performance = performance["roundabout"]

print("Roundabout Performance")
print("Average Reward: ", roundabout_performance["kpis"]["average_reward"])
print("Total Collisions: ", roundabout_performance["kpis"]["total_collisions"])
print("Average Speed: ", roundabout_performance["kpis"]["average_speed"])
print("Speed Variance: ", roundabout_performance["kpis"]["speed_variance"])
print("\n")

In [None]:
# roundabout overtaking performance
roundabout_overtaking_performance = performance["roundabout_overtaking"]

print("Roundabout Overtaking Performance")
print("Average Reward: ", roundabout_overtaking_performance["kpis"]["average_reward"])
print("Total Collisions: ", roundabout_overtaking_performance["kpis"]["total_collisions"])
print("Average Speed: ", roundabout_overtaking_performance["kpis"]["average_speed"])
print("Speed Variance: ", roundabout_overtaking_performance["kpis"]["speed_variance"])
print("\n")

In [None]:
# overtaking performance
overtaking_performance = performance["overtaking"]

print("Overtaking Performance")
print("Average Reward: ", overtaking_performance["kpis"]["average_reward"])
print("Total Collisions: ", overtaking_performance["kpis"]["total_collisions"])
print("Average Speed: ", overtaking_performance["kpis"]["average_speed"])
print("Speed Variance: ", overtaking_performance["kpis"]["speed_variance"])
print("\n")

## Analysis

## Conclusion