# 强化学习

In [6]:
import numpy
import torch

class BotEnv(object):
    def __init__(self):
        self.n_actions = 3 # 三个动作：前进、左转、右转
        self.power = 0 # 机器人电量
        self.map = None # 当前房间网格地图
        self.pos = None # 机器人当前位置
        self.botdir = None # 机器人当前方向
        # 四个方向变换矩阵
        # 用于计算机器人当前方向对应的前、左、右、后四个方向
        self.alldir = numpy.array([[[1,0],[0,1]],
            [[0,1],[-1,0]],[[0,-1],[1,0]],[[-1,0],[0,-1]]])
        # 初始地图，0表示地面，2表示墙，1表示打扫过的地面
        # 这是一个5*6的矩形房间
        self.init_map = numpy.array([[2,2,2,2,2,2,2,2],[2,0,0,0,0,0,0,2],
            [2,0,0,0,0,0,0,2],[2,0,0,0,0,0,0,2],[2,0,0,0,0,0,0,2],
            [2,0,0,0,0,0,0,2],[2,2,2,2,2,2,2,2]])
        # 调用reset方法将状态初始化，方法定义在后面列出
        self.reset()
    def reset(self):
        # 放置机器人的初始位置
        self.pos = [2,2]
        # 机器人的初始方向
        self.botdir = [1,0]
        # 地图的初始状态
        self.map = numpy.copy(self.init_map)
        # 设置机器人的初始电量略大于可打扫网格数量
        self.power = self.map.shape[0] * self.map.shape[1]
    def get_state(self):
        # 计算机器人在当前位置和当前方向下
        # 前、左、右、后四个方向相邻网格点坐标
        allpos = numpy.matmul(self.botdir, self.alldir) + self.pos
        # 取出这四个相邻网格的地图状态数值
        allidx = numpy.ravel_multi_index(numpy.transpose(allpos), self.map.shape)
        allval = numpy.take(self.map, allidx)
        # 产生机器人当前的观察结果：是否是墙、是否打扫过
        iswall = numpy.array(allval==2, dtype=numpy.float64)
        isdone = numpy.array(allval==1, dtype=numpy.float64)
        # 输出为张量
        return torch.tensor([numpy.array([iswall, isdone]).flatten()], dtype=torch.float)
    def do_action(self, action):
        # 如果电量耗尽，给-100的惩罚，结束试验
        if self.power <= 0:
            return None, torch.tensor([-100], dtype=torch.float)
        # 消耗一格电量
        self.power -= 1
        # 如果动作是左转(1)或者右转(2)
        if action == 1 or action == 2:
            # 更新机器人方向，给-1的能耗惩罚
            self.botdir = numpy.matmul(self.botdir, self.alldir[action])
            return self.get_state(), torch.tensor([-1], dtype=torch.float)
        # 如果动作是前进(action == 0)
        # 前进一个格子，更新机器人位置
        self.pos = numpy.array(self.pos) + self.botdir
        # 提取地图中当前格子的数值
        posval = self.map[self.pos[0], self.pos[1]]
        # 如果撞到了墙，给-100的惩罚，结束试验
        if posval == 2:
            return None, torch.tensor([-100], dtype=torch.float)
        # 计算新的状态
        next_state = self.get_state()
        # 如果当前格子没有打扫过，将它标记为已经打扫过
        if posval == 0:
            self.map[self.pos[0], self.pos[1]] = 1
            # 如果房间打扫完成，给100奖励，结束试验
            if numpy.all(self.map > 0):
                return None, torch.tensor([100], dtype=torch.float)
            # 如果还没有打扫完成，给1个奖励，继续试验
            return next_state, torch.tensor([1], dtype=torch.float)
        # 如果当前格子已经打扫过，给-1的能耗惩罚，继续试验
        return next_state, torch.tensor([-1], dtype=torch.float)


In [8]:
import torch.nn as nn
import torch.nn.functional as F

# 值网络模型
class DqnModel(nn.Module):
    def __init__(self):
        super(DqnModel, self).__init__()
        self.lin1 = nn.Linear(8, 32)
        self.lin2 = nn.Linear(32, 3)
    def forward(self, x):
        x = F.relu(self.lin1(x))
        return self.lin2(x)

# %%
import random
from collections import namedtuple

# 回放记忆的内容为状态转换
# 用一个命名元组表示状态转换
# 分别是当前状态、动作、下一个状态、奖励
Transition = namedtuple('Transition',
    ('state', 'action', 'next_state', 'reward'))

