In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np

torch.__version__

'2.1.1'

In [2]:
# Determine the best available device for PyTorch operations (Device Agnostic Code)
if torch.cuda.is_available():
    device = 'cuda' # GPU
elif torch.backends.mps.is_available():
    device = 'mps' # GPU for MacOS (Metal Programming Framework)
else:
    device = 'cpu' # CPU

print(f'Device set to: {device}')

Device set to: mps


In [None]:
# Constants
TLS_ID = '209'
HIDDEN_SIZE = 64
LEARNING_RATE = 0.001
STARTING_EPSILON = 1.0
EPSILON_DECAY = 0.995
MIN_EPSILON = 0.01

In [8]:
import api_endpoints
# Check what data we have
simulation_id = api_endpoints.start_simulation('scenarios/bologna/acosta/run.sumocfg')
simulation_id

'b99f154e-304a-4fa2-8071-8ea5172f115a'

In [10]:
api_endpoints.get_traffic_lights(simulation_id)

[{'current_phase': 0,
  'current_phase_duration': 86400.0,
  'current_phase_duration_max': 86400.0,
  'id': '209',
  'logics': [{'phases': [{'duration': 42.0,
      'maxDur': 42.0,
      'minDur': 42.0,
      'state': 'GrGGGGg'},
     {'duration': 3.0, 'maxDur': 3.0, 'minDur': 3.0, 'state': 'yryyyyy'},
     {'duration': 42.0, 'maxDur': 42.0, 'minDur': 42.0, 'state': 'GGrGGrr'},
     {'duration': 3.0, 'maxDur': 3.0, 'minDur': 3.0, 'state': 'yyryyrr'}],
    'program_id': '0',
    'type': 0},
   {'phases': [{'duration': 86400.0,
      'maxDur': 117.0,
      'minDur': 45.0,
      'state': 'GrGGGGg'},
     {'duration': 86400.0, 'maxDur': 3.0, 'minDur': 3.0, 'state': 'yrGGGyy'},
     {'duration': 86400.0, 'maxDur': 7.0, 'minDur': 7.0, 'state': 'rrGGGrr'},
     {'duration': 86400.0, 'maxDur': 3.0, 'minDur': 3.0, 'state': 'rryyyrr'},
     {'duration': 86400.0, 'maxDur': 3.0, 'minDur': 3.0, 'state': 'rrrrrrr'},
     {'duration': 86400.0, 'maxDur': 26.0, 'minDur': 26.0, 'state': 'rGrrrrr'},
    

In [4]:
# Neural Network Atchitecture
class DQNNetwork(nn.Module):
    def __init__(self, state_size: int, action_size: int, hidden_size: int = 64):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size)
        )
        
    def forward(self, state: torch.Tensor) -> torch.Tensor:
        return self.layers(state)

In [5]:
import api_endpoints
# Deep Q-Learning Agent
class DQNAgent:
    def __init__(self, state_size: int, action_size: int, config_path: str, tls_id: str, hidden_size: int = 64, learning_rate: float = 0.001, starting_epsilon: float = 1.0, epsilon_decay: float = 0.995, min_epsilon: float = 0.01):
        self.state_size = state_size
        self.action_size = action_size # number of actions (Make step, Change phase, Change program)
        self.hidden_size = hidden_size
        
        self.model = DQNNetwork(state_size, action_size, hidden_size).to(device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()
        self.epsilon = starting_epsilon # Exploration rate
        
        # Simulation related variables
        self.session_id = api_endpoints.start_simulation(config_path)
        self.tls_id = tls_id
        
    def act(self, state: np.ndarray) -> int:
        # Epsilon-greedy action selection
        if random.random() < self.epsilon:
            return random.randrange(self.action_size)
        else:
            state = torch.from_numpy(state).float().unsqueeze(0).to(device)
            with torch.inference_mode():
                action_values = self.model(state)
            return torch.argmax(action_values).item()
        
    def learn(self, state: np.ndarray, action: int, reward: float, next_state: np.ndarray, done: bool) -> None:
        

In [None]:
from tqdm import tqdm
EPISODES = 1000
for episode in tqdm(range(EPISODES)):
