[Текст ссылки](https://)### Выполнил: <font color='red'>Тижин Матвей Романович</font>

### Тема: Web-сервер для обучения и использования ML-моделей

#### Преподаватели: Роман Ищенко (roman.ischenko@gmail.com) и Илья Склонин

**Дедлайн**: 18.01.2026

**Среда выполнения**: Jupyter Notebook (Python 3.9+)

#### Правила:

Результаты выполнения задания:

- архив со скриптами и файлами Dockerfile, который 1-2 команды позволяет развернуть сервер, решающий поставленные в задании задачи
- Jupyter Notebook, где __весь код__ из скриптов дублируется (1 ячейка - 1 скрипт) с комментарием, содержащим информацию о том, из какого файла взят код и что верхнеуровнево этот код делает

__Максимальное число баллов за задание - 35__.

Готовое задание отправляется на почту преподавателя.

Задание выполняется самостоятельно. Если какие-то студенты будут уличены в списывании, все они автоматически получат за эту работу 0 баллов. Если вы нашли в Интернете какой-то специфичный код, который собираетесь заимствовать, обязательно укажите это в задании - наверняка вы не единственный, кто найдёт и использует эту информацию.

Удалять фрагменты формулировок заданий запрещается.

### Постановка задачи:

**Серверная часть (22 балла):**

- __В данной работе нужно написать многозадачный веб-сервер для обучения и инференса ML моделей. На старте сервер получает на вход (через .env) конфиг, в котором должны быть указаны 3 параметра: путь к директории для сохранения моделей внутри контейнера сервера, число ядер, доступных для обучения и максимальное число моделей, которые могут быть одновременно загружены для инференса.__


- Сервер должен реализовывать следующие методы:
    - `fit(X, y, config)` - обучить модель и сохранить на диск по указанным именем
    - `predict(y, config)` - предсказать с помощью обученной и загруженной модели по её имени
    - `load(config)` - загрузить обученную модель по её имени в режим инференса
    - `unload(config)` - выгрузить загруженную модель по её имени
    - `remove(config)` - удалить обученную модель с диска по её имени
    - `remove_all()` - удалить все обученные модели с диска__


- __Содержимое конфигов и форматы данных предлагается продумать и реализовать самостоятельно__
- __Сервер должен иметь счётчик активных процессов. Максимальное число активных процессов соответствует числу ядер, переданному в конфиге при старте сервиса. Каждое обучение модели запускается в отдельном процессе и до своего завершения потребляет этот процесс. Один процесс всегда остаётся для сервера, в нём же загружаются и работают на инференс обученные модели__
- __Сервер должен корректно обрабатывать все граничные случаи (запуск обучения без свободных ядер, запуск инфренса свыше лимита, запросы с несуществующими именами моделей, запросы с дублирующимися именами моделей)__
- __В реализации должны поддерживаться не менее трёх дискриминативных моделей (т.е. принимающих на вход объекты и метки при обучении и предсказывающих метки для новых объектов)__
- __Сервер должен быть реализован на FastAPI__
- Проект разворачивается с помощью выбранной библиотеки управления виртуальными окружениями и технологии контейнеризации Docker

**Клиентская часть (13 баллов):**

- __Клиентская часть должна демонстрировать работу с реализованным сервером с помощью библиотек requests и aiohttp. Она может быть реализована непосредственно в Jupyter Notebook, с описанием ожидаемого действия, или в отдельном(-ых) скрипте(-ах), с дублированием в Jupyter Notebook (тогда работоспособность в ноутбуке не требуется). Далее описываются отдельные функции:
- Код вызова последовательного вызова обучения как минимум двух (N) различных моделей с таким набором данных и параметрами, чтобы обучение одной модели длилось не менее 60 секунд.
- Код вызова асинхронного вызова обучения как минимум двух различных моделей с демонстрацией, что работа выполняется в два (в N) раза быстрее
- Асинхронный вызов нескольких предсказаний
- Код демонстрации остальных функций сервера (загрузка, выгрузка, удаление)
- Должны обрабатываться ошибки и исключения, возвращаемые сервером


In [1]:
# Файл: server/config.py
# Описание: Этот скрипт отвечает за инициализацию настроек сервера. 
# Он считывает переменные из файла .env (путь к моделям, лимит ядер, лимит моделей в RAM) 
# с помощью библиотеки pydantic-settings. Эти настройки используются во всем приложении 
# для контроля потребления ресурсов.

from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    # Путь для сохранения моделей внутри контейнера
    MODELS_PATH: str = "saved_models"
    
    # Число ядер, доступных для обучения (используется для семафора)
    TRAINING_CORES: int = 4
    
    # Максимальное число моделей, которые могут быть одновременно загружены для инференса
    MAX_INFERENCE_MODELS: int = 1

    model_config = SettingsConfigDict(env_file=".env", env_file_encoding='utf-8')

settings = Settings()

In [2]:
# Файл: server/config_models.py
# Описание: Скрипт содержит описание схем данных (Pydantic-моделей) для валидации входящих запросов.
# Здесь реализована сложная логика валидации гиперпараметров для различных типов моделей:
# LogisticRegression, RandomForest, XGBoost, LightGBM и CatBoost.
# Используется Union и discriminator для автоматического определения типа модели в запросе /fit.

from pydantic import BaseModel, Field
from typing import List, Literal, Union, Optional

class ModelNameConfig(BaseModel):
    model_name: str = Field(..., max_length=50, description="Уникальное имя модели")

class LogisticRegressionConfig(ModelNameConfig):
    model_type: Literal['LogisticRegression']
    penalty: Literal['l1', 'l2', 'elasticnet', 'none'] = 'l2'
    C: float = Field(1.0, gt=0)
    solver: Literal['newton-cg', 'lbfgs', 'liblinear', 'sag', 'saga'] = 'lbfgs'

class RandomForestConfig(ModelNameConfig):
    model_type: Literal['RandomForestClassifier']
    n_estimators: int = Field(100, gt=0)
    max_depth: Optional[int] = Field(None, ge=1)
    min_samples_split: int = Field(2, ge=2)
    criterion: Literal['gini', 'entropy', 'log_loss'] = 'gini'

class XGBoostConfig(ModelNameConfig):
    model_type: Literal['XGBClassifier']
    n_estimators: int = Field(100, gt=0)
    learning_rate: float = Field(0.1, gt=0)
    max_depth: int = Field(3, ge=0)

class LightGBMConfig(ModelNameConfig):
    model_type: Literal['LGBMClassifier']
    n_estimators: int = Field(100, gt=0)
    learning_rate: float = Field(0.1, gt=0)
    num_leaves: int = Field(31, gt=1)

class CatBoostConfig(ModelNameConfig):
    model_type: Literal['CatBoostClassifier']
    iterations: int = Field(1000, gt=0)
    learning_rate: float = Field(0.03, gt=0)
    depth: int = Field(6, ge=1, le=16)
    verbose: bool = False

AnyModelConfig = Union[
    LogisticRegressionConfig,
    RandomForestConfig,
    XGBoostConfig,
    LightGBMConfig,
    CatBoostConfig,
]

class FitRequest(BaseModel):
    X: List[List[float]] = Field(..., description="Матрица признаков")
    y: List[int] = Field(..., description="Вектор меток")
    config: AnyModelConfig = Field(..., discriminator='model_type')

class PredictRequest(BaseModel):
    X: List[List[float]] = Field(...)
    config: ModelNameConfig = Field(...)

In [4]:
# Файл: server/main.py
# Описание: Основной скрипт сервера на FastAPI. 
# Реализует логику:
# 1. Lifespan: создание ProcessPoolExecutor для выноса обучения в отдельные процессы.
# 2. Метод /fit: использует asyncio.Semaphore для контроля занятых ядер. Обучение происходит синхронно в пуле.
# 3. Метод /load: контролирует лимит одновременно загруженных моделей в RAM (реестр LOADED_MODELS).
# 4. Метод /predict: выполняет инференс на загруженных моделях.
# 5. Методы управления диском: /remove и /remove_all для очистки сохраненных файлов.

import time
import os
import shutil
import asyncio
import joblib
from contextlib import asynccontextmanager
from typing import List, Dict
from concurrent.futures import ProcessPoolExecutor
from fastapi import FastAPI, HTTPException

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

from .config_models import FitRequest, PredictRequest, ModelNameConfig
from .config import settings

SUPPORTED_MODELS = {
    'LogisticRegression': LogisticRegression,
    'RandomForestClassifier': RandomForestClassifier,
    'XGBClassifier': XGBClassifier,
    'LGBMClassifier': LGBMClassifier,
    'CatBoostClassifier': CatBoostClassifier,
}

training_semaphore: asyncio.Semaphore = None
LOADED_MODELS = dict()

def _train_model_sync(model_name, model_path, model_type, hyperparameters, X, y):
    try:
        if os.path.exists(model_path):
            return f"Модель по пути '{model_path}' уже существует."
        if model_type not in SUPPORTED_MODELS:
            return f"Тип модели '{model_type}' не поддерживается."
        
        model_class = SUPPORTED_MODELS[model_type]   
        model = model_class(**hyperparameters)
        model.fit(X, y)
        joblib.dump(model, model_path)
        return f"Модель {model_name} успешно сохранена в {model_path}"
    except Exception as e:
        return f"Ошибка при обучении: {e}"

@asynccontextmanager
async def lifespan(app: FastAPI):
    global training_semaphore
    # Лимит активных процессов обучения задается через TRAINING_CORES
    training_semaphore = asyncio.Semaphore(settings.TRAINING_CORES)
    # Используем пул процессов для CPU-задач
    app.state.process_pool = ProcessPoolExecutor(max_workers=settings.TRAINING_CORES)
    print(f"Сервер готов. Пул процессов: {settings.TRAINING_CORES}")
    yield
    app.state.process_pool.shutdown(wait=True)

app = FastAPI(title="ML Model Server", lifespan=lifespan)

@app.get('/')
def read_root():
    return {"message": "Добро пожаловать на сервер для ML моделей!"}

@app.post('/fit')
async def fit_model(request: FitRequest):
    if training_semaphore.locked():
        raise HTTPException(status_code=503, detail="Нет свободных ядер для обучения.")
    
    os.makedirs(settings.MODELS_PATH, exist_ok=True)
    model_name = request.config.model_name
    model_path = os.path.join(settings.MODELS_PATH, f'{model_name}.joblib')
    
    hyperparams = request.config.model_dump(exclude={'model_name', 'model_type'})
    
    async with training_semaphore:
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            app.state.process_pool, _train_model_sync,
            model_name, model_path, request.config.model_type, hyperparams, request.X, request.y
        )

    if result and "Ошибка" in result:
        raise HTTPException(status_code=400, detail=result)
    return {"message": "Обучение завершено успешно.", "model_name": model_name}

