# 增强学习：Q-Learning

In [1]:
import numpy as np

## 游戏棋盘

首先，我们创造一个游戏棋盘（`gym`上有许多类似的游戏棋盘可以使用，这里我们自己实现一个简单的游戏棋盘）。  
游戏棋盘上的元素主要包括:  
- 0: 可以走的道路
- X: 陷阱，走入陷阱则算游戏失败
- P: 玩家当前状态(位置)
- G: 终点，当走到终点时，则算游戏胜利

这里，设定初始奖励值为0，每走一步奖励值减0.1，掉入陷阱则奖励值减10，走到终点奖励值加10。

In [2]:
class PlayEnvironment:
    def __init__(self, table_size=(5, 5), traps=[5, 7, 11, 12], 
                 step_cost=-0.1, trap_cost=-10, goal_cost=10, start_state=0, goal_state=None):
        """
        
        :param table_size: 棋盘大小 (length, width)
        :param traps: 陷阱位置
        :param step_cost: 移动损失
        :param trap_cost: 进入陷阱损失
        :param goal_cost: 到底终点奖励
        :param start_state: 起始状态(位置)
        :param gold_state: 终点状态(位置)
        """
        self.table_size = table_size
        self.length = table_size[0]
        self.width = table_size[1]
        
        self.start_state = start_state
        self.current_state = start_state
        self.current_step = 0
        # 如果未传入终点状态，则将最后个状态作为终点状态
        if goal_state is None:
            self.goal_state = self.length * self.width - 1
        else:
            self.goal_state = goal_state
        self.traps = traps
        
        self.step_cost = step_cost
        self.trap_cost = trap_cost
        self.goal_cost = goal_cost
        self.cumulative_reward = 0
        
        self.play_status = 'START'
        
    def reset_evn(self):
        """
        重置游戏环境
        """
        self.current_state = self.start_state
        self.current_step = 0
        self.cumulative_reward = 0
        self.play_status = 'START'
        return self.current_state

    def get_state_size(self):
        return self.length * self.width
    
    def get_action_size(self):
        return 4
    
    def get_random_walk(self):
        return np.random.choice(np.arange(4))
    
    def render_chessboard(self, return_chessboard=False):
        """
        游戏棋盘
        X: 陷阱
        P: 玩家当前位置
        S: 起始位置
        G: 终点位置
        :param return_chessboard: 是否返回游戏棋盘，如果不返回，则打印游戏棋盘
        """
        chessboard = np.full(self.length * self.width, '0')
        chessboard[self.traps] = 'X'
        chessboard[self.start_state] = 'S'
        chessboard[self.goal_state] = 'G'
        chessboard[self.current_state] = 'P'
        
        chessboard = chessboard.reshape(self.width, self.length)
        if return_chessboard:
            return chessboard
        else:
            print("=====step: {}=====".format(self.current_step))
            print("state: {}\tstatus: {}\treawrd: {}".format(self.current_state, 
                                                              self.play_status, 
                                                              self.cumulative_reward))
            for r in chessboard:
                print(r)
            
    def step(self, action):
        """
        
        :param action:
            0 - up
            1 - right
            2 - down
            3 - left
        """
        done = False
        reward = 0
        if action == 0:
            new_state = self.current_state - self.length
            # 检测是否超出了上边界，如果超出了，则设置为原地
            if new_state < 0:
                new_state = self.current_state
        if action == 1:
            # 检测是否超出了右边界，如果超出了，则设置为原地
            if self.current_state % self.length == self.length - 1:
                new_state = self.current_state
            else:
                new_state = self.current_state + 1
        if action == 2:
            new_state = self.current_state + self.length
            # 检测是否超出了下边界，如果超出了，则设置为原地
            if new_state >= self.length * self.width:
                new_state = self.current_state
        if action == 3:
            # 检测是否超出了左边界，如果超出了，则设置为原地
            if self.current_state % self.length == 0:
                new_state = self.current_state
            else:
                new_state = self.current_state - 1
        
        done, reward = self.cal_state_reward(new_state)
        self.cumulative_reward += reward
        self.current_state = new_state
        self.current_step += 1
        if done:
            if new_state == self.goal_state:
                self.play_status = 'GOAL'
            else:
                self.play_status = 'FAIL'
        else:
            self.play_status = 'PALYING'
            
        return done, reward, new_state
    
    def cal_state_reward(self, new_state):
        """
        计算新状态得分
        """
        if new_state in self.traps:
            return True, self.trap_cost
        if new_state == self.goal_state:
            return True, self.goal_cost
        return False, self.step_cost

创建的游戏棋盘，设置陷阱位置，然后查看游戏棋盘的样子

In [3]:
play_env = PlayEnvironment(traps=[5, 6, 7, 14, 17, 18, 20])
play_env.render_chessboard()

