# Практика

In [22]:
import numpy as np
import time
from Frozen_Lake import FrozenLakeEnv

env = FrozenLakeEnv()

Если награда не зависит от $s'$:
$$
q(s,a) = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$
Если награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a,s') + \gamma v(s')\Big)
$$
Если награда не зависит от $s'$ - это частный случай того, когда награда зависит от $s'$:
$$
q(s,a) = \sum_{s'} P(s'|s,a) \Big( R(s,a) + \gamma v(s')\Big) = R(s,a) \sum_{s'} P(s'|s,a) + \gamma \sum_{s'} P(s'|s,a) v(s')
 = R(s, a) + \gamma \sum_{s'} P(s'|s,a) v(s')
$$



In [23]:
def get_q_values(v_values, gamma):
    q_values = {}
    for state in env.get_all_states():
        q_values[state] = {}
        for action in env.get_possible_actions(state):
            q_values[state][action] = 0
            for next_state in env.get_next_states(state, action):
                q_values[state][action] += env.get_transition_prob(state, action, next_state) * env.get_reward(state, action, next_state)
                q_values[state][action] += gamma * env.get_transition_prob(state, action, next_state) * v_values[next_state]
    return q_values

In [24]:
def init_policy():
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        for action in env.get_possible_actions(state):
            policy[state][action] = 1 / len(env.get_possible_actions(state))
    return policy

In [25]:
def init_v_values():
    v_values = {}
    for state in env.get_all_states():
        v_values[state] = 0
    return v_values

In [26]:
def policy_evaluation_step(v_values, policy, gamma):
    q_values = get_q_values(v_values, gamma)
    new_v_values = init_v_values()
    for state in env.get_all_states():
        new_v_values[state] = 0
        for action in env.get_possible_actions(state):
            new_v_values[state] += policy[state][action] * q_values[state][action]
    return new_v_values

In [27]:
def policy_evaluation(policy, gamma, eval_iter_n, v_values=None):
    if v_values is None:
        v_values = init_v_values()
    for _ in range(eval_iter_n):
        v_values = policy_evaluation_step(v_values, policy, gamma)
    q_values = get_q_values(v_values, gamma)
    return q_values

In [28]:
def policy_improvement(q_values):
    policy = {}
    for state in env.get_all_states():
        policy[state] = {}
        argmax_action = None
        max_q_value = float('-inf')
        for action in env.get_possible_actions(state): 
            policy[state][action] = 0
            if q_values[state][action] > max_q_value:
                argmax_action = action
                max_q_value = q_values[state][action]
        policy[state][argmax_action] = 1
    return policy

In [45]:
iter_n = 100
eval_iter_n = 100
gamma = 1.0

policy = init_policy()
v_values = init_v_values()
for _ in range(iter_n):
    q_values = policy_evaluation(policy, gamma, eval_iter_n)
    policy = policy_improvement(q_values)

In [46]:
total_rewards = []

for _ in range(1000):
    total_reward = 0
    state = env.reset()
    for _ in range(1000):
        action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
        state, reward, done, _ = env.step(action)
        total_reward += reward
        
        if done:
            break
    
    total_rewards.append(total_reward)

np.mean(total_rewards)

0.988

In [17]:
policy

{(0, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (0, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (0, 3): {'left': 1, 'down': 0, 'right': 0, 'up': 0},
 (1, 0): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 1): {None: 1},
 (1, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (1, 3): {None: 1},
 (2, 0): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (2, 1): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 2): {'left': 0, 'down': 1, 'right': 0, 'up': 0},
 (2, 3): {None: 1},
 (3, 0): {None: 1},
 (3, 1): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 2): {'left': 0, 'down': 0, 'right': 1, 'up': 0},
 (3, 3): {None: 1}}

In [20]:
state = env.reset()
for _ in range(1000):
    action = np.random.choice(env.get_possible_actions(state), p=list(policy[state].values()))
    state, reward, done, _ = env.step(action)
    total_reward += reward

    env.render()
    time.sleep(0.5)
    
    if done:
        break

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HFF*



# Домашнее задание

## Задание №3

### Value iteration

1) Вычисление $v_*$ в случае, когда награда зависит от следующего состояния:
$$ 
v_* = max_{a \in \mathcal{A}} \big( R(a) + \gamma P(a) v_* \big)
$$
где
$R(a)$ - вектор, состоящий из элементов $R(s_i, a) = \sum_{s' \in \mathcal{S}} P(s' | s_i, a)R(s_i, a, s')$, $1 \leq i \leq |\mathcal{S}|$;
$P(a)$ - матрица, состоящая из элементов $P(s_j | s_i, a)$, $1 \leq i \leq |\mathcal{S}|; 1 \leq j \leq |\mathcal{S}|$.

2) Вычисление $q_*$:
$$
q_*(s, a) = \sum_{s'} P(s' | s, a) \big( R(s, a, s') + \gamma v_*(s') \big)
$$
или в векторном виде
$$
q_*(a) = \sum_{s'} P(s' | a) \big( R(a, s') + \gamma v_*(s') \big)
$$
3) Итоговый шаг - вычисление policy через теорему greedy policy improvement.

