In [None]:
import os
import sys
import torch
import random
import pickle
import time
import argparse
import numpy as np
import torch.nn as nn
from tqdm import tqdm
import torch.optim as optim
from collections import deque
import matplotlib.pyplot as plt
import torch.nn.functional as F

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
cd drive/MyDrive/gridworld_trap

/content/drive/MyDrive/gridworld_trap


In [None]:
# Environment
class Env:
    def __init__(self, grid = (5, 5)):
        self.gridsize = grid
        # goal position, initial position and trap positions
        self.goal = [grid[0] - 1, grid[1] - 1]
        self.position = [0, 0]
        self.traps = [[0, 19], [19, 0], [4, 4], [15, 15], [2, 8], [17, 11], [6, 10], [13, 9], [8, 6], [11, 13]]
        # initialize grid
        self.grid = np.zeros((grid[0], grid[1]))
        self.done = 0

    # reset the agent
    def reset(self):
        self.grid = np.zeros(self.grid.shape)
        self.position = [0, 0]
        self.grid[self.position[0], self.position[1]] = 1
        self.done = 0
        return self.grid
    
    # action step
    def step(self, action, test=False):
        # action(up:0, down:1, right:2, left:3)
        original_position = self.position.copy()
        out_of_boundary = False
        # take action
        if action == 0:
            if self.position[0] - 1 >= 0:
                self.position[0] = self.position[0] - 1  
            else:
              out_of_boundary = True 
        elif action == 1:
            if self.position[0] + 1 < self.gridsize[0]:
                self.position[0] = self.position[0] + 1
            else:
              out_of_boundary = True 
        elif action == 2:
            if self.position[1] + 1 < self.gridsize[1]:
                self.position[1] = self.position[1] + 1  
            else:
              out_of_boundary = True    
        elif action == 3:
            if self.position[1] - 1 >= 0:
                self.position[1] = self.position[1] - 1
            else:
              out_of_boundary = True
        # check new position
        if self.position[0] == self.gridsize[0]-1 and self.position[1] == self.gridsize[1]-1:
            reward = 1
            done = 1
            self.position = original_position
        elif [self.position[0], self.position[1]] in self.traps:
            reward = -1
            done = 1
            self.position = original_position
        elif out_of_boundary:
            reward = 0
            done = 1
        else:
            reward = 0
            done = 0
            self.grid[self.position[0], self.position[1]] = 1
            self.grid[original_position[0], original_position[1]] = 0
        
        return self.grid.copy(), reward, done

    # set agent location
    def set_agent_loc(self, row, col):
        orig_agent_loc = self.position.copy()
        self.grid[orig_agent_loc[0], orig_agent_loc[1]] = 0
        self.position = [row, col]
        self.grid[row, col] = 1
        return self.grid.copy()

In [None]:
# ER Buffer
class ReplayMemory:
    __slots__ = ['buffer']

    def __init__(self, capacity):
        self.buffer = deque(maxlen = capacity)

    def __len__(self):
        return len(self.buffer)

    # append new transition into RM buffer
    def append(self, state, action, reward, next_state, done):
        self.buffer.append((state, [action], [reward], next_state, [done]))

    # sample a batch of transition tensors
    def sample(self, batch_size, device):
        state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
        state = torch.FloatTensor(np.float32(state)).to(device)
        action = torch.LongTensor(action).to(device)
        next_state = torch.FloatTensor(np.float32(next_state)).to(device)
        reward = torch.FloatTensor(reward).to(device)
        done = torch.FloatTensor(done).to(device)       
        return state, action, reward, next_state, done

In [None]:
# Network
class Net(nn.Module):
    def __init__(self, input_shape, num_actions):
        super().__init__()
        self.num_actions = 4
        
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(input_shape, 80),
            nn.Linear(80, self.num_actions)
        )
        
    def forward(self, x):
        x = self.fc(x)
        return x

