In [1]:
import gym
import numpy as np
import random
import time
import matplotlib.pyplot as plt

from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")

In [2]:
env = gym.make("Taxi-v3")
n_states = env.observation_space.n
n_actions = env.action_space.n

##  Описание среды Taxi-v3

###  States

Состояние среды кодируется целым числом от `0` до `499` и включает:

- **Позицию такси**: 25 возможных клеток (сеткой 5×5)
- **Положение пассажира**:
  - `0`: у точки R (Red)
  - `1`: у точки G (Green)
  - `2`: у точки Y (Yellow)
  - `3`: у точки B (Blue)
  - `4`: внутри такси
- **Целевая точка назначения**:
  - `0`: R
  - `1`: G
  - `2`: Y
  - `3`: B

**Всего состояний:**  
`25 (такси) × 5 (пассажир) × 4 (цель) = 500`

**Формула кодирования состояния:**  
`state = ((taxi_row * 5 + taxi_col) * 5 + passenger_pos) * 4 + destination`

---

### Actions

| Код | Действие              |
|-----|-----------------------|
| `0` | Поехать вверх         |
| `1` | Поехать вниз          |
| `2` | Поехать влево         |
| `3` | Поехать вправо        |
| `4` | Забрать пассажира     |
| `5` | Высадить пассажира    |

---

### Rewards

| Событие                                         | Награда |
|-------------------------------------------------|---------|
| Любой обычный шаг                               | `-1`    |
| Неверный `pickup` или `dropoff`                 | `-10`   |
| Успешная высадка пассажира в точке назначения   | `+20`   |
| Завершение эпизода                              | —       |


In [3]:
def eval_policy(env, policy, episodes=10000):
    total_reward = 0
    successes = 0

    for _ in range(episodes):
        state, _ = env.reset()
        done = False

        while not done:
            action = int(policy[state])
            state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            total_reward += reward

        if reward == 20:  # Успешная доставка пассажира
            successes += 1

    avg_reward = total_reward / episodes
    return avg_reward, successes

In [4]:
def render_one_episode(env, policy, delay=0.5):
    state, _ = env.reset()
    done = False
    total_reward = 0

    while not done:
        env.render()  # показываем текущее состояние
        time.sleep(delay)

        action = int(policy[state])
        state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        total_reward += reward

    env.render()  # показать финальное состояние
    print("Завершено. Общая награда за эпизод:", total_reward)

In [5]:
def value_iteration(env, gamma=0.99, theta=1e-6):
    V = np.zeros(n_states)
    policy = np.zeros(n_states)

    while True:
        delta = 0.
        for s in range(n_states):
            v = V[s]
            q_sa = np.zeros(n_actions)
            for a in range(n_actions):
                for prob, next_s, reward, done in env.P[s][a]:
                    q_sa[a] += prob * (reward + gamma * V[next_s])

            V[s] = np.max(q_sa)
            delta = max(delta, abs(v - V[s]))
        
        if delta < theta:
            break
    
    # извлечение политики
    for s in range(n_states):
        q_sa = np.zeros(n_actions)
        for a in range(n_actions):
            for prob, next_s, reward, done in env.P[s][a]:
                q_sa[a] += prob * (reward + gamma * V[next_s])
        policy[s] = np.argmax(q_sa)

    return policy

In [6]:
def policy_iteration(env, gamma=0.99, theta=1e-3):
    V = np.zeros(n_states)
    policy = np.random.choice(n_actions, size=n_states)

    while True:
        delta = 0.
        for s in range(n_states):
            v = V[s]
            a = policy[s]
            V[s] = np.sum([prob * (reward + gamma * V[next_s]) for prob, next_s, reward, done in env.P[s][a]])
            delta = max(delta, abs(v - V[s]))
        if delta < theta:
            break
        
        policy_stable = True
        for s in range(n_states):
            old_policy = policy[s]
            q_sa = np.zeros(n_actions)
            for a in range(n_actions):
                q_sa[a] = np.sum([prob * (reward + gamma * V[next_s]) for prob, next_s, reward, done in env.P[s][a]])
            policy[s] = np.argmax(q_sa)

            if old_policy != policy[s]:
                policy_stable = False
        if policy_stable:
            break

    return policy

