# Лабораторная работа 1

In [None]:
import torch#для работы с тензорами, создания нейронных сетей и обучения моделей.
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
import os
import pickle
from torch.distributions import Categorical #для выбора действий на основе вероятностей, возвращаемых моделью



## Модель

In [None]:

# Определяем архитектуру нейронной сети
def create_model():
    model1 = nn.Sequential(
        nn.Linear(18, 64),  # 3 строки * 6 колонок = 18
        nn.ReLU(),
        nn.Linear(64, 32),
        nn.ReLU(),
        nn.Linear(32, 6),  # 6 возможных ходов (колонки)
        nn.Softmax()
    )
    return model1




    # Функция: Инициализирует модель (загружает существующую или создает новую)
def initialize_model(should_load_existing):
    if not should_load_existing:
        print("Инициализирована новая модель.")
        return create_model()

    try:
        trained_model = torch.load('uriupkina.pt',weights_only=False)  # Пытаемся загрузить модель
        print("Загружена существующая модель.")
        return trained_model
    except Exception as load_error:
        print(f"Ошибка загрузки модели: {load_error}. Инициализирована новая модель.")
        return create_model()




def model_change(model, optimizer, prog_probs, awards):
    # Преобразование данных
    discounted = check_rewards(awards)
    rewards = torch.tensor(discounted, dtype=torch.float32)
    probs = torch.stack(prog_probs)

    # Расчет градиентов
    loss = -torch.sum(probs * rewards.detach())

    # Логирование перед обновлением для проверки работоспособности
   # print(f"Loss before update: {loss.item():.4f}")

    # Процесс обновления
    optimizer.zero_grad()
    loss.backward()

    # Дополнительная проверка градиентов
    total_norm = 0
    for p in model.parameters():
        if p.grad is not None:
            param_norm = p.grad.data.norm(2)
            total_norm += param_norm.item() ** 2
    total_norm = total_norm ** 0.5
    #print(f"Gradient norm: {total_norm:.4f}")
    #проверка градиента

    optimizer.step()

# Функция: Сохраняет модель и некорректные ходы
def save_model_and_incorrect_moves(model, incorrect_moves_memory):
    torch.save(model, 'uriupkina.pt')  # Сохраняем модель
    with open("errors.pkl", "wb") as f:
        pickle.dump(incorrect_moves_memory, f)  # Сохраняем некорректные ходы


## Функции связаные с доской

In [None]:

# Функция: Обновляет доску после хода игрока
def new_board(board, column, player):
    ROWS = 3
    COLUMNS = 6
    EMPTY_CELL = 0
    row = 2
    while row >= 0:
        i = row * COLUMNS + column
        if board[i] == EMPTY_CELL:
            copy_board = board.copy()
            copy_board[i] = player
            return copy_board
        row -= 1
    return board

# Функция: Проверяет, заполнена ли доска
def check_full(board):
    return len(list(filter(lambda x: x == 0, board))) == 0


## Функции ходов

In [None]:

def is_valid_move(board_tensor):
    empty_columns = []  # Создаем пустой список для хранения колонок с пустыми клетками

    for col in range(6):  # Проходим по каждой колонке (всего 6 колонок)
        if board_tensor[col] == 0:  # Проверяем только верхнюю строку (первую строку) в текущей колонке
            empty_columns.append(col)  # Если клетка пуста, добавляем колонку в список

    return empty_columns  # Возвращаем список колонок с пустыми клетками


# Функция: Считает количество троек для игрока
def count_combinations(board, player):
    count = 0
    grid = [board[i*6:(i+1)*6] for i in range(3)]

    # Все возможные тройки: (начальная строка, начальная колонка, шаг по строке, шаг по колонке)
    patterns = [
        # Горизонтальные
        *[(row, col, 0, 1) for row in range(3) for col in range(4)],
        # Вертикальные
        *[(0, col, 1, 0) for col in range(6)],
        # Диагонали (↘)
        *[(2, col, -1, 1) for col in range(4)],
        # Диагонали (↙)
        *[(0, col, 1, 1) for col in range(4)]
    ]

    for row, col, dr, dc in patterns:
        if all(grid[row + i*dr][col + i*dc] == player for i in range(3)):
            count += 1

    return count



# Функция: Выбирает действие на основе вероятностей
def choose_action(probs):
    action_distribution = Categorical(probs)  # Создаем категориальное распределение
    action = action_distribution.sample()     # Берем случайную выборку из распределения
    action_log_prob = action_distribution.log_prob(action)  # Считаем логарифм вероятности
    return action, action_log_prob




