# RL basics

Термины и понятия:

- агент/среда
- наблюдение $o$ / состояние $s$
- действие $a$, стратегия $\pi: \pi(s) \rightarrow a$ функция перехода $T: T(s, a) \rightarrow s'$
- вознаграждение $r$, ф-я вознаграждений $R: R(s, a) \rightarrow r$
- цикл взаимодействия, траектория $\tau: (s_0, a_0, r_0, s_1, a_1, r_1, ..., s_T, a_T, r_T)$, эпизод
- отдача $G$, подсчет отдачи, средняя[/ожидаемая] отдача $\mathbb{E}[G]$

In [1]:
import gymnasium as gym
import numpy as np

## Agent, environment

<img src=https://gymnasium.farama.org/_images/lunar_lander.gif caption="lunar lander" width="150" height="50"><img src=https://gymnasium.farama.org/_images/mountain_car.gif caption="mountain car" width="150" height="50">
<img src=https://gymnasium.farama.org/_images/cliff_walking.gif caption="cliff walking" width="300" height="50">
<img src=https://ale.farama.org/_images/montezuma_revenge.gif caption="montezuma revenge" width="150" height="100">
<img src=https://github.com/danijar/crafter/raw/main/media/video.gif caption="crafter" width="150" height="100">
<img src=https://camo.githubusercontent.com/6df2ca438d8fe8aa7a132b859315147818c54af608f8609320c3c20e938acf48/68747470733a2f2f6d656469612e67697068792e636f6d2f6d656469612f344e78376759694d394e44724d724d616f372f67697068792e676966 caption="malmo minecraft" width="150" height="100">
<img src=https://images.ctfassets.net/kftzwdyauwt9/e0c0947f-1a44-4528-4a41450a9f0a/2d0e85871d58d02dbe01b2469d693d4a/table-03.gif caption="roboschool" width="150" height="100">
<img src=https://raw.githubusercontent.com/Tviskaron/mipt/master/2019/RL/02/mdp.png caption="Марковский процесс принятия решений" width="150" height="100">
<img src=https://minigrid.farama.org/_images/DoorKeyEnv.gif caption="minigrid" width="120" height="120">

## Observation, state

TODO:
- добавить примеры наблюдений/состояний (числа, векторы, картинки)
- интуитивное объяснение различия, положить пока, что наблюдение = состояние
- пространство состояний


В каждый момент времени среда имеет некоторое внутреннее состояние. Здесь слово "состояние" я употребил скорее в интуитивном понимании, чтобы обозначить, что среда изменчива (иначе какой смысл с ней взаимодействовать, если ничего не меняется). В обучении с подкреплением под термином состояние $s$ (или $s_t$, где $t$ — текущее время) подразумевают либо абстрактно информацию о "состоянии" среды, либо ее явное представление в виде данных, достаточные для полного описания "состояния". *NB: Здесь можно провести аналогию с компьютерными играми — файл сохранения игры как раз содержит информацию о "состоянии" мира игры, чтобы в будущем можно было продолжить с текущей точки, так что данные этого файла в целом можно с некоторой натяжкой считать состоянием (с натяжкой, потому что редко когда в сложных играх файлы сохранения содержат прямо вот всю информацию, так что после перезагрузки вы получите не совсем точную копию). При этом обычно подразумевается, что состояние не содержит в себе ничего лишнего, то есть это **минимальный** набор информации.*

Наблюдением $o$ называют то, что агент "видит" о текущем состоянии среды. Это не обязательно зрение, а вообще вся доступная ему информация (условно, со всех его органов чувств).

В общем случае наблюдение: кортеж/словарь многомерных векторов чисел.

In [2]:
print(gym.make("CartPole-v1").reset()[0].shape)
print(gym.make("MountainCar-v0").reset()[0].shape)

(4,)
(2,)


## Action, policy, transition function

Рассмотрим следующие MDP:

