In [1]:
'''
（状態遷移確率、報酬関数、方策）-> ベルマン方程式 -> 連立方程式 -> 状態価値関数
連立方程sきを明示的に出して直接解くのは現実的には不可能
よって動的計画法（DP)

強化学習の２つのタスク
方策評価　方策が与えられた時に価値関数を求めること
方策制御　方策を最適方策へ調整すること

評価-> 制御　の流れ、DPで方策評価を行う

反復方策評価（DPの具体的なアルゴリズム）
ベルマン方程式を更新式（ブートストラッピング、推定値を使って推定値を改善する）に変えたもの

状態遷移が決定的：ある状態sである行動aを行えば次の状態s'は１つに決まる


'''

"\n（状態遷移確率、報酬関数、方策）-> ベルマン方程式 -> 連立方程式 -> 状態価値関数\n連立方程sきを明示的に出して直接解くのは現実的には不可能\nよって動的計画法（DP)\n\n強化学習の２つのタスク\n方策評価\u3000方策が与えられた時に価値関数を求めること\n方策制御\u3000方策を最適方策へ調整すること\n\n評価-> 制御\u3000の流れ、DPで方策評価を行う\n\n反復方策評価（DPの具体的なアルゴリズム）\nベルマン方程式を更新式（ブートストラッピング、推定値を使って推定値を改善する）に変えたもの\n\n状態遷移が決定的：ある状態sである行動aを行えば次の状態s'は１つに決まる\n\n\n"

In [2]:
# 反復方策評価

In [3]:
import numpy as np

In [7]:
class GridWorld:
    def __init__(self):
        '''
                          | goal
                          v
        +---+------+---+------+
        | 0 |  0   | 0 |  1.0 |
        +---+------+---+------+
        | 0 | None | 0 | -1.0 |   
        +---+------+---+------+
        | 0 |  0   | 0 |   0  |
        +---+------+---+------+
          ^
          | start
        '''
        self.action_space = [0,1,2,3] 
        self.action_meaning = {
            0: "UP",
            1: "DOWN",
            2: "LEFT",
            3: "RIGHT",
        }
        self.reward_map = np.array(
            [
                [0,0,0,1.0],
                [0,None,0,-1.0],
                [0,0,0,0]
            ]
        )
        self.goal_state = (0, 3)
        self.wall_state = (1, 1)
        self.start_state = (2, 0)
        self.agent_state = self.start_state

    @property
    def height(self):
        return len(self.reward_map)

    @property
    def width(self):
        return len(self.reward_map[0])

    @property
    def shape(self):
        return len(self.reward_map.shape)

    def actions(self):
        return self.action_space

    def states(self):
        for h in range(self.height):
            for w in range(self.width):
                yield (h, w)

    def next_state(self, state, action):
        action_move_map = [(-1,0),(1,0),(0,1),(0,-1)]
        move = action_move_map[action]
        next_state = (state[0]+move[0], state[1]+move[1])
        ny, nx = next_state

        if nx<0 or nx>=self.width or ny<0 or ny>=self.height:
            next_state = state
        elif next_state==self.wall_state:
            next_state = state

        return next_state

    def reward(self, state, action, next_state):
        return self.reward_map[next_state]

In [10]:
env = GridWorld()
V = {}
for state in env.states():
    V[state] = np.random.randn() # 状態価値関数

print(V)

{(0, 0): 1.2159087003296114, (0, 1): -0.5806169180349051, (0, 2): -1.0381794008048761, (0, 3): 0.8253526350196313, (1, 0): 0.2067356038063959, (1, 1): -0.7160572913410463, (1, 2): -0.6053089184072472, (1, 3): -0.5527016421412936, (2, 0): -0.40616378455141067, (2, 1): -0.8156943789802026, (2, 2): -0.7535806204959032, (2, 3): 0.6389817441476344}


In [11]:
from collections import defaultdict

