# Markov Reward Process

![image.png](./fig_1.png)

这里假设从 $s_1$ 出发，terminal state 是 $s_6$

In [8]:
import numpy as np

P = [
    [0.9, 0.1, 0.0, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.6, 0.0, 0.4],
    [0.0, 0.0, 0.0, 0.0, 0.3, 0.7],
    [0.0, 0.2, 0.3, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.0, 0.0, 1.0],
]

P = np.array(P)
rewards = [-1, -2, -2, 10, 1, 0]
gamma = 0.5

def get_return(start_index, chain: list, gamma):
    G = 0
    for i in reversed(range(start_index, len(chain))):
        G = gamma * G + rewards[chain[i]-1]
    return G

chain = [1,2,3,6]
start_index = 0
G = get_return(start_index, chain, gamma)
print("The return is %.2f"%G)


The return is -2.50


根据 Markov Reward Process，可以得到
$$
V = R + \gamma P V
$$
进而
$$
V = (I-\gamma P)^{-1} R
$$

In [9]:
def compute_V(P, rewards, gamma):
    rewards = np.array(rewards).reshape((-1,1))
    state_num = P.shape[0]
    V = np.dot(np.linalg.inv(np.eye(state_num) - gamma * P), rewards)
    return V

V = compute_V(P, rewards, gamma)
print("The state value is\n", V)


The state value is
 [[-2.01950168]
 [-2.21451846]
 [ 1.16142785]
 [10.53809283]
 [ 3.58728554]
 [ 0.        ]]


![image.png](./fig_2.png)

根据上图所示，编写 MDP

In [10]:
S = ["s1", "s2", "s3", "s4", "s5"]  # 状态集合
A = ["保持s1", "前往s1", "前往s2", "前往s3", "前往s4", "前往s5", "概率前往"]  # 动作集合
# 状态转移函数
P = {
    "s1-保持s1-s1": 1.0,
    "s1-前往s2-s2": 1.0,
    "s2-前往s1-s1": 1.0,
    "s2-前往s3-s3": 1.0,
    "s3-前往s4-s4": 1.0,
    "s3-前往s5-s5": 1.0,
    "s4-前往s5-s5": 1.0,
    "s4-概率前往-s2": 0.2,
    "s4-概率前往-s3": 0.4,
    "s4-概率前往-s4": 0.4,
}
# 奖励函数
R = {
    "s1-保持s1": -1,
    "s1-前往s2": 0,
    "s2-前往s1": -1,
    "s2-前往s3": -2,
    "s3-前往s4": -2,
    "s3-前往s5": 0,
    "s4-前往s5": 10,
    "s4-概率前往": 1,
}
gamma = 0.5  # 折扣因子
MDP = (S, A, P, R, gamma)

# 策略1,随机策略
Pi_1 = {
    "s1-保持s1": 0.5,
    "s1-前往s2": 0.5,
    "s2-前往s1": 0.5,
    "s2-前往s3": 0.5,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.5,
    "s4-概率前往": 0.5,
}
# 策略2
Pi_2 = {
    "s1-保持s1": 0.6,
    "s1-前往s2": 0.4,
    "s2-前往s1": 0.3,
    "s2-前往s3": 0.7,
    "s3-前往s4": 0.5,
    "s3-前往s5": 0.5,
    "s4-前往s5": 0.1,
    "s4-概率前往": 0.9,
}


# 把输入的两个字符串通过“-”连接,便于使用上述定义的P、R变量
def join(str1, str2):
    return str1 + '-' + str2

通过 $r'(s) = \sum_{a \in A} \pi(a|s)r(s,a)$ 和 $P'(s'|s) = \sum_{a \in A} \pi(a|s)P(s'|s,a)$，我们可以把一个 MDP 转化为 MRP

In [11]:
gamma = 0.5
# 转化后的MRP的状态转移矩阵
P_from_mdp_to_mrp = [
    [0.5, 0.5, 0.0, 0.0, 0.0],
    [0.5, 0.0, 0.5, 0.0, 0.0],
    [0.0, 0.0, 0.0, 0.5, 0.5],
    [0.0, 0.1, 0.2, 0.2, 0.5],
    [0.0, 0.0, 0.0, 0.0, 1.0],
]
P_from_mdp_to_mrp = np.array(P_from_mdp_to_mrp)
R_from_mdp_to_mrp = [-0.5, -1.5, -1.0, 5.5, 0]

V = compute_V(P_from_mdp_to_mrp, R_from_mdp_to_mrp, gamma)
print("MDP中每个状态价值分别为\n", V)

MDP中每个状态价值分别为
 [[-1.22555411]
 [-1.67666232]
 [ 0.51890482]
 [ 6.0756193 ]
 [ 0.        ]]


# MC 方法
构造采样函数

