# 练习二 理解马尔科夫过程中的收获与价值

In [2]:
import numpy as np

In [3]:
#num_states = 7
#{"0": "C1", "1":"C2", "2":"C3", "3":"Pass", "4":"Pub", "5":"FB", "6":"Sleep"}
i_to_n = {}
i_to_n["0"] = "C1"
i_to_n["1"] = "C2"
i_to_n["2"] = "C3"
i_to_n["3"] = "Pass"
i_to_n["4"] = "Pub"
i_to_n["5"] = "FB"
i_to_n["6"] = "Sleep"

n_to_i = {}
for i, name in zip(i_to_n.keys(), i_to_n.values()):
    n_to_i[name] = int(i)
    
#   C1   C2   C3  Pass Pub   FB  Sleep
Pss = [
   [ 0.0, 0.5, 0.0, 0.0, 0.0, 0.5, 0.0 ],
   [ 0.0, 0.0, 0.8, 0.0, 0.0, 0.0, 0.2 ],
   [ 0.0, 0.0, 0.0, 0.6, 0.4, 0.0, 0.0 ],
   [ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0 ],
   [ 0.2, 0.4, 0.4, 0.0, 0.0, 0.0, 0.0 ],
   [ 0.1, 0.0, 0.0, 0.0, 0.0, 0.9, 0.0 ],
   [ 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0 ]
]
Pss = np.array(Pss)
rewards = [-2, -2, -2, 10, 1, -1, 0]
gamma = 0.5

In [4]:
chains =[
    ["C1", "C2", "C3", "Pass", "Sleep"],
    ["C1", "FB", "FB", "C1", "C2", "Sleep"],
    ["C1", "C2", "C3", "Pub", "C2", "C3", "Pass", "Sleep"],
    ["C1", "FB", "FB", "C1", "C2", "C3", "Pub", "C1", "FB",\
     "FB", "FB", "C1", "C2", "C3", "Pub", "C2", "Sleep"]
]

In [5]:
def compute_return(start_index = 0, 
                   chain = None, 
                   gamma = 0.5) -> float:
    '''计算一个马尔科夫奖励过程中某状态的收获值
    Args:
        start_index 要计算的状态在链中的位置
        chain 要计算的马尔科夫过程
        gamma 衰减系数
    Returns：
        retrn 收获值
    '''
    retrn, power, gamma = 0.0, 0, gamma
    for i in range(start_index, len(chain)):
        retrn += np.power(gamma, power) * rewards[n_to_i[chain[i]]]
        power += 1
    return retrn

In [6]:
compute_return(0, chains[3], gamma = 0.5)

-3.196044921875

In [7]:
compute_return(3,chains[3])

-3.568359375

**注意：收获与价值是不同的**

In [8]:
def compute_value(Pss, rewards, gamma = 0.05):
    '''通过求解矩阵方程的形式直接计算状态的价值
    Args：
        P 状态转移概率矩阵 shape(7, 7)
        rewards 即时奖励 list 
        gamma 衰减系数
    Return
        values 各状态的价值
    '''
    #assert(gamma >= 0 and gamma < 1.0) 
    #assert(len(P.shape) == 2 and P.shape[0] == P.shape[1])
    rewards = np.array(rewards).reshape((-1,1))
    values = np.dot(np.linalg.inv(np.eye(7,7) - gamma * Pss), rewards)
    return values

In [9]:
values = compute_value(Pss, rewards, gamma = 0.999999)
print(values)

[[-12.54296219]
 [  1.4568013 ]
 [  4.32100594]
 [ 10.        ]
 [  0.80253065]
 [-22.54274676]
 [  0.        ]]


### 读者可以自己验证下图中某状态价值

![马尔科夫奖励过程价值计算](../pics/c2_3_mrp.png)

- - -
## 马尔科夫决策过程

![马尔科夫决策过程](../pics/c2_4_mdp.png)