In [13]:
# ランダムな方策
pi = defaultdict(lambda: {
    0: 0.25,
    1: 0.25,
    2: 0.25,
    3: 0.25
})
state = (0, 1)
print(pi[state])

{0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25}


In [15]:
def eval_onestep(pi, V, env, gamma=0.9):
    for state in env.states():  # ①各状態へアクセス
        if state == env.goal_state:  # ②ゴールの価値関数は常に0
            V[state] = 0
            continue

        action_probs = pi[state]  
        new_V = 0

        # ③各行動へアクセス
        for action, action_prob in action_probs.items():
            next_state = env.next_state(state, action)
            r = env.reward(state, action, next_state)
            # ④新しい価値関数
            new_V += action_prob * (r + gamma * V[next_state])
        V[state] = new_V
    return V

In [16]:
def policy_eval(pi, V, env, gamma, threshold=0.001):
    while True:
        old_V = V.copy()  # 更新前の価値関数
        V = eval_onestep(pi, V, env, gamma)

        # 更新された量の最大値を求める
        delta = 0
        for state in V.keys():
            t = abs(V[state] - old_V[state])
            if delta < t:
                delta = t

        # 閾値との比較
        if delta < threshold:
            break
    return V

In [17]:
env = GridWorld()
gamma = 0.9

pi = defaultdict(lambda: {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25})
V = defaultdict(lambda: 0)

V = policy_eval(pi, V, env, gamma)

In [18]:
print(V)

defaultdict(<function <lambda> at 0x10709dbc0>, {(0, 0): 0.03028806099856353, (1, 0): -0.026936362355998215, (0, 1): 0.0977824241089171, (0, 2): 0.2071980803453905, (1, 2): -0.495978151235659, (0, 3): 0, (2, 0): -0.09886155432366785, (2, 1): -0.21729033457922944, (1, 1): -0.14427715375541272, (2, 2): -0.43435951415465596, (1, 3): -0.37134421613448265, (2, 3): -0.7838088155423875})


In [19]:
# 方策を改善する

In [20]:
'''
最適方策は最大値を取る行動aによって決まる

最適方策ではなく「なんらかの決定論的方策」に対しても最大値を取る行動aによって方策を更新する（＝greedy化と呼ぶことにする）と
常に改善されるか、改善が止まればそれが最適方策になる

こうして得た方策（その時は最適）を評価して価値関数を得る、その価値関数でgreedy化をして方策を得る（この時点では最適）
これを方策が変更されなくなるまで繰り返す（方策反復法）
'''

'\n最適方策は最大値を取る行動aによって決まる\n\n最適方策ではなく「なんらかの決定論的方策」に対しても最大値を取る行動aによって方策を更新する（＝greedy化と呼ぶことにする）と\n常に改善されるか、改善が止まればそれが最適方策になる\n\nこうして得た方策（その時は最適）を評価して価値関数を得る、その価値関数でgreedy化をして方策を得る（この時点では最適）\nこれを方策が変更されなくなるまで繰り返す（方策反復法）\n'

In [21]:
def argmax(d):
    max_value = max(d.values())
    max_key = 0
    for key, value in d.items():
        if value == max_value:
            max_key = key
    return max_key
    
def greedy_policy(V, env, gamma):
    pi = {}

    for state in env.states():
        action_values = {}

        for action in env.actions():
            next_state = env.next_state(state, action)
            r = env.reward(state, action, next_state)
            value = r + gamma * V[next_state]  # ①
            action_values[action] = value
            max_action = argmax(action_values)  # ②
            action_probs = {0: 0, 1: 0, 2: 0, 3: 0}
            action_probs[max_action] = 1.0
            pi[state] = action_probs  # ③
    return pi