In [15]:
def sample(MDP, Pi, timestep_max, number):
    S, A, P, R, gamma = MDP
    episodes = []
    for _ in range(number):
        episode = []
        timestep = 0
        s = S[np.random.randint(4)]
        while s != "s5" and timestep < timestep_max:
            timestep += 1
            rand, temp = np.random.rand(), 0
            for a_opt in A:
                temp += Pi.get(join(s, a_opt), 0)
                if rand <= temp:
                    a = a_opt
                    r = R.get(join(s, a), 0)
                    break
            
            rand, temp = np.random.rand(), 0
            for s_opt in S:
                key = join(join(s, a), s_opt)
                temp += P.get(key, 0)
                if rand <= temp:
                    s_next = s_opt
                    break
            episode.append((s, a, r, s_next))
            s = s_next
        episodes.append(episode)
    return episodes

episodes = sample(MDP, Pi_1, 20, 5)

print("采样得到的第1条序列为:\n", episodes[0])
print("采样得到的第2条序列为:\n", episodes[1])
print("采样得到的第3条序列为:\n", episodes[2])


采样得到的第1条序列为:
 [('s4', '概率前往', 1, 's3'), ('s3', '前往s4', -2, 's4'), ('s4', '概率前往', 1, 's4'), ('s4', '前往s5', 10, 's5')]
采样得到的第2条序列为:
 [('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]
采样得到的第3条序列为:
 [('s2', '前往s3', -2, 's3'), ('s3', '前往s5', 0, 's5')]


构造 MC 价值估计函数

In [16]:
def MC(episodes, V, N, gamma):
    for episode in episodes:
        G = 0
        for t in reversed(range(len(episode))):
            s, a, r, s_next = episode[t]
            G = gamma * G + r
            N[s] += 1
            V[s] += (1 / N[s]) * (G - V[s])


timestep_max = 20

episodes = sample(MDP, Pi_1, timestep_max, 10000)
gamma = 0.5
V = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
N = {"s1": 0, "s2": 0, "s3": 0, "s4": 0, "s5": 0}
MC(episodes, V, N, gamma)
print("使用蒙特卡洛方法计算MDP的状态价值为\n", V)

使用蒙特卡洛方法计算MDP的状态价值为
 {'s1': -1.226748214257894, 's2': -1.683054341778986, 's3': 0.5023301987924838, 's4': 6.10155246364003, 's5': 0}


# 占用度量

这里我们先关注 **折扣状态分布** (Discounted State Distribution)
公式如下：
$$v^\pi(s) = (1 - \gamma) \sum_{t=0}^{\infty} \gamma^t P_t^\pi(s)$$

我们可以得到状态访问概率的性质：
$$\nu^{\pi}(s') = (1 - \gamma)\nu_0(s') + \gamma \int P(s'|s, a)\pi(a|s)\nu^{\pi}(s)dsda$$

还可以定义策略的占用度量：
$$\rho^{\pi}(s, a) = (1 - \gamma) \sum_{t=0}^{\infty} \gamma^t P_t^{\pi}(s)\pi(a|s)$$

二者之间有如下关系：
$$\rho^{\pi}(s, a) = \nu^{\pi}(s)\pi(a|s)$$


**定理1:**

一个策略唯一决定了它的占用度量；反过来，一个占用度量也唯一决定了是哪个策略产生了它。
$\rho^{\pi_1} = \rho^{\pi_2} \iff \pi_1 = \pi_2$

**定理2:**

如果你手里有一个占用度量 $\rho$，你该如何把它变回唯一的那个策略 $\pi$。
$$\pi(a|s) = \frac{\rho(s, a)}{\sum_{a'} \rho(s, a')}$$

接下来定义近似估计占用度量的函数

In [18]:
def occupancy(episodes, s, a, timestep_max, gamma):
    rho = 0
    total_times = np.zeros(timestep_max)
    occur_times = np.zeros(timestep_max)
    for episode in episodes:
        for i in range(len(episode)):
            (s_opt, a_opt, r, s_next) = episode[i]
            total_times[i] += 1
            if s_opt == s and a_opt == a:
                occur_times[i] += 1

    for i in reversed(range(timestep_max)):
        if total_times[i] != 0:
            rho += gamma ** i * occur_times[i] / total_times[i]
    
    return (1 - gamma) * rho

gamma = 0.5
timestep_max = 1000

episodes_1 = sample(MDP, Pi_1, timestep_max, 1000)
episodes_2 = sample(MDP, Pi_2, timestep_max, 1000)
rho_1 = occupancy(episodes_1, "s4", "概率前往", timestep_max, gamma)
rho_2 = occupancy(episodes_2, "s4", "概率前往", timestep_max, gamma)
print(rho_1, rho_2)

0.12092215264228745 0.23766528282698035


a2
