## Notebook to Train DQN Model


### Imports

In [23]:
import cv2
import numpy as np
import math
import os

# Import ML Libraries
from collections import namedtuple, deque
import random

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import torch.distributions as distributions

In [24]:
model_path = "./models/"

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state'))

class ReplayMemory(object):
    def __init__(self, capacity, batch_size):
        self.memory = deque([],maxlen=capacity)
        self.capacity = capacity
        self.batch_size = batch_size

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self):
        return random.sample(self.memory, self.batch_size)

    def __len__(self):
        return len(self.memory)

### Set Up Reward System

In [25]:
class Balloon(object):
    def __init__(self, center, radius):
        self.center = center
        self.radius = radius

    def determine_hit(self, coords):
        dist = np.linalg.norm(self.center - coords)
        if dist <= self.radius:
            return True
        return False
    
    def determine_prize(self, coords, prev_coords, shape):
        dist = np.linalg.norm(self.center - coords)
        prize = 176 - (2 * self.radius)
        scaling = 1 - (dist / self.radius)
        h_flick = abs(prev_coords[0] - coords[0]) / (0.5 * shape[0])
        w_flick = abs(prev_coords[1] - coords[1]) / (0.5 * shape[1])
        prize += min(h_flick * 50, 50) + min(w_flick * 50, 50)

        return prize * scaling