In [1]:
import time

import numpy as np

from Frozen_Lake import FrozenLakeEnv



In [2]:
env = FrozenLakeEnv()
POSSIBLE_STATES = tuple(env.get_all_states())
POSSIBLE_ACTIONS = ("left", "right", "down", "up")
N_STATES = len(POSSIBLE_STATES)
N_ACTIONS = len(POSSIBLE_ACTIONS)

In [3]:
def calc_r_vector():
    r = []
    for state in POSSIBLE_STATES:
        for action in POSSIBLE_ACTIONS:
            r_i = 0
            # the reward of actions that aren't in possible_actions is considered to be 0
            # so we don't need FOR cycle for them
            if action in env.get_possible_actions(state):
                for next_state, transition_proba in env.get_next_states(
                    state, action
                ).items():
                    r_i += transition_proba * env.get_reward(
                        state, action, next_state
                    )
            r.append(r_i)
    # r = [n_states * n_actions]
    assert len(r) == N_STATES * N_ACTIONS
    return np.array(r)

In [4]:
calc_r_vector()

array([0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
       0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
       0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
       0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ,
       0. , 0. , 0. , 0. , 0. , 0.8, 0.1, 0.1, 0. , 0. , 0. , 0. ])

In [5]:
def calc_p_matrix():
    p = []
    for i_state, state in enumerate(POSSIBLE_STATES):
        for action in POSSIBLE_ACTIONS:
            if action in env.get_possible_actions(state):
                p_row = []
                for next_state in POSSIBLE_STATES:
                    transition_proba = env.get_next_states(
                        state, action
                    ).get(next_state, 0.0)
                    p_row.append(transition_proba)
            else:
                p_row = [0.0] * N_STATES
                p_row[i_state] = 1.0 
            p.append(p_row)
    p = np.array(p)
    # p = [n_states * n_actions, n_states]
    assert p.shape == (N_STATES * N_ACTIONS, N_STATES)
    return p

In [6]:
calc_p_matrix().reshape((N_STATES, N_ACTIONS, N_STATES)).sum(-1)

array([[1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.],
       [1., 1., 1., 1.]])

In [7]:
def calc_v_values_step(v_values, r_vector, p_matrix, gamma):
    # v_values = [n_states]
    # r_vector = [n_states * n_actions]
    # p_matrix = [n_states * n_actions, n_states]
    raw_v_values = r_vector + gamma * p_matrix.dot(v_values)
    # raw_v_values = [n_states * n_actions]
    raw_v_values = raw_v_values.reshape((N_STATES, N_ACTIONS))
    v_values = raw_v_values.max(axis=-1)
    # v_values = [n_states]
    return v_values

