# 基于动态规划的强化学习
## 强化学习(reinforcement learning)
在强化学习中，每个agent的目标是**最大化累计收益**(cumulative reward):
$$\max \sum_{t=1}^T \mathbb{E}_{a_t \sim \pi(s_t), s_{t+1} \sim p(s_{t+1} \mid s_t, a_t), s_t \sim p(s)}[ \gamma^{t-1} r(s_t,a_t)]
$$

从贝尔曼等式来看，累计收益可以用以下形式表示：

$$
V(s_t) = \mathbb{E}_{a \sim \pi(s_t)}[r(s_t,a_t) + \gamma V(s_{t+1})].
$$

## 策略迭代(Policy Iteration Learning)
一但一个策略$\pi$，用$v_{\pi}$提升产生了一个更好的策略$\pi'$，接下来我们可以计算$v_{\pi'}$并且用它提升并生成一个更好的$v_{\pi''}$。因此我们可以得到一系列单调改进的策略和价值函数


$$
\pi_0 \stackrel{\text{E}}{\longrightarrow} v_{\pi_0} \stackrel{\text{I}}{\longrightarrow} \pi_1 \stackrel{\text{E}}{\longrightarrow} \dots \stackrel{\text{I}}{\longrightarrow}\pi_{\star}\stackrel{\text{E}}{\longrightarrow}v_{\star},
$$

这里 $\stackrel{\text{E}}{\longrightarrow}$ 表示*策略评估(Policy evaluation)*, $\stackrel{\text{I}}{\longrightarrow}$ *策略提升（policy improvement）*.这个方法找到最优策略称为*(策略迭代)policy iteration*.

### 策略迭代算法过程
![](https://gitee.com/wubmu/image/raw/master/img/20210822175954.png)

**策略评估的实现**

In [1]:
import numpy as np

from copy import copy

def policy_eval(env, values, policies, upper_bound):
    print('\n===== Policy Evalution =====')
    delta = upper_bound
    iteration = 0

    while delta >= upper_bound:
        delta = 0.

        for s in env.states:
            v = values.get(s)
            env.set_state(s)

            action_index = policies.sample(s)
            action = env.actions[action_index]
            _, _, rewards, next_states = env.step(action)

            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))

            exp_value = np.mean(td_values)
            values.update(s, exp_value)

            # update delta
            delta = max(delta, abs(v - exp_value))
            
        iteration += 1
        print('\r> iteration: {} delta: {}'.format(iteration, delta), flush=True, end="")

**策略提升**

In [2]:
def policy_improve(env, values, policies):
    print('\n===== Policy Improve =====')
    policy_stable = True

    for state in env.states:
        old_act = policies.sample(state)

        # calculate new policy execution
        actions = env.actions
        value = [0] * len(env.actions)

        for i, action in enumerate(actions):
            env.set_state(state)
            _, _, rewards, next_states = env.step(action)
            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))
            prob = [1 / len(next_states)] * len(next_states)

            value[i] = sum(map(lambda x, y: x * y, prob, td_values))

        # action selection
        new_act = actions[np.argmax(value)]

        # greedy update policy
        new_policy = [0.] * env.action_space
        new_policy[new_act] = 1.
        policies.update(state, new_policy)

        if old_act != new_act:
            policy_stable = False

    return policy_stable

## 价值迭代(Value Iteration)
策略迭代的一个缺点是它的**每次迭代都涉及策略评估**，这本身可能是一个**长期的迭代计算**，需要多次扫描状态集。 我们必须等待精确收敛，还是可以止步于此？

