## 第4章 Dynamic Programming 动态规划 （上）

__动态规划__ 的核心思想是：通过价值函数来评估和寻找最佳策略。


### 4.1 Policy Evaluation (Prediction) 策略评估（预测）

- 策略评估：给定随机策略 $\pi$，计算状态值函数 $v_{\pi}$。
- 策略评估迭代计算：
$$
v_{k+1}(s) = \sum_{a} \pi(a|s) \sum_{s',r} p(s',r|s,a) [r + \gamma v_k(s')]
$$

##### 公式解释:

- $v_{k+1}(s)$ 是状态 $s$ 在第 $k+1$ 次迭代后的价值估计。
- $\pi(a|s)$ 是在状态 $s$ 下选择动作 $a$ 的概率。
- $p(s',r|s,a)$ 是在状态 $s$ 下采取动作 $a$ 后,转移到状态 $s'$ 并获得奖励 $r$ 的概率。
- $r$ 是即时奖励。
- $\gamma$ 是折扣因子。
- $v_k(s')$ 是下一个状态 $s'$ 在第 $k$ 次迭代的价值估计。


##### 公式理解:
这个公式计算了在当前策略 $\pi$ 下，状态 $s$ 的期望价值。它考虑了所有可能的动作、转移和奖励，然后对所有值进行加权平均。

##### 与Bellman方程的联系:

这个公式实际上是Bellman期望方程的迭代形式。Bellman期望方程为:

$$
v_{\pi}(s) = \sum_{a} \pi(a|s) \sum_{s',r} p(s',r|s,a) \left[ r + \gamma v_{\pi}(s') \right]
$$

区别在于:

- Bellman方程中的 $v_\pi(s)$ 是在策略 $\pi$ 下状态 $s$ 的真实价值。
- 而迭代公式中的 $v_{k+1}(s)$ 和 $v_k(s')$ 是价值的估计,随着迭代次数增加会逐渐收敛到真实价值。


##### 工作原理:

从一个初始估计开始(例如,所有状态的价值初始化为0)。
重复应用这个公式,不断更新每个状态的价值估计。
随着迭代次数增加,估计值会逐渐收敛到真实的价值函数。

书中给出的计算逻辑如下:

![policy_evaluation](img.png)

##### 用代码实现上面的策略评估迭代计算:

In [None]:
def iterative_policy_evaluation(pi, p, reward, states, theta=1e-6, gamma=0.9):
    """
    执行迭代策略评估。
    
    pi: 策略（状态到动作概率分布的字典）
    p: 转移概率函数（状态-动作对到（下一状态，概率）对的字典）
    reward: 奖励函数（状态-动作-下一状态对到奖励的字典）
    states: 所有状态的列表
    theta: 确定估计精度的小阈值
    gamma: 折扣因子
    """
    V = {s: 0 for s in states}  # 将所有状态的 V(s) 初始化为 0
    iteration = 0
    while True:
        delta = 0
        iteration += 1
        print(f"\n迭代 {iteration}:")
        for s in states:
            v = V[s]
            V[s] = sum([pi[s][a] * sum([prob * (reward[(s, a, s_prime)] + gamma * V[s_prime])
                                        for s_prime, prob in p[(s, a)]])
                        for a in pi[s]])
            delta = max(delta, abs(v - V[s]))
            print(f"  状态 {s}: {V[s]:.6f}")
        print(f"  Delta: {delta:.6f}")
        if delta < theta:
            break
    return V

In [7]:
# 模拟一些数据，用于测试上面的公式：
pi = {
    's0': {'a0': 0.5, 'a1': 0.5},
    's1': {'a0': 0.5, 'a1': 0.5},
    's2': {'a0': 1.0}  # 终止状态，没有后续动作
}
p = {
    ('s0', 'a0'): [('s0', 0.5), ('s1', 0.5)],
    ('s0', 'a1'): [('s1', 1.0)],
    ('s1', 'a0'): [('s0', 0.7), ('s1', 0.1), ('s2', 0.2)],
    ('s1', 'a1'): [('s0', 0.95), ('s1', 0.05)],
    ('s2', 'a0'): [('s2', 1.0)]  # 终止状态的转移
}
reward = {
    ('s0', 'a0', 's0'): 5,
    ('s0', 'a0', 's1'): -1,
    ('s0', 'a1', 's1'): -1,
    ('s1', 'a0', 's0'): -1,
    ('s1', 'a0', 's1'): -1,
    ('s1', 'a0', 's2'): 10,
    ('s1', 'a1', 's0'): 5,
    ('s1', 'a1', 's1'): -1,
    ('s2', 'a0', 's2'): 0  # 终止状态的奖励
}
states = ['s0', 's1', 's2']

print("开始迭代策略评估:")
V = iterative_policy_evaluation(pi, p, reward, states)

print("\n最终结果:")
for s in states:
    print(f"V({s}) = {V[s]:.6f}")

开始迭代策略评估:

迭代 1:
  状态 s0: 0.500000
  状态 s1: 3.321250
  状态 s2: 0.000000
  Delta: 3.321250

迭代 2:
  状态 s0: 2.854344
  状态 s1: 5.293535
  状态 s2: 0.000000
  Delta: 2.354344

迭代 3:
  状态 s0: 4.715363
  状态 s1: 6.808471
  状态 s2: 0.000000
  Delta: 1.861019

迭代 4:
  状态 s0: 6.156674
  状态 s1: 7.980903
  状态 s2: 0.000000
  Delta: 1.441311

迭代 5:
  状态 s0: 7.272361
  状态 s1: 8.888439
  状态 s2: 0.000000
  Delta: 1.115687

迭代 6:
  状态 s0: 8.135978
  状态 s1: 9.590933
  状态 s2: 0.000000
  Delta: 0.863617

迭代 7:
  状态 s0: 8.804475
  状态 s1: 10.134710
  状态 s2: 0.000000
  Delta: 0.668497

迭代 8:
  状态 s0: 9.321936
  状态 s1: 10.555631
  状态 s2: 0.000000
  Delta: 0.517462

迭代 9:
  状态 s0: 9.722486
  状态 s1: 10.881451
  状态 s2: 0.000000
  Delta: 0.400550

迭代 10:
  状态 s0: 10.032539
  状态 s1: 11.133658
  状态 s2: 0.000000
  Delta: 0.310053

迭代 11:
  状态 s0: 10.272541
  状态 s1: 11.328883
  状态 s2: 0.000000
  Delta: 0.240002

迭代 12:
  状态 s0: 10.458318
  状态 s1: 11.480001
  状态 s2: 0.000000
  Delta: 0.185777

迭代 13:
  状态 s0: 10.602122
  状

上面的模拟代码实现的是：在给定策略 $\pi$ 下，每个状态的预期累积折扣奖励（也称为价值函数）。所以最终输出的V(s)就是每个状态的价值函数值。

### 4.2 Policy Improvement 策略改进

- 目的：是找到更好的策略。
- 原理：对于任何一个策略 $\pi$，我们可以通过贪心地选择使下一个状态价值最大的动作来改进策略，寻找一个更好的策略 $\pi'$。
- 公式：
$$
\pi'(s) = \text{argmax}_a \sum_{s',r} p(s',r|s,a) [r + \gamma v_{\pi}(s')]
$$

##### 公式解释:
- $\pi'(s)$ 是 __新策略__ 下状态 $s$ 的动作。
- $\text{argmax}_a$ 表示能够使得表达式的值最大化的动作 ${a}$。
- $p(s',r|s,a)$ 是在状态 $s$ 下采取动作 $a$ 后,转移到状态 $s'$ 并获得奖励 $r$ 的概率。
- $r$ 是即时奖励。
- $\gamma$ 是折扣因子。
- $v_{\pi}(s')$ 是在 __当前策略__ $\pi$ 下状态 $s'$ 的价值。

##### 工作原理:
对于每个状态 $s$，贪心地去选择使下一个状态价值最大的动作。
这样就得到了一个新的策略 $\pi'$。



### 4.3 Policy Iteration 策略迭代

通过交替进行 __策略评估__ 和 __策略改进__ 来找到最优策略：

1. 初始化一个策略 $\pi$。
2. 迭代执行策略评估，计算当前策略 $\pi$ 下的状态值函数 $v_{\pi}$。
3. 通过策略改进，得到一个新的策略 $\pi'$。
4. 如果 $\pi'$ 和 $\pi$ 相等，则停止迭代，输出最优策略和最优价值函数。
5. 否则，将 $\pi'$ 赋值给 $\pi$，返回第2步。
6. 最终输出最优策略和最优价值函数。

$$
\pi_0 \xrightarrow{E} v_{\pi_0} \xrightarrow{I} \pi_1 \xrightarrow{E} v_{\pi_1} \xrightarrow{I} \pi_2 \xrightarrow{E} \cdots \xrightarrow{I} \pi_* \xrightarrow{E} v_*
$$

${E}$ 表示策略评估，${I}$ 表示策略改进。

书中给出的计算方法：

![policy_iteration](img_1.png)

结合以上学习到的内容：

> _Policy Iteration_ { Policy Evaluation + Policy Improvement } 

#### 用代码实现4.1到4.3的策略迭代计算如下:

In [52]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots


# 定义动作
actions = [(0, -1), (1, 0), (0, 1), (-1, 0)]  # 左, 下, 右, 上
action_symbols = ['←', '↓', '→', '↑']

def env_step(state, act):
    """在环境中执行一步"""
    assert state not in [(0, 0), (3, 3)], "终止状态不应被选择"
    next_row = state[0] + act[0]
    next_col = state[1] + act[1]

    if 0 <= next_row < 4 and 0 <= next_col < 4:
        return -1., (next_row, next_col)
    else:
        return -1., state

def policy_evaluation(grid_world, actions, probs, gamma, delta, terminal_states):
    """执行一步策略评估"""
    Q = np.zeros(shape=(grid_world.shape[0], grid_world.shape[1], len(actions)))

    for row in range(4):
        for col in range(4):
            if (row, col) in terminal_states:
                continue
            for i, act in enumerate(actions):
                reward, next_state = env_step((row, col), act)
                next_value = grid_world[next_state]
                Q[row, col, i] = probs[i] * (reward + gamma * next_value)

    new_grid_world = Q.sum(axis=-1)
    max_diff = np.abs(new_grid_world - grid_world).max()
    return new_grid_world, max(max_diff, delta)

def greedy_policy(grid_world, actions, probs, gamma, terminal_states):
    """从当前值获取贪婪策略"""
    Q = np.zeros(shape=(grid_world.shape[0], grid_world.shape[1], len(actions)))
    greedy_acts = []

    for row in range(4):
        acts_row = []
        for col in range(4):
            if (row, col) in terminal_states:
                acts_row.append('')
                continue

            for i, act in enumerate(actions):
                reward, next_state = env_step((row, col), act)
                next_value = grid_world[next_state]
                Q[row, col, i] = probs[i] * (reward + gamma * next_value)  # 公式部分:选择能够让Q值最大的动作来更新策略

            greedy_sels = np.where(np.abs(Q[row, col] - Q[row, col].max()) < 0.0001)[0]
            acts = "".join([action_symbols[i] for i in greedy_sels])
            acts_row.append(acts)
        greedy_acts.append(acts_row)

    return greedy_acts


from plotly.subplots import make_subplots
import plotly.graph_objects as go

def plot_gridworld(data, annots, curr_row):
    """绘制网格世界的价值函数和最优策略"""
    fig = make_subplots(rows=1, cols=2,
                        subplot_titles=(f"价值函数 (k={curr_row})", f"最优策略 (k={curr_row})"),
                        specs=[[{"type": "heatmap"}, {"type": "heatmap"}]])

    # 创建 'Blues' 色彩方案
    blues = ['#f7fbff', '#deebf7', '#c6dbef', '#9ecae1', '#6baed6', '#4292c6', '#2171b5', '#08519c', '#08306b']

    # 价值函数热图
    fig.add_trace(
        go.Heatmap(z=data,
                   text=[[f'{val:.1f}' for val in row] for row in data],
                   texttemplate="%{text}",
                   textfont={"size": 16, "color": "black"},
                   colorscale=blues,
                   showscale=False),
        row=1, col=1
    )

    # 策略热图
    fig.add_trace(
        go.Heatmap(z=data,
                   text=annots,
                   texttemplate="%{text}",
                   textfont={"size": 20, "color": "black"},
                   colorscale=blues,
                   showscale=False),
        row=1, col=2
    )

    fig.update_layout(
        title=f'最优价值函数和策略 (k={"∞" if curr_row == -1 else curr_row})',
        title_font=dict(size=16, color='black', family="Arial, sans-serif"),
        title_x=0.5,
        width=900,
        height=400,
        margin=dict(l=50, r=50, t=100, b=50)
    )

    for i in range(1, 3):
        fig.update_xaxes(showticklabels=False, showgrid=False, row=1, col=i)
        fig.update_yaxes(showticklabels=False, showgrid=False, row=1, col=i, autorange='reversed')

    fig.show()

# 初始化网格世界
grid_world = np.zeros(shape=(4, 4))
gamma = 1.0
theta = 0.001

# 随机策略，每个动作的概率相等
probs = [1. / len(actions)] * len(actions)
terminal_states = [(0, 0), (3, 3)]

# 主循环
iterations_to_plot = [0, 1, 2, 3, 10, float('inf')]
for i in range(1000):  # 使用大量迭代次数以确保收敛
    if i in iterations_to_plot:
        greedy_acts = greedy_policy(grid_world, actions, probs, gamma, terminal_states)
        plot_iteration = 'inf' if i == float('inf') else i
        plot_gridworld(grid_world, greedy_acts, plot_iteration)

    delta = 0  # 重置 delta
    grid_world, delta = policy_evaluation(grid_world, actions, probs, gamma, delta, terminal_states)

    if delta < theta:
        print(f'在迭代 {i} 时收敛')
        greedy_acts = greedy_policy(grid_world, actions, probs, gamma, terminal_states)
        plot_gridworld(grid_world, greedy_acts, 'inf')
        break


在迭代 130 时收敛
