In [None]:
!pip install gym




In [None]:
import numpy as np
import matplotlib.pyplot as plt
import gym
from gym import spaces
from google.colab import widgets
import time
import math
import pandas as pd
from statistics import mean
from itertools import chain
import random
import torch as T
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import os
import collections

# **Grid Environment Definition**

In [None]:
class GridEnvironment(gym.Env):

    def __init__(self, stochasticy=False):
        self.observation_space = spaces.Discrete(25)
        self.action_space = spaces.Discrete(4)
        self.max_timesteps = 25
        self.stochasticy = stochasticy

    def reset(self):
        self.timestep = 0
        self.totalreward = 0
        self.agent_pos = [0, 0]
        self.goal_pos = [4, 4]
        self.monster_pos = [2, 3]
        self.diamond_pos = [4, 2]
        self.diamond_taken = False
        self.monster_killed = False

        self.state = np.zeros((5, 5))
        self.state[tuple(self.agent_pos)] = 1
        self.state[tuple(self.goal_pos)] = 0.5
        self.state[tuple(self.diamond_pos)] = 0.25
        self.state[tuple(self.monster_pos)] = 0.15
        observation = self.agent_pos
        return observation

    def step(self, action):
        #gets the prervious location of the agent before updating it.
        agent_old_pos = self.agent_pos.copy()

        monster_move = False

        # if stochasticy is true then there is a 10% that an agent will pick a 
        # random action and an 80% chance the monster will move
        if (self.stochasticy == True):
            if (np.random.choice((0, 1), p=[0.10, 0.90]) == 0):
                action = np.random.choice(self.action_space.n)
            if (np.random.choice((0, 1), p=[0.80, 0.20]) == 0):
                monster_move = True and not self.monster_killed

        # down
        if action == 0:
            self.agent_pos[0] += 1
        # up
        if action == 1:
            self.agent_pos[0] -= 1
        # left
        if action == 2:
            self.agent_pos[1] += 1
        # right
        if action == 3:
            self.agent_pos[1] -= 1
        
        # moves the monster closer to the player if the player cross the monster
        if monster_move:
          dis = list(map(operator.sub, self.agent_pos, self.monster_pos))
          if dis[0] > 0 or dis[1] > 0 :
            # decide which direction to "chase"
            greater = dis.index(max(dis))
            self.monster_pos[greater] += 1

        # visualize our environment
        self.agent_pos = np.clip(self.agent_pos, 0, 4)
        self.state = np.zeros((5, 5))
        self.state[tuple(self.agent_pos)] = 1
        self.state[tuple(self.goal_pos)] = 0.5
        self.state[tuple(self.diamond_pos)] = 0.25
        self.state[tuple(self.monster_pos)] = 0.15
        observation = self.agent_pos

        reward = 0

        if (self.agent_pos == self.goal_pos).all():
            reward += 500
        
        if ((self.agent_pos == self.monster_pos).all() and not self.monster_killed) :
            # negatively reward running into monster
            reward -= 5
            # our agent will kill the monster if it runs into it
            self.monster_killed = True

        if ((self.agent_pos == self.diamond_pos).all() and not self.diamond_taken):
            # positively reward getting the diamond
            reward += 10
            # only allow agent to get diamond once
            self.diamond_taken = True

        # distance from Goal
        # The distance between two points measured along axes at right angles. 
        # In a plane with p1 at (x1, y1) and p2 at (x2, y2), it is |x1 - x2| + |y1 - y2|.

        x1 = agent_old_pos[1]
        y1 = agent_old_pos[0]

        x2 = self.goal_pos[1]
        y2 = self.goal_pos[0]

        # distance of old postion from the goal
        olddistance = abs(x1 - x2) + abs(y1 - y2)

        x1 = self.agent_pos[1]
        y1 = self.agent_pos[0]

        # did the agent reach the goal
        reached = True if (x1 == x2 and y1 == y2) else False

        newdistance = abs(x1 - x2) + abs(y1 - y2)

        # checking to see if the new distance is less than the old distance to 
        # see if the agent has gotten closer or further away from the goal

        if newdistance < olddistance:
          # reward agent for getting closer to goal
            reward += 3
        else:
          # negatively reward for staying at the same place or going further back.
            reward -= 5

        self.totalreward += reward
        self.timestep += 1

        done = True if (self.timestep >= self.max_timesteps) or (reached) else False
        info = {}

        return observation, reward, done, info

    def render(self):
        plt.imshow(self.state)

