#### RL Course by David Silver

## Lecture 3  Planning by Dynamic Programming

- Policy Evaluation
- Policy Iteration
- Value Iteration

# 强化学习

强化学习的过程中，机器在环境中通过尝试得到一个策略$\pi$,使其能够在遇到状态x时,就知道要采取的动作$a=\pi(x)$.策略通常有两种表示方法,一种是$\pi:X\to A$,常用于确定性策略,在状态x下只选择a动作;另一种是$\pi:X \times A \to \mathbb{R}$,常用于随机性策略,在状态x下选择动作a的概率为$\pi(x,a)$.

策略的优劣由该策略的长期累积奖赏决定,强化学习的目的就是寻找长期累计奖赏最大化的策略.长期累积奖赏主要有两种计算方式:一种是t步累积奖赏,表示为$E[\frac{1}{T}\sum_{t=1}^{T}r^t]$,另一种是r折扣累积奖赏,表示为$E[\sum_{t=0}^{+\infty}\gamma^t r^{t+1}]$.其中$r^t$为t步获得的奖赏.

## 探索-利用窘境

在讨论长期累积奖赏最大化之前,以K-摇臂赌博机为例讨论下探索-利用窘境. K-摇臂赌博机的目标是长期累积奖赏最大的简化版本,单步奖赏最大化. 选择某一摇臂这个动作对应的奖赏服从一个概率分布,仅通过一次尝试不能够确定该动作的平均奖赏值. 因此,总尝试次数有限的情况下,如果将所有次数都分配给第一遍尝试中奖励最大的摇臂(利用),可能会导致无法找到最优摇臂; 如果将次数平均分配给每个摇臂(探索),则可能导致最终的平均奖赏较低.这个问题称为探索-利用窘境.
在探索和利用之间如何折中有两种经典算法:e-贪心算法和softmax算法, 两者的区别主要体现在非当前最优动作被采纳的概率是否平均以及当前最优动作被采纳的概率优势大小, 两者均可以通过调参在偏向探索和偏向利用之间调整.
<table>
    <tr>
        <td><top><img src="http://cdn.hustcaid.com/Fscr-uFGqD1x_kiq5fc0Y_AsFPtN.jpg" width=100%></top></td>
        <td><top><img src="http://cdn.hustcaid.com/FozXrPm4wFB9ovfZ-LuMDVagF60m.jpg" width=100%></top></td>
    </tr>
</table>



## 1. 马尔科夫决策过程

强化学习是建立在马尔科夫决策过程之上的一个体系, 在学习强化学习之前,不可避免要先了解什么是马尔科夫决策过程(MDP).

马尔科夫决策过程是在马尔科夫过程的基础上, 增加了动作,奖励的概念, 包括了五个部分:

- 状态S:有限的状态集
- 动作A:动作的有限集
- 概率P:状态转移概率矩阵
    
    $p^a_{ss^\prime}=P[S_{t+1}=s^\prime|S_t=s,A_t=a]$
- 奖励R:奖励函数
    
    $R^a_s=E[R_{t+1}|S_t= s, A_t = a]$
- $\gamma$衰减系数:$\gamma\in[0,1]$

对于一个MDP, 可以产生多个策略作为决策过程的依据. 一个策略包括各个给定状态下的动作概率分布
$$\pi(a|s)=P[A_t=a|S_t=s]$$

下图是一个机器人从任意一个状态出发寻找金币的例子。找到金币则获得奖励 1，碰到海盗则损失 1。找到金币或者碰到海盗则机器人停止。