@app.post('/predict')
def predict(request: PredictRequest):
    model_name = request.config.model_name
    if model_name not in LOADED_MODELS:
        raise HTTPException(status_code=404, detail=f"Модель '{model_name}' не загружена в память.")
        
    model = LOADED_MODELS[model_name]
    try:
        predictions = model.predict(request.X).tolist()
        return {"model_name": model_name, "predictions": predictions}
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Ошибка инференса: {e}")

@app.post('/load')
def load_model(config: ModelNameConfig):
    model_name = config.model_name
    model_path = os.path.join(settings.MODELS_PATH, f'{model_name}.joblib')
    
    if model_name in LOADED_MODELS:
        return {"message": f"Модель {model_name} уже загружена"}
        
    if len(LOADED_MODELS) >= settings.MAX_INFERENCE_MODELS:
        raise HTTPException(status_code=409, detail=f"Лимит RAM моделей достигнут ({settings.MAX_INFERENCE_MODELS}).")

    if not os.path.exists(model_path):
        raise HTTPException(status_code=404, detail="Файл модели не найден на диске.")

    try:
        LOADED_MODELS[model_name] = joblib.load(model_path)
        return {"message": f"Модель '{model_name}' успешно загружена."}
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Ошибка загрузки: {e}")

@app.post('/unload')
def unload_model(config: ModelNameConfig):
    model_name = config.model_name
    if model_name in LOADED_MODELS:
        del LOADED_MODELS[model_name]
        return {"message": f"Модель '{model_name}' выгружена."}
    return {"message": "Модель не была загружена."}