In [8]:
def calc_v_values(n_iterations, gamma):
    v_values = np.zeros((N_STATES,))
    r_vector, p_matrix = calc_r_vector(), calc_p_matrix()
    # r_vector = [n_states * n_action]
    # p_matrix = [n_states * n_actions, n_states]
    for i_iteration in range(n_iterations):
        v_values = calc_v_values_step(v_values, r_vector, p_matrix, gamma)
    # v_values = [n_states]
    return v_values

In [9]:
v_values = calc_v_values(n_iterations=1000, gamma=1.0)
v_values

array([0.9962074 , 0.99604792, 0.9959365 , 0.99587922, 0.99623226,
       0.        , 0.79674484, 0.        , 0.9962951 , 0.99683244,
       0.97710783, 0.        , 0.        , 0.99936583, 0.99968264,
       0.        ])

In [10]:
def calc_rewards_matrix():
    rewards = []
    for state in POSSIBLE_STATES:
        for action in POSSIBLE_ACTIONS:
            rewards_row = []
            for next_state in POSSIBLE_STATES:
                if (
                    action in env.get_possible_actions(state)
                    and next_state in env.get_next_states(state, action)
                ):
                    reward = env.get_reward(state, action, next_state)
                else:
                    reward = 0
                rewards_row.append(reward)
            rewards.append(rewards_row)
    rewards = np.array(rewards)
    # rewards = [n_states * n_actions, n_states]
    assert rewards.shape == (N_STATES * N_ACTIONS, N_STATES)
    return rewards

In [11]:
calc_rewards_matrix()

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

In [12]:
def calc_q_values(v_values, gamma):
    p_matrix = calc_p_matrix()
    # p_matrix = [n_states * n_actions, n_states]
    rewards = calc_rewards_matrix()
    # rewards = [n_states * n_actions, n_states]
    raw_q_values = gamma * p_matrix.dot(v_values) + (p_matrix * rewards).sum(-1)
    # raw_q_values = [n_states * n_actions]
    q_values = raw_q_values.reshape((N_STATES, N_ACTIONS,))
    return q_values

In [13]:
q_values = calc_q_values(v_values, gamma=1.0)
q_values

array([[0.99620989, 0.9960823 , 0.99621134, 0.99619145],
       [0.89657071, 0.89635399, 0.19921439, 0.99605272],
       [0.97610647, 0.97597151, 0.83658859, 0.99594191],
       [0.89633712, 0.8962913 , 0.19918157, 0.99588495],
       [0.99623606, 0.19925025, 0.89665931, 0.89658915],
       [0.        , 0.        , 0.        , 0.        ],
       [0.19730443, 0.19730443, 0.78168627, 0.7967492 ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.89665931, 0.89708917, 0.19931275, 0.99629856],
       [0.89697267, 0.88162285, 0.99683296, 0.19734029],
       [0.9771087 , 0.17964275, 0.89942935, 0.73707912],
       [0.        , 0.        , 0.        , 0.        ],
       [0.        , 0.        , 0.        , 0.        ],
       [0.19961983, 0.99936594, 0.89946093, 0.89743421],
       [0.99717171, 0.99767905, 0.99968269, 0.98162285],
       [0.        , 0.        , 0.        , 0.        ]])

In [14]:
def calc_policy(q_values):
    policy = dict()
    for state, state_q_values in zip(POSSIBLE_STATES, q_values):
        policy[state] = dict()
        argmax_action = POSSIBLE_ACTIONS[state_q_values.argmax()]
        for action in env.get_possible_actions(state):
            if action == argmax_action:
                policy[state][action] = 1.0
            else:
                policy[state][action] = 0.0
    return policy

In [15]:
calc_policy(q_values)

{(0, 0): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (0, 1): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (0, 2): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (0, 3): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (1, 0): {'left': 1.0, 'down': 0.0, 'right': 0.0, 'up': 0.0},
 (1, 1): {},
 (1, 2): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (1, 3): {},
 (2, 0): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (2, 1): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (2, 2): {'left': 1.0, 'down': 0.0, 'right': 0.0, 'up': 0.0},
 (2, 3): {},
 (3, 0): {},
 (3, 1): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (3, 2): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (3, 3): {}}