In [10]:
# 辅助函数
def str_key(*args):
    '''将参数用"_"连接起来作为字典的键，需注意参数本身可能会是tuple或者list型，
    比如类似((a,b,c),d)的形式。
    '''
    new_arg = []
    for arg in args:
        if type(arg) in [tuple, list]:
            new_arg += [str(i) for i in arg]
        else:
            new_arg.append(str(arg))
    return "_".join(new_arg)

def set_dict(target_dict, value, *args):
    target_dict[str_key(*args)] = value
    
def set_prob(P, s, a, s1, p = 1.0): # 设置概率字典
    set_dict(P, p, s, a, s1)

def get_prob(P, s, a, s1): # 获取概率值
    return P.get(str_key(s,a,s1), 0)
    
def set_reward(R, s, a, r): # 设置奖励字典
    set_dict(R, r, s, a)

def get_reward(R, s, a): # 获取奖励值
    return R.get(str_key(s,a), 0)

def display_dict(target_dict): # 显示字典内容
    for key in target_dict.keys():
        print("{}:　{:.2f}".format(key, target_dict[key]))
    print("")
    
# 辅助方法
def set_value(V, s, v): # 设置价值字典
    set_dict(V, v, s)
    
def get_value(V, s): # 获取价值值
    return V.get(str_key(s), 0)

def set_pi(Pi, s, a, p = 0.5): # 设置策略字典
    set_dict(Pi, p, s, a)
    
def get_pi(Pi, s, a): # 获取策略（概率）值
    return Pi.get(str_key(s,a), 0)


In [11]:
#from utils import str_key, display_dict
#from utils import set_prob, set_reward, get_prob, get_reward
#from utils import set_value, set_pi, get_value, get_pi

# 构建学生马尔科夫决策过程
S = ['浏览手机中','第一节课','第二节课','第三节课','休息中']
A = ['浏览手机','学习','离开浏览','泡吧','退出学习']
R = {} # 奖励Rsa
P = {} # 状态转移概率Pss'a
gamma = 1.0 # 衰减因子

set_prob(P, S[0], A[0], S[0]) # 浏览手机中 - 浏览手机 -> 浏览手机中
set_prob(P, S[0], A[2], S[1]) # 浏览手机中 - 离开浏览 -> 第一节课
set_prob(P, S[1], A[0], S[0]) # 第一节课 - 浏览手机 -> 浏览手机中
set_prob(P, S[1], A[1], S[2]) # 第一节课 - 学习 -> 第二节课
set_prob(P, S[2], A[1], S[3]) # 第二节课 - 学习 -> 第三节课
set_prob(P, S[2], A[4], S[4]) # 第二节课 - 退出学习 -> 退出休息
set_prob(P, S[3], A[1], S[4]) # 第三节课 - 学习 -> 退出休息
set_prob(P, S[3], A[3], S[1], p = 0.2) # 第三节课 - 泡吧 -> 第一节课
set_prob(P, S[3], A[3], S[2], p = 0.4) # 第三节课 - 泡吧 -> 第一节课
set_prob(P, S[3], A[3], S[3], p = 0.4) # 第三节课 - 泡吧 -> 第一节课

set_reward(R, S[0], A[0], -1) # 浏览手机中 - 浏览手机 -> -1
set_reward(R, S[0], A[2],  0) # 浏览手机中 - 离开浏览 -> 0
set_reward(R, S[1], A[0], -1) # 第一节课 - 浏览手机 -> -1
set_reward(R, S[1], A[1], -2) # 第一节课 - 学习 -> -2
set_reward(R, S[2], A[1], -2) # 第二节课 - 学习 -> -2
set_reward(R, S[2], A[4],  0) # 第二节课 - 退出学习 -> 0
set_reward(R, S[3], A[1], 10) # 第三节课 - 学习 -> 10
set_reward(R, S[3], A[3], +1) # 第三节课 - 泡吧 -> -1

MDP = (S, A, R, P, gamma)

In [12]:
print("----状态转移概率字典（矩阵）信息:----")
display_dict(P)
print("----奖励字典（函数）信息:----")
display_dict(R)

