In [1]:
import numpy as np
import time
from Frozen_Lake import FrozenLakeEnv

env = FrozenLakeEnv()



Если награда не зависит от $s'$:
$$
q(s,a) = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$
Если награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a,s') + \gamma v(s')\Big)
$$
Если награда не зависит от $s'$ - это частный случай того, когда награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a) + \gamma v(s')\Big) = R(s,a) \sum_{s'} P(s'|s,a) + \gamma \sum_{s'} P(s'|s,a) v(s')
 = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$



In [2]:
def get_q_values(v_values, gamma):
    q_values = {}
    for state in env.get_all_states():
        q_values[state] = {}
        for action in env.get_possible_actions(state):
            q_values[state][action] = 0
            for next_state in env.get_next_states(state, action):
                q_values[state][action] += env.get_transition_prob(state, action, next_state) * env.get_reward(state, action, next_state)
                q_values[state][action] += gamma * env.get_transition_prob(state, action, next_state) * v_values[next_state]
    return q_values

In [3]:
def init_policy():
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        for action in env.get_possible_actions(state):
            policy[state][action] = 1 / len(env.get_possible_actions(state))
    return policy

In [4]:
def init_v_values():
    v_values = {}
    for state in env.get_all_states():
        v_values[state] = 0
    return v_values

In [5]:
def policy_evaluation_step(v_values, policy, gamma):
    q_values = get_q_values(v_values, gamma)
    new_v_values = init_v_values()
    for state in env.get_all_states():
        new_v_values[state] = 0
        for action in env.get_possible_actions(state):
            new_v_values[state] += policy[state][action] * q_values[state][action]
    return new_v_values

In [14]:
def policy_evaluation(policy, gamma, eval_iter_n):
    v_values = init_v_values()
    for _ in range(eval_iter_n):
        v_values = policy_evaluation_step(v_values, policy, gamma)
    q_values = get_q_values(v_values, gamma)
    print(v_values)
    return q_values

In [15]:
def policy_improvement(q_values):
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        argmax_action = None
        max_q_value = float('-inf')
        for action in env.get_possible_actions(state): 
            policy[state][action] = 0
            if q_values[state][action] > max_q_value:
                argmax_action = action
                max_q_value = q_values[state][action]
        policy[state][argmax_action] = 1
    return policy

In [16]:
iter_n = 100
eval_iter_n = 100
gamma = 0.9999

policy = init_policy()
for _ in range(iter_n):
    q_values = policy_evaluation(policy, gamma, eval_iter_n)
    policy = policy_improvement(q_values)

{(0, 0): 0.01392284759471333, (0, 1): 0.011618058009183906, (0, 2): 0.020935974260894136, (0, 3): 0.010465893694560765, (1, 0): 0.016233207073914853, (1, 1): 0, (1, 2): 0.040732346429984155, (1, 3): 0, (2, 0): 0.034783267675136245, (2, 1): 0.08813051071831293, (2, 2): 0.14200970607552987, (2, 3): 0, (3, 0): 0, (3, 1): 0.17576432489326094, (3, 2): 0.439232776753058, (3, 3): 0}
{(0, 0): 0.7339138616001625, (0, 1): 0.612623474892364, (0, 2): 0.6892779948468303, (0, 3): 0.612623474892364, (1, 0): 0.749166908346178, (1, 1): 0, (1, 2): 0.7085277932016393, (1, 3): 0, (2, 0): 0.8429064271185163, (2, 1): 0.9600925441956469, (2, 2): 0.8857483163336823, (2, 3): 0, (3, 0): 0, (3, 1): 0.984153860883415, (3, 2): 0.9872845570051499, (3, 3): 0}
{(0, 0): 0.812022483276867, (0, 1): 0.7197553715812856, (0, 2): 0.7605313728146351, (0, 3): 0.7553742001387258, (1, 0): 0.8319440209294895, (1, 1): 0, (1, 2): 0.7666849900733356, (1, 3): 0, (2, 0): 0.8652246277810456, (2, 1): 0.9778453273111514, (2, 2): 0.95848

In [13]:
total_rewards = []

for _ in range(1000):
    total_reward = 0
    state = env.reset()
    for _ in range(1000):
        action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
        state, reward, done, _ = env.step(action)
        total_reward += reward
        
        if done:
            break
    
    total_rewards.append(total_reward)

np.mean(total_rewards)

0.986

In [40]:
policy

{(0, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (0, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 3): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (1, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 1): {None: 1},
 (1, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 3): {None: 1},
 (2, 0): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (2, 1): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 3): {None: 1},
 (3, 0): {None: 1},
 (3, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 2): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 3): {None: 1}}

In [12]:
state = env.reset()
for _ in range(1000):
    action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
    state, reward, done, _ = env.step(action)
    total_reward += reward

    env.render()
    time.sleep(0.5)
    
    if done:
        break

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

*FFF
FHFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

*FFF
FHFH
FFFH
HFFG

S*FF
FHFH
FFFH
HFFG

S*FF
FHFH
FFFH
HFFG

S*FF
FHFH
FFFH
HFFG

S*FF
FHFH
FFFH
HFFG

S*FF
FHFH
FFFH
HFFG

*FFF
FHFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HFF*

