# RL basics

Термины и понятия:

- агент/среда
- наблюдение $o$ / состояние $s$
- действие $a$, стратегия $\pi: \pi(s) \rightarrow a$ функция перехода $T: T(s, a) \rightarrow s'$
- вознаграждение $r$, ф-я вознаграждений $R: R(s, a) \rightarrow r$
- цикл взаимодействия, траектория $\tau: (s_0, a_0, r_0, s_1, a_1, r_1, ..., s_T, a_T, r_T)$, эпизод
- отдача $G$, подсчет отдачи, средняя[/ожидаемая] отдача $\mathbb{E}[G]$

In [220]:
try:
    import google.colab
    COLAB = True
except ModuleNotFoundError:
    COLAB = False
    pass

if COLAB:
    !pip -q install "gymnasium[classic-control, atari, accept-rom-license]"
    !pip -q install piglet
    !pip -q install imageio_ffmpeg
    !pip -q install moviepy==1.0.3

In [221]:
import glob
import io
import base64
import gymnasium as gym
import numpy as np
from IPython import display as ipythondisplay
from IPython.display import HTML
import matplotlib.pyplot as plt
%matplotlib inline

## Agent, environment

<img src=https://gymnasium.farama.org/_images/lunar_lander.gif caption="lunar lander" width="150" height="50"><img src=https://gymnasium.farama.org/_images/mountain_car.gif caption="mountain car" width="150" height="50">
<img src=https://gymnasium.farama.org/_images/cliff_walking.gif caption="cliff walking" width="300" height="50">
<img src=https://ale.farama.org/_images/montezuma_revenge.gif caption="montezuma revenge" width="150" height="100">
<img src=https://github.com/danijar/crafter/raw/main/media/video.gif caption="crafter" width="150" height="100">
<img src=https://camo.githubusercontent.com/6df2ca438d8fe8aa7a132b859315147818c54af608f8609320c3c20e938acf48/68747470733a2f2f6d656469612e67697068792e636f6d2f6d656469612f344e78376759694d394e44724d724d616f372f67697068792e676966 caption="malmo minecraft" width="150" height="100">
<img src=https://images.ctfassets.net/kftzwdyauwt9/e0c0947f-1a44-4528-4a41450a9f0a/2d0e85871d58d02dbe01b2469d693d4a/table-03.gif caption="roboschool" width="150" height="100">
<img src=https://raw.githubusercontent.com/Tviskaron/mipt/master/2019/RL/02/mdp.png caption="Марковский процесс принятия решений" width="150" height="100">
<img src=https://minigrid.farama.org/_images/DoorKeyEnv.gif caption="minigrid" width="120" height="120">

## Observation, state

TODO:
- добавить примеры наблюдений/состояний (числа, векторы, картинки)
- интуитивное объяснение различия, положить пока, что наблюдение = состояние
- пространство состояний


В каждый момент времени среда имеет некоторое внутреннее состояние. Здесь слово "состояние" я употребил скорее в интуитивном понимании, чтобы обозначить, что среда изменчива (иначе какой смысл с ней взаимодействовать, если ничего не меняется). В обучении с подкреплением под термином состояние $s$ (или $s_t$, где $t$ — текущее время) подразумевают либо абстрактно информацию о "состоянии" среды, либо ее явное представление в виде данных, достаточные для полного описания "состояния". *NB: Здесь можно провести аналогию с компьютерными играми — файл сохранения игры как раз содержит информацию о "состоянии" мира игры, чтобы в будущем можно было продолжить с текущей точки, так что данные этого файла в целом можно с некоторой натяжкой считать состоянием (с натяжкой, потому что редко когда в сложных играх файлы сохранения содержат прямо вот всю информацию, так что после перезагрузки вы получите не совсем точную копию). При этом обычно подразумевается, что состояние не содержит в себе ничего лишнего, то есть это **минимальный** набор информации.*