----状态转移概率字典（矩阵）信息:----
浏览手机中_浏览手机_浏览手机中:　1.00
浏览手机中_离开浏览_第一节课:　1.00
第一节课_浏览手机_浏览手机中:　1.00
第一节课_学习_第二节课:　1.00
第二节课_学习_第三节课:　1.00
第二节课_退出学习_休息中:　1.00
第三节课_学习_休息中:　1.00
第三节课_泡吧_第一节课:　0.20
第三节课_泡吧_第二节课:　0.40
第三节课_泡吧_第三节课:　0.40

----奖励字典（函数）信息:----
浏览手机中_浏览手机:　-1.00
浏览手机中_离开浏览:　0.00
第一节课_浏览手机:　-1.00
第一节课_学习:　-2.00
第二节课_学习:　-2.00
第二节课_退出学习:　0.00
第三节课_学习:　10.00
第三节课_泡吧:　1.00



## 策略评估

![策略评估](../pics/c2_5_mdp.png)

In [13]:
# S = ['浏览手机中','第一节课','第二节课','第三节课','休息中']
# A = ['继续浏览','学习','离开浏览','泡吧','退出学习']
# 设置行为策略：pi(a|.) = 0.5
Pi = {}
set_pi(Pi, S[0], A[0], 0.5) # 浏览手机中 - 浏览手机
set_pi(Pi, S[0], A[2], 0.5) # 浏览手机中 - 离开浏览
set_pi(Pi, S[1], A[0], 0.5) # 第一节课 - 浏览手机
set_pi(Pi, S[1], A[1], 0.5) # 第一节课 - 学习
set_pi(Pi, S[2], A[1], 0.5) # 第二节课 - 学习
set_pi(Pi, S[2], A[4], 0.5) # 第二节课 - 退出学习
set_pi(Pi, S[3], A[1], 0.5) # 第三节课 - 学习
set_pi(Pi, S[3], A[3], 0.5) # 第三节课 - 泡吧

print("----状态转移概率字典（矩阵）信息:----")
display_dict(Pi)
# 初始时价值为空，访问时会返回0
print("----状态转移概率字典（矩阵）信息:----")
V = {}
display_dict(V)

----状态转移概率字典（矩阵）信息:----
浏览手机中_浏览手机:　0.50
浏览手机中_离开浏览:　0.50
第一节课_浏览手机:　0.50
第一节课_学习:　0.50
第二节课_学习:　0.50
第二节课_退出学习:　0.50
第三节课_学习:　0.50
第三节课_泡吧:　0.50

----状态转移概率字典（矩阵）信息:----



In [14]:
def compute_q(MDP, V, s, a):
    '''根据给定的MDP，价值函数V，计算状态行为对s,a的价值qsa
    '''
    S, A, R, P, gamma = MDP
    q_sa = 0
    for s_prime in S:
        q_sa += get_prob(P, s,a,s_prime) * get_value(V, s_prime)
    q_sa = get_reward(R, s,a) + gamma * q_sa
    return q_sa


def compute_v(MDP, V, Pi, s):
    '''给定MDP下依据某一策略Pi和当前状态价值函数V计算某状态s的价值
    '''
    S, A, R, P, gamma = MDP
    v_s = 0
    for a in A:
        v_s += get_pi(Pi, s,a) * compute_q(MDP, V, s, a)
    return v_s
        

# 根据当前策略使用回溯法来更新状态价值，本章不做要求
def update_V(MDP, V, Pi):
    '''给定一个MDP和一个策略，更新该策略下的价值函数V
    '''
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        #set_value(V_prime, s, V_S(MDP, V_prime, Pi, s))
        V_prime[str_key(s)] = compute_v(MDP, V_prime, Pi, s)
    return V_prime


# 策略评估，得到该策略下最终的状态价值。本章不做要求
def policy_evaluate(MDP, V, Pi, n):
    '''使用n次迭代计算来评估一个MDP在给定策略Pi下的状态价值，初始时价值为V
    '''
    for i in range(n):
        V = update_V(MDP, V, Pi)
        #display_dict(V)
    return V