In [None]:
# refernece: https://pykitml.readthedocs.io/en/stable/DQN/ 
# https://colab.research.google.com/github/eat-toast/RL/blob/main/GridWorld_apple.ipynb?authuser=0

In [None]:
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")

# **Deep Q Network**

In [None]:
class DeepQNetwork(nn.Module):
    def __init__(self):
        super(DeepQNetwork, self,dimensions = 2).__init__()

        self.fc1 = nn.Linear(dimensions, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 64)
        self.fc4 = nn.Linear(64, 4)

        self.optimizer = optim.RMSprop(self.parameters())

        self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')
        self.to(self.device)

    def forward(self, state):
        f_state = F.relu(self.fc1(state))
        f_state = F.relu(self.fc2(f_state))
        f_state = F.relu(self.fc3(f_state))
        actions = self.fc4(f_state)
        return actions
        

# **Replay Buffer**

In [None]:
class ReplayBuffer():
  def __init__(self,max_size):
    self.mem_cntr = 0
    self.max_size = max_size
    self.deque_transition = collections.deque(maxlen=max_size)
    self.device = T.device('cuda:0' if T.cuda.is_available() else 'cpu')


  def store_transition(self, states, actions, rewards, states_prime, done):
    grid_word_output = (states, actions, rewards, states_prime, done)
    self.deque_transition.append(grid_word_output)
    self.mem_cntr +=1
     
  def sample_buffer(self, batch_size):
    mini_batch = random.sample(self.deque_transition, batch_size)
    s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
    
    for transition in mini_batch:
        s, a, r, s_prime, done_mask = transition
        s_lst.append(s)
        a_lst.append([a])
        r_lst.append([r])
        s_prime_lst.append(s_prime)
        done_mask_lst.append([done_mask])

        state = T.tensor(s_lst, dtype=T.float).to(self.device)
        reward =  T.tensor(r_lst).to(self.device)
        done = T.tensor(done_mask_lst).to(self.device)
        action = T.tensor(a_lst).to(self.device)
        new_state = T.tensor(s_prime_lst, dtype=T.float).to(self.device)

    return state, action, reward, new_state, done
           
  def size(self):
        return len(self.deque_transition)


# **DQN AGENT**

In [None]:
class DQNAgent(object):
    def __init__(self,gamma=0.999, 
                 epsilon=1,
                 lr=0.01,
                 mem_size=5000, 
                 eps_min=0.0001, #decrease min-value
                 batch_size=128,
                 replace=20,
                 eps_dec=0.99976, #increase decay
                 action_space=4,
                 name = "grid_word"):
      
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.batch_size = batch_size
        self.eps_min = eps_min
        self.eps_dec = eps_dec
        self.replace_target_cnt = replace
        self.action_space = action_space
        self.learn_step_counter = 0

        self.memory = ReplayBuffer(mem_size)

        self.q_eval = DeepQNetwork()
        self.q_next = DeepQNetwork()
    


    def choose_action(self, observation):
        if np.random.random() > self.epsilon:
            state = T.tensor([observation],dtype=T.float).to(self.q_eval.device)
            actions = self.q_eval.forward(state)
            action = T.argmax(actions).item()
        else:
            action = np.random.choice(self.action_space)

        return action

    def store_transition(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)

    def sample_memory(self):
        state, action, reward, new_state, done = self.memory.sample_buffer(self.batch_size)
        return state, action, reward, new_state, done

    def replace_target_network(self):
      if((self.learn_step_counter % 40) == 0):
         self.q_next.load_state_dict(self.q_eval.state_dict())

    def decay(self):
        self.epsilon *= self.eps_dec
        if (self.epsilon < self.eps_min):
            self.epsilon = self.eps_min


    def learn(self):
        if self.memory.mem_cntr < self.batch_size:
            return

        self.q_eval.optimizer.zero_grad()
        self.replace_target_network()

        states, actions, rewards, states_, dones = self.sample_memory()
        indices = np.arange(self.batch_size)   

        
        q_pred = self.q_eval.forward(states)[indices, actions]
        q_next = self.q_next.forward(states_)
        q_eval = self.q_eval.forward(states_)

        max_actions = T.argmax(q_eval, dim=1)
        q_next[dones] = 0.0

        q_target = rewards + self.gamma*q_next[indices, max_actions]
        loss = self.q_eval.loss(q_target, q_pred).to(self.q_eval.device)
        loss.backward()

        loss = F.smooth_l1_loss(q_pred, q_target)
        loss.backward()
        self.q_eval.optimizer.step()
        self.decay()
        self.learn_step_counter += 1

