***DDPG STRUCTURE***

Try to train high epochs but still this model need to br finetune and is less accurate that the nn model in this same repo

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim,hidden_dim=256):
      super(Actor, self).__init__()
      self.network = nn.Sequential(
          nn.Linear(state_dim, hidden_dim),
          nn.ReLU(),
          nn.Linear(hidden_dim, hidden_dim),
          nn.ReLU(),
          nn.Linear(hidden_dim, action_dim),
          nn.Tanh()
      )

    def forward(self, state):
      return self.network(state)

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=256):
        super(Critic, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(state_dim + action_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, 1),
        )
    def forward(self,state,action):
        x = torch.cat([state,action], dim=1)
        return self.network(x)



class ReplayBuffer:
  def __init__(self,capacity):
    self.buffer = deque(maxlen=capacity)

  def push(self,state,action,reward,next_state,done):
    self.buffer.append((state,action,reward,next_state,done))

  def sample(self,batch_size):
    transitions = random.sample(self.buffer,batch_size)
    state,action,reward,next_state,done=zip(*transitions)

    return (
        torch.FloatTensor(state),
        torch.FloatTensor(action),
        torch.FloatTensor(reward),
        torch.FloatTensor(next_state),
        torch.FloatTensor(done)
    )

  def __len__(self):
    return len(self.buffer)


class DDPG:
  def __init__(self,state_dim,action_dim,hidden_dim=28,buffer_size=1000000,batch_size=64,gamma=0.99,tau=0.5,actor_lr=1e-4,critic_lr=1e-3):

    self.actor=Actor(state_dim,action_dim,hidden_dim)
    self.actor_target=Actor(state_dim,action_dim,hidden_dim)
    self.actor_target.load_state_dict(self.actor.state_dict())

    self.critic=Critic(state_dim,action_dim,hidden_dim)
    self.critic_target=Critic(state_dim,action_dim,hidden_dim)
    self.critic_target.load_state_dict(self.critic.state_dict())

    self.actor_optimizer=optim.Adam(self.actor.parameters(),lr=actor_lr)
    self.critic_optimizer=optim.Adam(self.critic.parameters(),lr=critic_lr)

    self.replay_buffer=ReplayBuffer(buffer_size)

    self.batch_size = batch_size
    self.gamma=gamma
    self.tau=tau


  def select_action(self, state, noise_std=50):
    with torch.no_grad():
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).squeeze(0).numpy()  # Output is in [-1,1]

    # Scale action from [-1, 1] to [1, 30]
    min_action, max_action = 0, 30
    action = (action + 1) / 2 * (max_action - min_action) + min_action  # Scale to [1, 30]

    # Add noise
    action += np.random.normal(0, noise_std, size=action.shape)

    # Ensure within bounds, round, and convert to integer array
    action = np.clip(action, min_action, max_action)  # Keep within [1, 30]
    action = np.round(action).astype(int)  # Convert all elements to integers

    return action  # Keep as NumPy array

  def select_action1(self, state, noise_std=0.1):
    with torch.no_grad():
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).squeeze(0).numpy()  # Output is in [-1,1]

    # Scale action from [-1, 1] to [1, 30]
    min_action, max_action = 0, 30
    action = (action + 1) / 2 * (max_action - min_action) + min_action  # Scale to [1, 30]

    # Ensure within bounds, round, and convert to integer array
    action = np.clip(action, min_action, max_action)  # Keep within [1, 30]
    action = np.round(action).astype(int)  # Convert all elements to integers

    return action  # Keep as NumPy array




  def train(self):
    if len(self.replay_buffer) < self.batch_size:
      return

    state,action,reward,next_state,done=self.replay_buffer.sample(self.batch_size)

    with torch.no_grad():
      next_action=self.actor_target(next_state)
      target_Q=self.critic_target(next_state,next_action)
      target_Q=reward.unsqueeze(1)+(1-done.unsqueeze(1))*self.gamma*target_Q

    current_Q=self.critic(state,action)
    critic_loss=nn.MSELoss()(current_Q,target_Q)

    self.critic_optimizer.zero_grad()
    critic_loss.backward()
    self.critic_optimizer.step()

    actor_loss = -self.critic(state,self.actor(state)).mean()


    self.actor_optimizer.zero_grad()
    actor_loss.backward()
    self.actor_optimizer.step()


    self._soft_update(self.actor_target, self.actor)
    self._soft_update(self.critic_target, self.critic)

  def _soft_update(self, target, source):

    for target_param, param in zip(target.parameters(), source.parameters()):
        target_param.data.copy_(self.tau * param.data + (1.0 - self.tau) * target_param.data)

  def save_model(self, filename="ddpg_agent.pth"):
        torch.save({
            'actor_state_dict': self.actor.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'actor_optimizer': self.actor_optimizer.state_dict(),
            'critic_optimizer': self.critic_optimizer.state_dict(),
        }, filename)
        print(f"Model saved to {filename}")