=====step: 0=====
state: 0	status: START	reawrd: 0
['P' '0' '0' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']


我们往右移动一步，看下效果（`action`输入为1时，表示右移）

In [4]:
play_env.step(1)
play_env.render_chessboard()

=====step: 1=====
state: 1	status: PALYING	reawrd: -0.1
['S' 'P' '0' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']


## Q-Learning算法

有了游戏棋盘后，可以开始实现Q-learning算法来玩游戏了。  
Q-learning的详细介绍可以查看[这里](https://medium.freecodecamp.org/diving-deeper-into-reinforcement-learning-with-q-learning-c18d0db58efe)，下面将简单介绍一下

首先，创建一个Q-table用来存储状态和行动的奖励。初始化Q-table后，按照下列公式对Q-table进行更新

\begin{equation}  
Q(s, a) = Q(s, a) + \alpha(r + \gamma maxQ(s', a') - Q(s, a))
\end{equation} 

其中，$Q$表示Q-function，用以计算输入状态和行动的奖励值，s为状态，s'为下一个状态，a为行动，a'为下一个行动，$\alpha$为学习率，$\gamma$为折扣率(discounting rate，用于衰减远期的奖励值影响)。  
我们将采用[epsilon贪婪算法](https://imaddabbura.github.io/blog/data%20science/2018/03/31/epsilon-Greedy-Algorithm.html)来权衡探索(Exploration)与利用(Exploitation)。以$\epsilon$的概率选择探索，如果进行探索，随机选择下一步的行动，如果进行利用，则利用现有的Q-table选择行动。  
一开始将$\epsilon$设置为1，主要以探索为主，随后逐渐衰减$\epsilon$，增大利用的概率，按照下列公式进行$\epsilon$更新

\begin{equation}  
\epsilon = \epsilon\mathrm{e}^{-r}
\end{equation} 

其中r为衰减率

首先我们获取`state`和`action`的大小

In [5]:
state_size = play_env.get_state_size()
action_size = play_env.get_action_size()
state_size, action_size

(25, 4)

初始化Q-table

In [6]:
qtable = np.zeros((state_size, action_size))
qtable

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])

设定训练超参数

In [7]:
# 迭代次数
epochs = 5000
# 学习率
learning_rate = 0.75
# 最大步数
max_steps = 99
# 折扣率
gamma = 0.95

epsilon = 1.0
# epsilon衰减率
decay_rate = 0.005

通过Q-learning算法计算Q-table

In [8]:
from collections import defaultdict

# 统计游戏结果
status_count = defaultdict(int)
# 统计最终奖励
final_rewards = np.zeros(epochs)

for epoch in range(epochs):
    state = play_env.reset_evn()
    
    for step in range(max_steps):
        # 确定这一步使用探索还是利用
        ee_num = np.random.random(1)[0]
        if ee_num > epsilon:
            # 使用现有Q-table中最好的action
            action = np.argmax(qtable[state, :])
        else:
            # 使用随机action
            action = play_env.get_random_walk()
            
        done, reward, new_state = play_env.step(action)
        # 按照Q-table更新公式更行qtable
        qtable[state, action] = qtable[state, action] + learning_rate * (
            reward + gamma * np.max(qtable[new_state, :]) - qtable[state, action])
        state = new_state
        
        if done:
            status_count[play_env.play_status] += 1
            final_rewards[epoch] = play_env.cumulative_reward
            break
    
    if step == max_steps - 1:
        status_count[play_env.play_statu] += 1
        final_rewards[epoch] = play_env.cumulative_reward
    # 更新epsilon
    epsilon = epsilon * np.exp(-decay_rate)

查看训练结果

In [9]:
status_count

defaultdict(int, {'FAIL': 394, 'GOAL': 4606})

训练中最大的奖励值

In [10]:
final_rewards.max()

8.9

查看Q-table

In [11]:
qtable

array([[  4.48432105,   4.82560111, -10.        ,   4.4843204 ],
       [  4.8256011 ,   5.18484327, -10.        ,   4.48432087],
       [  5.18484327,   5.56299292, -10.        ,   4.82560111],
       [  5.56299292,   5.18484251,   5.96104518,   5.18484228],
       [  4.73914015,  -0.7199772 ,   3.9966546 ,   5.56299289],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  5.56298554,   5.5629929 ,   6.38004755, -10.        ],
       [ -0.66926995,   3.78100487,  -9.99999762,   5.96104517],
       [ -9.9609375 ,   7.28537125,  -0.38406818,  -0.37838299],
       [-10.        ,   6.82110269,   7.774075  ,   6.82110257],
       [-10.        ,   6.38004755,  -9.99999996,   7.28537125],
       [  5.96104512, -10.        , -10.        ,   6.82110269],
       [  0.        ,   0.        ,   0.        ,   0.        ],
       [  4.66263849,   7

## Play game!

使用训练好的Q-learning算法进行游戏

In [12]:
state = play_env.reset_evn()
play_env.render_chessboard()
for step in range(max_steps):
    action = np.argmax(qtable[state, :])
    done, _, new_state = play_env.step(action)
    play_env.render_chessboard()
    if done:
        if play_env.play_status == 'GOAL':
            print('游戏结束，顺利到达终点！')
        else:
            print('游戏结束，失败！')
        break
    state = new_state
else:
    print('已经到达最大步数，游戏还在进行中，似乎迷路了...')

=====step: 0=====
state: 0	status: START	reawrd: 0
['P' '0' '0' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']
=====step: 1=====
state: 1	status: PALYING	reawrd: -0.1
['S' 'P' '0' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']
=====step: 2=====
state: 2	status: PALYING	reawrd: -0.2
['S' '0' 'P' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']
=====step: 3=====
state: 3	status: PALYING	reawrd: -0.30000000000000004
['S' '0' '0' 'P' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']
=====step: 4=====
state: 8	status: PALYING	reawrd: -0.4
['S' '0' '0' '0' '0']
['X' 'X' 'X' 'P' '0']
['0' '0' '0' '0' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '0' 'G']
=====step: 5=====
state: 13	status: PALYING	reawrd: -0.5
['S' '0' '0' '0' '0']
['X' 'X' 'X' '0' '0']
['0' '0' '0' 'P' 'X']
['0' '0' 'X' 'X' '0']
['X' '0' '0' '