# Reinforcement Learning-Based Traffic Light Optimization

### Group 5
| #     | Name |
|-------|---------------|
|8965985|Pradeepti Kasam|
|9027375|Khushbu Lad|
|8944328|Akshata Madhav|
|8914803|Rohit Totlani| 
|8964515|Neha Yadav|

## Introduction

This project aims to develop a reinforcement learning (RL) agent capable of optimizing traffic flow by managing a single traffic light at an intersection. The agent will learn to adjust the signal timing dynamically to reduce congestion and improve vehicle movement efficiency.

#### States
The state represents the ***current condition*** of the ***traffic intersection***

***NS*** : Sum of cars in the North-South direction.<br/>
***EW***: Sum of cars in the East-West direction.<br/>
***NS_array***: Array representing the number of cars in different positions in the North-South direction.<br/>
***EW_array***: Array representing the number of cars in different positions in the East-West direction.<br/>


#### Actions
Actions represent the choices available to the agent that controls the traffic lights

***Action 0*** – Allow traffic to flow in the North-South (NS) direction (green light for NS, red light for EW). <br>
***Action 1*** – Allow traffic to flow in the East-West (EW) direction (green light for EW, red light for NS).

#### Discrete State Representation

Given that the number of cars is kept between 0 and 5 for both directions (NS and EW), the state space is discrete with values ranging from 0 to 5 for both NS and EW.

Discrete state can be a tuple (NS, EW) where both NS and EW are between 0 and 5:
<br/>
<br/>
```state = (min(state["NS"], 5), min(state["EW"], 5))```

#### State space
For each intersection, there are 6 possible values (0 through 5) for both NS and EW, so the total number of possible discrete states is:

State space: 6 (NS values) × 6 (EW values) = ***36 possible states***.

#### State Action Diagram

<img src="./StateActionDiagram.png" alt="Alt text" width="500" height="500">

#### Reward

- The reward function encourages clearing vehicles from the intersection, i.e., minimizing traffic.
<br/>
<br/>
```reward = cleared```
<br/>
<br/>
- Every time, agent take an action, vehicles in last selected directions are cleared, ***the number of vehicles cleared from intersection becomes rewards***

#### State / Value Functions

```state = (min(NS_count, 5), min(EW_count, 5))```

##### Value Function

Estimated future reward from a state or state-action pair
<br/>
<br/>
```q_table = {(ns, ew): [0, 0]}``` 

Updated using Q-Function
<br/>
<br/>
```Q(s, a) ← Q(s, a) + α * (r + γ * max(Q(s', a')) - Q(s, a))```

#### Policy

Following Greedy Policy

```select_action(q_table, state, epsilon)```

- With probability ε, choose a random action (exploration).
- With probability 1−ε, choose best known action (exploitation):


#### Markov Decision Process (MDP)

- Simulation is a classic finite MDP
- Next state only depends on current state and action, not on past steps.
- It includes randomness in vehicle inflow (modeled by np.random.choice).

#### Gradients

- Although your current Q-learning algorithm uses table-based updates
- Policy gradients or value gradients are used to update neural networks.<br/>
<br/>
```Q(s, a) ← Q(s, a) + α * (target - Q(s, a))```
<br/><br/>
- Here, temporal difference (TD) update
- a form of gradient descent over Q-values using:
    - α: learning rate (step size)
    - target: reward + discounted future value

## Code Setup

### Install libraries

In [1]:
import sys
if sys.version_info < (3,12, 6):
    sys.exit("This project requires Python 3.12.6.")
else:
    print("Python version is compatible")

Python version is compatible


In [None]:
## Install necessary libraries
# To install the required libraries, run the following commands in a separate cell:
!python -m pip install --upgrade pip
!py -3.12 -m venv myenv venvRLTLOP
print("Virtual environment created successfully")
!.\venvRLTLOP\Scripts\Activate.ps1
print("Virtual environment activated successfully")
!pip install -r requirements.txt
print("All libraries installed successfully")


'py3.12' is not recognized as an internal or external command,
operable program or batch file.


### Import Libraries

In [3]:
import numpy as np
import matplotlib.pyplot as plt
import time
import pickle
import os
import streamlit as st

## Traffic Light Optimization using Q-learning

#### Class : TrafficIntersection

```cars_ns``` : the number of cars at different positions for the north-south (NS) <br/>
```cars_ew``` : the number of cars at different positions for the east-west (EW) <br/>
```inflow_prob``` : The probability that a new car will enter the intersection from either direction <br/>
```total_cleared``` : The total number of cars cleared from the intersection.

