In [None]:
!pip install ale_py
!pip install gymnasium



In [None]:
import ale_py, cv2, os, random, torch
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict, deque
from gymnasium.wrappers import RecordVideo
import torch.nn as nn
import torch.optim as optim

In [None]:
class DQNetwork(nn.Module):
    def __init__(self, inputShape, numActions):
        super(DQNetwork, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(inputShape[0], 32, kernel_size=8, stride=4),  # Convolutional layers
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU(),
        )

        self.fc = nn.Sequential(
            nn.Linear(self.featureSize(inputShape), 512),  # Fully connected layers
            nn.ReLU(),
            nn.Linear(512, numActions)
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)  # Flatten
        return self.fc(x)

    # Gets size of features for FC layer
    def featureSize(self, inputShape):
        return self.conv(torch.zeros(1, *inputShape)).view(1, -1).size(1)

In [None]:
class DQNAgent:
    def __init__(self, stateShape, numActions, gamma = 0.95, lr=0.01, epsilonMax =1.0, epsilonDecay=0.999, epsilonMin=0.0):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.device}")
        self.numActions = numActions

        # Networks
        self.qNetwork = DQNetwork(stateShape, numActions).to(self.device)
        self.targetNetwork = DQNetwork(stateShape, numActions).to(self.device)  # Stabilizes training by keeping the target Q-values fixed for several updates.
        self.targetNetwork.load_state_dict(self.qNetwork.state_dict())
        self.targetNetwork.eval()

        # Hyperparameters
        self.gamma = gamma  # Discount factor
        self.epsilon = epsilonMax
        self.epsilonDecay = epsilonMax / 50000
        self.epsilonMin = epsilonMin
        self.lr = lr
        self.batchSize = 2048
        self.memory = deque(maxlen=100000)
        self.optimizer = optim.Adam(self.qNetwork.parameters(), lr=self.lr)
        self.errors = []

    def action(self, state):
        if random.random() < self.epsilon:
            return random.randint(0, self.numActions - 1)  # Explore
        else:
            state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(self.device)
            qValues = self.qNetwork(state)
            return torch.argmax(qValues).item()  # Exploit

    # Captures agent experience and stores it into the replay buffer
    def storeExperience(self, state, action, reward, nextState, done):
        self.memory.append((state, action, reward, nextState, done))

    def update(self):
        if len(self.memory) < self.batchSize:
            return

        # Sample mini-batch from memory
        batch = random.sample(self.memory, self.batchSize)
        states, actions, rewards, nextStates, dones = zip(*batch)

        states = np.array(states)
        actions = np.array(actions)
        rewards = np.array(rewards)
        nextStates = np.array(nextStates)
        dones = np.array(dones)
        states = torch.tensor(states, dtype=torch.float32).to(self.device)
        actions = torch.tensor(actions, dtype=torch.int64).to(self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        nextStates = torch.tensor(nextStates, dtype=torch.float32).to(self.device)
        dones = torch.tensor(dones, dtype=torch.float32).to(self.device)

        # Compute target Q-values
        with torch.no_grad():
            maxNextQValues = self.targetNetwork(nextStates).max(1)[0]
            targetQValues = rewards + self.gamma * maxNextQValues * (1 - dones)

        # Compute current Q-values
        qValues = self.qNetwork(states).gather(1, actions.unsqueeze(1)).squeeze(1)

        # Compute and log TD error
        self.errors.append(torch.abs(targetQValues - qValues).detach().cpu().numpy())

        # Compute loss and optimize
        loss = nn.MSELoss()(qValues, targetQValues)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        if done:
            self.epsilon = max(self.epsilonMin, self.epsilon - self.epsilonDecay)

    def updateTargetNetwork(self):
        self.targetNetwork.load_state_dict(self.qNetwork.state_dict())

In [None]:
def preprocessState(state):
    import cv2
    state = cv2.cvtColor(state, cv2.COLOR_RGB2GRAY)
    state = cv2.resize(state, (84, 84), interpolation=cv2.INTER_AREA)
    state = state / 255.0
    return np.expand_dims(state, axis=0)  # Add channel dimension

In [None]:
env = gym.make("ALE/Frogger-v5", render_mode="rgb_array")
stateShape = (1, 84, 84)  # Grayscale, resized to 84x84
numActions = env.action_space.n

agent = DQNAgent(stateShape, numActions)
episodes = 50000
targetUpdateFreq = 500  # How frequently the target network is updated
recordingPeriod = 10000
episodeRewards = []
trainingErrors = []

env = RecordVideo(env, video_folder="frogger-deep-agent", name_prefix=f"eval_{episodes}",
                  episode_trigger=lambda x: x % recordingPeriod == 0)

for episode in range(episodes):
    state, _ = env.reset()
    state = preprocessState(state)
    totalReward = 0
    done = False
    agent.errors = []

    while not done:
        action = agent.action(state)
        nextState, reward, done, truncated, _ = env.step(action)
        nextState = preprocessState(nextState)

        # Store experience and update agent
        agent.storeExperience(state, action, reward, nextState, done)

        totalReward += reward
        state = nextState

    agent.update()

    # Update target network
    if episode % targetUpdateFreq == 0:
        agent.updateTargetNetwork()

    episodeRewards.append(totalReward)  # Total Rewards

    if len(trainingErrors) > 0:
      trainingErrors.append(np.mean(agent.errors))  # Training Error
    else:
      print(f"Warning: training_errors is empty in episode {episode}")
      trainingErrors.append(0)

    if episode % 2500 == 0:
        print(f"Episode {episode}/{episodes}, Total Reward: {totalReward}, Epsilon: {agent.epsilon:.2f}")


env.close()

Using device: cuda


  logger.warn(


Episode 0/50000, Total Reward: 15.0, Epsilon: 1.00


  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


Episode 2500/50000, Total Reward: 10.0, Epsilon: 0.95
Episode 5000/50000, Total Reward: 9.0, Epsilon: 0.90
Episode 7500/50000, Total Reward: 8.0, Epsilon: 0.85
Episode 10000/50000, Total Reward: 9.0, Epsilon: 0.80
Episode 12500/50000, Total Reward: 8.0, Epsilon: 0.75
Episode 15000/50000, Total Reward: 11.0, Epsilon: 0.70
Episode 17500/50000, Total Reward: 23.0, Epsilon: 0.65


In [None]:
fig, axs = plt.subplots(1, 2, figsize=(20, 8))

# Compute rolling averages
rollingWindow = 2500
rewards = np.convolve(episodeRewards, np.ones(rollingWindow), mode='valid')
errors = np.convolve(trainingErrors, np.ones(rollingWindow), mode='valid')

# Episode Rewards
axs[0].plot(rewards)
axs[0].set_title("Episode Rewards")
axs[0].set_xlabel("Episode")
axs[0].set_ylabel("Reward")

# Training Errors
axs[1].plot(errors)
axs[1].set_title("Training Error")
axs[1].set_xlabel("Episode")
axs[1].set_ylabel("Temporal Difference Error")

plt.tight_layout()
plt.show()

env.close()