# Лабораторная работа №4: Алгоритм Policy Iteration
**Выполнил:** Чжан Цзэнпэн, ИУ5И-21М

## Цель лабораторной работы:
Ознакомление с базовыми методами обучения с подкреплением.

## Задание:
1. На основе рассмотренного на лекции примера реализуйте алгоритм Policy Iteration для любой среды обучения с подкреплением (кроме рассмотренной на лекции среды Toy Text / Frozen Lake) из библиотеки Gym (или аналогичной библиотеки)

In [1]:
import gymnasium as gym # Обновленная библиотека Gym
import numpy as np
from pprint import pprint # Для красивого вывода словарей и матриц

### 2. Описание среды: CliffWalking-v0

Среда "CliffWalking-v0" представляет собой сеточное поле размером 4x12.
- **Состояния**: 48 состояний (от 0 до 47), представляющих каждую ячейку сетки.
- **Действия**: 4 дискретных действия:
    - 0: Двигаться вверх
    - 1: Двигаться вправо
    - 2: Двигаться вниз
    - 3: Двигаться влево
- **Начальное состояние**: Агент всегда начинает в ячейке [3, 0] (левый нижний угол, состояние 36).
- **Целевое состояние**: Ячейка [3, 11] (правый нижний угол, состояние 47).
- **"Обрыв"**: Ячейки [3, 1]...[3, 10] (состояния 37-46). Попадание на обрыв возвращает агента в начальное состояние (36).
- **Награды**:
    - -1 за каждый шаг, не ведущий к обрыву.
    - -100 за шаг, приводящий к попаданию на обрыв.
- **Завершение эпизода**: Эпизод завершается, когда агент достигает целевого состояния.

In [2]:
# Инициализация среды
env_raw = gym.make("CliffWalking-v0")
env = env_raw.unwrapped # <<<--- ДОБАВЛЕНО: получаем доступ к базовой среде

# Пространство состояний (количество состояний)
n_states = env.observation_space.n # Можно использовать env_raw или env, это свойство обычно доступно
print(f"Пространство состояний (количество): {n_states}")

# Пространство действий (количество действий)
n_actions = env.action_space.n # Аналогично
print(f"Пространство действий (количество): {n_actions}")

# Матрица переходов P
# Теперь обращаемся к env.P, где env - это "развернутая" среда
print("\nПример матрицы переходов для состояния 36 (старт), действие 0 (вверх):")
pprint(env.P[36][0])

print("\nПример матрицы переходов для состояния 25, действие 2 (вниз):")
pprint(env.P[25][2])

print("\nПример матрицы переходов для состояния 46, действие 1 (вправо):")
pprint(env.P[46][1])

# Также, для метода print_policy в классе агента нам понадобится env.shape
# Убедимся, что он доступен (обычно для сеточных сред он есть в unwrapped версии)
try:
    print(f"\nРазмерность сетки (env.shape): {env.shape}")
except AttributeError:
    print("\nАтрибут env.shape не найден. Для CliffWalking это обычно (4,12). Установим вручную, если нужно.")
    # Если env.shape не будет доступен, нам придется его задать вручную в классе агента
    # для reshape. Для CliffWalking это (4, 12).
    # В классе PolicyIterationAgent можно будет передавать shape или использовать значения по умолчанию.

Пространство состояний (количество): 48
Пространство действий (количество): 4

Пример матрицы переходов для состояния 36 (старт), действие 0 (вверх):
[(1.0, np.int64(24), -1, False)]

Пример матрицы переходов для состояния 25, действие 2 (вниз):
[(1.0, np.int64(36), -100, False)]

Пример матрицы переходов для состояния 46, действие 1 (вправо):
[(1.0, np.int64(47), -1, True)]

Размерность сетки (env.shape): (4, 12)


### 3. Реализация алгоритма Policy Iteration