#__________________________________________

#Дисконтируем награды через матрицу

#____________________________________________

def check_rewards(awards, gamma=0.91):
    n = len(awards)
    discount_matrix = np.tril(gamma ** np.subtract.outer(np.arange(n), np.arange(n)))
    return discount_matrix.dot(awards).astype(np.float32)


## Дополненительное обучение на эррорсах

In [None]:
# Функция: Загружает историю некорректных ходов из файла
def add_errors():
    try:
        with open("errors.pkl", "rb") as f:
            data = pickle.load(f)
            print(f"Загружено {len(data)} ходов")
            return data
    except Exception as e:
        match e:
            case FileNotFoundError():
                return []
            case _:
                print(f"Неожиданная ошибка: {e}")
                return []


# Функция: Обучает модель на некорректных ходах
def train_on_incorrect_moves(model, optimizer, incorrect_samples):
    if not incorrect_samples:
        return

    # Собираем все данные в батч
    states = torch.stack([s for s, _ in incorrect_samples])
    actions = torch.stack([a for _, a in incorrect_samples])

    # Пакетный forward pass
    probs = model(states)
    m = Categorical(probs)
    log_probs = m.log_prob(actions)
    loss = -(log_probs * torch.tensor(-50.0)).sum()

    # Оптимизация
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


#_________________________________________

#ОШИБКИ С БАТЧАМИ

#_________________________________________
def train_on_incorrect_moves(model, optimizer, incorrect_samples, batch_size=32):
    if not incorrect_samples:
        return

    # Разбиваем на батчи
    for i in range(0, len(incorrect_samples), batch_size):
        batch = incorrect_samples[i:i+batch_size]
        states = torch.stack([s for s, _ in batch])
        actions = torch.stack([a for _, a in batch])

        probs = model(states)
        m = Categorical(probs)
        log_probs = m.log_prob(actions)
        loss = -(log_probs * 50).mean()  # Используем mean вместо sum

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

def retrain_on_incorrect_moves(model, optimizer, incorrect_moves_memory, epochs=3):
    if not incorrect_moves_memory:
        print("Нет некорректных ходов для дообучения")
        return

    print(f"Начало дообучения на {len(incorrect_moves_memory)} ошибках...")
    for epoch in range(epochs):
        train_on_incorrect_moves(model, optimizer, incorrect_moves_memory)
        print(f"Эпоха {epoch+1}/{epochs} завершена")

## Создание умного бота

In [None]:
def generate_smart_move(board, player):
    """
    Упрощенная версия стратегического хода без check_threat
    """
    best_move = None
    best_score = -1

    for col in range(6):
        if col in is_valid_move(board):  # Правильно - проверяем вхождение колонки в список доступных
            # Создаем временную доску с предполагаемым ходом
            temp_board = new_board(board, col, player)

            # Оцениваем качество хода по нескольким критериям
            score = 0

            # 1. Количество созданных троек
            score += count_combinations(temp_board, player) * 3

            # 2. Количество созданных потенциальных троек (две в ряд)
            score += count_potential_threes(temp_board, player) * 2

            # 3. Бонус за центральные колонки
            if col in [2, 3]:  # Центральные колонки
                score += 1

            # 4. Штраф за ходы, которые дают противнику выигрышный ход
            opponent = -player
            valid_opponent_moves = is_valid_move(temp_board)  # Получаем список
            for opp_col in valid_opponent_moves:
                  opp_board = new_board(temp_board, opp_col, opponent)
                  if count_combinations(opp_board, opponent) >= 1:
                        score -= 2

            if score > best_score or (score == best_score and random.random() < 0.3):
                best_score = score
                best_move = col

    return best_move if best_move is not None else generate_valid_random_move(board)

def generate_valid_random_move(board):
    """
    Генерирует случайный допустимый ход.
    :param board: Игровое поле.
    :return: Случайный допустимый ход (колонка) или None, если нет допустимых ходов.
    """
    valid_moves = is_valid_move(board)  # Получаем список всех доступных колонок
    return random.choice(valid_moves) if valid_moves else None

def generate_combined_move(board, player, strategy_prob=0.5):
    """
    Генерирует ход противника, который может быть либо стратегически лучшим, либо случайным.
    :param board: Игровое поле.
    :param player: Игрок (1 или -1).
    :param strategy_prob: Вероятность выбора стратегического хода.
    :return: Выбранный ход (колонка).
    """
    if random.random() < strategy_prob:
        return generate_smart_move(board, player)
    else:
        return generate_valid_random_move(board)


