In [36]:
import numpy as np
import pygame
import gym

### Метод итераций по стратегиям

In [37]:
def policy_improvement(V, policy):
    '''
    Принимает функцию ценности, стратегию и перебирает все состояния,
    обновляя стратегию на основе новой функции ценности.
    Если стратегия изменилась - False, т.е. ещё не стабилизировалась.
    '''
    policy_stable = True
    for s in range(nS):
        old_a = policy[s]
        # Обновление политики с действием, которое приводит к максимальной ценности состояния
        policy[s] = np.argmax([eval_state_action(V, s, a) for a in range(nA)])
        if old_a != policy[s]: 
            policy_stable = False

    return policy_stable

In [38]:
def eval_state_action(V, s, a, gamma=0.99):
    '''
    Функция для вычисления ожидаемой ценности действия (используется в функции 'policy_improvement').
    
    env.P – словарь, содержащий всю информацию о динамике окружающей среды;
    gamma – дисконтирующий множитель.
    '''
    return np.sum([p * (rew + gamma*V[next_s]) for p, next_s, rew, _ in env.P[s][a]])

In [39]:
def policy_evaluation(V, policy, eps=0.0001):
    '''
    Функция для уравнения Беллмана, которая производит вычисления для каждого состояния,
    следуя текущей стратегии, до стабилизации результатов (delta становиться меньше eps).
    '''
    while True:
        delta = 0
        # Цикл для всех состояний
        for s in range(nS):
            old_v = V[s]
            # Обновление функции ценности с помощью уравнения Беллмана
            V[s] = eval_state_action(V, s, policy[s])
            delta = max(delta, np.abs(old_v - V[s]))

        if delta < eps:
            break

In [40]:
def run_episodes(env, policy, num_games=100):
    '''
    Функция запуска несольких эпох
    '''
    tot_rew = 0
    state = env.reset()
    for _ in range(num_games):
        done = False
        while not done:
            # Выбор действия в соотвествии с полититкой
            next_state, reward, done, _ = env.step(policy[state])
            state = next_state
            tot_rew += reward
            if done:
                state = env.reset()
    print('Победа в {} из {} игр!'.format(tot_rew, num_games))

In [52]:
# Создаем окружение
# FrozenLake-v1 - дискретный конечный МПР
env = gym.make('FrozenLake-v1')
env = env.unwrapped

# Получаем пространства действий и состояний
nA = env.action_space.n
nS = env.observation_space.n

# Инициализируем функцию ценности и стратегии
V = np.zeros(nS)
policy = np.zeros(nS)

# Основной цикл, в котором на каждой итерации выполняется один шаг оценивания стратегии и шаг улучшения стратегии. 
# Выход из цикла после стабилизации стратегии
policy_stable = False
iteration = 0

while not policy_stable:
    policy_evaluation(V, policy)
    policy_stable = policy_improvement(V, policy)
    iteration += 1

print('Сошёлся после {} итераций'.format(iteration))
run_episodes(env, policy)

#Функция ценности, стратегия
print(V.reshape((4,4)).round(2))
print(policy.reshape((4,4)))

Сошёлся после 7 итераций
Победа в 78.0 из 100 игр!
[[0.54 0.5  0.47 0.45]
 [0.56 0.   0.36 0.  ]
 [0.59 0.64 0.61 0.  ]
 [0.   0.74 0.86 0.  ]]
[[0. 3. 3. 3.]
 [0. 0. 0. 0.]
 [3. 1. 0. 0.]
 [0. 2. 1. 0.]]


### Метод итераций по ценности

In [53]:
def eval_state_action(V, s, a, gamma=0.99):
    '''
    Функция для оценивания пары состояние-действие
    '''
    return np.sum([p * (rew + gamma*V[next_s]) for p, next_s, rew, _ in env.P[s][a]])

In [71]:
def value_iteration(eps=0.0001):
    '''
    Алгоритм итераций по ценности
    '''
    V = np.zeros(nS)
    it = 0
    global iteration
    while True:
        delta = 0
        # Обнолвение ценности каждого состояния
        for s in range(nS):
            old_v = V[s]
            V[s] = np.max([eval_state_action(V, s, a) for a in range(nA)])
            delta = max(delta, np.abs(old_v - V[s]))
        # Выход из цикла при стабилизации
        if delta < eps:
            break
        it += 1
        iteration = it

    return V

In [72]:
def run_episodes(env, V, num_games=100):
    '''
    Функция запуска несольких эпох
    '''
    tot_rew = 0
    state = env.reset()

    for _ in range(num_games):
        done = False
        while not done:
            #Выбор наилучшего действия по функции ценности
            action = np.argmax([eval_state_action(V, state, a) for a in range(nA)])
            next_state, reward, done, _ = env.step(action)

            state = next_state
            tot_rew += reward 
            if done:
                state = env.reset()
    print('Победа в {} из {} игр!'.format(tot_rew, num_games))

In [74]:
# create the environment
env = gym.make('FrozenLake-v1')
env = env.unwrapped

# Получаем пространства действий и состояний
nA = env.action_space.n
nS = env.observation_space.n

# Value iteration
iteration = 0
V = value_iteration(eps=0.0001)

run_episodes(env, V, 100)

print('Сошёлся после {} итераций'.format(iteration))
print(V.reshape((4,4)).round(2))

Победа в 79.0 из 100 игр!
Сошёлся после 131 итераций
[[0.54 0.5  0.47 0.45]
 [0.56 0.   0.36 0.  ]
 [0.59 0.64 0.61 0.  ]
 [0.   0.74 0.86 0.  ]]