![picture](http://www.algorithmdog.com/wp-content/uploads/2016/04/mdp.png)

可以把这个问题建模成马尔科夫决策过程。图中不同位置为状态，因此 $S = \{1,\dots,8\}$。机器人采取动作是向东南西北四个方向走，因此A={‘n’,’e’,’s’,’w’}。转移概率方面，当机器人碰到墙壁，则会停在原来的位置；当机器人找到金币时获得奖励 1，当碰到海盗则损失 1, 其他情况不奖励也不惩罚。因此除了 $R_{1,s}=−1, R_{3,s}=1，R_{5,s}=−1 $之外，其他情况$R_{∗,∗}=0$。采用折扣累积奖赏的计算方式,衰减因子$\gamma$。写成代码下面所示。

In [2]:
class Mdp:
    def __init__(self):
        self.state = [1, 2, 3, 4, 5, 6, 7, 8] # set of states
        self.action = ['n','s', 'e', 'w'] # set of actions
        self.reward = dict() # set of reward among s-a pairs
        self.reward['1-s'] = -1.0
        self.reward['3-s'] = 1.0
        self.reward['5-s'] = -1.0
        self.terminate = dict() # set of terminate states
        self.terminate[6] = True
        self.terminate[7] = True
        self.terminate[8] = True

        self.t = dict() # set of state transforms
        self.t['1-e'] = 2
        self.t['1-s'] = 6
        self.t['2-w'] = 1
        self.t['2-e'] = 3
        self.t['3-w'] = 2
        self.t['3-s'] = 7
        self.t['3-e'] = 4
        self.t['4-w'] = 3
        self.t['4-e'] = 5
        self.t['5-w'] = 4
        self.t['5-s'] = 8
        self.gamma = 0.5

    def transform(self, state, action): # return is_terminate, state, reward
        if state in self.terminate: # start at terminate state
            return True, state, 0.0
        key = "%d-%s"%(state, action)
        if key in self.t : # legal state-action pair
            state = self.t[key]
            if key in self.reward:
                return True, state, self.reward[key] # return the terminate state and reward of s-a pair
            else:
                return False, state, 0.0 # not the state-action pair leading to terminal.
        else:
            return False, state, 0.0 # illegal state-action pair, stay at former state with reward of 0


疑惑:
    reward 什么时候分配在状态上 什么时候分配在动作上 对实现有影响

## 有模型学习

有模型学习是指MDP的四要素已知的学习方法, 有模型的学习主要包括策略迭代和值迭代两种方式.

### 1. 策略估值

按照slide中迭代的策略估值的公式
![pictures](http://cdn.hustcaid.com/FsDgiuWjv_I78Zw86MsoZ750i3c1.png)
迭代计算上图中的各个点.
具体算法如下:

<img src="http://cdn.hustcaid.com/FnCTO1K4honIRHcjTM1uZVwS-N_n.jpeg" width=50%></img>

需要注意的是$R^{\pi}$是从当前状态按照策略产生的奖励, 故对于1状态来说, 其$R^{\pi}=0.25$;对于6,7,8状态来说,由于是直接终止,故$R^{\pi} = 0$
以V(1)在迭代结束的时候的迭代步骤为例, 根据[dp](http://www0.cs.ucl.ac.uk/staff/D.Silver/web/Teaching_files/DP.pdf)中的描述, 当迭代结束时, $|v^{k+1}-v^{k}\leq \epsilon$,此时有
$$ 
\begin{align}
V(1)=0.25+0.5\times (0.25\times0 + 0.5\times-0.335 + 0.25\times-0.008)\\
     =-0.335
\end{align}
$$
完整的计算结果如下:
![result](http://www.algorithmdog.com/wp-content/uploads/2016/04/mdp-value.png)


In [7]:
# 基于迭代的策略估值
def policy(mdp, state, action):
    return 1.0/len(mdp.action)

def policy_evaluation(mdp, policy):
    values = [0.0]*(len(mdp.state)+1)
    error_s = float('inf')
    epsilon = 0.001
    while(error_s > epsilon):
        aux = list(values)
        values = [0.0]*len(values)
        error = 0
        for s in mdp.state:
            for a in mdp.action:
                is_terminal, state, reward = mdp.transform(s, a)
                values[s] += policy(mdp, s, a) * (reward + mdp.gamma * aux[state])
            error = max(error, abs(values[s] - aux[s]))
        error_s = error
    return values

mdp = Mdp()
values = policy_evaluation(mdp, policy)
print(values)

[0.0, -0.3344268798828125, -0.0081329345703125, 0.28360748291015625, -0.0081329345703125, -0.3344268798828125, 0.0, 0.0, 0.0]


In [5]:
# 基于迭代的策略估值 按照贝克曼方程进行向量加速
# V^{k+1} = R^{\pi}+\gamma P^{\pi}v^k
import numpy as np
# init
R = np.array([0, -0.25, 0, 0.25, 0, -0.25, 0, 0, 0], np.float32)
R = R[:,np.newaxis] # format the shape of R from (9,) to (9, 1)
V = np.zeros((9, 1), np.float32)
p = np.zeros((9,9), np.float32)
p[1][1] = 0.5
p[1][2] = 0.25
p[1][6] = 0.25
p[2][1] = 0.25
p[2][2] = 0.5
p[2][3] = 0.25
p[3][2] = 0.25
p[3][3] = 0.25
p[3][4] = 0.25
p[3][7] = 0.25
p[4][3] = 0.25
p[4][4] = 0.5
p[4][5] = 0.25
p[5][4] = 0.25
p[5][5] = 0.5
p[5][8] = 0.25
gamma = 0.5
error_s = np.inf
epsilon = 0.001
p
assert V.shape == (9, 1)
assert R.shape == (9, 1)
assert p.shape == (9, 9)

# evaluation 
while (error_s > epsilon):
    V_k = V.copy()
    V = R + gamma*p.dot(V_k)
    error_s = np.max(np.abs(V - V_k))

np.set_printoptions(precision=3) # pretty the output
print(V)

[[ 0.   ]
 [-0.334]
 [-0.008]
 [ 0.284]
 [-0.008]
 [-0.334]
 [ 0.   ]
 [ 0.   ]
 [ 0.   ]]


### 2. 策略迭代

![](http://cdn.hustcaid.com/FsU8UeoNAf5rP40h4pm6SaobaE7s.png)

策略迭代是基于策略估值的优化方法。优化步骤如下：

    - step1：利用估值函数对当前状态进行估值
    - step2：按照贪心策略选取估值最大化的动作更新策略
    - step3：如果策略能够被优化（还未稳定），返回step1，否则结束迭代

![](http://cdn.hustcaid.com/Fu6uOlyJN-SRx155mT8GmAlaODps.png)

策略迭代公式
$$\pi'(s)=argmax_{a\in A}q_{\pi}(s,a)$$
策略迭代终止条件
$$v_{\pi}(s)=max_{a \in A}q_{\pi}(s,a)$$

策略迭代的步骤
<img src="http://cdn.hustcaid.com/FtetvaDxC555PzDAujT02gSHE4BX.jpeg" width=50%>


In [17]:
# policy iteration.
# +++++++++++++++debug information++++++++++++++++++
# import pdb
# pdb.set_trace()
# +++++++++++++++++++ end ++++++++++++++++++++++++++

class Policy:
    def __init__(self, mdp):
        self.epsilon = 0.0005
        self.mdp = mdp
        self.iteration = 0
        self.values = [0.0]*(len(mdp.state)+1) # v(s)
        self.policy = [mdp.action] * (len(mdp.state)+1) # the argmax q(s,a) of s
        
    def get_policy(self, state, action): # return the prob of s-a pair
        if action in self.policy[state]:
            return 1. / len(self.policy[state])
        else: return 0.0
    
    def improve_step(self): # return if the policy has changed
        is_optimium = True
        for index in self.mdp.state:
            self.policy[index] = [] # the argmax q(s,a) of s
            max_value = float('-inf')
            
            for a in self.mdp.action:
                _, next_s, reward = self.mdp.transform(index, a)
                q_s_a = self.mdp.gamma * self.values[next_s] + reward
                
                if q_s_a > max_value:
                    self.policy[index] = [a]
                    max_value = q_s_a
                elif q_s_a == max_value:
                    self.policy[index].append(a)
                    
            if abs(self.values[index] - max_value) > self.epsilon:  # v_pi(s) != max q(s,a)
                is_optimium = False
        return is_optimium
            

    def evaluation(self):
        self.iteration += 1
        error_s = float('inf')
        epsilon = 0.001
        while(error_s > epsilon):
            aux = list(self.values)
            self.values = [0.0]*len(self.values)
            error = 0
            for s in self.mdp.state:
                for a in self.mdp.action:
                    is_terminal, state, reward = self.mdp.transform(s, a)
                    self.values[s] += self.get_policy(s, a) * (reward + self.mdp.gamma * aux[state])
                error = max(error, abs(self.values[s] - aux[s]))
            error_s = error
        print("iter %d: " % self.iteration, self.values)
    
    def optimium(self):
        n = 5
        while(True):
            self.evaluation()
            is_optimium = self.improve_step()
            
            # print the current policy
#             for s in mdp.state:
#                 for a in mdp.action:
#                     p = policy.get_policy(s, a)
#                     if p > 0:
#                         print(s, a, policy.get_policy(s, a))

            if is_optimium: 
                break
       
            
mdp = Mdp()
policy = Policy(mdp)
policy.optimium()


iter 1:  [0.0, -0.3344268798828125, -0.0081329345703125, 0.28360748291015625, -0.0081329345703125, -0.3344268798828125, 0.0, 0.0, 0.0]
iter 2:  [0.0, 0.25, 0.5, 1.0, 0.5, 0.25, 0.0, 0.0, 0.0]


### 3. 值迭代

由策略迭代的过程可以得到策略迭代过程的原则就是

> 一个策略$\pi(a|s)$在s点处达到最优,$V_{\pi}(s)=V_*(s)$,当且仅当对于每个s状态能够达到的状态s',都有$\pi$在该点达到最优值, $V_{\pi}(s') = V_*(s')$

如果已知$V_*(s')$, 则$V_*(s)$可以直接由
$$V_*(s)=max_{a\in A}(R^a_{s\to s'}+V_*(s'))$$
得到.

因此, 可以直接在迭代的每一步根据上一步的V值对这一步的每个状态的估值进行更新, 相比策略迭代方式减少了策略评价过程中得到稳定策略估值的步骤,减少了计算量.
具体算法如下:
<img src="http://cdn.hustcaid.com/FqYg7RifVhF6Nkhd-nRAEJDLZeBm.jpeg" width=60%>


In [15]:
# value iteration.
# +++++++++++++++debug information++++++++++++++++++
# import pdb
# pdb.set_trace()
# +++++++++++++++++++ end ++++++++++++++++++++++++++

class Value:
    def __init__(self, mdp):
        self.epsilon = 0.0005
        self.mdp = mdp
        self.iteration = 0
        self.values = [0.0]*(len(mdp.state)+1) # v(s)
        self.policy = ['s']*(len(mdp.state) + 1) # todo update policy inf func step
    
    """
    return true if iteration terminate
    return false otherwise.
    """
    def step(self):
        v_temp = [float('-inf')] * len(self.values)
        max_diff = 0.0
        for s in self.mdp.state:
            for a in self.mdp.action:
                _, s_t, reward = self.mdp.transform(s, a)
                v_temp[s] = max(v_temp[s], reward + self.mdp.gamma * self.values[s_t])
            max_diff = max(max_diff, abs(v_temp[s] - self.values[s]))
        if max_diff < self.epsilon:
            return True
        else:
            self.values = v_temp
        return False
    
    def optimum(self):
        iteration = 0
        while True:
            iteration += 1
            is_optimum = self.step()
            print("iter", iteration,":", self.values)
            if is_optimum: break

    def get_policy(self):
        pass
        # for each state return the maximum of Q(s,a)

mdp = Mdp()
value_iter = Value(mdp)
value_iter.optimum()

iter 1 : [-inf, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]
iter 2 : [-inf, 0.0, 0.5, 1.0, 0.5, 0.0, 0.0, 0.0, 0.0]
iter 3 : [-inf, 0.25, 0.5, 1.0, 0.5, 0.25, 0.0, 0.0, 0.0]
iter 4 : [-inf, 0.25, 0.5, 1.0, 0.5, 0.25, 0.0, 0.0, 0.0]