In [3]:
class PolicyIterationAgent:
    def __init__(self, env, gamma=0.99, theta=1e-8):
        self.env = env
        self.n_states = env.observation_space.n
        self.n_actions = env.action_space.n
        self.gamma = gamma  # Коэффициент дисконтирования
        self.theta = theta  # Малое число для проверки сходимости

        # Инициализация случайной политики (равновероятный выбор действий)
        self.policy = np.ones([self.n_states, self.n_actions]) / self.n_actions
        # Инициализация функции ценности состояний нулями
        self.V = np.zeros(self.n_states)

    def policy_evaluation(self):
        """
        Оценивает текущую политику.
        Итеративно вычисляет функцию ценности V для текущей политики.
        """
        while True:
            delta = 0
            for s in range(self.n_states):
                v_s_old = self.V[s]
                new_v_s = 0
                # Для каждого действия, возможного по текущей политике
                for a in range(self.n_actions):
                    action_prob = self.policy[s, a]
                    # Для каждого возможного исхода действия (в CliffWalking один исход для каждого s,a)
                    for prob, next_state, reward, done in self.env.P[s][a]:
                        new_v_s += action_prob * prob * (reward + self.gamma * self.V[next_state])
                self.V[s] = new_v_s
                delta = max(delta, np.abs(v_s_old - self.V[s]))
            if delta < self.theta:
                break # Сходимость достигнута

    def policy_improvement(self):
        """
        Улучшает политику на основе текущей функции ценности V.
        Возвращает True, если политика изменилась (стала стабильной), иначе False.
        """
        policy_stable = True
        for s in range(self.n_states):
            old_action_probs = self.policy[s].copy() # Сохраняем старую политику для состояния s
            
            # Вычисляем Q-значения для всех действий в состоянии s
            q_s_a = np.zeros(self.n_actions)
            for a in range(self.n_actions):
                for prob, next_state, reward, done in self.env.P[s][a]:
                    q_s_a[a] += prob * (reward + self.gamma * self.V[next_state])
            
            # Находим лучшее(ие) действие(я) - жадный выбор
            best_actions = np.argwhere(q_s_a == np.max(q_s_a)).flatten()
            
            # Обновляем политику для состояния s
            # Делаем ее детерминированной, выбирая одно из лучших действий (или распределяя вероятность между ними)
            self.policy[s] = np.zeros(self.n_actions)
            self.policy[s, best_actions] = 1.0 / len(best_actions) # Равные вероятности для всех лучших действий

            # Проверяем, изменилась ли политика
            if not np.array_equal(old_action_probs, self.policy[s]):
                policy_stable = False
        return policy_stable

    def policy_iteration(self, max_iterations=1000):
        """
        Основной цикл алгоритма Policy Iteration.
        """
        for i in range(max_iterations):
            print(f"Итерация {i+1}")
            self.policy_evaluation()
            policy_stable = self.policy_improvement()
            if policy_stable:
                print(f"Политика стабилизировалась на итерации {i+1}.")
                break
        if i == max_iterations - 1:
             print(f"Достигнуто максимальное количество итераций ({max_iterations}).")

    def print_policy(self):
        print("Текущая политика (вероятности действий для каждого состояния):")
        # Для наглядности можно вывести только выбранное действие, если политика детерминирована
        optimal_actions = np.argmax(self.policy, axis=1)
        policy_grid = optimal_actions.reshape(self.env.shape) # env.shape для CliffWalking (4,12)
        
        action_symbols = {0: '^ (вверх)', 1: '> (вправо)', 2: 'v (вниз)', 3: '< (влево)'}
        
        print("Оптимальные действия на сетке:")
        for r in range(policy_grid.shape[0]):
            row_str = []
            for c in range(policy_grid.shape[1]):
                state = r * policy_grid.shape[1] + c
                if state in [37,38,39,40,41,42,43,44,45,46]: # Обрыв
                     row_str.append(" C ") # Cliff
                elif state == 47: # Цель
                     row_str.append(" G ") # Goal
                else:
                    row_str.append(f"{action_symbols[policy_grid[r,c]][0]:^3}") # Используем только символ
            print(" ".join(row_str))
        # pprint(self.policy) # Можно раскомментировать для вывода полной матрицы

    def print_value_function(self):
        print("\nФункция ценности состояний (V):")
        pprint(self.V.reshape(self.env.shape)) # env.shape для CliffWalking (4,12)

In [4]:
# Создание и обучение агента
# Передаем РАЗВЕРНУТУЮ среду в агент!
agent = PolicyIterationAgent(env_raw.unwrapped, gamma=0.9) # env_raw.unwrapped здесь
print("Начальная (случайная) политика:")
agent.print_policy() # Этот метод внутри агента должен использовать self.env.shape

agent.policy_iteration(max_iterations=100)

print("\nОптимальная политика:")
agent.print_policy()
agent.print_value_function()

Начальная (случайная) политика:
Текущая политика (вероятности действий для каждого состояния):
Оптимальные действия на сетке:
 ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^ 
 ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^ 
 ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^   ^ 
 ^   C   C   C   C   C   C   C   C   C   C   G 
Итерация 1
Итерация 2
Итерация 3
Итерация 4
Итерация 5
Итерация 6
Итерация 7
Итерация 8
Итерация 9
Итерация 10
Итерация 11
Итерация 12
Итерация 13
Итерация 14
Итерация 15
Итерация 16
Итерация 17
Итерация 18
Итерация 19
Итерация 20
Итерация 21
Итерация 22
Итерация 23
Итерация 24
Итерация 25
Итерация 26
Итерация 27
Итерация 28
Итерация 29
Итерация 30
Итерация 31
Итерация 32
Итерация 33
Итерация 34
Итерация 35
Итерация 36
Итерация 37
Итерация 38
Итерация 39
Итерация 40
Итерация 41
Итерация 42
Итерация 43
Итерация 44
Итерация 45
Итерация 46
Итерация 47
Итерация 48
Итерация 49
Итерация 50
Итерация 51
Итерация 52
Итерация 53
Итерация 54
Итерация 55
Итерация 56
Итерация 57
Итераци

