In [1]:
%matplotlib inline
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from copy import deepcopy
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision.transforms as T

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display


plt.ion()

#GPU
use_cuda = torch.cuda.is_available()
FloatTensor = torch.cuda.FloatTensor if use_cuda else torch.FloatTensor
LongTensor = torch.cuda.LongTensor if use_cuda else torch.LongTensor
ByteTensor = torch.cuda.ByteTensor if use_cuda else torch.ByteTensor
Tensor = FloatTensor

### 经验重放

In [2]:
Clip = namedtuple('Clip',('state', 'action', 'next_state', 'reward')) #命名元组

class ReplayMemory(object):

    def __init__(self, capacity): #初始化
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args): #添加transition
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Clip(*args)
        self.position = (self.position + 1) % self.capacity # %取余数，可以循环

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

### Q网络

In [3]:
def ini_net(md):
    for m in md.modules():
        if isinstance(m, nn.Linear):
            torch.nn.init.normal(m.weight.data)
            torch.nn.init.normal(m.bias.data)

class DQN(nn.Module):

    def __init__(self):
        super(DQN, self).__init__()
        self.MLP = nn.Sequential(
            nn.Linear(4,100),
            nn.ReLU(),
            nn.Linear(100,100),
            nn.ReLU(),
            nn.Linear(100,2),
        )
    def forward(self, x):
        x = self.MLP(x)
        return x
    
net = DQN()
net.apply(ini_net)
testx = Variable(torch.randn(1,4))
net(testx)

Variable containing:
 33.4336 -70.9401
[torch.FloatTensor of size 1x2]

### 动作策略(e贪婪法)

In [4]:
def select_action(state): #策略：e贪婪法选择动作的策略
    global steps_done
    sample = random.random() #产生一个随机数
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY) #一个衰减的eps
    eps_threshold = EPS_END
    steps_done += 1 
    if sample > eps_threshold: #如果随机数大于eps
        return model1(Variable(state, volatile=True).type(FloatTensor)).data.max(1)[1].view(1, 1) #Q网络输出的0和1
    else:
        return LongTensor([[random.randrange(2)]]) #否则随机选择0和1

### 值迭代

In [5]:
def optimize_model():
    global last_sync
    if len(memory) < BATCH_SIZE: #如果记忆长度小于batch_size就弹出
        return
    
    #取出一个批次的片段
    clips = memory.sample(BATCH_SIZE) #取一个batch_size长度的transitions
    batch = Clip(*zip(*clips)) #打一下包
    s2_batch = Variable(torch.cat(batch.next_state),volatile=True)
    s_batch = Variable(torch.cat(batch.state))
    a_batch = Variable(torch.cat(batch.action))
    r_batch = Variable(torch.cat(batch.reward))
    
    #计算Q(s,a)
    Qsa = model1(s_batch).gather(1, a_batch) 

    #计算Q(s_,a_) 
    Qs2a = Variable(torch.zeros(BATCH_SIZE).type(Tensor)) #全0
    Qs2a = model2(s2_batch).max(1)[0] #取最大的一个，动作
    
    # 计算y = r + Q(s_,a_)
    y = (Qs2a * GAMMA) + r_batch

    # Huber loss
    #loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)
    loss = torch.nn.MSELoss()(Qsa, y)
    # 优化model
    optimizer.zero_grad()
    loss.backward()
    #为了防止值过大，可以裁剪梯度
    #for param in model.parameters():
    #    param.grad.data.clamp_(-1, 1)
    optimizer.step()
    return loss

### 超参数

In [6]:
BATCH_SIZE = 64
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.01
EPS_DECAY = 200
num_episodes = 5000

model1 = DQN().cuda()
model1.apply(ini_net)
#optimizer = optim.RMSprop(model.parameters())
model2 = deepcopy(model1)
optimizer = optim.Adam(model1.parameters(),lr=1e-3)
memory = ReplayMemory(100000) #10000个重放记忆

env = gym.make('CartPole-v0').unwrapped
env.reset()

steps_done = 0

### 训练

In [7]:
from tensorboardX import SummaryWriter
writer = SummaryWriter()

def get_state(): #获得修改后的屏幕图像
    state = FloatTensor(env.state).cuda().view(1,-1)
    return state

for i_episode in range(num_episodes):
    # 初始化环境
    env.reset()
    s = get_state()
    for t in count(): #无限循环
        #env.render()
        a = select_action(s) #选择一个动作
        _, r, done, _ = env.step(a[0, 0]) #计算该动作的奖励，done
        
        if done: r -= 50
        r = Tensor([r]) #存储奖励

        s_ = get_state() #下一个状态位当前屏幕状态-上一个屏幕状态\
        memory.push(s, a, s_, r) # 把片段存入重放记忆
        
        # 变成下一个状态
        s = s_
        loss = optimize_model()
        
        if done:
            writer.add_scalar('t', t, i_episode)
            #print(i_episode, t)
            break
    if i_episode % 20 == 0:
        model2 = deepcopy(model1)

print('Complete')
writer.close()

KeyboardInterrupt: 

修改后比较理想的版本，可以达到一个很平衡的状态维持几乎无限时间。