@app.post('/remove')
def remove_model(config: ModelNameConfig):
    model_name = config.model_name
    if model_name in LOADED_MODELS:
        raise HTTPException(status_code=409, detail="Нельзя удалить загруженную модель.")
    
    model_path = os.path.join(settings.MODELS_PATH, f'{model_name}.joblib')
    if os.path.exists(model_path):
        os.remove(model_path)
        return {"message": f"Файл {model_name} удален."}
    raise HTTPException(status_code=404, detail="Файл не найден.")

@app.post('/remove_all')
def remove_all_models():
    if LOADED_MODELS:
        raise HTTPException(status_code=409, detail="Выгрузите все модели перед очисткой диска.")
    if os.path.isdir(settings.MODELS_PATH):
        shutil.rmtree(settings.MODELS_PATH)
        os.makedirs(settings.MODELS_PATH, exist_ok=True)
    return {"message": "Все модели удалены с диска."}

## Работа на стороне клиента

In [71]:
import requests
import aiohttp
import asyncio
import json
from sklearn.datasets import make_classification
import time 

BASE_URL = "http://127.0.0.1:8000"

In [72]:
def generate_heavy_dataset(n_samples=15000, n_features=50, n_informative=30):
    X, y = make_classification(
        n_samples=n_samples,
        n_features=n_features,
        n_redundant=10,
        n_classes=2,
        random_state=42
    )
    print("Генерация завершена.")
    return X.tolist(), y.tolist()

