# Gridworld

`通过这个实验，了解MDP的Dynamic Programming解法`

所有的实验源代码都在`lib`目录下，来自[dennybritz](https://github.com/dennybritz/reinforcement-learning)，这里只做解读和归总。

## 实验目录

- [Gridworld](https://applenob.github.io/gridworld.html)：对应MDP的Dynamic Programming
- [Blackjack](https://applenob.github.io/black_jack.html)：对应Model Free的Monte Carlo的Planning和Controlling
- [Windy Gridworld](https://applenob.github.io/windy_gridworld.html)：对应Model Free的Temporal Difference的On-Policy Controlling，SARSA。
- [Cliff Walking](https://applenob.github.io/cliff_walking.html)：对应Model Free的Temporal Difference的Off-Policy Controlling，Q-learning。
- [Mountain Car](https://applenob.github.io/mountain_car.html)：对应Q-Learning with Linear Function Approximation。
- [Atari](https://applenob.github.io/atari.html)：对应Deep-Q Learning。


## 本文目录

- [问题介绍](#问题介绍)
- [Policy Evaluation](#Policy-Evaluation)
- [Policy Iteration](#Policy-Iteration)
- [Value Iteration](#Value-Iteration)


## 问题介绍

![](https://github.com/applenob/rl_learn/raw/master/res/grid_world.png)

你的agent在一个M×N的格子世界里，想办法走到左上角或者右下角的终点。

这个实验用来解释如何在MDP的环境里，使用动态规划（DP），来寻找最佳策略$\pi_*$。方法是：`Policy Iteration`和`Value Iteration`。

为什么说这个实验是MDP呢？回顾下MDP的五元组：$<\textbf{S}, \textbf{A}, \textbf{P}, \textbf{R}, \gamma>$，分别对应：

- $\textbf{S}$：状态集合，上图一共有16种状态，对应16个格子。
- $\textbf{A}$：动作集合，上下左右四种。
- $\textbf{P}$：状态转移概率矩阵，即当前状态是$s$且动作是$a$时，下一个状态是$s+1$的概率。这里如果动作选定，下一个状态是唯一的。
- $\textbf{R}$：奖励函数，即在某一个状态$s$，下一时刻能收获的即时奖励的期望。这里假设除了最终状态，每到另一个状态，即时奖励都是-1。
- $\gamma$：折扣系数，题目没有明确给出。

来总结一下：五元组中，$\textbf{S}$和$\textbf{R}$用来描述环境，即环境有什么样的状态，不同的状态下有什么回报。$\textbf{A}$描述agent的可操作动作范围。$\textbf{P}$最重要，它描述**agent和环境如何交互**，即，在某状态下采取某个动作下一步会进到什么状态。

```python
import numpy as np
import sys
from gym.envs.toy_text import discrete

# 定义动作
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

class GridworldEnv(discrete.DiscreteEnv):
    """
    Grid World environment from Sutton's Reinforcement Learning book chapter 4.
    You are an agent on an MxN grid and your goal is to reach the terminal
    state at the top left or the bottom right corner.

    For example, a 4x4 grid looks as follows:

    T  o  o  o
    o  x  o  o
    o  o  o  o
    o  o  o  T

    x is your position and T are the two terminal states.

    You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
    Actions going off the edge leave you in your current state.
    You receive a reward of -1 at each step until you reach a terminal state.
    """

    metadata = {'render.modes': ['human', 'ansi']}

    def __init__(self, shape=[4,4]):
        if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
            raise ValueError('shape argument must be a list/tuple of length 2')

        self.shape = shape

        nS = np.prod(shape)
        nA = 4

        MAX_Y = shape[0]
        MAX_X = shape[1]

        # 用一个字典表示状态转移概率
        P = {}
        grid = np.arange(nS).reshape(shape)
        it = np.nditer(grid, flags=['multi_index'])

        # 初始化P
        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            P[s] = {a : [] for a in range(nA)}

            is_done = lambda s: s == 0 or s == (nS - 1)
            reward = 0.0 if is_done(s) else -1.0

            # We're stuck in a terminal state
            if is_done(s):
                P[s][UP] = [(1.0, s, reward, True)]
                P[s][RIGHT] = [(1.0, s, reward, True)]
                P[s][DOWN] = [(1.0, s, reward, True)]
                P[s][LEFT] = [(1.0, s, reward, True)]
            # Not a terminal state
            else:
                ns_up = s if y == 0 else s - MAX_X
                ns_right = s if x == (MAX_X - 1) else s + 1
                ns_down = s if y == (MAX_Y - 1) else s + MAX_X
                ns_left = s if x == 0 else s - 1
                P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
                P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
                P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
                P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]

            it.iternext()

        # Initial state distribution is uniform
        isd = np.ones(nS) / nS

        # We expose the model of the environment for educational purposes
        # This should not be used in any model-free learning algorithm
        self.P = P

        super(GridworldEnv, self).__init__(nS, nA, P, isd)

    def _render(self, mode='human', close=False):
        if close:
            return

        outfile = StringIO() if mode == 'ansi' else sys.stdout

        grid = np.arange(self.nS).reshape(self.shape)
        it = np.nditer(grid, flags=['multi_index'])
        while not it.finished:
            s = it.iterindex
            y, x = it.multi_index

            if self.s == s:
                output = " x "
            elif s == 0 or s == self.nS - 1:
                output = " T "
            else:
                output = " o "

            if x == 0:
                output = output.lstrip() 
            if x == self.shape[1] - 1:
                output = output.rstrip()

            outfile.write(output)

            if x == self.shape[1] - 1:
                outfile.write("\n")

            it.iternext()
```

- P : [state_num, action_num, 4]，其中最后一维的4位分别表示： `prob`, `next_state`, `reward`, `is_done`。

In [1]:
import numpy as np
from lib.envs.gridworld import GridworldEnv

In [2]:
env = GridworldEnv()

## Policy Evaluation

![](https://github.com/applenob/rl_learn/raw/master/res/iterative_policy_evaluation.png)

In [3]:
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
    """
    策略评估
    Evaluate a policy given an environment and a full description of the environment's dynamics.
    
    Args:
        policy: [S, A] shaped matrix representing the policy.
        env: OpenAI env. env.P represents the transition probabilities of the environment.
            env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
        theta: We stop evaluation once our value function change is less than theta for all states.
        discount_factor: gamma discount factor.
    
    Returns:
        Vector of length env.nS representing the value function.
    """
    # Start with a random (all 0) value function
    V = np.zeros(env.nS)
    while True:
        delta = 0
        # For each state, perform a "full backup"
        for s in range(env.nS):
            v = 0
            # Look at the possible next actions
            for a, action_prob in enumerate(policy[s]):
                # For each action, look at the possible next states...
                for  prob, next_state, reward, done in env.P[s][a]:
                    # Calculate the expected value
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
            # How much our value function changed (across any states)
            delta = max(delta, np.abs(v - V[s]))
            V[s] = v
        # Stop evaluating once our value function change is below a threshold
        if delta < theta:
            break
    return np.array(V)

- **Bellman Equation**:
- 公式：$v_{\pi}(s) = \sum_a\pi(a|s)\sum_{s',r}p(s',r|s,a)[r+\gamma v_{\pi}(s')]\;\;\forall s \in S$
- 代码：
```python
for a, action_prob in enumerate(policy[s]):
                # For each action, look at the possible next states...
                for  prob, next_state, reward, done in env.P[s][a]:
                    # Calculate the expected value
                    v += action_prob * prob * (reward + discount_factor * V[next_state])
```
- 公式和代码正好一一对应。
- 测试期望的value和评估的value误差小于一定的范围：
- `abs(desired-actual) < 1.5 * 10**(-decimal)`

In [4]:
random_policy = np.ones([env.nS, env.nA]) / env.nA

均匀分布的随机策略

In [5]:
v = policy_eval(random_policy, env)
expected_v = np.array([0, -14, -20, -22, -14, -18, -20, -20, -20, -20, -18, -14, -22, -20, -14, 0])
np.testing.assert_array_almost_equal(v, expected_v, decimal=2)

实际的价值函数和评估的价值函数的误差通过测试。

## Policy Iteration

![](https://github.com/applenob/rl_learn/raw/master/res/policy_iteration.png)

In [6]:
def policy_iteration(env, policy_eval_fn=policy_eval, discount_factor=1.0):
    """
    Policy Iteration Algorithm. Iteratively evaluates and improves a policy
    until an optimal policy is found.
    
    Args:
        env: The OpenAI envrionment.
        policy_eval_fn: Policy Evaluation function that takes 3 arguments:
            policy, env, discount_factor.
        discount_factor: Lambda discount factor.
        
    Returns:
        A tuple (policy, V). 
        policy is the optimal policy, a matrix of shape [S, A] where each state s
        contains a valid probability distribution over actions.
        V is the value function for the optimal policy.
        
    """
    # Start with a random policy
    policy = np.ones([env.nS, env.nA]) / env.nA
    
    while True:
        # Evaluate the current policy
        # 策略评估
        V = policy_eval_fn(policy, env, discount_factor)
        
        # Will be set to false if we make any changes to the policy
        policy_stable = True
        
        # 策略改善
        # For each state...
        for s in range(env.nS):
            # The best action we would take under the currect policy
            chosen_a = np.argmax(policy[s])
            
            # Find the best action by one-step lookahead
            # Ties are resolved arbitarily
            action_values = np.zeros(env.nA)
            for a in range(env.nA):
                for prob, next_state, reward, done in env.P[s][a]:
                    action_values[a] += prob * (reward + discount_factor * V[next_state])
            # 取value最大的action
            best_a = np.argmax(action_values)
            
            # Greedily update the policy
            if chosen_a != best_a:
                policy_stable = False
            policy[s] = np.eye(env.nA)[best_a]
        
        # If the policy is stable we've found an optimal policy. Return it
        if policy_stable:
            return policy, V

这里的核心代码是：

```python
# Find the best action by one-step lookahead
# Ties are resolved arbitarily
action_values = np.zeros(env.nA)
for a in range(env.nA):
    for prob, next_state, reward, done in env.P[s][a]:
        action_values[a] += prob * (reward + discount_factor * V[next_state])
# 取value最大的action
best_a = np.argmax(action_values)
```

In [7]:
policy, v = policy_iteration(env)
actions = ["UP", "RIGHT", "DOWN", "LEFT"]
print([[action for num, action in zip(one, actions) if num == 1.0] for one in policy])

[['UP'], ['LEFT'], ['LEFT'], ['DOWN'], ['UP'], ['UP'], ['UP'], ['DOWN'], ['UP'], ['UP'], ['RIGHT'], ['DOWN'], ['UP'], ['RIGHT'], ['RIGHT'], ['UP']]


![](https://github.com/applenob/rl_learn/raw/master/res/policy_evaluation_gw1.png)

![](https://github.com/applenob/rl_learn/raw/master/res/policy_evaluation_gw2.png)

可以看到输出的policy和图上的policy基本一致。

## Value Iteration

![](https://github.com/applenob/rl_learn/raw/master/res/value_iteration.png)

In [8]:
def value_iteration(env, theta=0.0001, discount_factor=1.0):
    """
    Value Iteration Algorithm.
    
    Args:
        env: OpenAI environment. env.P represents the transition probabilities of the environment.
        theta: Stopping threshold. If the value of all states changes less than theta
            in one iteration we are done.
        discount_factor: lambda time discount factor.
        
    Returns:
        A tuple (policy, V) of the optimal policy and the optimal value function.
    """
    
    def one_step_lookahead(state, V):
        """
        Helper function to calculate the value for all action in a given state.
        
        Args:
            state: The state to consider (int)
            V: The value to use as an estimator, Vector of length env.nS
        
        Returns:
            A vector of length env.nA containing the expected value of each action.
        """
        A = np.zeros(env.nA)
        for a in range(env.nA):
            for prob, next_state, reward, done in env.P[state][a]:
                A[a] += prob * (reward + discount_factor * V[next_state])
        return A
    
    V = np.zeros(env.nS)
    while True:
        # Stopping condition
        delta = 0
        # Update each state...
        for s in range(env.nS):
            # Do a one-step lookahead to find the best action
            A = one_step_lookahead(s, V)
            best_action_value = np.max(A)
            # Calculate delta across all states seen so far
            delta = max(delta, np.abs(best_action_value - V[s]))
            # Update the value function
            V[s] = best_action_value        
        # Check if we can stop 
        if delta < theta:
            break
    
    # Create a deterministic policy using the optimal value function
    policy = np.zeros([env.nS, env.nA])
    for s in range(env.nS):
        # One step lookahead to find the best action for this state
        A = one_step_lookahead(s, V)
        best_action = np.argmax(A)
        # Always take the best action
        policy[s, best_action] = 1.0
    
    return policy, V

整体思路：每次通过one-step lookahead更新v(s)，找到让v(s)最大的action。直到找到最终的$v_*(s)$，每次选取的动作可以构成一个deterministic policy。

更新公式：$v_{k+1}(s) = \underset{a}{max}\sum_{s',r}p(s',r|s,a)[r+\gamma v_k(s')]$，one-step lookahead对应：$\sum_{s',r}p(s',r|s,a)[r+\gamma v_k(s')]$

In [9]:
policy, v = value_iteration(env)
print([[action for num, action in zip(one, actions) if num == 1.0] for one in policy])

[['UP'], ['LEFT'], ['LEFT'], ['DOWN'], ['UP'], ['UP'], ['UP'], ['DOWN'], ['UP'], ['UP'], ['RIGHT'], ['DOWN'], ['UP'], ['RIGHT'], ['RIGHT'], ['UP']]
