**动态规划**（dynamic programming）是程序设计算法中非常重要的内容，能够高效解决一些经典问题，例如背包问题和最短路径规划。动态规划的基本思想是将待求解问题分解成若干个子问题，先求解子问题，然后从这些子问题的解得到目标问题的解。动态规划会保存已解决的子问题的答案，在求解目标问题的过程中，需要这些子问题答案时就可以直接利用，避免重复计算。本章介绍如何用动态规划的思想来求解在马尔可夫决策过程中的最优策略。

基于动态规划的强化学习算法主要有两种：**一是策略迭代**（policy iteration），**二是价值迭代**（value iteration）。其中，策略迭代由两部分组成：**策略评估**（policy evaluation）**策略提升**（policy improvement）。具体来说，策略迭代中的策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数，这是一个动态规划的过程；而价值迭代直接使用贝尔曼最优方程来进行动态规划，得到最终的最优状态价值。

基于动态规划的这两种强化学习算法要求事先知道环境的状态转移函数和奖励函数，也就是需要知道整个马尔可夫决策过程。在这样一个**白盒环境**中，不需要通过智能体和环境的大量交互来学习，可以直接用动态规划求解状态价值函数。但是，现实中的白盒环境很少，这也是动态规划算法的局限之处，我们无法将其运用到很多实际场景中。另外，策略迭代和价值迭代通常只适用于有限马尔可夫决策过程，即**状态空间和动作空间是离散且有限**的。

## 悬崖漫步 
有一个 4×12 的网格世界，每一个网格表示一个状态。智能体的起点是左下角的状态，目标是右下角的状态，智能体在每一个状态都可以采取 4 种动作：上、下、左、右。如果智能体采取动作后触碰到边界墙壁则状态不发生改变，否则就会相应到达下一个状态。环境中有一段悬崖，智能体掉入悬崖或到达目标状态都会结束动作并回到起点，也就是说掉入悬崖或者达到目标状态是终止状态。智能体每走一步的奖励是 −1，掉入悬崖的奖励是 −100。

In [6]:
import copy

class CliffWalkingEnv:
    """悬崖漫步环境"""
    def __init__(self, ncol=12 , nrow=4):
        self.ncol = ncol
        self.nrow = nrow
        # 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
        self.P = self.createP()

    def createP(self):
        #初始化
        P = [[[] for _ in range(4)] for _ in range(self.ncol * self.nrow)]

        # 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
        # 定义在左上角
        change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
        for i in range(self.nrow):
            for j in range(self.ncol):
                for a in range(4):
                    # 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
                    if i == self.nrow - 1 and j > 0:
                        P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0, True)] #一个元素内有4种信息
                        continue

                    #其他位置
                    next_x = min(self.ncol -1, max(0, j + change[a][0]))
                    next_y = min(self.nrow -1, max(0, i + change[a][1]))
                    next_state = next_y * self.ncol + next_x
                    reward = -1
                    done = False

                    # 下一个位置在悬崖或者终点
                    '''
                    if next_y == self.nrow - 1 and next_x != self.ncol - 1:
                        done = True
                        reward = -100
                    elif next_y == self.nrow - 1 and next_x == self.ncol - 1:
                        done = True
                        reward = 20
                    '''
                    if next_y == self.nrow - 1 and next_x > 0:
                        done = True
                        if next_x != self.ncol - 1:  # 下一个位置在悬崖
                            reward = -100
                    P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
        return P

![image](image/屏幕截图%202025-02-13%20165712.png)

In [None]:
class PolicyIteration:
    """ 策略迭代算法 """
    def __init__(self, env, theta, gamma):
        self.env = env
        self.theta = theta #收敛阈值
        self.gamma = gamma #折扣因子
        self.state_number = self.env.ncol * self.env.nrow
        self.v = [0] * self.state_number
        self.pi = [[0.25, 0.25, 0.25, 0.25] for _ in range(self.state_number)]

    def policy_evalution(self): #策略评估
        cnt = 1 #计数器
        while True:
            max_diff = 0
            new_v = [0] * self.state_number
            for s in range(self.state_number):
                qsa_list = [] #计算状态s下所有Q(s,a)价值
                for a in range(4):
                    qsa = 0 #标量
                    for res in self.env.P[s][a]:
                        prob, next_state, r, done = res
                        qsa += prob * (r + self.gamma * self.v[next_state] * (1 - done))
                    qsa_list.append(qsa * self.pi[s][a])
                new_v[s] = sum(qsa_list) #状态价值函数是状态各个动作的状态动作函数的总和
                max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
            self.v = new_v
            if max_diff < self.theta: break
            cnt += 1
        print("策略评估进行%d轮后完成" % cnt)

    def policy_improvement(self): 
        for s in range(self.state_number):
            qsa_list = []
            for a in range(4):
                qsa = 0
                for res in self.env.P[s][a]:
                    prob, next_state, r, done = res
                    qsa += prob * (r + self.gamma * self.v[next_state] * (1 - done))
                qsa_list.append(qsa)
            maxq = max(qsa_list)
            cntq = qsa_list.count(maxq) #计算得到最大Q值的动作有几个
            self.pi[s] = [1 / cntq if q==maxq else 0 for q in qsa_list]
        print("策略提升完成")
        return self.pi

    def policy_iteration(self):
        while True:
            self.policy_evalution()
            old_pi = copy.deepcopy(self.pi) #
            new_pi = self.policy_improvement()
            if old_pi == new_pi: break


对于打印出来的动作，我们用^o<o 表示等概率采取向左和向上两种动作，ooo>表示在当前状态只采取向右动作

In [10]:
def print_agent(agent, action_meaning, disaster=[], end=[]):
    print("状态价值: ")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
        print()

    print("策略: ")
    for i in range(agent.env.nrow):
        for j in range(agent.env.ncol):
            # 一些特殊的状态,例如悬崖漫步中的悬崖
            if (i * agent.env.ncol + j) in disaster:
                print('****', end=' ')
            elif (i * agent.env.ncol + j) in end:  # 目标状态
                print('EEEE', end=' ')
            else:
                a = agent.pi[i * agent.env.ncol + j]
                pi_str = ''
                for k in range(len(action_meaning)):
                    pi_str += action_meaning[k] if a[k] > 0 else 'o'
                print(pi_str, end=' ')
        print()

env = CliffWalkingEnv()
action_meaning = ['^','v','<','>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env,theta,gamma)
agent.policy_iteration()
print_agent(agent,action_meaning, list(range(37,47)), [47])

策略评估进行60轮后完成
策略评估进行72轮后完成
策略评估进行44轮后完成
策略评估进行12轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成
策略评估进行1轮后完成


KeyboardInterrupt: 