def check_response(response):
    """Проверяет HTTP-ответ и выводит результат."""
    if response.status_code == 200:
        print("   Ответ сервера:", response.json())
    else:
        print(f"Ошибка: (Статус-код: {response.status_code})")
        try:
            print("   Детали ошибки:", response.json())
        except json.JSONDecodeError:
            print("   Не удалось декодировать JSON из ответа.")

X_heavy, y_heavy = generate_heavy_dataset()

Генерация завершена.


In [80]:
model_configs = [
    {
        "config": {
            "model_name": "lgbm",
            "model_type": "LGBMClassifier",
            "n_estimators": 200, 
            "num_leaves": 40
        }
    },
    {
        "config": {
            "model_name": "rf",
            "model_type": "RandomForestClassifier",
            "n_estimators": 150, 
            "max_depth": 15
        }
    }
]

__Асихронная отправка запросов на обучение с использованием `aiohttp`__.
__Пример с эндпоинтом `/fit`__

In [107]:
print("--- Запуск последовательного (синхронного) обучения ---")
start_time_sequential = time.monotonic()

for config in model_configs:
    model_name = config['config']['model_name']
    print(f"-> Отправка запроса для модели: '{model_name}'")
    
    # 1. Собираем payload для текущей модели
    payload = {
        "X": X_heavy,
        "y": y_heavy,
        **config
    }
    response = requests.post(f"{BASE_URL}/fit", json=payload)
    print(response.json())
    end_time_sequential = time.monotonic()
print(f"\n--- Последовательное обучение завершено за {end_time_sequential - start_time_sequential:.4f} секунд ---")


--- Запуск последовательного (синхронного) обучения ---
-> Отправка запроса для модели: 'lgbm'
{'detail': "Модель по пути 'saved_models/lgbm.joblib' уже существует."}
-> Отправка запроса для модели: 'rf'
{'detail': "Модель по пути 'saved_models/rf.joblib' уже существует."}

--- Последовательное обучение завершено за 2.5685 секунд ---