In [16]:
def value_iteration(n_iterations, gamma):
    v_values = calc_v_values(n_iterations, gamma)
    q_values = calc_q_values(v_values, gamma)
    return calc_policy(q_values)

In [17]:
policy = value_iteration(n_iterations=1000, gamma=1.0)
policy

{(0, 0): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (0, 1): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (0, 2): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (0, 3): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (1, 0): {'left': 1.0, 'down': 0.0, 'right': 0.0, 'up': 0.0},
 (1, 1): {},
 (1, 2): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (1, 3): {},
 (2, 0): {'left': 0.0, 'down': 0.0, 'right': 0.0, 'up': 1.0},
 (2, 1): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (2, 2): {'left': 1.0, 'down': 0.0, 'right': 0.0, 'up': 0.0},
 (2, 3): {},
 (3, 0): {},
 (3, 1): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (3, 2): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (3, 3): {}}

### Оценка качества политики

In [18]:
def evaluate_policy(policy):
    total_rewards = []
    
    for _ in range(1000):
        total_reward = 0
        state = env.reset()
        
        for _ in range(1000):
            action = np.random.choice(
                POSSIBLE_ACTIONS, 
                p=[policy[state][action] for action in POSSIBLE_ACTIONS]
            )
            state, reward, done, _ = env.step(action)
            total_reward += reward
            
            if done:
                break
        
        total_rewards.append(total_reward)
    
    return np.mean(total_rewards)

In [19]:
evaluate_policy(policy)

0.996

In [22]:
def run_agent(policy):
    state = env.reset()
    for _ in range(1000):
        action = np.random.choice(
            POSSIBLE_ACTIONS, 
            p=[policy[state][action] for action in POSSIBLE_ACTIONS]
        )
        state, _, done, _ = env.step(action)
    
        env.render()
        
        if done:
            break

In [23]:
run_agent(policy)

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

*FFF
FHFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

*FFF
FHFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FF*H
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFF

In [27]:
env.get_next_states((1, 0), "down")

{(1, 0): 0.1, (2, 0): 0.8, (1, 1): 0.1}

### Эксперименты со средой

In [28]:
class CustomFrozenLakeEnv(FrozenLakeEnv):
    def get_reward(self, state, action, next_state):
        reward = super().get_reward(state, action, next_state)
        if reward == 0:
            reward = -1
        elif reward == 1:
            reward = 10
        else:
            raise NotImplementedError
        return reward

In [29]:
env = CustomFrozenLakeEnv()

In [31]:
policy = value_iteration(n_iterations=1000, gamma=1.0)
policy

{(0, 0): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (0, 1): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (0, 2): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (0, 3): {'left': 1.0, 'down': 0.0, 'right': 0.0, 'up': 0.0},
 (1, 0): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (1, 1): {},
 (1, 2): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (1, 3): {},
 (2, 0): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (2, 1): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (2, 2): {'left': 0.0, 'down': 1.0, 'right': 0.0, 'up': 0.0},
 (2, 3): {},
 (3, 0): {},
 (3, 1): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (3, 2): {'left': 0.0, 'down': 0.0, 'right': 1.0, 'up': 0.0},
 (3, 3): {}}

In [39]:
env.get_next_states((1, 2), "down")

{(1, 1): 0.1, (2, 2): 0.8, (1, 3): 0.1}

In [33]:
evaluate_policy(policy)

1.618

In [41]:
run_agent(policy)

SFFF
*HFH
FFFH
HFFG

SFFF
FHFH
*FFH
HFFG

SFFF
FHFH
F*FH
HFFG

SFFF
FHFH
FFFH
H*FG

SFFF
FHFH
FFFH
HF*G

SFFF
FHFH
FFFH
HFF*

