In [1]:
import gym
from gym import wrappers
import random
import math
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import matplotlib.pyplot as plt
from collections import deque

In [2]:
env = gym.make('CartPole-v1')

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [3]:
# hyper parameters
EPISODES = 100  # number of episodes
EPS_START = 0.9  # e-greedy threshold start value
EPS_END = 0.05  # e-greedy threshold end value
EPS_DECAY = 200  # e-greedy threshold decay
GAMMA = 0.8  # Q-learning discount factor
LR = 0.001  # NN optimizer learning rate
HIDDEN_LAYER = 256  # NN hidden layer size
BATCH_SIZE = 64  # Q-learning batch size

In [4]:
# if gpu is to be used
use_cuda = False

In [5]:
model = nn.Sequential(
            nn.Linear(4, HIDDEN_LAYER),
            nn.ReLU(),
            nn.Linear(HIDDEN_LAYER, 2)
        )

In [6]:
env = gym.make('CartPole-v0')

memory = deque(maxlen=10000)
optimizer = optim.Adam(model.parameters(), LR)
steps_done = 0
episode_durations = []

[33mWARN: gym.spaces.Box autodetected dtype as <class 'numpy.float32'>. Please provide explicit dtype.[0m


In [7]:
# class DQNAgent:
#     def __init__(self):
#         self.net = nn.Sequential(
#             nn.Linear(4, HIDDEN_LAYER),
#             nn.ReLU(),
#             nn.Linear(HIDDEN_LAYER, 2)
#         )
    
#     def act(state):
#         state = torch.FloatTensor(state)
#         eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
#         steps_done += 1
#         if np.random.uniform() > eps_threshold:
#             return model(Variable(state, volatile=True)).data.max(1)[1].view(1, 1)
#         else:
#             return torch.LongTensor([[random.randrange(2)]])
        
#     def memorize():
#         pass
        
#     def learn():
#         pass

In [8]:
agent = DQNAgent()

In [9]:
def select_action(state):
    global steps_done
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if random.random() > eps_threshold:
        return model(Variable(state, volatile=True)).data.max(1)[1].view(1, 1)
    else:
        return torch.LongTensor([[random.randrange(2)]])


In [10]:
def run_episode(e, environment):
    state = environment.reset()
    steps = 0
    while True:
        environment.render()
        action = select_action(torch.FloatTensor([state]))
        next_state, reward, done, _ = environment.step(action.item())

        # negative reward when attempt ends
        if done:
            reward = -1

        memory.append((torch.FloatTensor([state]),
                      action,  # action is already a tensor
                      torch.FloatTensor([next_state]),
                      torch.FloatTensor([reward])))

        learn()

        state = next_state
        steps += 1

        if done:
            print("{2} Episode {0} finished after {1} steps"
                  .format(e, steps, '\033[92m' if steps >= 195 else '\033[99m'))
            episode_durations.append(steps)
            break

In [11]:
def learn():
    if len(memory) < BATCH_SIZE:
        return

    # random transition batch is taken from experience replay memory
    transitions = random.sample(memory, BATCH_SIZE)
    batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)

    batch_state = torch.cat(batch_state)
    batch_action = torch.cat(batch_action)
    batch_reward = torch.cat(batch_reward)
    batch_next_state = torch.cat(batch_next_state)

    # current Q values are estimated by NN for all actions
    current_q_values = model(batch_state).gather(1, batch_action)
    # expected Q values are estimated from actions which gives maximum Q value
    max_next_q_values = model(batch_next_state).detach().max(1)[0]
    expected_q_values = batch_reward + (GAMMA * max_next_q_values)

    # loss is measured from error between current and newly expected Q values
    loss = F.smooth_l1_loss(current_q_values.squeeze(), expected_q_values)

    # backpropagation of loss to NN
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [12]:
for e in range(EPISODES):
    run_episode(e, env)

  


[99m Episode 0 finished after 37 steps
[99m Episode 1 finished after 30 steps
[99m Episode 2 finished after 13 steps
[99m Episode 3 finished after 11 steps
[99m Episode 4 finished after 13 steps
[99m Episode 5 finished after 13 steps
[99m Episode 6 finished after 11 steps
[99m Episode 7 finished after 13 steps
[99m Episode 8 finished after 11 steps
[99m Episode 9 finished after 10 steps
[99m Episode 10 finished after 11 steps
[99m Episode 11 finished after 9 steps
[99m Episode 12 finished after 10 steps
[99m Episode 13 finished after 12 steps
[99m Episode 14 finished after 10 steps
[99m Episode 15 finished after 10 steps
[99m Episode 16 finished after 12 steps
[99m Episode 17 finished after 8 steps
[99m Episode 18 finished after 10 steps
[99m Episode 19 finished after 10 steps
[99m Episode 20 finished after 11 steps
[99m Episode 21 finished after 9 steps
[99m Episode 22 finished after 11 steps
[99m Episode 23 finished after 9 steps
[99m Episode 24 finished afte