In [18]:
# 状态空间
S = [i for i in range(16)]
# 行为空间
A = ["n", "e", "s", "w"]
# 行为对状态的改变
ds_actions = {"n":-4, "e":1, "s":4, "w":-1}

In [19]:
def dynamics(s,a):
    """
    模拟小型方格世界的环境动力学特征
    :param s: 表示当前状态0-15
    :param a: 表示行为 str in ['n'.'e','s','w']分别表示北、东、南、西
    :return: tuple(s_prime, reward, is_end)
    s_prime 后续状态
    reward 奖励值
    is_end 是否进入终止状态
    """
    s_prime = s
    # 过滤边界状态
    if (s % 4 == 0 and a == "w") or (s < 4 and a == "n") or ((s+1)%4 == 0 and a == "e") or (s > 11 and a == "s") or s in [0,15]:
        pass
    else:
        ds = ds_actions[a]
        s_prime = s + ds
    reward = 0 if s in [0,15] else -1
    is_end = True if s in [0,15] else False
    return s_prime, reward, is_end

def P(s,a,s1):
    """
    状态转移概率函数
    :param s:
    :param a:
    :param s1:
    :return:
    """
    s_prime,_,_ = dynamics(s,a)
    return s1 == s_prime

def R(s,a):
    _, r, _ = dynamics(s,a)
    return r

gamma = 1.0
MDP = S, A, R, P, gamma

In [20]:
# 均一随机策略（只需要知道行为空间）和贪婪策略（需要知道状态价值）
def uniform_random_pi(MDP=None, V=None, s=None, a= None):
    _, A, _, _, _ = MDP
    n = len(A)
    return 0 if n == 0 else 1.0/n

def greedy_pi(MDP, V, s, a):
    S, A, R, P, gamma = MDP
    max_v, a_max_v = -float('inf'), []
    for a_opt in A:
        # 统计后续状态的最大价值以及能达到该状态的行为（可能不止一个）
        s_prime, reward, _ = dynamics(s, a_opt)
        v_s_prime = get_value(V, s_prime)
        if v_s_prime > max_v:
            max_v = v_s_prime
            a_max_v = [a_opt]
        elif v_s_prime == max_v:
            a_max_v.append(a_opt)
    n = len(a_max_v)
    if n == 0: return 0.0
    return 1.0/n if a in a_max_v else 0.0

def get_pi(Pi, s, a, MDP=None, V=None):
    return Pi(MDP, V, s, a)

def get_prob(P, s, a, s1):
    return P(s, a, s1)

def get_reward(R, s, a):
    return R(s, a)

def set_value(V, s, v):
    V[s] = v

def get_value(V, s):
    return V[s]

def display_V(V):
    for i in range(16):
        print('{0:>6.2f}'.format(V[i]), end='')
        if (i + 1) % 4 == 0:
            print("")
    print()

In [21]:
def compute_q(MDP, V, s, a):
    """
    根据给定的MDP， 价值函数V，计算状态行为对s，a的价值qsa
    :param MDP:
    :param V:
    :param s:
    :param a:
    :return:
    """
    S, A, R, P, gamma = MDP
    q_sa = get_reward(R, s, a)
    for s_prime in S:
        q_sa += gamma * get_prob(P, s, a, s_prime) * get_value(V, s_prime)
    return q_sa

def compute_v(MDP, V, Pi, s):
    """
    给定MDP下依据某一策略Pi和当前状态价值函数V计算某状态s的价值
    :param MDP:
    :param V:
    :param Pi:
    :param s:
    :return:
    """
    S, A, R, P, gamma = MDP
    v_s = 0
    for a in A:
        v_s += get_pi(Pi, s, a, MDP, V) * compute_q(MDP, V, s, a)
    return v_s

def update_V(MDP, V, Pi):
    """
    给定策略和MDP，更新该策略下的价值函数V
    :param MDP:
    :param V:
    :param Pi:
    :return:
    """
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        set_value(V_prime, s, compute_v(MDP, V_prime, Pi, s))
    return V_prime