**Test the model if:**

In [None]:
def test_ddpg():
    state_dim = 10
    action_dim = 1
    agent = DDPG(state_dim, action_dim)

    state = np.random.randn(state_dim)
    action = agent.select_action(state)

    # Ensure action is properly shaped (1D array, not scalar)
    assert action.shape == (action_dim,), f"Expected shape ({action_dim},) but got {action.shape}"

    for _ in range(100):
        state = np.random.randn(state_dim)
        action = agent.select_action(state)

        # Ensure action is properly formatted
        action = np.atleast_1d(action)  # Convert scalar to 1D array if needed

        reward = np.random.randn()
        next_state = np.random.randn(state_dim)
        done = False

        agent.replay_buffer.push(state, action, reward, next_state, done)
        agent.train()

    print("All tests passed!")


if __name__ == "__main__":
    test_ddpg()


**GENRATE RANDOM INPUT AND OTHER FUNCTION FOR ENV**

In [None]:
import random

def generate_input():
    cars_in_lane = [random.randint(0, 30) for _ in range(4)]  # 4 values from 0-100
    wait_times = [random.randint(1, 60) for _ in range(4)]  # 4 values from 0-8
    wait_times[-1]=0

    return cars_in_lane + wait_times

# Example usage
random_input = generate_input()
print(random_input)  # Output: [num1, num2, num3, num4, wt1, wt2, wt3, wt4,prelane]


def rotate_list(lst, n):
    return lst[n:] + lst[:n]  # Moves first `n` elements to the end



print(rotate_list([1,2,3,4],1))


import random

def rotate_update(wait_times, nn_output):

    selected_lane = 0 # Extract lane number (0-3)
    green_time = nn_output[0]  # Extract green light duration (in seconds)

    # Split the wait_times list
    carsinlane = wait_times[:4]  # First 4 elements: Number of cars in each lane
    carswaittime = wait_times[4:8]  # Last 4 elements: Waiting time per lane

    # 🔹 Randomized car flow calculation
    cars_per_second = 1
    cars_passed = min(carsinlane[selected_lane], cars_per_second * green_time)

    # 🔹 Update selected lane cars
    carsinlane[selected_lane] -= cars_passed

    # 🔹 Update waiting time for selected lane
    carswaittime[selected_lane] = 0

    # 🔹 Update waiting time for other lanes
    for i in range(4):
        if i != selected_lane:
            if carsinlane[i] > 0:
                carswaittime[i] += green_time  # Increase wait time for lanes with cars
            else:
                carswaittime[i] = 0  # If no cars, reset wait time
    selected_lane=[selected_lane]
   # print(carsinlane)
    carsinlane=rotate_list(carsinlane, 1)
   # print(carsinlane)
   # print(carswaittime)
    carswaittime=rotate_list(carswaittime, 1)
   # print(carswaittime)


    return carsinlane + carswaittime

# 🔹 Example usage
wait_times =  generate_input()  # Initial cars & wait times
print(wait_times)
nn_output = [10]  # Lane 2 gets green light for 10 sec

updated_wait_times = rotate_update(wait_times, nn_output)
print(updated_wait_times)  # Updated list with cars reduced properly


**REWARD FUNCTION**

