# Обучение агента с использованием генетических алгоритмов в нескольких средах OpenAI Gym

| Среда | Наблюдение (obs) | Действия (act) | Размер весов | Задача |
| --- | :---: | --- | --- | --- |
| CartPole-v1 |4|2 (Discrete)|(4, 2)| Балансир|
| LunarLander-v3|8|4 (Discrete)|(8, 4)| Посадка ракеты|
| BipedalWalker-v3|24|4 (Box)|(24, 4)| Шагание|

Рассмотрим следующую стратегию обучения для этих сред:


### 1.      CartPole и LunarLander

Обе среды используют дискретные действия, что делает их схожими с точки зрения структуры агента. Применим следующий подход:

-   **Предобучение на CartPole (10 эпох)** – быстрая начальная настройка политики.
-   **Перенос весов (transfer learning) в LunarLander** – веса из 1-го и 2-го столбцов CartPole (отвечающих за действия "влево/вправо") копируются во 2-й и 4-й столбцы LunarLander, так как в обеих средах эти действия имеют схожую семантику.
-   **Дообучение на LunarLander (90 эпох)** – с 20% вероятностью после каждой эпохи веса частично обновляются из CartPole, что помогает сохранить устойчивость обучения.

### 2.   BipedalWalker

BipedalWalker требует непрерывного управления (регулирование усилий на суставы), что делает его принципиально другим типом задачи. Применим следующий подход:

-   **Отдельный цикл обучения** – перенос весов из дискретных сред не применяется. Это позволит получить лучшую сходимость, поскольку стратегии для ходьбы не имеют аналогов в простых дискретных средах
-   **Обучение на 100 эпохах с ранней остановкой** - при получении общей награды в 300 и более очков (критерий взят из задачи среды Gym)

### Генетический алгоритм



1.   Создаётся начальная популяция из `POP_SIZE` случайных особей

2.   Для каждой особи вычисляется функция приспособленности, определяющая эффективность особи

3.   Выбираются `NSURV` наиболее приспособленных особей для размножения

4.   Формируется новое поколение особей:
    - `NSURV` особей переходят в новую популяцию без изменений
    - Популяция дополняется до `POP_SIZE` особями потомками (скрещивание и мутирование с шансом `MUTATION_RATE`)

5.  Пункты 2-4 повторяются пока не настанет условие завершения (по кол-ву эпох либо по достижению необходимого результата)


## install

In [None]:
!pip install swig
!pip install gymnasium[box2d]

Collecting swig
  Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.5 kB)