### 4. Проигрывание сцены с обученным агентом (опционально, но интересно)

In [5]:
# Функция для проигрывания сцены
def play_episode(env_play, agent_play, render=True, max_steps=100):
    """Проигрывает один эпизод с использованием политики агента."""
    state, _ = env_play.reset()
    total_reward = 0
    
    print("\n--- Начало нового эпизода ---")
    if render:
        env_play.render() # Для текстового вывода в консоль
        # Для графического вывода (если среда поддерживает и настроено):
        # img = plt.imshow(env_play.render(mode='rgb_array'))

    for step in range(max_steps):
        # Выбираем действие согласно политике агента
        # np.random.choice используется, если политика стохастическая (несколько лучших действий)
        action = np.random.choice(np.arange(agent_play.n_actions), p=agent_play.policy[state])
        
        next_state, reward, terminated, truncated, _ = env_play.step(action)
        total_reward += reward
        
        if render:
            print(f"Шаг: {step+1}, Состояние: {state}, Действие: {action}, Награда: {reward}, Новое состояние: {next_state}")
            # Для текстового вывода
            env_play.render() 
            # if 'human' in env_play.metadata.get('render_modes', []):
            #     env_play.render()
            # elif 'rgb_array' in env_play.metadata.get('render_modes', []):
            #      img.set_data(env_play.render(mode='rgb_array'))
            #      plt.pause(0.1) # небольшая задержка для отображения
            #      plt.draw()


        state = next_state
        if terminated or truncated:
            print(f"Эпизод завершен после {step+1} шагов.")
            break
    
    print(f"Итоговая награда за эпизод: {total_reward}")
    return total_reward

In [6]:
# Запуск проигрывания
# Создаем среду для проигрывания (можно с другим режимом рендера, если нужно)
# CliffWalking-v0 не имеет встроенного графического рендера, только текстовый (mode='ansi') или rgb_array
# Для текстового вывода в Jupyter, print(env.render()) может быть лучше чем env.render() напрямую.
# Либо использовать параметр render_mode='human' при создании среды для вывода в отдельном окне (если поддерживается).
# Но в Google Colab или простом скрипте 'human' может не работать.

# Для текстового вывода в консоли/Jupyter, env.render() внутри play_episode уже есть.
# Если используете Jupyter и хотите видеть "картинку" (rgb_array), то нужен matplotlib
# и код как закомментировано выше. Но для CliffWalking rgb_array не самый информативный.

# Давайте просто запустим с текстовым выводом
env_to_play = gym.make("CliffWalking-v0", render_mode=None) # render_mode='ansi' для цветного текста, если терминал поддерживает

# Проиграем несколько эпизодов
for _ in range(1): # Можно увеличить количество эпизодов
    play_episode(env_to_play, agent, render=True, max_steps=50) # render=True включает print(env.render())

env.close()
env_to_play.close()


--- Начало нового эпизода ---
Шаг: 1, Состояние: 36, Действие: 0, Награда: -1, Новое состояние: 24
Шаг: 2, Состояние: 24, Действие: 2, Награда: -1, Новое состояние: 36
Шаг: 3, Состояние: 36, Действие: 3, Награда: -1, Новое состояние: 36
Шаг: 4, Состояние: 36, Действие: 2, Награда: -1, Новое состояние: 36
Шаг: 5, Состояние: 36, Действие: 0, Награда: -1, Новое состояние: 24
Шаг: 6, Состояние: 24, Действие: 3, Награда: -1, Новое состояние: 24
Шаг: 7, Состояние: 24, Действие: 1, Награда: -1, Новое состояние: 25
Шаг: 8, Состояние: 25, Действие: 1, Награда: -1, Новое состояние: 26
Шаг: 9, Состояние: 26, Действие: 3, Награда: -1, Новое состояние: 25
Шаг: 10, Состояние: 25, Действие: 0, Награда: -1, Новое состояние: 13
Шаг: 11, Состояние: 13, Действие: 1, Награда: -1, Новое состояние: 14
Шаг: 12, Состояние: 14, Действие: 0, Награда: -1, Новое состояние: 2
Шаг: 13, Состояние: 2, Действие: 3, Награда: -1, Новое состояние: 1
Шаг: 14, Состояние: 1, Действие: 2, Награда: -1, Новое состояние: 13
Ша

  gym.logger.warn(