- A: <img src=https://i.ibb.co/mrCMVZLQ/mdp-a.png caption="A" width="400" height="100">
- B: <img src=https://i.ibb.co/GQ2tVtjC/mdp-b.png caption="B" width="400" height="100">

Links to all:
[A](https://i.ibb.co/mrCMVZLQ/mdp-a.png)
[B](https://i.ibb.co/GQ2tVtjC/mdp-b.png)
[C](https://i.ibb.co/Jj9LYHjP/mdp-c.png)
[D](https://i.ibb.co/Y47Mr83b/mdp-d.png)
[E](https://i.ibb.co/Kjt1Xhmf/mdp-e.png)

Давайте явно запишем пространства состояний $S$ и действий $A$, а также функцию перехода $T$ среды.

In [3]:
states_A   = set(range(3))
actions_A = set(range(1))
print(f'{states_A=} | {actions_A=}')
T_A = {
    (0, 0): 1,
    (1, 0): 2,
    (2, 0): 2
}
print(f'Transition function {T_A=}')
A_mdp = states_A, actions_A, T_A

states_A={0, 1, 2} | actions_A={0}
Transition function T_A={(0, 0): 1, (1, 0): 2, (2, 0): 2}


In [4]:
states_B   = set(range(4))
actions_B = set(range(3))
print(f'{states_B=} | {actions_B=}')
T_B = {
    (0, 0): 1,
    (0, 1): 2,
    (0, 2): 3,
    
    (1, 0): 1,
    (1, 1): 1,
    (1, 2): 1,
    
    (2, 0): 2,
    (2, 1): 2,
    (2, 2): 2,
    
    (3, 0): 3,
    (3, 1): 3,
    (3, 2): 3
}
print(f'Transition function {T_B=}')
B_mdp = states_B, actions_B, T_B

states_B={0, 1, 2, 3} | actions_B={0, 1, 2}
Transition function T_B={(0, 0): 1, (0, 1): 2, (0, 2): 3, (1, 0): 1, (1, 1): 1, (1, 2): 1, (2, 0): 2, (2, 1): 2, (2, 2): 2, (3, 0): 3, (3, 1): 3, (3, 2): 3}


Попробуйте записать функцию перехода в матричном виде:

In [5]:
T_matr_A = np.array([
#    0  1  2 - итоговые состояния
    [0, 1, 0], # 0 - исходное состояние
    [0, 0, 1], # 1 - исходное состояние
    [0, 0, 1]  # 2 - исходное состояние
]) # значения элементов - вероятности перехода

In [6]:
# не указаны вероятности перехода, возьмем 1/3
T_matr_B = np.array([
#    0  1    2    3 - итоговые состояния
    [0, 1/3, 1/3, 1/3], # 0 - исходное состояние
    [0, 1,   0,   0  ], # 1 - исходное состояние
    [0, 0,   1,   0  ], # 2 - исходное состояние
    [0, 0,   0,   1  ]  # 3 - исходное состояние
]) # значения элементов - вероятности перехода

Как получить вероятность нахождения агента в состоянии (1) через N шагов? Что происходит с вероятностями нахождения в состояниях при $N \rightarrow \infty$

In [7]:
def step_n(s, T_matr, n):
    return s @ np.linalg.matrix_power(T_matr, n)

In [8]:
init_state_A = np.array([1, 0, 0]) # возьмем состояние 0
print(f"MDP: A, init state: {init_state_A}")
for i in range(5):
    print(f"{i} steps, state: {step_n(init_state_A, T_matr_A, i)}")
print("Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 1 равна 1, при N != 1 равна 0")
print("При N->inf попадаем в стационартное состояние")

MDP: A, init state: [1 0 0]
0 steps, state: [1 0 0]
1 steps, state: [0 1 0]
2 steps, state: [0 0 1]
3 steps, state: [0 0 1]
4 steps, state: [0 0 1]
Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 1 равна 1, при N != 1 равна 0
При N->inf попадаем в стационартное состояние


In [9]:
init_state_B = np.array([1, 0, 0, 0]) # возьмем состояние 0
print(f"MDP: B, init state: {init_state_B}")
for i in range(5):
    print(f"{i} steps, state: {step_n(init_state_B, T_matr_B, i)}")
print("Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 0 равна 0, при N > 1 равна 1/3")
print("При N->inf попадаем в стационартное состояние")

MDP: B, init state: [1 0 0 0]
0 steps, state: [1. 0. 0. 0.]
1 steps, state: [0.         0.33333333 0.33333333 0.33333333]
2 steps, state: [0.         0.33333333 0.33333333 0.33333333]
3 steps, state: [0.         0.33333333 0.33333333 0.33333333]
4 steps, state: [0.         0.33333333 0.33333333 0.33333333]
Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 0 равна 0, при N > 1 равна 1/3
При N->inf попадаем в стационартное состояние


Причем, возможно, я запутался в терминологоии:
- я записал функцию перехода в матричном виде для состояний (она мне понадобилась для расчета нахождения состояния через N шагов)
- можно записать функцию перехода в матричном виде для среды (это будет матрица размера N_states x N_actions x N_states)

Задайте еще несколько MDP:

- C: <img src=https://i.ibb.co/Jj9LYHjP/mdp-c.png caption="C" width="400" height="100">

In [10]:
states_C   = set(range(4))
actions_C = set(range(2))
print(f'{states_C=} | {actions_C=}')
T_C = {
    (0, 0): 1,
    (0, 1): 2,
    
    (1, 0): 1,
    (1, 1): 3,
    
    (2, 0): 3,
    (2, 1): 2,
    
    (3, 0): 3,
    (3, 1): 3
}
print(f'Transition function {T_C=}')
C_mdp = states_C, actions_C, T_C

# не указаны вероятности перехода, возьмем 1/2
T_matr_C = np.array([
#    0  1    2    3 - итоговые состояния
    [0, 1/2, 1/2, 0  ], # 0 - исходное состояние
    [0, 1/2, 0,   1/2], # 1 - исходное состояние
    [0, 0,   1/2, 1/2], # 2 - исходное состояние
    [0, 0,   0,   1  ]  # 3 - исходное состояние
]) # значения элементов - вероятности перехода

init_state_C = np.array([1, 0, 0, 0]) # возьмем состояние 0
print(f"MDP: C, init state: {init_state_C}")
for i in range(5):
    print(f"{i} steps, state: {step_n(init_state_C, T_matr_C, i)}")
print("Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 0 равна 0, при N > 0 равна (1/2)^N")
print("При N->inf попадаем в стационартное состояние")

states_C={0, 1, 2, 3} | actions_C={0, 1}
Transition function T_C={(0, 0): 1, (0, 1): 2, (1, 0): 1, (1, 1): 3, (2, 0): 3, (2, 1): 2, (3, 0): 3, (3, 1): 3}
MDP: C, init state: [1 0 0 0]
0 steps, state: [1. 0. 0. 0.]
1 steps, state: [0.  0.5 0.5 0. ]
2 steps, state: [0.   0.25 0.25 0.5 ]
3 steps, state: [0.    0.125 0.125 0.75 ]
4 steps, state: [0.     0.0625 0.0625 0.875 ]
Вероятность нахождения в состоянии 1 при старте в состоянии 0 при N = 0 равна 0, при N > 0 равна (1/2)^N
При N->inf попадаем в стационартное состояние


Давайте попробуем задать двух агентов: случайного и оптимального (для каждой среды свой).

In [11]:
class Agent:
    def __init__(self, actions):
        self.rng = np.random.default_rng()
        self.actions = np.array(list(actions))

    def act(self, state):
        return self.rng.integers(len(self.actions))

In [12]:
class OptimalAgentA:
    def __init__(self, actions):
        self.actions = np.array(list(actions))

    def act(self, state):
        # одно действие
        return 0

Других оптимальных агентов зададим ниже, так как для этого надо понять, что такое "оптимальный"

В качестве дополнения, запишите стратегию агента

Если я правильно понял, то под стратегией мы подразумеваем как раз правило, по которому агент выбирает действие  
Это правило в свою очередь будет как раз заложено в класс оптимального агента

## Reward, reward function

Теперь добавим произвольную функцию вознаграждения. Например, для A:

In [13]:
R_A = {
    (0, 0): -0.1,
    (1, 0): 1.0,
    (2, 0): 0.0
}
print(R_A)

A_mdp = *A_mdp, R_A
print(A_mdp)

{(0, 0): -0.1, (1, 0): 1.0, (2, 0): 0.0}
({0, 1, 2}, {0}, {(0, 0): 1, (1, 0): 2, (2, 0): 2}, {(0, 0): -0.1, (1, 0): 1.0, (2, 0): 0.0})


In [14]:
# Зададим награды за переходы 
R_B = {
    (0, 0): -1,
    (0, 1): 0,
    (0, 2): 1,
    
    (1, 0): 0,
    (1, 1): 0,
    (1, 2): 0,
    
    (2, 0): 0,
    (2, 1): 0,
    (2, 2): 0,
    
    (3, 0): 0,
    (3, 1): 0,
    (3, 2): 0
}
print(R_B)

B_mdp = *B_mdp, R_B
print(B_mdp)

{(0, 0): -1, (0, 1): 0, (0, 2): 1, (1, 0): 0, (1, 1): 0, (1, 2): 0, (2, 0): 0, (2, 1): 0, (2, 2): 0, (3, 0): 0, (3, 1): 0, (3, 2): 0}
({0, 1, 2, 3}, {0, 1, 2}, {(0, 0): 1, (0, 1): 2, (0, 2): 3, (1, 0): 1, (1, 1): 1, (1, 2): 1, (2, 0): 2, (2, 1): 2, (2, 2): 2, (3, 0): 3, (3, 1): 3, (3, 2): 3}, {(0, 0): -1, (0, 1): 0, (0, 2): 1, (1, 0): 0, (1, 1): 0, (1, 2): 0, (2, 0): 0, (2, 1): 0, (2, 2): 0, (3, 0): 0, (3, 1): 0, (3, 2): 0})


In [15]:
# Зададим награды за переходы 
# Предполагается, что если у агента много времени, то ему будет выгодно пойти сначала на убыль 1, чтобы в цикле состоянии набрать больше
R_C = {
    (0, 0): -5,
    (0, 1): 10,
    
    (1, 0): 20,
    (1, 1): 1,
    
    (2, 0): -5,
    (2, 1): 10,
    
    (3, 0): 0,
    (3, 1): 0
}
print(R_C)

C_mdp = *C_mdp, R_C
print(C_mdp)

{(0, 0): -5, (0, 1): 10, (1, 0): 20, (1, 1): 1, (2, 0): -5, (2, 1): 10, (3, 0): 0, (3, 1): 0}
({0, 1, 2, 3}, {0, 1}, {(0, 0): 1, (0, 1): 2, (1, 0): 1, (1, 1): 3, (2, 0): 3, (2, 1): 2, (3, 0): 3, (3, 1): 3}, {(0, 0): -5, (0, 1): 10, (1, 0): 20, (1, 1): 1, (2, 0): -5, (2, 1): 10, (3, 0): 0, (3, 1): 0})


In [16]:
class OptimalAgentB:
    def __init__(self, actions):
        self.actions = np.array(list(actions))

    def act(self, state):
        optimal_moves = {
            0: 2,
            1: 0,
            2: 0,
            3: 0,
        }
        return optimal_moves[state]

In [17]:
class OptimalAgentC:
    def __init__(self, actions):
        self.actions = np.array(list(actions))

    def act(self, state):
        optimal_moves = {
            0: 0, # с поправкой, что у агента много времени
            1: 0,
            2: 1,
            3: 0,
        }
        return optimal_moves[state]

## Interaction loop, trajectory, termination, truncation, episode

Общий цикл взаимодействия в рамках эпизода:
1. Инициализировать среду: $s \leftarrow \text{env.init()}$
2. Цикл:
    - выбрать действие: $a \leftarrow \pi(s)$
    - получить ответ от среды: $s, r, d \leftarrow \text{env.next(a)}$
    - если $d == \text{True}$, выйти из цикла

In [18]:
def run_episode(mdp):
    states, actions, T, R = mdp
    agent = Agent(actions)

    s = 0
    tau = []
    for _ in range(5):
        a = agent.act(s)
        s_next = T[(s, a)]
        r = R[(s, a)]

        tau.append((s, a, r))
        s = s_next

    return tau

run_episode(A_mdp)

[(0, np.int64(0), -0.1),
 (1, np.int64(0), 1.0),
 (2, np.int64(0), 0.0),
 (2, np.int64(0), 0.0),
 (2, np.int64(0), 0.0)]

In [19]:
run_episode(B_mdp)

[(0, np.int64(2), 1),
 (3, np.int64(2), 0),
 (3, np.int64(0), 0),
 (3, np.int64(1), 0),
 (3, np.int64(0), 0)]

In [20]:
run_episode(C_mdp)

[(0, np.int64(1), 10),
 (2, np.int64(1), 10),
 (2, np.int64(1), 10),
 (2, np.int64(0), -5),
 (3, np.int64(1), 0)]

In [21]:
def run_episode_with_custom_agent(mdp, agent):
    states, actions, T, R = mdp
    ag = agent(actions)

    s = 0
    tau = []
    for _ in range(5):
        a = ag.act(s)
        s_next = T[(s, a)]
        r = R[(s, a)]

        tau.append((s, a, r))
        s = s_next

    return tau

run_episode_with_custom_agent(A_mdp, OptimalAgentA)

[(0, 0, -0.1), (1, 0, 1.0), (2, 0, 0.0), (2, 0, 0.0), (2, 0, 0.0)]

In [22]:
run_episode_with_custom_agent(B_mdp, OptimalAgentB)

[(0, 2, 1), (3, 0, 0), (3, 0, 0), (3, 0, 0), (3, 0, 0)]

In [23]:
run_episode_with_custom_agent(C_mdp, OptimalAgentC)

[(0, 0, -5), (1, 0, 20), (1, 0, 20), (1, 0, 20), (1, 0, 20)]

Termination — означает окончание эпизода, когда достигнуто терминальное состояние. Является частью задания среды.

Truncation — означает окончание эпизода, когда достигнут лимит по числу шагов (=времени). Обычно является внешне заданным параметром для удобства обучения.

Пока не будем вводить truncation, но поддержим termination: расширьте определение среды информацией о терминальных состояниях для всех описанных ранее сред. Сгенерируйте по несколько случайных траекторий для каждой среды.

In [24]:
A_terms = set([2])
A_mdp = *A_mdp, A_terms

B_terms = set([1, 2, 3])
B_mdp = *B_mdp, B_terms

C_terms = set([3])
C_mdp = *C_mdp, C_terms

In [25]:
def gen_random_trajectory(mdp, truncation=10):
    states, actions, T, R, terms = mdp
    state = np.random.choice(list(states))
    trajectory = []
    for step in range(truncation):
        terminated = state in terms
        if terminated:
            trajectory.append({
                'step': step,
                'state': state,
                'terminated': terminated
            })
            break
        action = np.random.choice(list(actions))
        next_state = T[(state, action)] 
        reward = R[(state, action)]
        trajectory.append({
            'step': step,
            'state': state,
            'action': action,
            'reward': reward,
            'next_state': next_state,
            'terminated': terminated
        })
        state = next_state
    return trajectory


In [26]:
a_tr = gen_random_trajectory(A_mdp)
a_tr

[{'step': 0,
  'state': np.int64(0),
  'action': np.int64(0),
  'reward': -0.1,
  'next_state': 1,
  'terminated': False},
 {'step': 1,
  'state': 1,
  'action': np.int64(0),
  'reward': 1.0,
  'next_state': 2,
  'terminated': False},
 {'step': 2, 'state': 2, 'terminated': True}]

In [27]:
b_tr = gen_random_trajectory(B_mdp)
b_tr

[{'step': 0, 'state': np.int64(1), 'terminated': True}]

In [28]:
c_tr = gen_random_trajectory(C_mdp)
c_tr

[{'step': 0,
  'state': np.int64(2),
  'action': np.int64(1),
  'reward': 10,
  'next_state': 2,
  'terminated': False},
 {'step': 1,
  'state': 2,
  'action': np.int64(1),
  'reward': 10,
  'next_state': 2,
  'terminated': False},
 {'step': 2,
  'state': 2,
  'action': np.int64(1),
  'reward': 10,
  'next_state': 2,
  'terminated': False},
 {'step': 3,
  'state': 2,
  'action': np.int64(0),
  'reward': -5,
  'next_state': 3,
  'terminated': False},
 {'step': 4, 'state': 3, 'terminated': True}]

### Return, expected return

Наиболее важная метрика оценки качества работы агента: отдача.

Отдача: $G(s_t) = \sum_{i=t}^T r_i$

Обычно также вводят параметр $\gamma \in [0, 1]$, дисконтирующий будущие вознаграждения. А еще, тк отдача может меняться от запуска к запуску благодаря вероятностным процессам, нас интересует отдача в среднем — ожидаемая отдача:

$$\hat{G}(s_t) = \mathbb{E} [ \sum_{i=t}^T \gamma^{i-t} r_i ]$$

Именно ее и оптимизируют в RL.

Давайте научимся считать отдачу для состояний по траектории и считать среднюю отдачу.

In [29]:
def calculate_returns(trajectory, gamma=0.9):
    returns = {}

    for t, experience in enumerate(trajectory):
        state = experience['state']
        
        if 'reward' not in experience:
            continue
            
        G = 0
        for i in range(t, len(trajectory)):
            if 'reward' in trajectory[i]:
                reward = trajectory[i]['reward']
                G += (gamma ** (i - t)) * reward
        
        returns[state] = G

    return returns

def calculate_average_returns(mdp, num_episodes=1000, gamma=0.9, truncation=100):
    state_returns = {}
    
    # Генерируем много траекторий
    for _ in range(num_episodes):
        trajectory = gen_random_trajectory(mdp, truncation)
        
        # Вычисляем отдачу для этой траектории
        episode_returns = calculate_returns(trajectory, gamma)
        
        # Добавляем к общей статистике
        for state, return_value in episode_returns.items():
            if state not in state_returns:
                state_returns[state] = []
            state_returns[state].append(return_value)
    
    # Вычисляем среднюю отдачу для каждого состояния
    average_returns = {}
    for state, returns_list in state_returns.items():
        average_returns[state] = sum(returns_list) / len(returns_list)
    
    return average_returns

In [30]:
calculate_returns(a_tr)

{np.int64(0): 0.8, 1: 1.0}

In [31]:
calculate_returns(b_tr)

{}

In [32]:
calculate_returns(c_tr)

{np.int64(2): -5.0}

In [33]:
calculate_average_returns(A_mdp)

{np.int64(0): 0.8, 1: 1.0}

In [34]:
calculate_average_returns(B_mdp)

{np.int64(0): -0.06854838709677419}

In [35]:
calculate_average_returns(C_mdp)

{np.int64(1): 1.0, np.int64(0): 12.198287316963654, 2: -5.0}