In [1]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
import gym
from collections import namedtuple
import random
import math

In [2]:
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor

In [3]:
class DNN(nn.Module):
    
    def __init__(self):
        super(DNN, self).__init__()
        self.fc1 = nn.Linear(4, 2)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        return x

In [4]:
env = gym.make('CartPole-v0')

In [5]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

In [6]:
class ReplayMemory(object):
    
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0
        
    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.push(None)
        t = Transition(*args)
        self.memory[self.position] = t
        self.position = (self.position + 1) % self.capacity
        
    def __len__(self):
        return len(self.memory)
    
    def sample(self, batch_size):

        return random.sample(self.memory)

In [12]:
BATCH_SIZE = 64
memory = ReplayMemory(10000)
gamma = 0.999
eps_start = 0.9
eps_end = 0.05
eps_decay = 200

model = DNN()
if use_cuda:
    model = model.cuda()
optimizer = optim.RMSprop(model.parameters())

In [8]:
episode = 0

In [9]:
def select_action(state):
    eps = eps_end + (eps_start - eps_end) * math.exp(-1 * episode)
    if random.random() < eps_start:
        return random.randrange(2)
    inputs = Variable(state.view(1, -1))
    outputs = model(inputs)
    a = torch.max(outputs, 1)
    return a

In [10]:
def optimize():
    if len(memory) < BATCH_SIZE:
        return
    
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))
    state_batch = Variable(batch.state)
    action_batch = Variable(batch.action)
    next_state_batch = Variable(batch.next_state)
    reward_batch = Variable(batch.reward)
    
    state_action_values = model(state_batch).gather(1, action_batch)
#     next_state_values = Variable(torch.zeros(BATCH_SIZE).type(Tensor))
    next_state_values = torch.max(1, model(next_state_batch))
    expected_state_action_values = next_state_values * gamma + reward_batch
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
    
    optimizer.zero_grad()
    loss.backward()
    
    

In [199]:
t1 = Transition(torch.from_numpy(np.asarray([1,2,3]).reshape(1,3)).cuda(), 'action', 'next_state', 'reward')
t2 = Transition(torch.from_numpy(np.asarray([3,4,5]).reshape(1,3)).cuda(), 'action', 'next_state', 'reward')
ts = [t1, t2]

In [198]:
torch.from_numpy(np.asarray([1,2,3]).reshape(1,3))


 1  2  3
[torch.LongTensor of size 1x3]

In [200]:
batch = Transition(*zip(*ts))
batch

Transition(state=(
 1  2  3
[torch.cuda.LongTensor of size 1x3 (GPU 0)]
, 
 3  4  5
[torch.cuda.LongTensor of size 1x3 (GPU 0)]
), action=('action', 'action'), next_state=('next_state', 'next_state'), reward=('reward', 'reward'))

In [201]:
batch.state

(
  1  2  3
 [torch.cuda.LongTensor of size 1x3 (GPU 0)], 
  3  4  5
 [torch.cuda.LongTensor of size 1x3 (GPU 0)])

In [202]:
state_batch = torch.cat(batch.state, 0)
state_batch


 1  2  3
 3  4  5
[torch.cuda.LongTensor of size 2x3 (GPU 0)]

In [None]:
def simulate():
    done = False
    state = env.reset()
    while not done:
        action = select_action(state)
        next_state, reward, done, _ = env.step(action)
        memory.push(state, action, next_state, reward)
        optimize()

In [175]:
env.reset()
type(env.step(1)[0])

numpy.ndarray

In [179]:
np.asarray([1,2,3])

array([1, 2, 3])