# DQN
class DQN:
    def __init__(self, args, env):
        # behavior network
        self._behavior_net = Net(env.gridsize[0]*env.gridsize[1], 4).to(args.device)
        if args.k > 1:
            # averaged-DQN
            self._target_net = [Net(env.gridsize[0]*env.gridsize[1], 4).to(args.device) for _ in range(args.k)]
        else:
            # vanilla DQN
            self._target_net = Net(env.gridsize[0]*env.gridsize[1], 4).to(args.device)
        
        # initialize target network
        if args.k > 1:
            for i in range(args.k):
                self._target_net[i].load_state_dict(self._behavior_net.state_dict())
        else:
            self._target_net.load_state_dict(self._behavior_net.state_dict())
        self._optimizer = torch.optim.Adam(self._behavior_net.parameters(), lr=args.lr)
        # memory
        self._memory = ReplayMemory(capacity=args.capacity)

        # config 
        self.device = args.device
        self.batch_size = args.batch_size
        self.action_space_n = 4
        self.gamma = args.gamma
        self.freq = args.freq
        self.target_freq = args.target_freq
        self.idx = 0
        self.k = args.k
        self.ddqn = args.ddqn
        self.criterion = torch.nn.MSELoss()
        self.test_q_val = []
        self.epoch = 0
        self.total_steps = 0

    # select action
    def select_action(self, state, epsilon, action_space):
        # use epsilon-greedy on behavior network
        if epsilon > random.random():
            action = np.random.randint(4)
        else:
            with torch.no_grad():
                q_value = self._behavior_net(torch.Tensor(state).unsqueeze(0).to(self.device))
                action = int(torch.argmax(q_value))
        return action

    # append transition
    def append(self, state, action, reward, next_state, done):
        self._memory.append(state, action, reward, next_state, done)

    # update networks
    def update(self, total_steps): 
        # behavior network      
        if total_steps % self.freq == 0:                    
            self._update_behavior_network(self.gamma)       
        # target network          
        if total_steps % self.target_freq == 0:
            self._update_target_network()
            self.idx += 1

    # update behavior network
    def _update_behavior_network(self, gamma):
        # sample a minibatch of transitions
        state, action, reward, next_state, done = self._memory.sample(self.batch_size, self.device)
        
        # calculate q value 
        q_value = self._behavior_net(state)
        q_value = torch.gather(q_value, dim = 1, index=action.long())
        with torch.no_grad():
            # if double network setting
            if self.ddqn:
                # calculate q_next and action_next 
                q_next = self._behavior_net(next_state)
                action_next = torch.argmax(q_next, dim=1).unsqueeze(-1)
                # if averaged
                if self.k > 1:
                    qs = torch.zeros(self.batch_size, self.action_space_n).to(self.device)
                    for i in range(self.k):
                        qs += self._target_net[i](next_state)
                    qs /= self.k
                # if vanilla
                else:
                    qs = self._target_net(next_state)
                q_next = torch.gather(qs, dim=1, index=action_next)
                q_target = reward + gamma * q_next * (1 - done)
            # if not double network setting
            else:
                # if averaged
                if self.k > 1:
                    q_next = torch.zeros(self.batch_size, self.action_space_n).to(self.device)
                    for i in range(self.k):
                        q_next += self._target_net[i](next_state)
                    q_next /= self.k
                # if vallina
                else:
                    q_next = self._target_net(next_state)
                q_next = torch.max(q_next, dim=1).values.unsqueeze(-1)
                q_target = reward + gamma * q_next * (1 - done)
        # loss
        loss = self.criterion(q_value, q_target)
        # optimize        
        self._optimizer.zero_grad()
        loss.backward()
        self._optimizer.step()

    # update target network    
    def _update_target_network(self):      
        if self.k > 1:
            self._target_net[self.idx % self.k].load_state_dict(self._behavior_net.state_dict())
        else:
            self._target_net.load_state_dict(self._behavior_net.state_dict())

In [None]:
# initialize all possible transitions into buffer
def init_buffer(env, agent):
    n_states = env.gridsize
    # there are row * col-1 possibible agent location
    for row in range(n_states[0]):
        for col in range(n_states[1]):
            for action in range(4):
                if row == n_states[0] - 1 and col == n_states[1] - 1:
                    break
                # action(up:0, down:1, right:2, left:3)
                state = env.set_agent_loc(row, col)
                next_state, reward, done = env.step(action)
                agent.append(state, action, reward, next_state, done)