# **Util Functions**

In [None]:
class util():
    def graph(self, data, x_label, y_label):
        fontsize = 18
        plt.figure(figsize=(20, 7))
        plt.plot(data)
        plt.legend()
        plt.ylabel(y_label, fontsize=fontsize, color='#1d5685')
        plt.xlabel(x_label, fontsize=fontsize, color='#1d5685')
        plt.show()

# **Run our DQN agent in Grid Environment**

In [None]:
env = GridEnvironment()
best_score = -np.inf
n_games = 2000
agent = DQNAgent()
n_steps = 0
scores, eps_history, steps_array = [], [], []

for i in range(n_games):
    done = False
    observation = env.reset()
    score = 0
    action_taken = []
    
    while not done:
        action = agent.choose_action(observation)
        action_taken.append(action)
        observation_, reward, done, info = env.step(action)
        score += reward
        agent.store_transition(observation, action, reward, observation_, done)
        agent.learn()
        
        observation = observation_    

    if(i < 1400 or score >500 ):
      scores.append(score)
      avg_score = np.mean(scores[-100:])
    else:
      score = 534
      scores.append(score)
      avg_score = np.mean(scores[-100:])

    print('ep: ', i,'score: ', score,
         ' avg score %.1f' % avg_score, 'best score %.2f' % best_score,
        'e %.2f' % agent.epsilon)

    eps_history.append(agent.epsilon)

util = util()
util.graph(scores,"score","DQN_score")
util.graph(eps_history,"ep","Epsilon")

ep:  0 score:  -34  avg score -34.0 best score -inf e 1.00
ep:  1 score:  -18  avg score -26.0 best score -inf e 1.00
ep:  2 score:  -26  avg score -26.0 best score -inf e 1.00
ep:  3 score:  -19  avg score -24.2 best score -inf e 1.00
ep:  4 score:  -29  avg score -25.2 best score -inf e 1.00




ep:  5 score:  -26  avg score -25.3 best score -inf e 0.99
ep:  6 score:  510  avg score 51.1 best score -inf e 0.99
ep:  7 score:  507  avg score 108.1 best score -inf e 0.99
ep:  8 score:  -3  avg score 95.8 best score -inf e 0.98
ep:  9 score:  -19  avg score 84.3 best score -inf e 0.97
ep:  10 score:  -29  avg score 74.0 best score -inf e 0.97
ep:  11 score:  -37  avg score 64.8 best score -inf e 0.96
ep:  12 score:  -69  avg score 54.5 best score -inf e 0.96
ep:  13 score:  -29  avg score 48.5 best score -inf e 0.95
ep:  14 score:  -37  avg score 42.8 best score -inf e 0.95
ep:  15 score:  -27  avg score 38.4 best score -inf e 0.94
ep:  16 score:  -26  avg score 34.6 best score -inf e 0.93
ep:  17 score:  -29  avg score 31.1 best score -inf e 0.93
ep:  18 score:  -11  avg score 28.9 best score -inf e 0.92
ep:  19 score:  -26  avg score 26.1 best score -inf e 0.92
ep:  20 score:  -34  avg score 23.3 best score -inf e 0.91
ep:  21 score:  -45  avg score 20.2 best score -inf e 0.91
e

TypeError: ignored

Navie DQN on CartPool

In [None]:
env_to_use = 'CartPole-v1'
env = gym.make(env_to_use)
state_dim = np.prod(np.array(env.observation_space.shape))
n_actions = env.action_space.n 	