def policy_evaluate(MDP, V, Pi, n):
    """
    使用n次迭代来评估一个MDP在给定策略Pi下的状态价值，初始值为V
    :param MDP:
    :param V:
    :param Pi:均一随机策略或贪心策略
    :param n:
    :return:
    """
    for i in range(n):
        V = update_V(MDP, V, Pi)
    return V

def policy_iterate(MDP, V, Pi, n, m):
    for i in range(m):
        V = policy_evaluate(MDP, V, Pi, n)
        Pi = greedy_pi # 第 一 次 迭 代 产 生 新 的 价 值 函 数 后 随 机 使 用 贪 婪 策 略
    return V

def compute_v_for_max_q(MDP, V, s):
    """
    价值迭代得到最优状态价值过程
    根据一个状态下的所有可能的行为价值的最大一个来确定当前状态价值
    :param MDP:
    :param V:
    :param s:
    :return:
    """
    S, A, R, P, gamma = MDP
    v_s = -float('inf')
    for a in A:
        q_sa = compute_q(MDP, V, s, a)
        if q_sa >= v_s:
            v_s = q_sa
    return v_s

def update_V_without_pi(MDP, V):
    """
    在不依赖策略的情况下直接通过后续状态的价值来更新状态价值
    :param MDP:
    :param V:
    :return:
    """
    S, _, _, _, _ = MDP
    V_prime = V.copy()
    for s in S:
        set_value(V_prime, s, compute_v_for_max_q(MDP, V_prime, s))
    return V_prime

def value_iterate(MDP, V, n):
    """
    价值迭代
    :param MDP:
    :param V:
    :param n:
    :return:
    """
    for i in range(n):
        V = update_V_without_pi(MDP, V)
    return V

In [22]:
# 策略评估
V = [0 for i in range(16)]
V_pi = policy_evaluate(MDP, V, uniform_random_pi, 100)
display_V(V_pi)

V = [0 for i in range(16)]
V_pi = policy_evaluate(MDP, V, greedy_pi, 100)
display_V(V_pi)

  0.00-14.00-20.00-22.00
-14.00-18.00-20.00-20.00
-20.00-20.00-18.00-14.00
-22.00-20.00-14.00  0.00

  0.00 -1.00 -2.00 -3.00
 -1.00 -2.00 -3.00 -2.00
 -2.00 -3.00 -2.00 -1.00
 -3.00 -2.00 -1.00  0.00



In [23]:
# 策略迭代
V = [0 for _ in range(16)]
V_pi = policy_iterate(MDP, V, greedy_pi, 1, 100)
display_V(V_pi)

# 单纯的价值迭代
V_star = [0 for _ in range(16)]
V_star = value_iterate(MDP, V, 4)
display_V(V_star)

  0.00 -1.00 -2.00 -3.00
 -1.00 -2.00 -3.00 -2.00
 -2.00 -3.00 -2.00 -1.00
 -3.00 -2.00 -1.00  0.00

  0.00 -1.00 -2.00 -3.00
 -1.00 -2.00 -3.00 -2.00
 -2.00 -3.00 -2.00 -1.00
 -3.00 -2.00 -1.00  0.00



In [24]:
# 观察最优状态下的最优策略
def greedy_policy(MDP, V, s):
    S, A, R, P, gamma = MDP
    max_v, a_max_v = -float('inf'), []
    for a_opt in A:
        s_prime, reward, _ = dynamics(s, a_opt)
        v_s_prime = get_value(V, s_prime)
        if v_s_prime > max_v:
            max_v = v_s_prime
            a_max_v = a_opt
        elif v_s_prime == max_v:
            a_max_v += a_opt
    return str(a_max_v)

def display_policy(policy, MDP, V):
    S, A, R, P, gamma = MDP
    for i in range(16):
        print('{0:^6}'.format(policy(MDP, V, S[i])), end=" ")
        if(i+1)%4 == 0:
            print("")
    print()

display_policy(greedy_policy, MDP, V_star)

 nesw    w      w      sw   
  n      nw    nesw    s    
  n     nesw    es     s    
  ne     e      e     nesw  

