In [6]:
!pip install gymnasium[atari,accept-rom-license]
!pip install ale-py







In [7]:
!pip install gymnasium numpy tensorflow matplotlib opencv-python tensorflow



In [8]:
import gymnasium as gym
import numpy as np
import random
import tensorflow as tf
import matplotlib.pyplot as plt
import cv2
import ale_py
from collections import deque
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import load_model

In [9]:
# Hyperparameters
GAMMA = 0.99  # Discount factor
EPSILON = 1.0  # Initial exploration rate
EPSILON_MIN = 0.1  # Minimum exploration rate
EPSILON_DECAY = 0.995  # Decay rate
LEARNING_RATE = 0.00025  # Learning rate
MEMORY_SIZE = 5000  # Experience replay buffer size
BATCH_SIZE = 64  # Batch size
TARGET_UPDATE_FREQ = 10  # Target model update frequency
EPISODES = 50  # Total training episodes
RECORD = False
LOAD_MODEL = False

In [10]:
# Create Pac-Man environment
env = gym.make("ALE/MsPacman-v5", render_mode="rgb_array")
state_shape = (88, 80, 1)  # Resized grayscale shape
# action_size = env.action_space.n
action_size = 5 # nope, up, right, down, left

In [11]:
# Function to preprocess frames
def preprocess_state(state):
    """Convert RGB to grayscale and resize."""
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)  # Convert to grayscale
    state = cv2.resize(state, (80, 88))  # Resize
    return np.expand_dims(state, axis=-1) / 255.0  # Normalize

In [12]:
# Build the DQN model with GPU optimization
def build_model():
    model = Sequential([
        Conv2D(32, (8, 8), strides=(4, 4), activation="relu", input_shape=state_shape),
        Conv2D(64, (4, 4), strides=(2, 2), activation="relu"),
        Conv2D(64, (3, 3), strides=(1, 1), activation="relu"),
        Flatten(),
        Dense(512, activation="relu"),
        Dense(action_size, activation="linear")  # Q-values output
    ])
    model.compile(optimizer=Adam(learning_rate=LEARNING_RATE), loss="mse")
    return model

In [13]:
# DQN Agent
class DQNAgent:
    def __init__(self):
        self.model = build_model()
        self.target_model = build_model()
        self.target_model.set_weights(self.model.get_weights())  # Sync target model
        self.memory = deque(maxlen=MEMORY_SIZE)
        self.epsilon = EPSILON

    def act(self, state):
        """Choose action using ε-greedy strategy."""
        if np.random.rand() <= self.epsilon:
            return random.randrange(action_size)  # Random action (exploration)
        q_values = self.model.predict(np.expand_dims(state, axis=0), verbose=0)
        return np.argmax(q_values[0])  # Best action (exploitation)

    def remember(self, state, action, reward, next_state, done):
        """Store experience in memory."""
        self.memory.append((state, action, reward, next_state, done))

    def replay(self):
        """Train the model using experience replay."""
        if len(self.memory) < BATCH_SIZE:
            return
        batch = random.sample(self.memory, BATCH_SIZE)
        states, targets = [], []
        for state, action, reward, next_state, done in batch:
            target = self.model.predict(np.expand_dims(state, axis=0), verbose=0)[0]
            if done:
                target[action] = reward
            else:
                next_q_values = self.target_model.predict(np.expand_dims(next_state, axis=0), verbose=0)[0]
                target[action] = reward + GAMMA * np.max(next_q_values)
            states.append(state)
            targets.append(target)

        # Train model in batches
        self.model.fit(np.array(states), np.array(targets), epochs=1, verbose=0, batch_size=BATCH_SIZE)

        if self.epsilon > EPSILON_MIN:
            self.epsilon *= EPSILON_DECAY  # Decay exploration rate

    def update_target_model(self):
        """Update target model weights."""
        self.target_model.set_weights(self.model.get_weights())

In [14]:
# Train the agent
agent = DQNAgent()

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


In [15]:
# Visualization function for Colab
def show_frame(frame):
    plt.imshow(frame)
    plt.axis("off")
    plt.show()

In [16]:
# Training loop
def train():
    for episode in range(EPISODES):
        state = preprocess_state(env.reset()[0])
        done = False
        total_reward = 0

        while not done:
            action = agent.act(state)
            next_state, reward, done, _, _ = env.step(action)
            next_state = preprocess_state(next_state)
            agent.remember(state, action, reward, next_state, done)
            state = next_state
            total_reward += reward
            agent.replay()

        # Update target network periodically
        if episode % TARGET_UPDATE_FREQ == 0:
            agent.update_target_model()

        print(f"Episode {episode + 1}/{EPISODES}, Score: {total_reward}, Epsilon: {agent.epsilon:.4f}")

    env.close()

In [None]:
if LOAD_MODEL:
    agent.model = load_model("pacman_dqn.keras")
else:
    train()
    agent.model.save("pacman_dqn.keras")