def count_potential_threes(board, player):
    count = 0
    grid = [list(board[i*6:(i+1)*6]) for i in range(3)]  # Преобразуем в списки

    # Проверка горизонтальных линий
    for row in range(3):
        for col in range(4):
            window = grid[row][col:col+3]
            if window.count(player) == 2 and window.count(0) == 1:
                count += 1

    # Проверка вертикальных линий
    for col in range(6):
        for row in range(1):
            if (grid[row][col] == player and
                grid[row+1][col] == player and
                grid[row+2][col] == 0):
                count += 1

    # Проверка диагоналей
    for row in range(1):
        for col in range(4):
            if (grid[row][col] == player and
                grid[row+1][col+1] == player and
                grid[row+2][col+2] == 0):
                count += 1
            if (grid[row][col+2] == player and
                grid[row+1][col+1] == player and
                grid[row+2][col] == 0):
                count += 1

    return count

## Функции тренировки и тестирования

In [None]:
# Функция: Основная функция обучения
def train_model(should_load_existing=True):
    # Инициализация
    incorrect_moves_memory = add_errors()
    model = initialize_model(should_load_existing)
    optimizer = optim.Adam(model.parameters(), lr=5e-6, weight_decay=1e-5)

    # Статистика
    statistic = {
        "wins": 0,
        "losses": 0,
        "draws": 0,
        "incorrect_moves": 0,
        "model_triplets": 0,
        "bot_triplets": 0
    }

    consecutive_no_incorrect = 0
    episodes = 500000

    for episode in range(episodes):
        board = np.zeros(18, dtype=np.float32)
        prog_probs, awards, incorrect_samples = [], [], []
        this_player = 1 if episode % 2 == 0 else -1
        stop_move = False

        while not check_full(board) and not stop_move:
            if this_player == 1:  # Ход модели
                st = torch.tensor(board, dtype=torch.float32)
                probs = model(st)
                act, log_prob = choose_action(probs)
                NBoard = new_board(board, act.item(), 1)

                if np.array_equal(NBoard, board):  # Некорректный ход
                    awards.append(-150)
                    prog_probs.append(log_prob)
                    stop_move = True
                    statistic["incorrect_moves"] += 1
                    incorrect_samples.append((st, act))
                    break

                board = NBoard
                prog_probs.append(log_prob)
                awards.append(0)
                this_player = -1
            else:  # Ход бота (умная стратегия)
              act = generate_combined_move(board, -1, strategy_prob=0.7)  # Используем умную стратегию вместо случайного выбора
              board = new_board(board, act, -1)
              this_player = 1

        # Определение победителя через подсчет троек
        if not stop_move:
            model_triplets = count_combinations(board, 1)
            bot_triplets = count_combinations(board, -1)

            statistic["model_triplets"] += model_triplets
            statistic["bot_triplets"] += bot_triplets

            if model_triplets > bot_triplets:
                result = "Model"
                statistic["wins"] += 1
            elif model_triplets < bot_triplets:
                result = "Bot"
                statistic["losses"] += 1
            else:
                result = "Draw"
                statistic["draws"] += 1

            awards[-1] = 70 if result == "Model" else (-70 if result == "Bot" else 0)

        # Обновление модели
        if prog_probs:
            model_change(model, optimizer, prog_probs, awards)

        # Обработка некорректных ходов
        if incorrect_samples:
            incorrect_moves_memory.extend(incorrect_samples)
            train_on_incorrect_moves(model, optimizer, incorrect_samples)

        # Логирование
        if (episode + 1) % 1000 == 0:
            print(f"Эпизод {episode + 1}: "
                  f"Победы {statistic['wins']}, "
                  f"Поражения {statistic['losses']}, "
                  f"Ничьи {statistic['draws']}, "
                  f"Ошибки {statistic['incorrect_moves']}, "
                  f"Троек модели/бота: {statistic['model_triplets']}/{statistic['bot_triplets']}")

            statistic = {
                "wins": 0,
                "losses": 0,
                "draws": 0,
                "incorrect_moves": 0,
                "model_triplets": 0,
                "bot_triplets": 0
            }

        # Условие остановки
        if statistic["incorrect_moves"] == 0:
            consecutive_no_incorrect += 1
            if consecutive_no_incorrect >= 1000:
                print("Обучение завершено: 1000 эпизодов подряд без ошибок")
                break
        else:
            consecutive_no_incorrect = 0

    # Финализация
    if incorrect_moves_memory:
        retrain_on_incorrect_moves(model, optimizer, incorrect_moves_memory)

    save_model_and_incorrect_moves(model, incorrect_moves_memory)