In [None]:
import statistics

def calculate_reward(input_list, output_list, nn_output):
    """
    Reward function for reinforcement learning in traffic management.

    Parameters:
        input_list: List containing initial traffic state [cars_in_lane, wait_times].
        output_list: List containing updated traffic state after signal change.
        nn_output: Output from the neural network [allocated green time].

    Returns:
        reward: A numerical value guiding reinforcement learning optimization.
    """

    # **1. Extract input and output values**
    cars_in_lane = input_list[:4]       # Cars currently in each lane
    new_cars_in_lane = output_list[:4]  # Updated number of cars after signal change
    cars_wait_time = input_list[4:8]    # Waiting times before signal change
    new_cars_wait_time = output_list[4:8]  # Updated waiting times after signal change

    # **2. Identify lanes exceeding max wait time (120 sec)**
    exceeded_times = [wait for wait in new_cars_wait_time if wait >= 120]
    diff_exceeded = [wait - 120 for wait in new_cars_wait_time if wait >= 120]

    # Number of lanes exceeding max wait
    num_exceeded = len(exceeded_times)

    # **3. Extract neural network outputs**
    selected_lane = 0  # Always the first lane
    given_green = nn_output[0]  # Green time assigned

    # **4. Calculate number of cars that passed during this cycle**
    cars_passed = cars_in_lane[selected_lane] - new_cars_in_lane[-1]

    # **5. Initialize reward**
    reward = 0

    # **6. Penalize exceeding 120 sec wait**
    reward -= sum(diff_exceeded)
    if num_exceeded > 0:
        reward *= num_exceeded  # Amplify penalty if multiple lanes exceed

    # **7. Encourage passing cars but balance time distribution**
    reward += cars_passed

    # **8. Prevent over-assigning time to a single lane**
    avg_cars = statistics.mean(cars_in_lane)
    if cars_in_lane[selected_lane] < avg_cars * 0.5 and given_green > 10:
        reward -= 5

    # **9. Encourage efficiency**
    # If no cars remain in selected lane after green, give a small bonus
    if new_cars_in_lane[-1] == 0:
        reward += 5

    return reward


**TRAINING MODEL AND TESTING**

In [None]:
def test_ddpg():
    state_dim = 8
    action_dim = 1
    agent = DDPG(state_dim, action_dim)

    # 🔹 Training Phase
    print("Training the model...")
    state = np.array(generate_input()).reshape(state_dim)  # Initial state

    for _ in range(1):  # Training loop
        while True:
            action = np.atleast_1d(agent.select_action(state))

            nstate = state.tolist()
            naction = action.tolist()

            temp_next_state = rotate_update(nstate, naction)  # Simulate environment
            next_state = np.array(temp_next_state).flatten()  # Convert to array

            reward = calculate_reward(next_state, state, action)
            done = all(x == 0 for x in temp_next_state[:4])  # Stop when all cars clear

            agent.replay_buffer.push(state, action, reward, next_state, done)
            agent.train()

            state = next_state  # Continue with new state

            if done:
                break  # Exit loop if all cars are cleared

        state = np.array(generate_input()).reshape(state_dim)  # Generate new input

    print("Training completed!")

# Save model
    agent.save_model()
    print("model saved!!!")

    # 🔹 Testing Phase (Using Trained Model)
    print("Testing the trained model...")
    state = np.array(generate_input()).reshape(state_dim)  # Initial test state

    while True:
        action = np.atleast_1d(agent.select_action(state))  # Model selects action

        nstate = state.tolist()
        naction = action.tolist()

        temp_next_state = rotate_update(nstate, naction)  # Simulate environment
        next_state = np.array(temp_next_state).flatten()  # Convert to array

        print(f"State: {state.tolist()}, Action: {action.tolist()}, Next State: {next_state.tolist()}")

        done = all(x == 0 for x in temp_next_state[:4])  # Stop when all cars are cleared

        state = next_state  # Continue with updated state

        if done:
            print("Testing completed! All cars cleared.")
            break  # Exit loop

if __name__ == "__main__":
    test_ddpg()
