In [9]:
import os
import logging
import numpy as np
import random
from collections import deque
import sys
import contextlib

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam

# Suppress TensorFlow logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Test logging output
logging.debug('Debug logging is working.')
logging.info('Info logging is working.')
print('Print statements are working.')


Print statements are working.


In [10]:
from gym import spaces

class TrafficEnv:
    def __init__(self):
        # Traffic light states: 1 for green, 0 for red
        self.signal_NS = 1  # Initially green for NS
        self.signal_EW = 0  # Initially red for EW
        self.cars_NS = np.random.randint(0, 10)  # Cars waiting in NS direction
        self.cars_EW = np.random.randint(0, 10)  # Cars waiting in EW direction

        # Observation space includes the number of cars and traffic light states
        self.observation_space = spaces.Box(
            low=np.array([0, 0, 0, 0, 0]), 
            high=np.array([np.inf, np.inf, 1, 1, np.inf]), 
            dtype=np.float32
        )

        # Action space: each agent can choose to keep or switch the signal
        self.action_space = spaces.MultiDiscrete([2, 2])  # Actions for NS and EW signals

        self.time_since_last_switch = 0
        self.state = self.get_state()

    def get_state(self):
        # State includes number of cars and signal states
        return np.array([
            self.cars_NS, 
            self.cars_EW, 
            self.signal_NS, 
            self.signal_EW, 
            self.time_since_last_switch
        ])

    def reset(self):
        self.cars_NS = np.random.randint(0, 10)
        self.cars_EW = np.random.randint(0, 10)
        self.signal_NS = 1
        self.signal_EW = 0
        self.time_since_last_switch = 0
        self.state = self.get_state()
        return self.state

    def step(self, action):
        reward = 0
        self.time_since_last_switch += 1

        # Unpack the action for both directions
        action_NS, action_EW = action

        # Update traffic lights based on actions
        self.signal_NS = action_NS
        self.signal_EW = action_EW

        # Update car movements based on current signals
        cars_passed_NS = 0
        cars_passed_EW = 0
        if self.signal_NS == 1:
            cars_passed_NS = min(self.cars_NS, np.random.randint(1, 3))
            self.cars_NS -= cars_passed_NS
            reward += cars_passed_NS
        if self.signal_EW == 1:
            cars_passed_EW = min(self.cars_EW, np.random.randint(1, 3))
            self.cars_EW -= cars_passed_EW
            reward += cars_passed_EW

        # New cars arrive at the intersection
        self.cars_NS += np.random.randint(0, 3)
        self.cars_EW += np.random.randint(0, 3)

        # Update state
        self.state = self.get_state()
        done = self.time_since_last_switch >= 100  # End the episode after 100 steps

        return self.state, reward, done, {}

    def update_traffic_lights(self, action, direction):
        # Deprecated method; actions are now handled in step()
        pass


In [14]:
@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        sys.stdout = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size  # e.g., 5
        self.action_size = action_size  # e.g., 2
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # Discount rate
        self.epsilon = 1.0   # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()
        
    def _build_model(self):
        # Build the neural network model
        model = Sequential()
        model.add(Input(shape=(self.state_size,)))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model
        
    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        with suppress_stdout():
            act_values = self.model.predict(np.array([state]), verbose=0)
        return np.argmax(act_values[0])  # Returns action
        
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def train(self, batch_size=32):
        if len(self.memory) < batch_size:
            logging.debug('Not enough memory to train.')
            return
        minibatch = random.sample(self.memory, batch_size)
        states = []
        targets = []
        for idx, (state, action, reward, next_state, done) in enumerate(minibatch):
            target = reward
            if not done:
                with suppress_stdout():
                    target += self.gamma * np.amax(self.model.predict(np.array([next_state]), verbose=0)[0])
            target_f = self.model.predict(np.array([state]), verbose=0)
            target_f[0][action] = target
            states.append(state)
            targets.append(target_f[0])
            logging.debug(f'Training on sample {idx+1}/{batch_size}')
        with suppress_stdout():
            self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0)
        # Reduce exploration rate
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        # Log training progress
        logging.debug(f'Agent trained on batch of size {batch_size}. Epsilon: {self.epsilon:.4f}')
        
    def load(self, name):
        self.model.load_weights(name)
        
    def save(self, name):
        self.model.save_weights(name)


In [12]:
def train_multi_agent(env, agent1, agent2, episodes, batch_size=32):
    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0
        step_count = 0

        logging.info(f'Starting Episode {episode+1}/{episodes}')

        while not done:
            # Agent 1 (NS direction) takes an action
            action1 = agent1.act(state)
            # Agent 2 (EW direction) takes an action
            action2 = agent2.act(state)

            # Both agents' actions are applied to the environment
            next_state, reward, done, _ = env.step([action1, action2])

            # Each agent gets the same reward and next state (you might want to customize this)
            agent1.remember(state, action1, reward, next_state, done)
            agent2.remember(state, action2, reward, next_state, done)

            state = next_state
            total_reward += reward
            step_count += 1

            # Train both agents
            agent1.train(batch_size)
            agent2.train(batch_size)

            # Log step information every 10 steps
            if step_count % 10 == 0:
                logging.debug(f'Episode {episode+1}, Step {step_count}, Total Reward: {total_reward}')

        logging.info(f'Episode {episode+1} completed. Steps: {step_count}, Total Reward: {total_reward}')
        logging.info(f'Agent1 Epsilon: {agent1.epsilon:.4f}, Agent2 Epsilon: {agent2.epsilon:.4f}')

    # Save trained models
    agent1.save("agent1_model.h5")
    agent2.save("agent2_model.h5")
    logging.info('Training completed and models saved.')


In [None]:
if __name__ == "__main__":
    env = TrafficEnv()
    state_size = env.observation_space.shape[0]
    action_size = env.action_space.nvec[0]  # Since action_space is MultiDiscrete

    # Initialize two DQN agents for the two directions
    agent1 = DQNAgent(state_size, action_size)
    agent2 = DQNAgent(state_size, action_size)

    # Start training
    logging.info('Starting training process...')
    train_multi_agent(env, agent1, agent2, episodes=100)