In [23]:
"""
    Description:
        A pole is attached by an un-actuated joint to a cart, which moves along
        a frictionless track. The pendulum starts upright, and the goal is to
        prevent it from falling over by increasing and reducing the cart's
        velocity.
    Source:
        This environment corresponds to the version of the cart-pole problem
        described by Barto, Sutton, and Anderson
    Observation:
        Type: Box(4)
        Num	Observation               Min             Max
        0	Cart Position             -4.8            4.8
        1	Cart Velocity             -Inf            Inf
        2	Pole Angle                -24 deg         24 deg
        3	Pole Velocity At Tip      -Inf            Inf
    Actions:
        Type: Discrete(2)
        Num	Action
        0	Push cart to the left
        1	Push cart to the right
        Note: The amount the velocity that is reduced or increased is not
        fixed; it depends on the angle the pole is pointing. This is because
        the center of gravity of the pole increases the amount of energy needed
        to move the cart underneath it
    Reward:
        Original reward is 1 for every step taken, including the termination step
    Starting State:
        All observations are assigned a uniform random value in [-0.05..0.05]
    Episode Termination:
        Pole Angle is more than 12 degrees.
        Cart Position is more than 2.4 (center of the cart reaches the edge of
        the display).
        Episode length is greater than 200.
        Solved Requirements:
        Considered solved when the average reward is greater than or equal to
        195.0 over 100 consecutive trials.
"""

import random
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
from collections import namedtuple

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# hypterparameters
BATCH_SIZE = 32
LR = 0.01                   # learning rate
EPSILON = 0.9               # 最优选择动作百分比
# GAMMA = 0.9                 
EPS_START = 0.9            # Determine epsilon adaptively
EPS_END = 0.05
EPS_DECAY = 200
TARGET_REPLACE_ITER = 100   # Q 现实网络的更新频率
MEMORY_CAPACITY = 2000      # 记忆库大小
env = gym.make('CartPole-v0')   # 立杆子游戏
env = env.unwrapped
N_ACTIONS = env.action_space.n  # 杆子能做的动作
N_STATES = env.observation_space.shape[0]   # 杆子能获取的环境信息数

Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state'))

class ReplayMemory(object):
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0

    def push(self, *args):
        """Saves a transition."""
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# Q-network
class QNetwork(nn.Module):
    def __init__(self, ):
        super().__init__()
        self.fc1 = nn.Linear(N_STATES, 10)
        self.fc1.weight.data.normal_(0, 0.1)   # initialization
        self.out = nn.Linear(10, N_ACTIONS)
        self.out.weight.data.normal_(0, 0.1)   # initialization

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        actions_value = self.out(x)
        return actions_value
        
# DQN framework
class DQN(object):
    # 建立 target net 和 eval net 还有 memory
    def __init__(self):
        self.eval_net, self.target_net = QNetwork().to(device), QNetwork().to(device)

        self.learn_step_counter = 0     # 用于 target 更新计时
        self.memory = ReplayMemory(MEMORY_CAPACITY)
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)    # torch 的优化器
        self.loss_func = nn.MSELoss()   # 误差公式
    
    # 根据环境观测值选择动作的机制
    def choose_action(self, x):
        global steps_done
        x = torch.unsqueeze(torch.FloatTensor(x), 0)
        eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
        steps_done += 1
        # 这里只输入一个 sample
        if np.random.uniform() > eps_threshold:   # 选最优动作
            actions_value = self.eval_net.forward(x)
            action = torch.max(actions_value, 1)[1].numpy()[0]     # return the argmax
        else:   # 选随机动作
            action = np.random.randint(0, N_ACTIONS)
        return action
    
    # target 网络更新
    # 学习记忆库中的记忆
    def learn(self):
        if len(self.memory) < BATCH_SIZE:
            return
        
        # target net 参数更新
        if self.learn_step_counter % TARGET_REPLACE_ITER == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())
        self.learn_step_counter += 1

        # 抽取记忆库中的批数据
        batch = self.memory.sample(BATCH_SIZE)
        # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
        # detailed explanation). This converts batch-array of Transitions
        # to Transition of batch-arrays.
        b_memory = Transition(*zip(*batch))
        b_s = torch.FloatTensor(b_memory.state)
        b_a = torch.LongTensor(b_memory.action).unsqueeze(1)
        b_r = torch.FloatTensor(b_memory.reward).unsqueeze(1)
        b_s_ = torch.FloatTensor(b_memory.next_state)

        # 针对做过的动作b_a, 来选 q_eval 的值, (q_eval 原本有所有动作的值)
        q_eval = self.eval_net(b_s).gather(1, b_a)  # shape (batch, 1)
        q_next = self.target_net(b_s_).detach()     # q_next 不进行反向传递误差, 所以 detach
        q_target = b_r + GAMMA * q_next.max(1)[0]   # shape (batch, 1)
        loss = self.loss_func(q_eval, q_target)

        # 计算, 更新 eval net
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()




In [24]:
# training
dqn = DQN() # 定义 DQN 系统

for i_episode in range(1000):
    s = env.reset()
    time_count = 1
    while True:
        # env.render()    # 显示实验动画
        a = dqn.choose_action(s)

        # 选动作, 得到环境反馈
        s_, r, done, info = env.step(a)
        
        x, x_dot, theta, theta_dot = s_   # 细分开, 为了修改原配的 reward

        # x 是车的水平位移, 所以 r1 是车越偏离中心, 分越少
        # theta 是棒子离垂直的角度, 角度越大, 越不垂直. 所以 r2 是棒越垂直, 分越高
        r1 = (env.x_threshold - abs(x))/env.x_threshold - 0.8
        r2 = (env.theta_threshold_radians - abs(theta))/env.theta_threshold_radians - 0.5
        reward = r1 + r2   # 总 reward 是 r1 和 r2 的结合, 既考虑位置, 也考虑角度, 这样 DQN 学习更有效率

        # 存记忆
        dqn.memory.push(s, a, r, s_)
        # learning
        dqn.learn()

        if done:    # 如果回合结束, 进入下回合
            break

        s = s_
        time_count += 1
    if i_episode % 50 == 0:
        print("Episode finished after {} timesteps".format(time_count+1))
env.close()

Episode finished after 16 timesteps


  return F.mse_loss(input, target, reduction=self.reduction)


Episode finished after 11 timesteps
Episode finished after 10 timesteps
Episode finished after 12 timesteps
Episode finished after 11 timesteps
Episode finished after 13 timesteps
Episode finished after 21 timesteps
Episode finished after 22 timesteps
Episode finished after 21 timesteps
Episode finished after 23 timesteps
Episode finished after 24 timesteps
Episode finished after 22 timesteps
Episode finished after 26 timesteps
Episode finished after 37 timesteps
Episode finished after 23 timesteps
Episode finished after 22 timesteps
Episode finished after 30 timesteps
Episode finished after 12 timesteps
Episode finished after 37 timesteps
Episode finished after 30 timesteps