In [105]:
response = requests.post(f"{BASE_URL}/unload", json={"model_name": "rf"})
check_response(response)
response = requests.post(f"{BASE_URL}/unload", json={"model_name": "lgbm"})
check_response(response)
response = requests.post(f"{BASE_URL}/remove_all")
check_response(response )

   Ответ сервера: {'message': "Модель 'rf' уже выгружена из памяти."}
   Ответ сервера: {'message': "Модель 'lgbm' уже выгружена из памяти."}
   Ответ сервера: {'message': "Все модели из директории 'saved_models' были успешно удалены."}


In [106]:
async def fit_model_async(session, payload):
    """Асинхронная функция для отправки запроса на обучение."""
    model_name = payload['config']['model_name']
    print(f"Отправка асинхронного запроса на обучение '{model_name}'...")
    
    async with session.post(f"{BASE_URL}/fit", json=payload) as response:
        if response.status == 200:
            print(f"Сервер принял задачу на обучение '{model_name}'")
            print("Ответ:", await response.йjson())
        else:
            print(f"Ошибка при отправке задачи на обучение '{model_name}' (Статус: {response.status})")
            print("   Детали:", await response.json())

async def main_fit(model_configs):
    start_time_async = time.monotonic()
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for config in model_configs:
            payload = {
                "X": X_heavy,
                "y": y_heavy,
                **config
            }
            task = asyncio.create_task(fit_model_async(session, payload))
            tasks.append(task)
        
        await asyncio.gather(*tasks)

    end_time_async = time.monotonic()

start = time.monotonic()
await main_fit(model_configs)
end_time_sequential = time.monotonic()
print(f"\n--- Последовательное обучение завершено за {end_time_sequential - start_time_sequential:.4f} секунд ---")


Отправка асинхронного запроса на обучение 'lgbm'...
Отправка асинхронного запроса на обучение 'rf'...
Ошибка при отправке задачи на обучение 'lgbm' (Статус: 400)
   Детали: {'detail': 'Модель lgbm успешно загружена и находится в saved_models/lgbm.joblib'}
Ошибка при отправке задачи на обучение 'rf' (Статус: 400)
   Детали: {'detail': 'Модель rf успешно загружена и находится в saved_models/rf.joblib'}

--- Последовательное обучение завершено за 67.7363 секунд ---


__Пример с эндпоинтом `/load`__

In [82]:
models_to_load = ["lgbm", "rf"]
for model_name in models_to_load:
    response = requests.post(f"{BASE_URL}/load", json={"model_name": model_name})
    check_response(response )

   Ответ сервера: {'message': "Модель 'lgbm' успешно загружена в память."}
Ошибка: (Статус-код: 409)
   Детали ошибки: {'detail': "Достигнут лимит одновременно загруженных моделей (1). Текущие загруженные модели: ['lgbm']. Выгрузите одну из них перед загрузкой новой."}


__Пример с эндпоинтом `/predict`__

In [83]:
model_name = 'lgbm'

predict_config = {
    'X': X_heavy,
    'config': {
        'model_name': model_name
    }
}

response = requests.post(f"{BASE_URL}/predict", json=predict_config)
pred = response.json()['predictions']
print(pred[:100])

[1, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 0]


__Пример с эндпоинтом `/unload`__

In [89]:
response = requests.post(f"{BASE_URL}/unload", json={"model_name": "rf"})
check_response(response)
response = requests.post(f"{BASE_URL}/unload", json={"model_name": "lgbm"})
check_response(response)

   Ответ сервера: {'message': "Модель 'rf' уже выгружена из памяти."}
   Ответ сервера: {'message': "Модель 'lgbm' уже выгружена из памяти."}


__Пример с эндпоинтом `/remove`__

In [90]:
response = requests.post(f"{BASE_URL}/remove", json={"model_name": "lgbm"})
check_response(response)

   Ответ сервера: {'message': "Файл модели 'lgbm' успешно удален с диска."}


__Пример с эндпоинтом `/remove_all`__

In [91]:
response = requests.post(f"{BASE_URL}/remove_all")
check_response(response )

   Ответ сервера: {'message': "Все модели из директории 'saved_models' были успешно удалены."}
