In [None]:
!pip install tensorflow numpy matplotlib gym



In [None]:
import gym
import numpy as np
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten

In [None]:
# Define custom traffic signal environment
class TrafficSignalEnv(gym.Env):
    def __init__(self):
        super(TrafficSignalEnv, self).__init__()
        # Define state space
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(3, 3), dtype=np.uint8)
        # Define action space
        self.action_space = gym.spaces.Discrete(4)
        # Initialize state
        self.state = np.zeros((3, 3), dtype=np.uint8)

    def reset(self):
        # Reset state to initial state
        self.state = np.zeros((3, 3), dtype=np.uint8)
        return self.state

    def step(self, action):
        # Update state based on action (simplified for demonstration purposes)
        if action == 0:
            self.state[0][0] += 1
        elif action == 1:
            self.state[0][1] += 1
        elif action == 2:
            self.state[1][0] += 1
        elif action == 3:
            self.state[1][1] += 1

        reward = np.sum(self.state)
        done = False
        info = {}
        return self.state, reward, done, info

In [None]:

# DQNAgent
class DQNAgent:
    def __init__(self, state_shape, action_space, learning_rate=0.001, epsilon=1.0, epsilon_decay=0.995, gamma=0.99):
        self.state_shape = state_shape
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.gamma = gamma
        self.model = self.build_model()
        self.memory = deque(maxlen=1000)

    def build_model(self):
        model = Sequential([
            Flatten(input_shape=self.state_shape),
            Dense(24, activation='relu'),
            Dense(24, activation='relu'),
            Dense(self.action_space.n)
        ])
        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate), loss='mse')
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return self.action_space.sample()  # Explore
        else:
            q_values = self.model.predict(state[np.newaxis, :])  # Exploit
            return np.argmax(q_values[0])

    def replay(self, batch_size):
        if len(self.memory) < batch_size:
            return

        minibatch = random.sample(self.memory, batch_size)
        states = []
        targets = []

        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target += self.gamma * np.amax(self.model.predict(next_state[np.newaxis, :])[0])
            target_f = self.model.predict(state[np.newaxis, :])
            target_f[0][action] = target
            states.append(state)
            targets.append(target_f)

        self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0)

    def update_epsilon(self):
        self.epsilon *= self.epsilon_decay

    def train(self, env, episodes=1000, batch_size=32):
        total_rewards = []
        for episode in range(episodes):
            state = env.reset()
            done = False
            total_reward = 0
            while not done:
                action = self.choose_action(state)
                next_state, reward, done, _ = env.step(action)
                self.remember(state, action, reward, next_state, done)
                state = next_state
                total_reward += reward
                self.replay(batch_size)
                self.update_epsilon()
            total_rewards.append(total_reward)
        return total_rewards

In [None]:
# Q-Learning agent
class QLearningAgent:
    def __init__(self, action_space, learning_rate=0.1, epsilon=1.0, epsilon_decay=0.995, gamma=0.99):
        self.Q = {}  # Q-table
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.gamma = gamma

    def choose_action(self, state):
        if np.random.rand() <= self.epsilon:
            return self.action_space.sample()  # Explore
        else:
            return np.argmax(self.Q.get(tuple(state), [0] * self.action_space.n))  # Exploit

    def update_q_table(self, state, action, reward, next_state):
        state = tuple(state)
        next_state = tuple(next_state) if next_state is not None else None
        if state not in self.Q:
            self.Q[state] = [0] * self.action_space.n
        if next_state is not None and next_state not in self.Q:
            self.Q[next_state] = [0] * self.action_space.n
        old_q_value = self.Q[state][action]
        next_max_q = np.max(self.Q[next_state]) if next_state is not None else 0
        new_q_value = (1 - self.learning_rate) * old_q_value + self.learning_rate * (reward + self.gamma * next_max_q)
        self.Q[state][action] = new_q_value

    def update_epsilon(self):
        self.epsilon *= self.epsilon_decay

In [None]:
# Create environment and agents
env = TrafficSignalEnv()
dqn_agent = DQNAgent(env.observation_space.shape, env.action_space)
q_learning_agent = QLearningAgent(env.action_space)

In [None]:
# Train agents and collect rewards
num_episodes = 100
dqn_rewards = dqn_agent.train(env, episodes=num_episodes)
q_learning_rewards = []

for episode in range(num_episodes):
    state = env.reset()
    done = False
    total_reward = 0
    while not done:
        action_dqn = dqn_agent.choose_action(state)
        action_q_learning = q_learning_agent.choose_action(state)
        next_state, reward, done, _ = env.step(action_dqn)
        dqn_agent.update_q_table(state, action_dqn, reward, next_state)
        q_learning_agent.update_q_table(state, action_q_learning, reward, next_state)
        total_reward += reward
        state = next_state
    q_learning_rewards.append(total_reward)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m


KeyboardInterrupt: 

In [None]:
# Evaluate performance
avg_dqn_reward = sum(dqn_rewards) / len(dqn_rewards)
avg_q_learning_reward = sum(q_learning_rewards) / len(q_learning_rewards)

print(f"Average reward for DQN agent: {avg_dqn_reward}")
print(f"Average reward for Q-Learning agent: {avg_q_learning_reward}")