In [None]:
def train(args, env, agent):
    action_space = 4
    total_steps = 0
    start_epoch = 1
    replay_initial = 0
    epsilon_by_steps = lambda steps, replay_start_time: 1 - (1-args.eps_min) * min(replay_start_time, args.epsilon) / args.epsilon

    # initialize ER buffer
    init_buffer(env, agent)

    for epoch in range(start_epoch, args.epochs + 1):
        print('Epoch {}/{}'.format(epoch, args.epochs))
        print('Start Training')
        total_reward = 0
        state = env.reset()
        rewards = []
        total_q = []

        # 400 iterations
        for t in range(1, args.steps + 1):
            # select action
            epsilon = epsilon_by_steps(total_steps, max([total_steps-replay_initial, 0]))
            action = agent.select_action(state, epsilon, action_space)
            # execute action
            next_state, reward, done = env.step(action)
            
            agent.update(total_steps)

            state = next_state
            total_reward += reward
            total_steps += 1

            # if agent is needed to be reset
            if done:
                state = env.reset()
                rewards.append(total_reward)
                total_reward = 0

        test(args, env, agent, epoch)

# test part
def test(args, env, agent, epoch):
    print('Start Testing')
    action_space = 4
    n_states = env.gridsize
    total_q = 0
    # go through all grid and get the value of optimal action
    for row in range(n_states[0]):
      for col in range(n_states[1]):
        state = env.set_agent_loc(row, col)
        state = torch.Tensor(state).unsqueeze(0).to(args.device)
        q = torch.zeros(1, 4).to(args.device)
        # if averaged
        if args.k > 1:
            for i in range(args.k):
                q += agent._target_net[i](state)
            q /= args.k
        # if vanilla
        else:
            q = agent._target_net(state)
        q = torch.max(q, dim=1).values.unsqueeze(-1)
        total_q += q.item()
    # calculate mean
    mean_q_val = total_q / 400
    agent.test_q_val.append(mean_q_val)
    print('Average Q value: {:.4f}'.format(mean_q_val))

In [None]:
class Args:
  def __init__(self, k = 1, lr = 0.0025, freq = 4, batch_size = 32, ddqn = False, result_path = 'result'):
    self.k = k
    self.device = "cuda"
    self.size = 20
    self.epochs = 500
    self.steps = freq * 100
    self.capacity = 400
    self.batch_size = batch_size
    self.lr = lr
    self.epsilon = 1000
    self.eps_min = 0.1
    self.gamma = 0.9
    self.freq = freq
    self.ddqn = ddqn
    self.target_freq = freq * 100
    self.result_path = result_path

In [None]:
# Vanilla DQN
start_time = time.time()
dir = "DQN_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 1, freq = 4, lr = 0.002, ddqn = False, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Double DQN
start_time = time.time()
dir = "DDQN_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 1, freq = 4, lr = 0.002, ddqn = True, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Averaged-DQN with K = 5
start_time = time.time()
dir = "AVERAGE-DQN_5_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 5, freq = 4, lr = 0.002, ddqn = False, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Averaged-DQN with K = 10
start_time = time.time()
dir = "AVERAGE-DQN_10_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 10, freq = 4, lr = 0.002, ddqn = False, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Averaged-DQN with K = 20
start_time = time.time()
dir = "AVERAGE-DQN_20_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 20, freq = 4, lr = 0.002, ddqn = False, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Averaged-DDQN with K = 10
start_time = time.time()
dir = "AVERAGE-DDQN_10_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 10, freq = 4, lr = 0.002, ddqn = True, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))

In [None]:
# Averaged-DDQN with K = 20
start_time = time.time()
dir = "AVERAGE-DDQN_20_try"
for i in range(1, 41):
  print("{}-th trial".format(i))
  args = Args(k = 20, freq = 4, lr = 0.002, ddqn = True, result_path = dir + "/result")
  env = Env((args.size, args.size))
  args.capacity = (args.size*args.size-1) * 4

  agent = DQN(args, env)
  train(args, env, agent)
  np.save(args.result_path + '/test_q_{}.npy'.format(i), agent.test_q_val)
print("--- %s seconds ---" % (time.time() - start_time))