In [15]:
V = policy_evaluate(MDP, V, Pi, 100)
display_dict(V)
# 验证状态在某策略下的价值
v = compute_v(MDP, V, Pi, "第三节课")
print("第三节课在当前策略下的价值为:{:.2f}".format(v))

浏览手机中:　-2.31
第一节课:　-1.31
第二节课:　2.69
第三节课:　7.38
休息中:　0.00

第三节课在当前策略下的价值为:7.38


In [16]:
# 价值迭代得到最优状态价值过程
def compute_v_from_max_q(MDP, V, s):
    '''根据一个状态的下所有可能的行为价值中最大一个来确定当前状态价值
    '''
    S, A, R, P, gamma = MDP
    v_s = -float('inf')
    for a in A:
        qsa = compute_q(MDP, V, s, a)
        if qsa >= v_s:
            v_s = qsa
    return v_s

def update_V_without_pi(MDP, V):
    '''在不依赖策略的情况下直接通过后续状态的价值来更新状态价值
    '''
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        set_value(V_prime, s, compute_v_from_max_q(MDP, V_prime, s))
        #V_prime[str_key(s)] = compute_v_from_max_q(MDP, V_prime, s)
    return V_prime

# 价值迭代，本章不作要求
def value_iterate(MDP, V, n):
    '''价值迭代
    '''
    for i in range(n):
        V = update_V_without_pi(MDP, V)
        display_dict(V)
    return V

In [17]:
V = {}
# 通过价值迭代得到最优状态价值及
V_star = value_iterate(MDP, V, 4)
display_dict(V_star)

# 验证最优行为价值
s, a = "第三节课", "泡吧"
q = compute_q(MDP, V_star, "第三节课", "泡吧")
print("在状态{}选择行为{}的最优价值为:{:.2f}".format(s,a,q))

浏览手机中:　0.00
第一节课:　0.00
第二节课:　0.00
第三节课:　10.00
休息中:　0.00

浏览手机中:　0.00
第一节课:　0.00
第二节课:　8.00
第三节课:　10.00
休息中:　0.00

浏览手机中:　0.00
第一节课:　6.00
第二节课:　8.00
第三节课:　10.00
休息中:　0.00

浏览手机中:　6.00
第一节课:　6.00
第二节课:　8.00
第三节课:　10.00
休息中:　0.00

浏览手机中:　6.00
第一节课:　6.00
第二节课:　8.00
第三节课:　10.00
休息中:　0.00

在状态第三节课选择行为泡吧的最优价值为:9.40


In [18]:
# display q_star
def display_q_star(MDP, V_star):
    S, A, _, _, _ = MDP
    for s in S:
        for a in A:
            print("q*({},{}):{}".format(s,a, compute_q(MDP, V_star, s, a)))

In [19]:
display_q_star(MDP, V_star)

q*(浏览手机中,浏览手机):5.0
q*(浏览手机中,学习):0.0
q*(浏览手机中,离开浏览):6.0
q*(浏览手机中,泡吧):0.0
q*(浏览手机中,退出学习):0.0
q*(第一节课,浏览手机):5.0
q*(第一节课,学习):6.0
q*(第一节课,离开浏览):0.0
q*(第一节课,泡吧):0.0
q*(第一节课,退出学习):0.0
q*(第二节课,浏览手机):0.0
q*(第二节课,学习):8.0
q*(第二节课,离开浏览):0.0
q*(第二节课,泡吧):0.0
q*(第二节课,退出学习):0.0
q*(第三节课,浏览手机):0.0
q*(第三节课,学习):10.0
q*(第三节课,离开浏览):0.0
q*(第三节课,泡吧):9.4
q*(第三节课,退出学习):0.0
q*(休息中,浏览手机):0.0
q*(休息中,学习):0.0
q*(休息中,离开浏览):0.0
q*(休息中,泡吧):0.0
q*(休息中,退出学习):0.0
