## DQN Algorithm
- Initialize policy network weights
- Copy policy network, and call it target network
- Initialize replay memory
- For each Episode:
- - Initialize starting state
  - for each time step:
  - - select an action via explore/explit
    - excute the selected action via gym
    - get reward and next state
    - store experience in replay memory
    - sample random batch from replay memory
    - Preprocess states from batch
    - Pass batch of preprocessed states to policy network
    - Calculate loss between output Q values and target Q values
    - Gradeint descent updates weights to minimize loss

In [8]:
import gymnasium as gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T

In [4]:
from collections import namedtuple
from itertools import count

In [6]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display

In [9]:
class DQN(nn.Module):
    def __init__(self, img_height, img_width):
        super().__init__()
        self.fc1 = nn.Linear(in_features=img_height*img_width*3,out_features=24)
        self.fc2 = nn.Linear(in_features=24, out_features=32)
        self.out = nn.Linear(in_features=32, out_features=2)

    def forward(self, t):
        t = t.flatten(start_dim=1)
        t = F.relu(self.fc1(t))
        t = F.relu(self.fc2(t))
        t = self.out(t)
        return t


In [10]:
Experience = namedtuple(
    'Experience',
    ('state', 'action', 'next_state', 'reward')
)
e = Experience(1,2,3,4)
e

Experience(state=1, action=2, next_state=3, reward=4)

In [13]:
class ReplayMemory():
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.count = 0

    def push(self, exp):
        if len(self.memory) < self.capacity:
            self.memory.append(exp)
        else:
            self.memory[self.count % self.capacity] = exp
            self.count += 1

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def can_provide_sample(self, batch_size):
        return len(self.memory) >= batch_size

In [14]:
class EpsilonGreedyStrategy():
    def __init__(self, start, end, decay):
        self.start = start
        self.end = end
        self.decay = decay

    def get_explortion_rate(self, step):
        return self.end + (self.start - self.end) * math.exp(-1.*step*self.decay)

In [16]:
class Agent():
    def __init__(self, strategy, num_actions, device):
        self.step = 0
        self.strategy = strategy
        self.num_actions = num_actions
        self.device = device

    def select_action(self, state, policy_net):
        rate = self.strategy.get_exploration_rate(self.step)
        self.step += 1

        if rate > random.random():
            action = randrange(self.num_actions)
            return torch.tensor([action]).to(self.device)
        else:
            with torch.no_grad():
                return policy_net(state).argmax(dim=1).to(self.device)