In [None]:
import gym

import torch
from torch import nn, optim
from torch.autograd import Variable

from collections import deque
import numpy as np
import random
from operator import itemgetter


In [None]:
env = gym.make('CartPole-v0')

In [None]:
BATCH_SIZE = 64
EPOCHES = 100
EPSILON = 0.9
GAMMA = 0.9
MEMORY_CAPACITY = 2000
TARGET_REPLACE_ITER = 100

N_STATES = env.observation_space.shape[0]
N_ACTIONS = env.action_space.n

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Sequential(
            nn.Linear(N_STATES, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, N_ACTIONS),
        )
    def forward(self, x):
        out = self.fc1(x)
        return out


In [None]:
class DQN(object):
    def __init__(self):
        self.eval_net = Net()
        self.target_net = Net()
        if torch.cuda.is_available():
            self.eval_net = self.eval_net.cuda()
            self.target_net = self.target_net.cuda()
        self.memory = deque(maxlen=MEMORY_CAPACITY)
        self.criterion = nn.MSELoss()
        self.optimizer = optim.Adam(self.eval_net.parameters(), lr=0.01)
    def choose_action(self, state):
        x = Variable(torch.FloatTensor(state).unsqueeze(0), volatile=True)
        if torch.cuda.is_available():
            x = x.cuda()
        if np.random.uniform() < EPSILON:
            action_values = self.eval_net(x)
            action = torch.max(action_values, -1)[1].data[0, 0]
        else:
            action = np.random.randint(0, N_ACTIONS)
        return action
    def refresh_target_net(self):
        self.target_net.load_state_dict(self.eval_net.state_dict())
    def learn(self):
        train_data = random.sample(self.memory, BATCH_SIZE)
        states = Variable(torch.FloatTensor(list(map(itemgetter(0), train_data))))
        actions = Variable(torch.LongTensor(list(map(itemgetter(1), train_data))))
        rewards = Variable(torch.FloatTensor(list(map(itemgetter(2), train_data))))
        next_states = Variable(torch.FloatTensor(list(map(itemgetter(3), train_data))))
        if torch.cuda.is_available():
            states = states.cuda()
            actions = actions.cuda()
            rewards = rewards.cuda()
            next_states = next_states.cuda()
        q_eval = self.eval_net(states).gather(1, actions.unsqueeze(1))
        q_next = self.target_net(next_states).detach()
        q_target = rewards + GAMMA * q_next.max(-1)[0]
        loss = self.criterion(q_eval, q_target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
    def remember(self, state, action, reward, next_state):
        self.memory.append((state, action, reward, next_state))

dqn = DQN()

In [None]:
from IPython.core.debugger import set_trace

env = env.unwrapped

for epoch in range(EPOCHES):
    print('*'*10)
    print('Epoch: {}'.format(epoch))
    state = env.reset()
    done = False
    steps = 0
    scores = 0.0
    while not done:
        action = dqn.choose_action(state)
        next_state, reward, done, info = env.step(action)
        scores += reward

        # Revise reward
        x, x_dot, theta, theta_dot = next_state
        r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
        reward = r1 + r2

        dqn.remember(state, action, reward, next_state)
        if len(dqn.memory) > BATCH_SIZE:
            dqn.learn()
        if steps%TARGET_REPLACE_ITER == 0:
            dqn.refresh_target_net()
        state = next_state
        steps += 1
    print('Steps: {}'.format(steps))
    print('Scores: {}'.format(scores))