def policy_iter(env, gamma, threshold=0.001, is_render=False):
    pi = defaultdict(lambda: {0: 0.25, 1: 0.25, 2: 0.25, 3: 0.25})
    V = defaultdict(lambda: 0)

    while True:
        V = policy_eval(pi, V, env, gamma, threshold)  # ①評価
        new_pi = greedy_policy(V, env, gamma)  # ②改善

        if is_render:
            env.render_v(V, pi)

        if new_pi == pi:  # ③更新チェック
            break
        pi = new_pi

    return pi

In [22]:
env = GridWorld()
gamma = 0.9
pi = policy_iter(env, gamma)

In [24]:
'''
方策反復法のアイデアは、図4-17のように「評価」と「改善」という2つの過程を交互に繰り返すこと

方策評価と方策改善の2つの過程を交互に繰り返すアルゴリズムでは、評価と改善の“粒度”——どれぐらい正確に評価（もしくは改善）するかという点——について自由度があります。
「評価」フェーズでは完全に更新されなくても、真の価値関数の方向に近づきさえすれば良いということです。同様に「改善」フェーズではgreedy化の方向へ向かう（一部だけがgreedy化される）だけで良い

価値反復法：評価と改善をそれぞれ最小限に行う、方策を使わずに、価値関数を更新

方策反復法：すべての状態における価値関数を何度も更新します。その更新作業が収束したら、続いて「改善」（greedy化）のフェーズに移ります。
価値反復法：1つの状態だけを一度更新したら、すぐに「改善」フェーズへと移行する


'''

'\n方策反復法のアイデアは、図4-17のように「評価」と「改善」という2つの過程を交互に繰り返すこと\n\n方策評価と方策改善の2つの過程を交互に繰り返すアルゴリズムでは、評価と改善の“粒度”——どれぐらい正確に評価（もしくは改善）するかという点——について自由度があります。\n「評価」フェーズでは完全に更新されなくても、真の価値関数の方向に近づきさえすれば良いということです。同様に「改善」フェーズではgreedy化の方向へ向かう（一部だけがgreedy化される）だけで良い\n\n価値反復法：評価と改善をそれぞれ最小限に行う、方策を使わずに、価値関数を更新\n\n方策反復法：すべての状態における価値関数を何度も更新します。その更新作業が収束したら、続いて「改善」（greedy化）のフェーズに移ります。\n価値反復法：1つの状態だけを一度更新したら、すぐに「改善」フェーズへと移行する\n\n\n'

In [25]:
def value_iter_onestep(V, env, gamma):
    for state in env.states():  # ①すべての状態にアクセス
        if state == env.goal_state:  # ゴールの価値関数は常に0
            V[state] = 0
            continue

        action_values = []
        for action in env.actions():  # ②すべての行動にアクセス
            next_state = env.next_state(state, action)
            r = env.reward(state, action, next_state)
            value = r + gamma * V[next_state]  # ③新しい価値観数
            action_values.append(value)

        V[state] = max(action_values)  # ④最大値を取り出す
    return V

def value_iter(V, env, gamma, threshold=0.001):
    while True:
        old_V = V.copy()  # 更新前の価値関数
        V = value_iter_onestep(V, env, gamma)

        # 更新された量の最大値を求める
        delta = 0
        for state in V.keys():
            t = abs(V[state] - old_V[state])
            if delta < t:
                delta = t
        # 閾値との比較
        if delta < threshold:
            break
    return V

In [26]:
V = defaultdict(lambda: 0)
env = GridWorld()
gamma = 0.9

V = value_iter(V, env, gamma)

pi = greedy_policy(V, env, gamma)

In [27]:
print(V)

defaultdict(<function <lambda> at 0x10709e160>, {(0, 0): 0.81, (1, 0): 0.7290000000000001, (0, 1): 0.9, (0, 2): 1.0, (1, 2): 0.9, (0, 3): 0, (2, 0): 0.6561000000000001, (2, 1): 0.7290000000000001, (1, 1): 0.81, (2, 2): 0.81, (1, 3): 1.0, (2, 3): 0.7290000000000001})
