<a href="https://colab.research.google.com/github/manikanta-eng/Reinforcement-learning/blob/main/project_traffic_signal.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [12]:
# ============================================================
# TRAFFIC SIGNAL OPTIMIZATION USING DEEP Q-LEARNING (DQN)
# FULL PROJECT SOURCE CODE IN ONE SINGLE FILE
# ============================================================

import random
import numpy as np
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ============================================================
# 1) TRAFFIC ENVIRONMENT (SIMULATION)
# ============================================================

class TrafficEnv:
    def __init__(self):
        self.max_queue = 50
        self.arrival_rate = [0.25, 0.15, 0.35, 0.20]     # arrival prob per lane
        self.depart_rate_green = 0.70                    # departure prob if green
        self.max_time_in_phase = 12
        self.n_approaches = 4
        self.current_phase = 0                          # 0 = NS green, 1 = EW green
        self.time_in_phase = 0

    def reset(self):
        self.queues = [random.randint(0, 3) for _ in range(self.n_approaches)]
        self.current_phase = 0
        self.time_in_phase = 0
        self.t = 0
        return self._get_state()

    def _get_state(self):
        scaled_time = self.time_in_phase / self.max_time_in_phase
        return np.array(self.queues + [self.current_phase, scaled_time], dtype=np.float32)

    def step(self, action):
        switched = False

        # Action 0 = extend, Action 1 = switch phase
        if action == 1:
            self.current_phase = 1 - self.current_phase
            self.time_in_phase = 0
            switched = True
        else:
            self.time_in_phase += 1
            if self.time_in_phase >= self.max_time_in_phase:
                self.current_phase = 1 - self.current_phase
                self.time_in_phase = 0
                switched = True

        # Arrivals
        for i in range(self.n_approaches):
            if random.random() < self.arrival_rate[i]:
                self.queues[i] = min(self.queues[i] + 1, self.max_queue)

        # Departures
        green = [0, 1] if self.current_phase == 0 else [2, 3]
        for lane in green:
            if self.queues[lane] > 0 and random.random() < self.depart_rate_green:
                self.queues[lane] -= 1

        # Reward: negative of total queue length
        reward = -sum(self.queues)
        if switched:
            reward -= 1  # small penalty for switching

        self.t += 1
        done = False
        return self._get_state(), reward, done, {"queues": self.queues.copy(), "phase": self.current_phase}


# ============================================================
# 2) DQN NETWORK
# ============================================================

class DQNNetwork(nn.Module):
    def __init__(self, state_dim, action_dim):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(state_dim, 64), nn.ReLU(),
            nn.Linear(64, 64), nn.ReLU(),
            nn.Linear(64, action_dim)
        )

    def forward(self, x):
        return self.model(x)


# ============================================================
# 3) DQN AGENT
# ============================================================

class DQNAgent:
    def __init__(self, state_dim, action_dim):
        self.gamma = 0.99
        self.lr = 1e-3
        self.batch_size = 64
        self.buffer_size = 20000
        self.target_update = 500

        self.epsilon = 1.0
        self.epsilon_min = 0.05
        self.epsilon_decay = 0.995

        self.replay = deque(maxlen=self.buffer_size)

        self.policy_net = DQNNetwork(state_dim, action_dim).to(device)
        self.target_net = DQNNetwork(state_dim, action_dim).to(device)
        self.target_net.load_state_dict(self.policy_net.state_dict())

        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=self.lr)

    def select_action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, 1)
        state_t = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
        with torch.no_grad():
            q_values = self.policy_net(state_t)
        return int(torch.argmax(q_values).item())

    def push(self, s, a, r, s2):
        self.replay.append((s, a, r, s2))

    def train_step(self):
        if len(self.replay) < self.batch_size:
            return

        batch = random.sample(self.replay, self.batch_size)
        states, actions, rewards, next_states = zip(*batch)

        states = torch.tensor(states, dtype=torch.float32, device=device)
        actions = torch.tensor(actions, dtype=torch.long, device=device).unsqueeze(1)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=device).unsqueeze(1)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=device)

        q_values = self.policy_net(states).gather(1, actions)

        with torch.no_grad():
            next_q = self.target_net(next_states).max(1)[0].unsqueeze(1)
            expected = rewards + self.gamma * next_q

        loss = nn.MSELoss()(q_values, expected)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

        return loss.item()