In [7]:
def monte_carlo(env, episodes=10000, gamma=0.99, epsilon=0.1):
    Q = np.zeros((n_states, n_actions))
    returns = {(s, a): [] for s in range(n_states) for a in range(n_actions)}

    for _ in tqdm(range(episodes), desc='Monte Carlo'):
        episode = []
        s, _ = env.reset()
        done = False
        while not done:
            if random.random() < epsilon:
                a = env.action_space.sample()
            else:
                a = np.argmax(Q[s])
            next_s, r, terminated, truncated, info = env.step(a)
            done = terminated or truncated
            episode.append((s, a, r))
            s = next_s
        
        G = 0
        visited = set()
        for s, a, r in reversed(episode):
            G = gamma * G + r
            if (s, a) not in visited:
                visited.add((s, a))
                returns[(s, a)].append(G)
                Q[s, a] = np.mean(returns[(s, a)])

    return np.argmax(Q, axis=1)

In [8]:
def epsilon_greedy(Q, state, epsilon):
    if random.random() < epsilon:
        return random.randint(0, n_actions - 1)
    else:
        return np.argmax(Q[state])

In [9]:
def q_learning(env, episodes=10000, alpha=0.1, gamma=0.99, epsilon=0.1):
    Q = np.zeros((n_states, n_actions))

    for _ in tqdm(range(episodes), desc='Q-learning'):
        s, _ = env.reset()
        done = False
        while not done:
            a = epsilon_greedy(Q, s, epsilon)
            next_s, r, terminated, truncated, info = env.step(a)
            done = r == 20
            Q[s, a] += alpha * (r + gamma * np.max(Q[next_s]) - Q[s, a])
            s = next_s

    return np.argmax(Q, axis=1)

In [10]:
def sarsa(env, episodes=10000, alpha=0.1, gamma=0.99, epsilon=0.1):
    Q = np.zeros((n_states, n_actions))

    for _ in tqdm(range(episodes), desc='SARSA'):
        s, _ = env.reset()
        a = epsilon_greedy(Q, s, epsilon)
        done = False

        while not done:
            next_s, r, terminated, truncated, info = env.step(a)
            next_a = epsilon_greedy(Q, next_s, epsilon)
            done = r == 20
            Q[s, a] += alpha * (r + gamma * Q[next_s, next_a] - Q[s, a])
            s, a = next_s, next_a

    return np.argmax(Q, axis=1)

In [11]:
policies = {
    "Value Iteration": value_iteration(env),
    "Policy Iteration": policy_iteration(env),
    "Monte Carlo Policy Iteration": monte_carlo(env),
    "Q-learning": q_learning(env),
    "SARSA": sarsa(env)
}

results = {}
for name, policy in policies.items():
    avg_reward, wins = eval_policy(env, policy)
    results[name] = (avg_reward, wins)
    print(f"{name}: Средняя награда = {avg_reward:.2f}, Побед = {wins}/10000")

Monte Carlo: 100%|██████████| 10000/10000 [00:23<00:00, 425.29it/s]
Q-learning: 100%|██████████| 10000/10000 [00:04<00:00, 2476.53it/s]
SARSA: 100%|██████████| 10000/10000 [00:03<00:00, 3179.87it/s]


Value Iteration: Средняя награда = 7.92, Побед = 10000/10000
Policy Iteration: Средняя награда = 7.95, Побед = 10000/10000
Monte Carlo Policy Iteration: Средняя награда = -598.17, Побед = 498/10000
Q-learning: Средняя награда = 7.90, Побед = 10000/10000
SARSA: Средняя награда = 7.90, Побед = 10000/10000


In [12]:
optimal_policy = sarsa(env)
avg_reward, success_count = eval_policy(env, optimal_policy)

print(f"Средняя награда за эпизод: {avg_reward:.2f}")
print(f"Успешных высадок пассажиров: {success_count} / 10000")

SARSA: 100%|██████████| 10000/10000 [00:02<00:00, 3620.44it/s]


Средняя награда за эпизод: 7.93
Успешных высадок пассажиров: 10000 / 10000


In [13]:
render_env = gym.make("Taxi-v3", render_mode="human")
render_one_episode(render_env, optimal_policy)

Завершено. Общая награда за эпизод: 7


## Вывод
В целом все алгоритмы кроме Monte Carlo справляются с задачей отлично