In [4]:
# ---------------- Traffic Environment ---------------- #
class TrafficIntersection:

    # Initializes the intersection with default values for the number of cars and inflow probability.
    def __init__(self, inflow_prob=0.5):
        self.cars_ns = np.zeros(5, dtype=int) 
        self.cars_ew = np.zeros(5, dtype=int)
        self.inflow_prob = inflow_prob
        self.total_cleared = 0
        
    # Resets the environment (cars at the intersection) and returns the current state.
    def reset(self):
        self.cars_ns[:] = 0
        self.cars_ew[:] = 0
        self.total_cleared = 0
        return self.get_state()

    # Executes a step in the environment. 
    # It takes an action (0 for NS green light, 1 for EW green light), 
    # updates the state, car positions, and clears cars based on the action.
    def step(self, action):
        cleared = 0
        if action == 0:
            cleared = self.cars_ns[-1]
            self.cars_ns[1:] = self.cars_ns[:-1]
            self.cars_ns[0] = np.random.choice([0, 1], p=[1 - self.inflow_prob, self.inflow_prob])
            self.cars_ew += np.random.choice([0, 1], size=5, p=[0.7, 0.3])
        else:
            cleared = self.cars_ew[-1]
            self.cars_ew[1:] = self.cars_ew[:-1]
            self.cars_ew[0] = np.random.choice([0, 1], p=[1 - self.inflow_prob, self.inflow_prob])
            self.cars_ns += np.random.choice([0, 1], size=5, p=[0.7, 0.3])

        self.cars_ns = np.clip(self.cars_ns, 0, 1)
        self.cars_ew = np.clip(self.cars_ew, 0, 1)

        self.total_cleared += cleared
        reward = cleared
        state = self.get_state()
        return state, reward, reward > 0

    # Returns the current state of the intersection
    def get_state(self):
        return {
            "NS": self.cars_ns.sum(),
            "EW": self.cars_ew.sum(),
            "NS_array": self.cars_ns.copy(),
            "EW_array": self.cars_ew.copy()
        }




#### Q-Learning Functions

In [5]:
# Converts the state (number of cars in NS and EW directions) into a discrete representation suitable for Q-learning.
def get_discrete_state(state):
    return (min(state["NS"], 5), min(state["EW"], 5))

# Chooses an action (0 or 1) based on the epsilon-greedy strategy. 
# It either selects a random action or the best action based on the Q-table.
def select_action(q_table, state, epsilon):
    if np.random.random() < epsilon:
        return np.random.choice([0, 1])
    return np.argmax(q_table[state])

# Updates the Q-table using the Q-learning update rule, based on the current state, action taken, reward, and the next state.
def update_q_table(q_table, state, action, reward, next_state, alpha, gamma):
    old_value = q_table[state][action]
    future_max = np.max(q_table[next_state])
    new_value = old_value + alpha * (reward + gamma * future_max - old_value)
    q_table[state][action] = new_value

#### Agent Training

- Trains the agents using ***Q-learning*** for ```n_intersections``` intersections over ```episodes``` number of episodes.
- For each intersection, a Q-table is initialized, and the traffic light action is taken according to the Q-learning policy.
- The environment is updated, and the Q-table is updated based on the reward.

In [6]:
def train_agents(n_intersections, episodes=500, alpha=0.1, gamma=0.9, epsilon=0.1):
    q_tables = []
    all_rewards = []
    inflow_probs = np.linspace(0.3, 0.7, n_intersections)

    for i in range(n_intersections):
        q_table = {(ns, ew): [0, 0] for ns in range(6) for ew in range(6)}
        rewards_per_episode = []
        env = TrafficIntersection(inflow_prob=inflow_probs[i])

        for ep in range(episodes):
            state = get_discrete_state(env.reset())
            total_reward = 0
            for step in range(50):
                action = select_action(q_table, state, epsilon)
                next_state, reward, _ = env.step(action)
                next_state = get_discrete_state(next_state)
                update_q_table(q_table, state, action, reward, next_state, alpha, gamma)
                state = next_state
                total_reward += reward
            rewards_per_episode.append(total_reward)

        q_tables.append(q_table)
        all_rewards.append(rewards_per_episode)

    return q_tables, all_rewards

#### Demo Agent logic