Downloading swig-4.3.0-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.3.0
Collecting box2d-py==2.3.5 (from gymnasium[box2d])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: box2d-py
  Building wheel for box2d-py (setup.py) ... [?25l[?25hdone
  Created wheel for box2d-py: filename=box2d_py-2.3.5-cp311-cp311-linux_x86_64.whl size=2379494 sha256=ac70dd75aa0b7a88da094d4d0ee2c4bdfb88980c31b417972989d0e6681425d2
  Stored in directory: /root/.cache/pip/wheels/a

In [None]:
!pip install Box2D

Collecting Box2D
  Downloading Box2D-2.3.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (573 bytes)
Downloading Box2D-2.3.10-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.7/3.7 MB[0m [31m27.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: Box2D
Successfully installed Box2D-2.3.10


## usefull functions

In [None]:
from moviepy.editor import ImageSequenceClip
from IPython.display import HTML
from base64 import b64encode

def save_frames_as_video(frames, filename="video.mp4", fps=30):
    clip = ImageSequenceClip(frames, fps=fps)
    clip.write_videofile(filename, fps=fps)


def play_video(video_path, width=500, height=500):
    """Plays a video in a Colab notebook.

    Args:
        video_path: The path to the video file.
        width: The width of the video player.
        height: The height of the video player.
    """

    # Read and encode the video data
    with open(video_path, "rb") as video_file:
        video_data = b64encode(video_file.read()).decode()

    # Create the HTML video tag
    video_html = f"""
        <video width="{width}" height="{height}" controls>
            <source src="data:video/mp4;base64,{video_data}" type="video/mp4">
        </video>
    """

    # Display the video
    display(HTML(video_html))

  if event.key is 'enter':



## BipedalWalker

In [None]:
import gymnasium as gym
import numpy as np

In [None]:
# Инициализация среды
env_bipedalwalker = gym.make('BipedalWalker-v3', render_mode='rgb_array')

In [None]:
# Параметры ГА
POP_SIZE = 100
NSURV = 20
EPOCHS = 100
MUTATION_RATE = 0.1

# Размерности наблюдений и действий
OBS_BIPEDAL = env_bipedalwalker.observation_space.shape[0]
ACT_BIPEDAL = env_bipedalwalker.action_space.shape[0]

# Явно зададим тип особи
Individual = np.ndarray

# Инициализация случайной особи
def create_individual() -> Individual:
    return np.random.rand(OBS_BIPEDAL, ACT_BIPEDAL)

# Функция приспособленности
def fitness(individual: Individual) -> float:
    obs, _ = env_bipedalwalker.reset()
    total_reward = 0
    while True:
        action = np.tanh(np.dot(obs, individual))
        obs, reward, done, truncated, _ = env_bipedalwalker.step(action)
        total_reward += reward
        if done or truncated:
            break
    return total_reward

# Селекция
def selection(population: list[Individual], fitness_scores: list[float]) -> list[Individual]:
    paired = list(zip(fitness_scores, population))
    paired_sorted = sorted(paired, key=lambda x: x[0], reverse=True)
    return [ind for _, ind in paired_sorted[:NSURV]]

# Кроссинговер
def crossover(parent1: Individual, parent2: Individual) -> Individual:
    crossover_point = np.random.randint(1, parent1.shape[0] - 1)
    return np.concatenate([parent1[:crossover_point], parent2[crossover_point:]])

# Мутация
def mutate(individual: Individual) -> Individual:
    mutated = individual.copy()
    for i in range(mutated.shape[0]):
        for j in range(mutated.shape[1]):
            if np.random.random() < MUTATION_RATE:
                mutated[i, j] = np.random.uniform(-1, 1)
    return mutated

# Основной цикл ГА
population = [create_individual() for _ in range(POP_SIZE)]
for epoch in range(EPOCHS):
    fitness_scores = [fitness(ind) for ind in population]

    # Проверка на приспособленность
    if max(fitness_scores) >= 300:
        print(f"Обучение завершено! Эпоха {epoch}, Лучшая приспособленность: {max(fitness_scores):.1f}")
        break

    survivors = selection(population, fitness_scores)
    new_population = survivors.copy()
    while len(new_population) < POP_SIZE:
        parent_indices = np.random.choice(len(survivors), 2, replace=False)
        parent1, parent2 = survivors[parent_indices[0]], survivors[parent_indices[1]]
        child = crossover(parent1, parent2)
        child = mutate(child)
        new_population.append(child)
    population = new_population
    print(f"Эпоха {epoch}, Лучшая приспособленность: {max(fitness_scores):.1f}")

# Лучший агент
best_individual = max(population, key=fitness)
print("Обучение завершено")
print(best_individual)

Эпоха 0, Лучшая приспособленность: -113.4
Эпоха 1, Лучшая приспособленность: -113.0
Эпоха 2, Лучшая приспособленность: -111.6
Эпоха 3, Лучшая приспособленность: -110.0
Эпоха 4, Лучшая приспособленность: -101.6
Эпоха 5, Лучшая приспособленность: -69.6
Эпоха 6, Лучшая приспособленность: -104.5
Эпоха 7, Лучшая приспособленность: -68.5
Эпоха 8, Лучшая приспособленность: -90.6
Эпоха 9, Лучшая приспособленность: 49.0
Эпоха 10, Лучшая приспособленность: 41.6
Эпоха 11, Лучшая приспособленность: 217.7
Эпоха 12, Лучшая приспособленность: 36.7
Эпоха 13, Лучшая приспособленность: 150.8
Эпоха 14, Лучшая приспособленность: 56.4
Эпоха 15, Лучшая приспособленность: 34.0
Эпоха 16, Лучшая приспособленность: 55.1
Эпоха 17, Лучшая приспособленность: 39.4
Эпоха 18, Лучшая приспособленность: 84.1
Эпоха 19, Лучшая приспособленность: 52.7
Эпоха 20, Лучшая приспособленность: 63.3
Эпоха 21, Лучшая приспособленность: 53.5
Эпоха 22, Лучшая приспособленность: 149.4
Эпоха 23, Лучшая приспособленность: 84.6
Эпоха 24

### demo

In [None]:
# Прогон эпизода для BipedalWalker с сохранением кадров
frames_bipedalwalker = []
rewards_bipedalwalker = []
obs = env_bipedalwalker.reset()[0]

while True:
    action = np.tanh(np.dot(obs, best_individual))
    obs, reward, done, truncated, _ = env_bipedalwalker.step(action)
    rewards_bipedalwalker.append(reward)
    frames_bipedalwalker.append(env_bipedalwalker.render())
    if done or truncated:
        break
env_bipedalwalker.close()

save_frames_as_video(frames_bipedalwalker, filename="bipedalwalker_video.mp4")
play_video("bipedalwalker_video.mp4")

Moviepy - Building video bipedalwalker_video.mp4.
Moviepy - Writing video bipedalwalker_video.mp4





Moviepy - Done !
Moviepy - video ready bipedalwalker_video.mp4


## CartPole + LunarLander

In [None]:
import gymnasium as gym
import numpy as np
import random
from typing import Dict

In [None]:
# Инициализация сред
env_cartpole = gym.make('CartPole-v1', render_mode='rgb_array')
env_lunarlander = gym.make('LunarLander-v3', render_mode='rgb_array')

In [None]:
# Параметры ГА
POP_SIZE = 100          # Размер популяции
NSURV = 20              # Число выживших
EPOCHS = 100             # Число эпох
MUTATION_RATE = 0.1     # Вероятность мутации

# Размерности наблюдений и действий
OBS_CARTPOLE = env_cartpole.observation_space.shape[0]
ACT_CARTPOLE = env_cartpole.action_space.n
OBS_LUNAR = env_lunarlander.observation_space.shape[0]
ACT_LUNAR = env_lunarlander.action_space.n

# Тип особи: словарь с весами для каждой среды
Individual = Dict[str, np.ndarray]

# Инициализация случайной особи
def create_individual() -> Individual:
    return {
        'cartpole': np.random.rand(OBS_CARTPOLE, ACT_CARTPOLE),
        'lunarlander': np.random.rand(OBS_LUNAR, ACT_LUNAR)
    }

# Функция приспособленности (оценка в одной или обеих средах в зависимости от эпохи)
def fitness(individual: Individual, epoch: int) -> float:
    total_reward_cp = 0
    total_reward_ll = 0

    # Всегда оцениваем в CartPole
    obs = env_cartpole.reset()[0]
    while True:
        action = np.argmax(np.dot(obs, individual['cartpole']))
        obs, reward, done, truncated, _ = env_cartpole.step(action)
        total_reward_cp += reward
        if done or truncated:
            break

    # Оцениваем в LunarLander только после 10 эпох
    if epoch >= 10:
        obs = env_lunarlander.reset()[0]
        while True:
            action = np.argmax(np.dot(obs, individual['lunarlander']))
            obs, reward, done, truncated, _ = env_lunarlander.step(action)
            total_reward_ll += reward
            if done or truncated:
                break

    # До 10 эпохи используем только CartPole, после - сумму
    return total_reward_cp if epoch < 10 else (total_reward_cp + total_reward_ll)

# Селекция (отбор лучших)
def selection(population: list[Individual], fitness_scores: list[float]) -> list[Individual]:
    paired = list(zip(fitness_scores, population))
    paired_sorted = sorted(paired, key=lambda x: x[0], reverse=True)
    return [ind for _, ind in paired_sorted[:NSURV]]

# Кроссинговер (одноточечный)
def crossover(parent1: Individual, parent2: Individual) -> Individual:
    child = {}
    for env in ['cartpole', 'lunarlander']:
        # Для каждой среды скрещиваем отдельно
        crossover_point = random.randint(1, parent1[env].shape[0] - 1)
        child[env] = np.concatenate([
            parent1[env][:crossover_point],
            parent2[env][crossover_point:]
        ])
    return child

# Мутация
def mutate(individual: Individual) -> Individual:
    mutated = {}
    for env in ['cartpole', 'lunarlander']:
        mutated[env] = individual[env].copy()
        for i in range(mutated[env].shape[0]):
            for j in range(mutated[env].shape[1]):
                if random.random() < MUTATION_RATE:
                    mutated[env][i, j] = random.random()
    return mutated

# Перенос весов с CartPole на LunarLander (первые 4 строки)
def transfer_weights(individual: Individual) -> Individual:
    individual['lunarlander'][:4, 1] = individual['cartpole'][:4, 0]  # Первый столбец cartpole -> второй столбец lunarlander
    individual['lunarlander'][:4, 3] = individual['cartpole'][:4, 1]  # Второй столбец cartpole -> четвертый столбец lunarlander
    return individual

# Основной цикл ГА
population = [create_individual() for _ in range(POP_SIZE)]
for epoch in range(EPOCHS):
    # Оценка приспособленности с учётом текущей эпохи
    fitness_scores = [fitness(ind, epoch) for ind in population]

    # Отбор лучших
    survivors = selection(population, fitness_scores)

    # Создание потомков
    new_population = survivors.copy()
    while len(new_population) < POP_SIZE:
        parent1, parent2 = random.sample(survivors, 2)
        child = crossover(parent1, parent2)
        child = mutate(child)

        # Перенос весов после 10 эпох для второго агента
        if epoch == 10:
            child = transfer_weights(child)

        # Перенос весов после 11 эпох с вероятностью 20%
        if epoch > 10 and random.random() < 0.2:
            child = transfer_weights(child)

        new_population.append(child)

    population = new_population
    print(f"Эпоха {epoch}, Приспособленность: лучшая: {max(fitness_scores):.1f}, средняя: {np.average(fitness_scores):.1f}")

# Вывод лучшего агента
best_individual = max(population, key=lambda ind: fitness(ind, EPOCHS))
print("Обучение завершено! Агент:")
print(best_individual)

Эпоха 0, Приспособленность: лучшая: 500.0, средняя: 50.3
Эпоха 1, Приспособленность: лучшая: 500.0, средняя: 141.1
Эпоха 2, Приспособленность: лучшая: 500.0, средняя: 252.7
Эпоха 3, Приспособленность: лучшая: 500.0, средняя: 289.2
Эпоха 4, Приспособленность: лучшая: 500.0, средняя: 335.7
Эпоха 5, Приспособленность: лучшая: 500.0, средняя: 326.7
Эпоха 6, Приспособленность: лучшая: 500.0, средняя: 303.0
Эпоха 7, Приспособленность: лучшая: 500.0, средняя: 371.3
Эпоха 8, Приспособленность: лучшая: 500.0, средняя: 335.3
Эпоха 9, Приспособленность: лучшая: 500.0, средняя: 384.5
Эпоха 10, Приспособленность: лучшая: 442.4, средняя: -173.9
Эпоха 11, Приспособленность: лучшая: 563.0, средняя: 35.9
Эпоха 12, Приспособленность: лучшая: 505.4, средняя: 80.7
Эпоха 13, Приспособленность: лучшая: 492.5, средняя: 124.3
Эпоха 14, Приспособленность: лучшая: 514.1, средняя: 215.9
Эпоха 15, Приспособленность: лучшая: 472.4, средняя: 175.1
Эпоха 16, Приспособленность: лучшая: 496.0, средняя: 166.5
Эпоха 17,

### demo

In [None]:
# Прогон эпизода для CartPole с сохранением кадров
frames_cartpole = []
rewards_cartpole = []
obs = env_cartpole.reset()[0]

while True:
    action = np.argmax(np.dot(obs, best_individual['cartpole']))
    obs, reward, done, truncated, _ = env_cartpole.step(action)
    rewards_cartpole.append(reward)
    frames_cartpole.append(env_cartpole.render())
    if done or truncated:
        break
env_cartpole.close()

save_frames_as_video(frames_cartpole, filename="cartpole_video.mp4")
play_video("cartpole_video.mp4")

print("Общая награда:", sum(rewards_cartpole))

Moviepy - Building video cartpole_video.mp4.
Moviepy - Writing video cartpole_video.mp4





Moviepy - Done !
Moviepy - video ready cartpole_video.mp4


Общая награда: 500.0


In [None]:
# Прогон эпизода для LunarLander с сохранением кадров
frames_lunarlander = []
rewards_lunarlander = []
obs = env_lunarlander.reset()[0]

while True:
    action = np.argmax(np.dot(obs, best_individual['lunarlander']))
    obs, reward, done, truncated, _ = env_lunarlander.step(action)
    rewards_lunarlander.append(reward)
    frames_lunarlander.append(env_lunarlander.render())
    if done or truncated:
        break
env_lunarlander.close()

save_frames_as_video(frames_lunarlander, filename="lunarlander_video.mp4")
play_video("lunarlander_video.mp4")

print("Общая награда: %.2f" % sum(rewards_lunarlander))

Moviepy - Building video lunarlander_video.mp4.
Moviepy - Writing video lunarlander_video.mp4





Moviepy - Done !
Moviepy - video ready lunarlander_video.mp4


Общая награда: 236.05