# Функция: Выводит финальные результаты тестирования
def print_final_results(statistic, num_games):
    print(f"\nФинальные результаты после {num_games} игр:")
    print(f"Побед: {statistic['wins']}, Поражений: {statistic['losses']}, Ничьих: {statistic['draws']}")
    print(f"Процент побед: {statistic['wins'] / num_games:.2%}")
    print(f"Некорректных ходов: {statistic['incorrect_moves']}")


In [None]:
# Функция: Тестирование модели
def test_model(num_games=1000):
    try:
        model = torch.load('uriupkina.pt', weights_only=False)  # Загрузка модели
        print("Загружена модель для теста.")
    except FileNotFoundError:
        print("Модель не найдена. Тестирование прервано.")
        return

    statistic = {"wins": 0, "losses": 0, "draws": 0, "incorrect_moves": 0}  # Статистика

    for game_num in range(num_games):
        board = np.zeros(18, dtype=np.float32)  # Инициализация доски
        this_player = 1 if (game_num % 2) == 0 else -1  # Текущий игрок
        incorrect_move_lose = False  # Флаг ошибки

        while not check_full(board):
            if this_player == 1:
                st = torch.tensor(board, dtype=torch.float32)  # Текущее состояние
                act_probs = model(st)  # Вероятности действий
                act = torch.argmax(act_probs).item()  # Выбор действия
                NBoard = new_board(board, act, 1)  # Обновление доски

                if np.array_equal(NBoard, board):  # Если ход некорректен
                    print(f"Игра {game_num + 1}: Модель сделала некорректный ход. Автоматическое поражение.")
                    statistic["incorrect_moves"] += 1
                    incorrect_move_lose = True
                    break

                board = NBoard
            else:
                available_moves = is_valid_move(board)
                if not available_moves:
                    break
                act = generate_combined_move(board, -1, strategy_prob=0.7)
                board = new_board(board, act, -1)

            this_player *= -1

        if incorrect_move_lose:
            continue

        model_win = count_combinations(board, 1)  # Тройки модели
        bot_win = count_combinations(board, -1)   # Тройки бота
        update_test_stats(statistic, model_win, bot_win)  # Обновление статистики

        print_game_result(game_num + 1, model_win, bot_win)  # Логирование результата

    print_final_results(statistic, num_games)  # Вывод итогов

# Функция: Обновляет статистику тестирования
def update_test_stats(statistic, model_win, bot_win):
    if model_win > bot_win:
        statistic["wins"] += 1  # Увеличиваем счетчик побед
    elif model_win < bot_win:
        statistic["losses"] += 1  # Увеличиваем счетчик поражений
    else:
        statistic["draws"] += 1  # Увеличиваем счетчик ничьих

# Функция: Логирует результат игры
def print_game_result(game_num, model_win, bot_win):
    result = "Победа" if model_win > bot_win else "Поражение" if model_win < bot_win else "Ничья"
    print(f"Игра {game_num}: Тройки модели = {model_win}, Тройки бота = {bot_win}, Результат: {result}")


In [None]:
# Вывод
def main():
    train_model()  # Обучение модели
    test_model()   # Тестирование модели

if __name__ == "__main__":
    main()  # Запуск программы

Ошибка загрузки модели: [Errno 2] No such file or directory: 'uriupkina.pt'. Инициализирована новая модель.


IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)

In [None]:
#____________________________________________

#ОШИБКИ БЕЗ БАТЧЕЙ

#____________________________________________

# def train_on_incorrect_moves(model, optimizer, incorrect_samples):
#     for st, act in incorrect_samples:
#         probs = model(st)  # Получаем вероятности действий
#         m = Categorical(probs)  # Создаем распределение
#         log_prob = m.log_prob(act)  # Логарифм вероятности
#         loss = -log_prob * torch.tensor(-50.0)  # Вычисляем потери
#         optimizer.zero_grad()  # Обнуляем градиенты
#         loss.backward()  # Вычисляем градиенты
#         optimizer.step()  # Обновляем параметры модели

# # Функция: Повторно обучает модель на некорректных ходах
# def retrain_on_incorrect_moves(model, optimizer, incorrect_moves_memory, epochs=3):
#     for _ in range(epochs):
#         train_on_incorrect_moves(model, optimizer, incorrect_moves_memory)  # Дообучение
#     print(f"Обучение на некорректных ходах ({len(incorrect_moves_memory)} ошибок)")