In [26]:
class Game(object):
    def __init__(self, h, w):
        self.h = h
        self.w = w
        self.screen = torch.zeros((1, 1, self.h, self.w))
        self.current_balloons = {}
        self.last_shot = np.array([self.h//2, self.w//2])

        for i in range(0, 3):
            rad = np.random.randint(38, 80)

            y_coord = np.random.randint(20, self.h-20)
            x_coord = np.random.randint(20, self.w-20)
            pos = np.array([x_coord, y_coord])

            bloon = Balloon(pos, rad)

            print("Adding Balloon")

            for y in range(y_coord - rad, y_coord + rad):
                if y < 0 or y >= self.h:
                    continue
                for x in range(x_coord - rad, x_coord + rad):
                    if x < 0 or x >= self.w:
                        continue
                    curr = np.array([y, x])
                    if np.linalg.norm(pos - curr) <= rad:
                        self.screen[0, 0, y, x] = 1

            self.current_balloons[i] = bloon
        
        self.done = False

    def update_shot(self, action):
        action = action.numpy()
        max_action = np.where(action == np.amax(action))
        action = np.array([max_action[2][0], max_action[3][0]])

        reward = 0

        to_remove = []

        for i in self.current_balloons:
            balloon = self.current_balloons[i]
            if balloon.determine_hit(action):
                print("balloon hit!")

                # Determine Reward
                reward += balloon.determine_prize(action, self.last_shot, np.array([self.h, self.w]))

                # Remove From Grid/Dictionary
                for y in range(balloon.center[0] - balloon.radius, balloon.center[0] + balloon.radius):
                    if y < 0 or y >= self.h:
                        continue
                    for x in range(balloon.center[1] - balloon.radius, balloon.center[1] + balloon.radius):
                        if x < 0 or x >= self.w:
                            continue
                        self.screen[0, 0, y, x] = 0

                to_remove.append(i)
        
        for idx in to_remove:
            self.current_balloons.pop(idx)

        self.last_shot = action

        next_state = self.screen

        if len(self.current_balloons) == 0:
            self.done = True
            print("All balloons popped!")

        return torch.tensor(reward), next_state, self.done

### DQN Model

In [27]:
class AimlabNetwork(nn.Module):
    def __init__(self):
        super(AimlabNetwork, self).__init__()
        self.input_channels = 1
        self.output_channels = 1
        self.hidden_size_1 = 32
        self.hidden_size_2 = 64
        self.hidden_size_3 = 128

        self.encoder = nn.ModuleList([
            #--- Stage 1
            nn.Conv2d(self.input_channels, self.hidden_size_1, kernel_size=3, padding=1),
            nn.BatchNorm2d(self.hidden_size_1), 
            nn.ReLU(),
            nn.Conv2d(self.hidden_size_1, self.hidden_size_1, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_1), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, return_indices = True), 
            #--- Stage 2
            nn.Conv2d(self.hidden_size_1, self.hidden_size_2, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            nn.Conv2d(self.hidden_size_2, self.hidden_size_2, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            nn.Conv2d(self.hidden_size_2, self.hidden_size_2, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, return_indices = True), 
            #--- Stage 3
            nn.Conv2d(self.hidden_size_2, self.hidden_size_3, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_3), 
            nn.ReLU(),
            nn.Conv2d(self.hidden_size_3, self.hidden_size_3, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_3), 
            nn.ReLU(),
            nn.Conv2d(self.hidden_size_3, self.hidden_size_3, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_3), 
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2, return_indices = True)
        ])

        self.decoder = nn.ModuleList([                                              
            #--- Stage 3-2
            nn.MaxUnpool2d(kernel_size=2, stride=2), 
            nn.ConvTranspose2d(self.hidden_size_3, self.hidden_size_3, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_3),
            nn.ReLU(),  
            nn.ConvTranspose2d(self.hidden_size_3, self.hidden_size_3, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_3), 
            nn.ReLU(),               
            nn.ConvTranspose2d(self.hidden_size_3, self.hidden_size_2, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            #--- Stage 2-2
            nn.MaxUnpool2d(kernel_size=2, stride=2),
            nn.ConvTranspose2d(self.hidden_size_2, self.hidden_size_2, kernel_size=3, padding=1),
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            nn.ConvTranspose2d(self.hidden_size_2, self.hidden_size_2, kernel_size=3, padding=1),
            nn.BatchNorm2d(self.hidden_size_2), 
            nn.ReLU(),
            nn.ConvTranspose2d(self.hidden_size_2, self.hidden_size_1, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.hidden_size_1), 
            nn.ReLU(),
            #--- Stage 1-2
            nn.MaxUnpool2d(kernel_size=2, stride=2), 
            nn.ConvTranspose2d(self.hidden_size_1, self.hidden_size_1, kernel_size=3, padding=1),
            nn.BatchNorm2d(self.hidden_size_1), 
            nn.ReLU(),
            nn.ConvTranspose2d(self.hidden_size_1, self.output_channels, kernel_size=3, padding=1), 
            nn.BatchNorm2d(self.output_channels), 
            nn.Sigmoid()
        ])

    def forward(self, x):
        stack = []
        # Forward
        out = x
        for i, l in enumerate(self.encoder):
            if isinstance(l, nn.MaxPool2d): 
                out, indices = l(out)
                stack.append(indices)
            else:
                out = l(out)
        
        # Backward
        for i, l in enumerate(self.decoder):
            if isinstance(l, nn.MaxUnpool2d): 
                indices = stack.pop()
                out = l(out, indices)
            else:
                out = l(out)

        return out

In [30]:
class AimlabTrainer(object):
    def __init__(self, h, w):
        self.h = h
        self.w = w

        """
        Model Parameters
        """
        self.batch_size = 1
        self.gamma = 0.999
        self.eps_start = 0.9
        self.eps_end = 0.05
        self.eps_decay = 200
        self.target_update = 10

        self.steps_done = 0

        self.model = AimlabNetwork()
        self.target_model = AimlabNetwork()
        self.target_model.load_state_dict(self.model.state_dict())
        self.target_model.eval()

        self.memory = ReplayMemory(10, 1)
        self.optimizer = optim.RMSprop(self.model.parameters())


    def select_action(self, state):
        sample = random.random()
        eps_threshold = self.eps_end + (self.eps_start - self.eps_end) * \
            math.exp(-1. * self.steps_done / self.eps_decay)
        self.steps_done += 1
        if sample > eps_threshold:
            action = self.model(state).detach()
        else:
            action = torch.rand((self.batch_size, 1, self.h, self.w), dtype=torch.float64)
        return action
    

    def optimize(self):
        if len(self.memory) < self.memory.capacity:
            return
        transitions = self.memory.sample()
        batch = Transition(*zip(*transitions))

        next_state_batch = batch.next_state[0]
        state_batch = batch.state[0]
        action_batch = batch.action[0]
        reward_batch = batch.reward[0]                          

        # Compute Q(s_t, a)
        state_action_values = self.model(state_batch)

        # Compute V(s_{t+1}) for all next states.
        next_state_values = self.target_model(next_state_batch).detach()
        # Compute expected Q
        expected_state_action_values = (next_state_values * self.gamma) + reward_batch
        
        # Compute Huber loss
        criterion = nn.SmoothL1Loss()
        loss = criterion(state_action_values, expected_state_action_values)

        # Optimize Model
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.model.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()


    def train(self):
        num_episodes = 1
        for i_episode in range(num_episodes):
            if i_episode % 2 == 0:
                print(f"Starting Episode {i_episode}")

            game = Game(self.h, self.w)
            state = game.screen
            
            done = False
            while not done:
                action = self.select_action(state)
                reward, next_state, done = game.update_shot(action)

                # Store Transition in Memory
                self.memory.push(state, action, reward, next_state)

                # Move to Next State
                state = next_state

                # Perform Optimization
                self.optimize()

    

## Train Model

In [29]:
model = AimlabTrainer(480, 640)
model.train()

Starting Episode 0
Adding Balloon
Adding Balloon
Adding Balloon
balloon hit!
torch.Size([1, 1, 480, 640])
torch.Size([1, 1, 480, 640])
torch.Size([1, 1, 480, 640])
torch.Size([1, 1, 480, 640])


KeyboardInterrupt: 