# Deep Q-learning

In [1]:
import gymnasium as gym
import math
import random
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

## Environment and device

In [2]:
env = gym.make("ALE/Breakout-v5")  # render_mode="human"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Replay buffer

In [3]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
    def push(self, *args):
        self.memory.append(Transition(*args))
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def __len__(self):
        return len(self.memory)

## Neural network

In [4]:
class DQN(nn.Module):
    def __init__(self, dim_x, dim_y, n_actions):
        super(DQN, self).__init__()
        self.conv1 = torch.nn.Conv2d(3, 18, kernel_size = 3, stride = 1, padding = 1)
        self.pool = torch.nn.MaxPool2d(kernel_size = 2, stride = 2, padding = 0)
        self.fc1 = torch.nn.Linear(18 * dim_x * dim_y // 4, 64)
        self.fc2 = torch.nn.Linear(64, n_actions)
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.flatten(x, start_dim=1)  # start_dim=1 because we want to process batches as well as single elements
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

## Parameters for training

In [5]:
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
LR = 1e-4

## Variables initialization

In [6]:
n_actions = env.action_space.n
state, info = env.reset()
obs_x = state.shape[0]
obs_y = state.shape[1]

policy_net = DQN(obs_x, obs_y, n_actions).to(device)
target_net = DQN(obs_x, obs_y, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

steps_done = 0

210


## Select action with epsilon-greedy policy

In [7]:
def update_epsilon():
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    return eps_threshold

def select_action(state, eps_threshold):
    global steps_done
    sample = random.random()
    steps_done += 1
    # epsilon-greedy
    if sample > eps_threshold:
        with torch.no_grad():
            return torch.argmax(policy_net(torch.moveaxis(state, -1, 1))).view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)

## One step optimization

In [8]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))   # list of transitions to transition with lists as state, reward, ...

    # using batch.next_state
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    state_action_values = policy_net(torch.moveaxis(state_batch, -1, 1)).gather(1, action_batch)

    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    # using target_net to improve stability because V(s') = max Q(s', a') that is used in the update rule
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(torch.moveaxis(non_final_next_states, -1, 1)).max(1)[0]
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    #criterion = nn.SmoothL1Loss()
    loss = nn.SmoothL1Loss()(state_action_values, expected_state_action_values.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

In [9]:
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 10

## Training

In [10]:
for i_episode in range(num_episodes):
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    
    for t in count():
        action = select_action(state, update_epsilon())
        observation, reward, terminated, truncated, _ = env.step(action.item())
        # env.render()

        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        memory.push(state, action, next_state, reward)
        state = next_state
        optimize_model()
        if done:
            print(t)
            break
            
    if (i_episode % 3) == 0:  
        print("#")
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()    
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]
        target_net.load_state_dict(target_net_state_dict)

print('Complete')

165
#
140
201
216
#
245
185
125
#
131
172
136
#
Complete


## Test one game

In [1]:
env = gym.make("ALE/Breakout-v5", render_mode="human")

state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

for t in count():
    action = select_action(state, 0)
    observation, reward, terminated, truncated, _ = env.step(action.item())
    env.render()

    reward = torch.tensor([reward], device=device)
    done = terminated or truncated

    if terminated:
        next_state = None
    else:
        next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
    
    state = next_state
    if done:
        break

NameError: name 'gym' is not defined