### 价值迭代算法
![](https://gitee.com/wubmu/image/raw/master/img/20210822203713.png)

**价值迭代实现**

In [3]:
def value_iter(env, values, upper_bound):
    print('===== Value Iteration =====')
    delta = upper_bound + 1.
    states = copy(env.states)
    
    iteration = 0
    
    while delta >= upper_bound:
        delta = 0 
        for s in states:
            v = values.get(s)
            
            # 获得新的价值
            actions = env.actions
            vs = [0] * len(actions)
            
            for i, action in enumerate(actions):
                env.set_state(s)
                _, _, rewards, next_states = env.step(action)
                td_values = list(map(lambda x, y: x + env.gamma * y, rewards, values.get(next_states)))
                
                vs[i] = np.mean(td_values)
            
            values.update(s, max(vs))
            delta = max(delta, abs(v - values.get(s)))
            
        iteration += 1 
        print('\r> iteration: {} delta: {}'.format(iteration, delta), end="", flush=True)
    
    return

## 简单环境
### 环境矩阵
一个简单的迷宫游戏，这个agent需要学习从开始点到中间(goal)
![](https://gitee.com/wubmu/image/raw/master/img/20210823160728.png)
```python
env = MatrixEnv()

# ...

# before starting a new episode, be sure to call the `env.reset()`
env.reset()

# act_index: get from policy sample
# retrieve action from action space with `act_index`
act = env.actions[act_index]

# method step accepts action, then update the inner state, return: {None, done, rewards: list, next_states: list} 
_, done, rewards, next_states = env.step(act)

# ...
```

In [4]:
class Env:
    def __init__(self):
        self._states = set()
        self._state = None
        self._actions = []
        self._gamma = None
        
    @property
    def states(self):
        return self._states
    
    @property
    def state_space(self):
        return self._state_shape
    
    @property
    def actions(self):
        return self._actions
    
    @property
    def action_space(self):
        return len(self._actions)
    
    @property
    def gamma(self):
        return self._gamma
    
    def _world_init(self):
        raise NotImplementedError

    def reset(self):
        raise NotImplementedError

    def step(self, state, action):
        """Return distribution and next states"""
        raise NotImplementedError

    def set_state(self, state):
        self._state = state


class MatrixEnv(Env):
    def __init__(self, height=4, width=4):
        super().__init__()

        self._action_space = 4
        self._actions = list(range(4))

        self._state_shape = (2,)
        self._state_shape = (height, width)
        self._states = [(i, j) for i in range(height) for j in range(width)]

        self._gamma = 0.9
        self._height = height
        self._width = width

        self._world_init()

    @property
    def state(self):
        return self._state

    @property
    def gamma(self):
        return self._gamma

    def set_gamma(self, value):
        self._gamma = value

    def reset(self):
        self._state = self._start_point

    def _world_init(self):
        # start_point
        self._start_point = (0, 0)
        self._end_point = (self._height - 1, self._width - 1)

    def _state_switch(self, act):
        # 0: h - 1, 1: w + 1, 2: h + 1, 3: w - 1
        if act == 0:  # up
            self._state = (max(0, self._state[0] - 1), self._state[1])
        elif act == 1:  # right
            self._state = (self._state[0], min(self._width - 1, self._state[1] + 1))
        elif act == 2:  # down
            self._state = (min(self._height - 1, self._state[0] + 1), self._state[1])
        elif act == 3:  # left
            self._state = (self._state[0], max(0, self._state[1] - 1))

    def step(self, act):
        assert 0 <= act <= 3

        done = False
        reward = 0.

        self._state_switch(act)

        if self._state == self._end_point:
            reward = 1.
            done = True

        return None, done, [reward], [self._state]

## 训练基本数据结构
### ValueTable

`ValueTable`包含了状态价值函数(state value function)-从状态空间 $\mathcal{S} \in \mathbb{R}^2$到一个实数$\mathbb{R}$.

**Methods**:

- `update(state, value)`: 更新value
- `get(state)`: 返回具有给定state或states的价值



In [5]:
class ValueTable:
    def __init__(self, env):
        self._values = np.zeros(env.state_space)
        
    def update(self, s, value):
        self._values[s] = value
        
    def get(self, state):
        if type(state) == list:
            # loop get
            res = [self._values[s] for s in state]
            return res
        elif type(state) == tuple:
            # return directly
            return self._values[state]
        

### 策略(Policies)
`Policies`:维护每个状态的策略

**Methods:**
- `sample(state)`: 采样动作，返回动作的索引
- `retrieve(state)`: 检索给定状态的策略
- `update(state, policy)`: 使用给定策略更新状态策略

In [6]:
from collections import namedtuple


Pi = namedtuple('Pi', 'act, prob')


class Policies:
    def __init__(self, env: Env):
        self._actions = env.actions
        self._default_policy = [1 / env.action_space] * env.action_space
        self._policies = dict.fromkeys(env.states, Pi(self._actions, self._default_policy))
    
    def sample(self, state):
        if self._policies.get(state, None) is None:
            self._policies[state] = Pi(self._actions, self._default_policy)

        policy = self._policies[state]
        return np.random.choice(policy.act, p=policy.prob)
    
    def retrieve(self, state):
        return self._policies[state].prob
    
    def update(self, state, policy):
        self._policies[state] = self._policies[state]._replace(prob=policy)
        

## 用策略迭代解决迷宫游戏

In [7]:
import time

env = MatrixEnv(width=8, height=8)  # TODO(ming): try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

stable = False

start = time.time()
while not stable:
    policy_eval(env, values, policies, upper_bound)
    stable = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumpution]: {} s'.format(end - start))

done = False
rewards = 0
env.reset()
step = 0

while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))


===== Policy Evalution =====
> iteration: 171 delta: 2.2318227519167337e-09
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 89 delta: 9.404610868202212e-051
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 7.617734803133658e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 6.17036519070524e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 4.997995804423283e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 4.0483766016841116e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 3.279185047411204e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 2.6561398884794585e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 2.151473309641716e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 1.7426933808906142e-05
===== Poli

## 价值迭代解决迷宫问题


In [8]:
env = MatrixEnv(width=8, height=8)
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4 

start = time.time()
value_iter(env, values, upper_bound)
_ = policy_improve(env, values, policies) # 根据价值去提升策略
end = time.time()

print('\n[time consumption] {}s'.format(end - start))

env.reset()
done = False
rewards = 0
step = 0
while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))


===== Value Iteration =====
> iteration: 89 delta: 9.404610870067387e-059
===== Policy Improve =====

[time consumption] 0.28806543350219727s
Evaluation: [reward] 1.0 [step] 14


In [9]:
values._values

array([[2.54101941, 2.82344895, 3.13725955, 3.48593799, 3.87335848,
        4.30382569, 4.78212259, 5.31356359],
       [2.82344895, 3.13725955, 3.48593799, 3.87335848, 4.30382569,
        4.78212259, 5.31356359, 5.90405359],
       [3.13725955, 3.48593799, 3.87335848, 4.30382569, 4.78212259,
        5.31356359, 5.90405359, 6.56015359],
       [3.48593799, 3.87335848, 4.30382569, 4.78212259, 5.31356359,
        5.90405359, 6.56015359, 7.28915359],
       [3.87335848, 4.30382569, 4.78212259, 5.31356359, 5.90405359,
        6.56015359, 7.28915359, 8.09915359],
       [4.30382569, 4.78212259, 5.31356359, 5.90405359, 6.56015359,
        7.28915359, 8.09915359, 8.99915359],
       [4.78212259, 5.31356359, 5.90405359, 6.56015359, 7.28915359,
        8.09915359, 8.99915359, 9.99915359],
       [5.31356359, 5.90405359, 6.56015359, 7.28915359, 8.09915359,
        8.99915359, 9.99915359, 9.99915359]])