In [7]:
# creates a visualization of the traffic intersection using matplotlib
def draw_intersection(state, action, step, idx, countdown, highlight_clear):
    fig, ax = plt.subplots(figsize=(4, 4))
    ax.set_xlim(0, 10)
    ax.set_ylim(0, 10)
    ax.axis('off')
    ax.set_facecolor("#2e2e2e")
    ax.add_patch(plt.Rectangle((0, 4.5), 10, 1, color="#444"))
    ax.add_patch(plt.Rectangle((4.5, 0), 1, 10, color="#444"))
    for i in range(0, 10, 1):
        ax.plot([i, i + 0.5], [5.0, 5.0], color="white", linewidth=1, linestyle="--")
        ax.plot([5.0, 5.0], [i, i + 0.5], color="white", linewidth=1, linestyle="--")
    light_ns_color = "green" if action == 0 else "red"
    light_ew_color = "green" if action == 1 else "red"
    ax.add_patch(plt.Circle((5, 9), 0.4, color=light_ns_color))
    ax.text(5, 8.3, f"{countdown}s", ha='center', va='center', fontsize=10, color='black', bbox=dict(facecolor='white', boxstyle='round,pad=0.2'))
    ax.add_patch(plt.Circle((9, 5), 0.4, color=light_ew_color))
    ax.text(8.2, 5, f"{countdown}s", ha='center', va='center', fontsize=10, color='black', bbox=dict(facecolor='white', boxstyle='round,pad=0.2'))

    for i in range(5):
        if state["NS_array"][i]:
            ax.add_patch(plt.Rectangle((4.6, 9 - i), 0.8, 0.5, color="red", alpha=0.9))
        if state["EW_array"][i]:
            ax.add_patch(plt.Rectangle((i, 4.6), 0.5, 0.8, color="blue", alpha=0.9))

    if highlight_clear:
        ax.text(5, 5.2, "+1", ha='center', va='center', fontsize=14, color='lime', fontweight='bold')
    ax.text(5, 1, "S ↓", ha='center', va='center', fontsize=9, color='white')
    ax.text(5, 9.3, "N ↑", ha='center', va='center', fontsize=9, color='white')
    ax.text(1.2, 5, "W ←", ha='center', va='center', fontsize=9, color='white')
    ax.text(8.8, 5, "E →", ha='center', va='center', fontsize=9, color='white')
    fig.text(0.5, 1.0, f"Intersection {idx+1}", ha='center', fontsize=12, fontweight='bold', color='black')
    fig.text(0.5, -0.05, "🟥 Red = Stop     🟩 Green = Go     ⏱ = Countdown     +1 = Vehicle Passed", ha='center', fontsize=9, color='black', bbox=dict(facecolor='white', edgecolor='gray', boxstyle='round,pad=0.4'))
    return fig

# This function simulates the traffic flow over a series of steps for all intersections,
# showing a real-time update of the environment using the learned Q-table.
def demo_agents(q_tables, steps=30, speed=0.4):
    inflow_probs = np.linspace(0.3, 0.7, len(q_tables))
    envs = [TrafficIntersection(inflow_prob=prob) for prob in inflow_probs]
    states = [get_discrete_state(env.reset()) for env in envs]
    placeholders = [st.empty() for _ in q_tables]
    intersection_states = [{"action": 0, "timer": 3} for _ in q_tables]

    for t in range(steps):
        for i, (env, q_table) in enumerate(zip(envs, q_tables)):
            if intersection_states[i]["timer"] == 0:
                intersection_states[i]["action"] = 1 - intersection_states[i]["action"]
                intersection_states[i]["timer"] = 3
            action = intersection_states[i]["action"]
            intersection_states[i]["timer"] -= 1
            full_state, _, cleared = env.step(action)
            fig = draw_intersection(full_state, action, t, i, intersection_states[i]["timer"] + 1, highlight_clear=cleared)
            placeholders[i].pyplot(fig)
            plt.close(fig)
            states[i] = get_discrete_state(full_state)
        time.sleep(speed)

    st.subheader("🚗 Total Vehicles Cleared per Intersection")
    for i, env in enumerate(envs):
        st.markdown(f"**Intersection {i+1}:** {env.total_cleared} vehicles")


#### Streamlit UI

In [8]:
# ---------------- Streamlit UI ---------------- #
st.title("Multi-Intersection Traffic Light Optimization (Q-learning)")

mode = st.selectbox("Choose Mode", ["Train Agent", "Demo Agent"])
n_intersections = st.slider("Number of Intersections", 1, 4, 2)
speed = st.slider("Demo Speed (sec/frame)", 0.1, 1.0, 0.4)

q_file = f"q_tables_{n_intersections}.pkl"

if mode == "Train Agent":
    st.info("Training Q-learning agents...")
    q_tables, rewards_list = train_agents(n_intersections)
    st.success("Training complete!")
    with open(q_file, "wb") as f:
        pickle.dump(q_tables, f)
    for i, rewards in enumerate(rewards_list):
        st.subheader(f"Intersection {i+1}")
        st.line_chart(rewards)

elif mode == "Demo Agent":
    if os.path.exists(q_file):
        with open(q_file, "rb") as f:
            q_tables = pickle.load(f)
        st.success("Loaded trained Q-tables.")
        demo_agents(q_tables, steps=30, speed=speed)
    else:
        st.error("No Q-tables found. Please train the agent(s) first.")


2025-04-13 18:27:25.920 
  command:

    streamlit run C:\Users\Khushbu.Lad\AppData\Roaming\Python\Python312\site-packages\ipykernel_launcher.py [ARGUMENTS]
2025-04-13 18:27:25.939 Session state does not function when running a script without `streamlit run`


#### Simulation : Run command in terminal

```streamlit run "multi_intersection_sim.py"```

In [9]:
!streamlit run multi_intersection_sim.py

^C


In [10]:
!python --version

Python 3.12.6