class ReplayMemory(object):
    def __init__(self, capacity):
        # 回放记忆的容量
        self.capacity = capacity
        # 初始化空的记忆
        self.memory = []
        self.position = 0
    def push(self, *args):
        # 记录一个状态转换
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity
    def sample(self, batch_size):
        # 随机采样一批状态转换用作训练样本
        return random.sample(self.memory, batch_size)
    def __len__(self):
        # 返回记忆的长度
        return len(self.memory)

#%%
import math
from itertools import count
import torch.optim as optim

class DqnTrainer(object):
    def __init__(self):
        self.BATCH_SIZE = 8 # 训练批量大小
        self.GAMMA = 0.999 # 回报折扣因子
        self.EPS_START = 0.95 # 探索策略的初始随机度
        self.EPS_END = 0.5 # 探索策略的最终随机度
        self.EPS_DECAY = 200 # 探索策略的随机度下降因子
        self.policy_net = DqnModel() # 探索策略所采用的网络
        self.target_net = DqnModel() # 最终训练的目标网络
        # 同步目标网络和探索策略的状态
        self.target_net.load_state_dict(self.policy_net.state_dict())
        self.target_net.eval()
        # 初始化优化器
        self.optimizer = optim.RMSprop(self.policy_net.parameters())
        # 初始化回放记忆
        self.memory = ReplayMemory(10000)
        self.steps_done = 0 # 记录试验次数
        # 初始化环境
        self.botenv = BotEnv()

    def select_action(self, state):
        sample = random.random()
        # 计算当前的探索策略随机度
        eps_threshold = self.EPS_END+(self.EPS_START-self.EPS_END)*math.exp(
            -1.0*self.steps_done/self.EPS_DECAY)
        self.steps_done += 1
        # 当随机值大于随机度时，采用网络的最大输出决定采取的动作
        # 否则，采取一个随机动作
        if sample > eps_threshold:
            with torch.no_grad():
                return self.policy_net(state).max(1)[1].view(1,1)
        else:
            return torch.tensor([[random.randrange(self.botenv.n_actions)]], dtype=torch.long)

    def optimize_model(self):
        if len(self.memory) < self.BATCH_SIZE:
            return
        # 从回放记忆中进行采样
        transitions = self.memory.sample(self.BATCH_SIZE)
        batch = Transition(*zip(*transitions))
        # 用值网络预测每个转换中的状态和动作对应的期望回报值
        # 这一步计算出的是网络的预测值
        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        state_action_values = self.policy_net(state_batch).gather(1, action_batch)
        # 下面计算动作期望回报的“真实值”
        # 这个真实值是采用时间差分法的近似
        # 是基于下一个状态的值函数进行估计得到的
        # 如果没有下一个状态，那么其对应的期望回报值为0
        next_state_values = torch.zeros(self.BATCH_SIZE)
        # 如果有下一个状态，用目标值网络计算状态动作值函数
        # 选取下一个状态的最大回报动作对应的值作为状态的期望回报
        non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
        non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
            batch.next_state)), dtype=torch.bool)
        next_state_values[non_final_mask] = self.target_net(non_final_next_states).max(1)[0].detach()
        # 当前动作的预期回报的“真实值”可以用当前回报加上下一个状态的期望回报进行估计
        reward_batch = torch.cat(batch.reward)
        expected_state_action_values = (next_state_values * self.GAMMA) + reward_batch
        # 用当前动作回报的预测值和“真实值”计算网络的误差
        # 这里采用平滑的绝对误差
        loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))
        self.optimizer.zero_grad()
        loss.backward()
        for param in self.policy_net.parameters():
            param.grad.data.clamp_(-1, 1)
        self.optimizer.step()

    def train_episode(self):
        # 重置环境，开始一轮新试验
        self.botenv.reset()
        # 获取初始状态
        state = self.botenv.get_state()
        for _ in count():
            # 执行一个动作
            action = self.select_action(state)
            next_state, reward = self.botenv.do_action(action.item())
            reward = torch.tensor([reward])
            # 将状态转换记入回放记忆
            self.memory.push(state, action, next_state, reward)
            state = next_state
            # 用回放记忆中的数据训练模型
            self.optimize_model()
            # 如果试验结束，跳出循环
            if state is None:
                break
        # 将优化过的探索策略网络同步到目标网络
        self.target_net.load_state_dict(self.policy_net.state_dict())


In [9]:
dqn = DqnTrainer()
# 进行10次迭代训练
for i in range(10):
    dqn.train_episode()
