In [29]:
!pip install "gymnasium[classic-control]"

Collecting pygame>=2.1.3 (from gymnasium[classic-control])
  Downloading pygame-2.6.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Downloading pygame-2.6.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (14.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.0/14.0 MB[0m [31m1.9 MB/s[0m  [33m0:00:07[0mm0:00:01[0m00:01[0m
[?25hInstalling collected packages: pygame
Successfully installed pygame-2.6.1


In [22]:
import random
import gymnasium as gym
from collections import deque
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

In [23]:
class QNetwork(nn.Module):

    def __init__(self, state_size: int, action_size: int):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_size, 24),
            nn.ReLU(),
            nn.Linear(24, 24),
            nn.ReLU(),
            nn.Linear(24, action_size),
        )

    def forward(self, x):
        return self.net(x)


In [26]:
class DQNAgent:
    def __init__(self, state_size: int, action_size: int):
        self.state_size  = state_size
        self.action_size = action_size

        self.memory = deque(maxlen=10_000)

        self.gamma         = 0.90   # discount rate
        self.epsilon       = 1.0    # exploration rate
        self.epsilon_min   = 0.01
        self.epsilon_decay = 0.98
        self.learning_rate = 0.001

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.model     = QNetwork(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
        self.criterion = nn.MSELoss()

    def _to_tensor(self, x):
        return torch.tensor(x, dtype=torch.float32, device=self.device)

    def memorize(self, memory: tuple):
        self.memory.append(memory)

    def get_action(self, state) -> int:
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)

        # exploit
        self.model.eval()
        with torch.no_grad():
            state_t  = self._to_tensor(state)           # shape: (1, state_size)
            q_values = self.model(state_t)               # shape: (1, action_size)
        return int(q_values.argmax(dim=1).item())

    def train(self, batch_size: int = 32):
        minibatch = random.sample(self.memory, batch_size)

        states      = np.vstack([m[0] for m in minibatch])          # (B, state_size)
        actions     = np.array( [m[1] for m in minibatch])          # (B,)
        rewards     = np.array( [m[2] for m in minibatch], dtype=np.float32)
        next_states = np.vstack([m[3] for m in minibatch])          # (B, state_size)
        dones       = np.array( [m[4] for m in minibatch], dtype=np.float32)

        states_t      = self._to_tensor(states)
        next_states_t = self._to_tensor(next_states)
        rewards_t     = self._to_tensor(rewards)
        dones_t       = self._to_tensor(dones)

        self.model.train()

        q_current = self.model(states_t)

        with torch.no_grad():
            q_next = self.model(next_states_t)                      # (B, action_size)
            q_next_max = q_next.max(dim=1).values                   # (B,)

        q_target = q_current.clone()
        batch_indices = torch.arange(batch_size, device=self.device)
        actions_t = torch.tensor(actions, dtype=torch.long, device=self.device)

        # Bellman equation
        q_target[batch_indices, actions_t] = (
            rewards_t + self.gamma * q_next_max * (1.0 - dones_t)
        )

        loss = self.criterion(q_current, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # decay exploration rate
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay



In [None]:
n_episodes = 300
render     = False
batch_size = 128

env          = gym.make("CartPole-v1", render_mode="human")
state_size   = env.observation_space.shape[0]
action_size  = env.action_space.n
agent        = DQNAgent(state_size, action_size)

for episode in range(n_episodes):
    state, _ = env.reset()
    state = np.reshape(state, [1, state_size])

    for t in range(500):
        if render:
            env.render()

        action = agent.get_action(state)

        state_next, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated

        state_next = np.reshape(state_next, [1, state_size])

        agent.memorize((state, action, reward, state_next, done))

        if done:
            print(
                f"[episode {episode}/{n_episodes}] "
                f"total reward: {t}, epsilon: {agent.epsilon:.2f}"
            )
            break

        state = state_next

    if len(agent.memory) > batch_size:
        agent.train(batch_size=batch_size)

  from pkg_resources import resource_stream, resource_exists


[episode 0/300] total reward: 10, epsilon: 1.00
[episode 1/300] total reward: 14, epsilon: 1.00
[episode 2/300] total reward: 43, epsilon: 1.00
[episode 3/300] total reward: 13, epsilon: 1.00
[episode 4/300] total reward: 14, epsilon: 1.00
[episode 5/300] total reward: 13, epsilon: 1.00
[episode 6/300] total reward: 28, epsilon: 1.00
[episode 7/300] total reward: 10, epsilon: 0.98
[episode 8/300] total reward: 21, epsilon: 0.96
[episode 9/300] total reward: 19, epsilon: 0.94
[episode 10/300] total reward: 15, epsilon: 0.92
[episode 11/300] total reward: 17, epsilon: 0.90
[episode 12/300] total reward: 18, epsilon: 0.89
[episode 13/300] total reward: 9, epsilon: 0.87
[episode 14/300] total reward: 34, epsilon: 0.85
[episode 15/300] total reward: 24, epsilon: 0.83
[episode 16/300] total reward: 11, epsilon: 0.82
[episode 17/300] total reward: 40, epsilon: 0.80
[episode 18/300] total reward: 22, epsilon: 0.78
[episode 19/300] total reward: 31, epsilon: 0.77
[episode 20/300] total reward: 

: 