Наблюдением $o$ называют то, что агент "видит" о текущем состоянии среды. Это не обязательно зрение, а вообще вся доступная ему информация (условно, со всех его органов чувств).

В общем случае наблюдение: кортеж/словарь многомерных векторов чисел.

In [222]:
print(gym.make("CartPole-v0").reset()[0].shape)
print(gym.make("MountainCar-v0").reset()[0].shape)

(4,)
(2,)


## Action, policy, transition function

Рассмотрим следующие MDP:

- A: <img src=https://i.ibb.co/mrCMVZLQ/mdp-a.png caption="A" width="400" height="100">
- B: <img src=https://i.ibb.co/GQ2tVtjC/mdp-b.png caption="B" width="400" height="100">
- C: <img src=https://i.ibb.co/Jj9LYHjP/mdp-c.png caption="C" width="400" height="100">

Links to all:
[A](https://i.ibb.co/mrCMVZLQ/mdp-a.png)
[B](https://i.ibb.co/GQ2tVtjC/mdp-b.png)
[C](https://i.ibb.co/Jj9LYHjP/mdp-c.png)
[D](https://i.ibb.co/Y47Mr83b/mdp-d.png)
[E](https://i.ibb.co/Kjt1Xhmf/mdp-e.png)

Давайте явно запишем пространства состояний $S$ и действий $A$, а также функцию перехода $T$ среды.

In [223]:
# Custom rewards for A, B, C

R_a = {
    (0, 0): 1.0,
    (1, 0): 1.0,
    (2, -1): 0.0
}

R_b = {
    (0, 0): 1.0,
    (0, 1): 2.0,
    (0, 2): 1.0,
    (1, -1): 0.0,
    (2, -1): 0.0,
    (3, -1): 0.0
}

R_c = {
    (0, 0): 1.0,
    (0, 1): 0.5,
    (1, 0): -0.1,
    (1, 1): 2.0,
    (0, 1): 0.1,
    (2, 0): 0.5,
    (2, 1): 0.0,
    (3, -1): 0.0
}

In [224]:
# A
states = set(range(3))
actions = set(range(-1, 1))

print(f'{states=} | {actions=}')

T_a = {
    (0, 0): 1,
    (1, 0): 2,
    (2, -1): 2
}
print(f'Transition function {T_a=}')

A_mdp = states, actions, T_a, [2], R_a

# B
states = set(range(4))
actions = set(range(-1, 3))

print(f'{states=} | {actions=}')

T_b = {
    (0, 0): 1,
    (0, 1): 2,
    (0, 2): 3,
    (1, -1): 1,
    (2, -1): 2,
    (3, -1): 3
}
print(f'Transition function {T_b=}')

B_mdp = states, actions, T_b, [1, 2, 3], R_b

# C
states = set(range(4))
actions = set(range(-1, 2))

print(f'{states=} | {actions=}')

T_c = {
    (0, 0): 1,
    (1, 0): 1,
    (1, 1): 3,
    (0, 1): 2,
    (2, 1): 2,
    (2, 0): 3,
    (3, -1): 3
}
print(f'Transition function {T_c=}')

C_mdp = states, actions, T_c, [3], R_c

states={0, 1, 2} | actions={0, -1}
Transition function T_a={(0, 0): 1, (1, 0): 2, (2, -1): 2}
states={0, 1, 2, 3} | actions={0, 1, 2, -1}
Transition function T_b={(0, 0): 1, (0, 1): 2, (0, 2): 3, (1, -1): 1, (2, -1): 2, (3, -1): 3}
states={0, 1, 2, 3} | actions={0, 1, -1}
Transition function T_c={(0, 0): 1, (1, 0): 1, (1, 1): 3, (0, 1): 2, (2, 1): 2, (2, 0): 3, (3, -1): 3}


Попробуйте записать функцию перехода в матричном виде:

In [225]:
# A
m_a = np.zeros((3, 3))
for (state, action), next_state in T_a.items():
    m_a[state][next_state] = 1.0
for row in m_a:
    row /= row.sum() if row.sum() != 0 else 1

print(m_a)
print()

# B
m_b = np.zeros((4, 4))
for (state, action), next_state in T_b.items():
    m_b[state][next_state] = 1.0
for row in m_b:
    row /= row.sum() if row.sum() != 0 else 1

print(m_b)
print()

# C
m_c = np.zeros((4, 4))
for (state, action), next_state in T_c.items():
    m_c[state][next_state] = 1.0
for row in m_c:
    row /= row.sum() if row.sum() != 0 else 1

print(m_c)
print()

[[0. 1. 0.]
 [0. 0. 1.]
 [0. 0. 1.]]

[[0.         0.33333333 0.33333333 0.33333333]
 [0.         1.         0.         0.        ]
 [0.         0.         1.         0.        ]
 [0.         0.         0.         1.        ]]

[[0.  0.5 0.5 0. ]
 [0.  0.5 0.  0.5]
 [0.  0.  0.5 0.5]
 [0.  0.  0.  1. ]]



Как получить вероятность нахождения агента в состоянии (1) через N шагов? Что происходит с вероятностями нахождения в состояниях при $N \rightarrow \infty$

In [226]:
def f(v0, m_t, n):
    m_t = np.linalg.matrix_power(m_t, n)
    return np.round(v0 @ m_t, 3)

print(f([1, 0, 0], m_a, 1000)) # A
print(f([1, 0, 0, 0], m_b, 1000)) # B
print(f([1, 0, 0, 0], m_c, 1000)) # C

[0. 0. 1.]
[0.    0.333 0.333 0.333]
[0. 0. 0. 1.]


Давайте попробуем задать двух агентов: случайного и оптимального (для каждой среды свой).

In [227]:
class Agent:
    def __init__(self, actions, R_, terminal_states):
        self.rng = np.random.default_rng()
        self.R = R_
        self.s_acts = {}
        for (s, a), sn in self.R.items():
            if s not in self.s_acts:
                self.s_acts[s] = [a]
            else:
                self.s_acts[s].append(a)
        self.terminal_states = terminal_states

    def act(self, state):
        if state in self.terminal_states:
            return -1
        return self.rng.choice(self.s_acts[state])
        

В качестве дополнения, запишите стратегию агента

In [228]:
class Agent_: # Greedy policy
    def __init__(self, R_, terminal_states):
        self.R = R_
        self.terminal_states = terminal_states

    def act(self, state, terminal_states):
        if state in self.terminal_states:
            return -1
        rs = []
        actions = []
        for item in self.R.items():
            if item[0][0] == state:
                rs.append(item[1])
                actions.append(item[0][1])
        return actions[np.argmax(rs)]

## Reward, reward function

Теперь добавим произвольную функцию вознаграждения. Например, для A:

In [229]:
R = {
    (0, 0): -0.1,
    (1, 0): 1.0,
    (2, -1): 0.0
}
print(R)

# A_mdp = *A_mdp, R
print(A_mdp)

{(0, 0): -0.1, (1, 0): 1.0, (2, -1): 0.0}
({0, 1, 2}, {0, -1}, {(0, 0): 1, (1, 0): 2, (2, -1): 2}, [2], {(0, 0): 1.0, (1, 0): 1.0, (2, -1): 0.0})


## Interaction loop, trajectory, termination, truncation, episode

Общий цикл взаимодействия в рамках эпизода:
1. Инициализировать среду: $s \leftarrow \text{env.init()}$
2. Цикл:
    - выбрать действие: $a \leftarrow \pi(s)$
    - получить ответ от среды: $s, r, d \leftarrow \text{env.next(a)}$
    - если $d == \text{True}$, выйти из цикла

In [230]:
def run_episode(mdp, f=True, s = 0):
    states, actions, T, terminal_nodes, R = mdp
    agent = Agent(actions, R, terminal_nodes)

    tau = []
    while True:
        a = agent.act(s)
        if f: print(s)
        s_next = T[(s, a)]
        r = R[(s, a)]

        tau.append((s, a, r))
        s = s_next

        if s_next in terminal_nodes:
            if f: print(f"{s_next} Finish")
            break

    return tau

print("------------")
run_episode(A_mdp)
print()
run_episode(B_mdp)
print()
run_episode(C_mdp)
print()

def run_episode_(mdp, f=True, s = 0):
    states, actions, T, terminal_nodes, R = mdp
    agent = Agent_(R, terminal_nodes)

    tau = []
    while True:
        a = agent.act(s)
        if f: print(s)
        s_next = T[(s, a)]
        r = R[(s, a)]

        tau.append((s, a, r))
        s = s_next

        if s_next in terminal_nodes:
            if f: print(f"{s_next} Finish")
            break

    return tau

print("------------")
run_episode(A_mdp)
print()
run_episode(B_mdp)
print()
run_episode(C_mdp)
print()

------------
0
1
2 Finish

0
1 Finish

0
2
3 Finish

------------
0
1
2 Finish

0
3 Finish

0
2
2
2
3 Finish



Termination — означает окончание эпизода, когда достигнуто терминальное состояние. Является частью задания среды.

Truncation — означает окончание эпизода, когда достигнут лимит по числу шагов (=времени). Обычно является внешне заданным параметром для удобства обучения.

Пока не будем вводить truncation, но поддержим termination: расширьте определение среды информацией о терминальных состояниях для всех описанных ранее сред. Сгенерируйте по несколько случайных траекторий для каждой среды.

### Return, expected return

Наиболее важная метрика оценки качества работы агента: отдача.

Отдача: $G(s_t) = \sum_{i=t}^T r_i$

Обычно также вводят параметр $\gamma \in [0, 1]$, дисконтирующий будущие вознаграждения. А еще, тк отдача может меняться от запуска к запуску благодаря вероятностным процессам, нас интересует отдача в среднем — ожидаемая отдача:

$$\hat{G}(s_t) = \mathbb{E} [ \sum_{i=t}^T \gamma^{i-t} r_i ]$$

Именно ее и оптимизируют в RL.

Давайте научимся считать отдачу для состояний по траектории и считать среднюю отдачу.

In [231]:
def G(tau, gamma):
    r = 0.
    discount = 1.0
    for el in tau:
        r += discount * el[2]
        discount *= gamma

    return r

def mean_G(mdp):
    states, _, _, _, _ = mdp
    means = []
    for s in states:
        r = 0.0
        for _ in range(100):
            tau = run_episode(mdp, False, s)
            r += G(tau, 0.99) / 100
        means.append(r)
    return means

# Agent with optimal policy

print(R_a)
print("meanG for all states: ")
print(mean_G(A_mdp))

print()

print(B_mdp)
print("meanG for all states: ")
print(mean_G(B_mdp))

print()

print(R_c)
print("meanG for all states: ")
print(mean_G(C_mdp))

{(0, 0): 1.0, (1, 0): 1.0, (2, -1): 0.0}
meanG for all states: 
[1.9900000000000024, 1.0000000000000007, 0.0]

({0, 1, 2, 3}, {0, 1, 2, -1}, {(0, 0): 1, (0, 1): 2, (0, 2): 3, (1, -1): 1, (2, -1): 2, (3, -1): 3}, [1, 2, 3], {(0, 0): 1.0, (0, 1): 2.0, (0, 2): 1.0, (1, -1): 0.0, (2, -1): 0.0, (3, -1): 0.0})
meanG for all states: 
[1.2300000000000009, 0.0, 0.0, 0.0]

{(0, 0): 1.0, (0, 1): 0.1, (1, 0): -0.1, (1, 1): 2.0, (2, 0): 0.5, (2, 1): 0.0, (3, -1): 0.0}
meanG for all states: 
[1.8571937929941829, 1.9001623717193499, 0.4945595128400151, 0.0]