# ============================================================
# 4) TRAINING LOOP
# ============================================================

def train_model(episodes=100, steps=60):
    env = TrafficEnv()
    state_dim = 6
    action_dim = 2
    agent = DQNAgent(state_dim, action_dim)

    print("\n=========== TRAINING STARTED ===========\n")

    for ep in range(1, episodes + 1):
        state = env.reset()
        total_reward = 0
        losses = []

        for _ in range(steps):
            action = agent.select_action(state)
            next_state, reward, done, info = env.step(action)
            agent.push(state, action, reward, next_state)
            loss = agent.train_step()
            if loss:
                losses.append(loss)
            state = next_state
            total_reward += reward

        avg_loss = np.mean(losses) if losses else 0.0

        print(f"Episode {ep}/{episodes} | Reward: {total_reward:.2f} "
              f"| Avg Loss: {avg_loss:.4f} | Epsilon: {agent.epsilon:.3f}")

        if ep % 50 == 0:
            agent.target_net.load_state_dict(agent.policy_net.state_dict())

    torch.save(agent.policy_net.state_dict(), "traffic_model.pth")

    print("\n=========== TRAINING COMPLETED ===========")
    print("Model saved as traffic_model.pth")

    return agent


# ============================================================
# 5) EVALUATION
# ============================================================

def evaluate_model(agent, episodes=20, steps=60):
    env = TrafficEnv()
    total_queues = []

    print("\n=========== EVALUATION STARTED ===========\n")

    for ep in range(episodes):
        state = env.reset()
        avg_queue = 0

        for _ in range(steps):
            state_tensor = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
            q = agent.policy_net(state_tensor).cpu().detach().numpy()[0]
            action = int(np.argmax(q))

            next_state, reward, done, info = env.step(action)
            state = next_state
            avg_queue += sum(info["queues"])

        avg_queue /= steps
        total_queues.append(avg_queue)

        print(f"Evaluation Episode {ep+1}/{episodes} | Avg Queue: {avg_queue:.2f}")

    print("\n=========== EVALUATION COMPLETED ===========")
    print(f"Final Average Queue Across Episodes: {np.mean(total_queues):.2f}")


# ============================================================
# 6) MAIN EXECUTION
# ============================================================

if __name__ == "__main__":
    agent = train_model(episodes=100, steps=60)
    evaluate_model(agent)




Episode 1/100 | Reward: -189.00 | Avg Loss: 0.0000 | Epsilon: 1.000
Episode 2/100 | Reward: -230.00 | Avg Loss: 6.4716 | Epsilon: 0.751
Episode 3/100 | Reward: -299.00 | Avg Loss: 0.9808 | Epsilon: 0.556
Episode 4/100 | Reward: -427.00 | Avg Loss: 0.8612 | Epsilon: 0.412
Episode 5/100 | Reward: -379.00 | Avg Loss: 0.8552 | Epsilon: 0.305
Episode 6/100 | Reward: -306.00 | Avg Loss: 0.8658 | Epsilon: 0.226
Episode 7/100 | Reward: -439.00 | Avg Loss: 0.8181 | Epsilon: 0.167
Episode 8/100 | Reward: -431.00 | Avg Loss: 0.8487 | Epsilon: 0.124
Episode 9/100 | Reward: -541.00 | Avg Loss: 0.8832 | Epsilon: 0.092
Episode 10/100 | Reward: -540.00 | Avg Loss: 0.8668 | Epsilon: 0.068
Episode 11/100 | Reward: -324.00 | Avg Loss: 0.8325 | Epsilon: 0.050
Episode 12/100 | Reward: -585.00 | Avg Loss: 0.8272 | Epsilon: 0.050
Episode 13/100 | Reward: -445.00 | Avg Loss: 0.8677 | Epsilon: 0.050
Episode 14/100 | Reward: -565.00 | Avg Loss: 0.9078 | Epsilon: 0.050
Episode 15/100 | Reward: -376.00 | Avg Lo