In [13]:
%matplotlib inline
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T  

In [14]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython: from IPython import display

In [15]:
class DQN(nn.Module):
    def __init__(self):
        super().__init__()
            
        self.fc1 = nn.Linear(in_features=4, out_features=24)   
        self.fc2 = nn.Linear(in_features=24, out_features=32)
        self.out = nn.Linear(in_features=32, out_features=2)
    def forward(self, t):
        t = t.flatten(start_dim=1)
        t = F.relu(self.fc1(t))
        t = F.relu(self.fc2(t))
        t = self.out(t)
        return t

In [17]:
Experience = namedtuple(
    'Experience',
    ('state', 'action', 'next_state', 'reward')
)

In [18]:
class ReplayMemory():
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.push_count = 0
    def push(self, experience):
        if len(self.memory) < self.capacity:
            self.memory.append(experience)
        else:
            self.memory[self.push_count % self.capacity] = experience
        self.push_count += 1
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    def can_provide_sample(self, batch_size):
        return len(self.memory) >= batch_size

In [22]:
class EpsilonGreedyStrategy():
    def __init__(self, start, end, decay):
        self.start = start
        self.end = end
        self.decay = decay

In [23]:
def get_exploration_rate(self, current_step):
    return self.end + (self.start - self.end) * \
        math.exp(-1. * current_step * self.decay)

In [24]:
class Agent():
    def __init__(self, strategy, num_actions):
        self.current_step = 0
        self.strategy = strategy
        self.num_actions = num_actions
    def select_action(self, state, policy_net):
        rate = strategy.get_exploration_rate(self.current_step)
        self.current_step += 1

        if rate > random.random():
            return random.randrange(self.num_actions) # explore      
        else:
            with torch.no_grad():
                return policy_net(state).argmax(dim=1).item() # exploit

In [None]:
env = gym.make('CartPole-v0')

#hyperparameters
learning_rate = 0.0005
discount_rate = .99
max_steps = 500
num_episodes = 100000
num_steps_until_reset_target = 5000
batch_size = 100

#exploration rate values
eps = 1
min_eps_val = 0.01
max_eps_val = 1
eps_decay_rate = 0.001

#initialize replay memory
replay_memory = []
replay_memory_size = 5000
push_count = 0

#initializes the networks
policy_net = DQN()
target_net = DQN()
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

rewards = []
avg = 0

for episode in range(num_episodes):
#     if episode % 200 == 0:
#         print(episode)
    state = env.reset()
    done = False
    current_reward = 0

    for step in range(max_steps):
        # env.render()
        #updates the target network
        if push_count % num_steps_until_reset_target == 0:
            target_net.load_state_dict(policy_net.state_dict())

        #Selects action via exploration or exploitation
        if np.random.random_sample() > eps:
            with torch.no_grad():
            # t.max(1) will return largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            action = policy_net(state).max(1)[1].view(1, 1)
        else:
            action = env.action_space.sample()

        #execute selected action
        new_state, reward, done, info = env.step(action)

        #store in replay memory
        if len(replay_memory) < replay_memory_size:
             replay_memory.append((state, action, reward, new_state, done))
        else:
            replay_memory[push_count % replay_memory_size] = (state, action, reward, new_state, done)
        push_count += 1
        
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.uint8)

        if len(replay_memory) >= batch_size:
            batch = random.sample(replay_memory, batch_size)
            
        state_action_values = policy_net([:,0]).gather(1, [:,1])
        next_state_values = torch.zeros(batch_size, device=device)

        current_reward += reward

        if done == True:
            break
    
    #decays the epsilon
    eps = min_eps_val + (max_eps_val - min_eps_val) * np.exp(-eps_decay_rate*episode)
    avg += current_reward
    if episode % 1000 == 0:
        print(avg / 1000)
        avg = 0
        rewards.append(current_reward)