Запуск сервера:
```bash
uv run --active uvicorn server.main:app --host 0.0.0.0 --port 8000
```

Проверка статуса сервера:
```bash
curl http://localhost:8000/status
```

# Демонстрация работы с ML-сервером

Посмотрим на работу клиентской части для взаимодействия с ML-сервером. Не забудьте установить все необходимые зависимости проекта.
Демонстрируются следующие функции:
- Синхронное обучение двух моделей (длительность ≥60 секунд для каждой).
- Асинхронное обучение двух моделей с ускорением.
- Асинхронный вызов нескольких предсказаний.
- Загрузка, выгрузка, удаление моделей и удаление всех моделей.

Есть обработка ошибок сервера (404, 429, 500) и сетевых исключений.

In [19]:
import requests
import aiohttp
import asyncio
import numpy as np
from sklearn.datasets import make_classification
import time
import json
from pathlib import Path

# Базовый URL сервера
BASE_URL = "http://localhost:8000"
MODEL_DIR = f"{Path.cwd().parent}/server/storage"

# Генерация синтетического датасета для обучения (длительность ≥60 секунд)
X, y = make_classification(
    n_samples=1000,  # Большое число объектов
    n_features=100,   # Большое число признаков
    n_classes=2,
    n_informative=5,
    random_state=42
)
X = X.tolist()
y = y.tolist()

# Данные для предсказания
X_pred = X[:10]  # Первые 10 объектов

# Конфигурации моделей
models = [
    {"name": "logreg_model", "type": "logistic_regression", "params": {}},
    {"name": "gb_model", "type": "gradient_boosting", "params": {"n_estimators": 2000}}
]

## 1. Синхронное обучение двух моделей

**Ожидаемое действие**: Обучаем две модели (`logistic_regression` и `gradient_boosting`) последовательно с помощью `requests`. Для `gradient_boosting` задаём `n_estimators=2000`, чтобы обучение длилось ≥60 секунд. Измеряем общее время. Обрабатываем возможные ошибки (например, отсутствие свободных ядер или превышение лимита моделей).

In [27]:
# Сначала очистим сервер от моделей
try:
    requests.post(f"{BASE_URL}/remove_all", timeout=10)
except requests.exceptions.RequestException as e:
    print(f"Error clearing models: {str(e)}")

print("Server status before training:")
try:
    response = requests.get(f"{BASE_URL}/status", timeout=10)
    print(json.dumps(response.json(), indent=2))
except requests.exceptions.RequestException as e:
    print(f"Error getting status: {str(e)}")

Server status before training:
{
  "cpu_cores": 4,
  "max_processes": 3,
  "active_processes": -1,
  "active_requests": 0,
  "max_requests": 8,
  "model_dir": "./server/storage",
  "max_models": 4
}


In [28]:
def train_sync(model: dict):
    """Синхронный вызов обучения модели."""
    payload = {
        "X": X,
        "y": y,
        "config": {
            "model_name": model["name"],
            "model_type": model["type"],
            "model_params": model["params"]
        }
    }
    
    try:
        start = time.time()
        response = requests.post(f"{BASE_URL}/fit", json=payload, timeout=300)
        
        timeout = 600
        while True:
            model_path = Path(MODEL_DIR) / f"{model['name']}.pkl"
            if model_path.exists():
                break
            if time.time() - start > timeout:
                raise TimeoutError(f"Model {model['name']} not saved after {timeout}s.")
            time.sleep(2)
        time_exec = time.time() - start

        response.raise_for_status()
            
        print(f"{model['name']}: Обучение завершено за {time_exec:.1f}с")
        
    except requests.exceptions.HTTPError as e:
        if response.status_code == 429:
            print(f"Error for {model['name']}: No available cores or max models reached: {response.json()['detail']}")
        elif response.status_code == 500:
            print(f"Error for {model['name']}: Server error: {response.json()['detail']}")
        else:
            print(f"Error for {model['name']}: {str(e)}")
    except requests.exceptions.RequestException as e:
        print(f"Network error for {model['name']}: {str(e)}")

# Синхронное обучение
print("Начало синхронного обучения...")
sync_start = time.time()
for model in models:
    train_sync(model)
sync_duration = time.time() - sync_start
print(f"Total synchronous training time: {sync_duration:.2f}s")

Начало синхронного обучения...
logreg_model: Обучение завершено за 2.2с
gb_model: Обучение завершено за 64.2с
Total synchronous training time: 66.41s


## 2. Асинхронное обучение двух моделей

Теперь обучаем те же две модели **асинхронно**. Видим, что обучение выполняется быстрее, чем синхронное.

In [29]:
# Сначала очистим сервер от моделей
try:
    requests.post(f"{BASE_URL}/remove_all", timeout=10)
except requests.exceptions.RequestException as e:
    print(f"Error clearing models: {str(e)}")

print("Server status before training:")
try:
    response = requests.get(f"{BASE_URL}/status", timeout=10)
    print(json.dumps(response.json(), indent=2))
except requests.exceptions.RequestException as e:
    print(f"Error getting status: {str(e)}")

Server status before training:
{
  "cpu_cores": 4,
  "max_processes": 3,
  "active_processes": -1,
  "active_requests": 0,
  "max_requests": 8,
  "model_dir": "./server/storage",
  "max_models": 4
}


In [30]:
import time
import requests
from pathlib import Path
# import asyncio
import aiohttp

# убираем ошибку "RuntimeError: This event loop is already running"
import nest_asyncio

async def train_async(session: aiohttp.ClientSession, model: dict):
    """Асинхронный вызов обучения модели."""
    payload = {
        "X": X,
        "y": y,
        "config": {
            "model_name": model["name"],
            "model_type": model["type"],
            "model_params": model["params"]
        }
    }
    timeout = 600
    try:
        start = time.time()
        async with aiohttp.ClientSession() as session:
            async with session.post(f"{BASE_URL}/fit", json=payload, timeout=300) as response:
                resp_json = await response.json()
                await asyncio.sleep(0)  # yield control
        # Ожидание сохранения модели как и в sync-версии
        while True:
            model_path = Path(MODEL_DIR) / f"{model['name']}.pkl"
            if model_path.exists():
                break
            if time.time() - start > timeout:
                raise TimeoutError(f"Model {model['name']} not saved after {timeout}s.")
            await asyncio.sleep(2)
        time_exec = time.time() - start
        if response.status != 200:
            if response.status == 429:
                print(f"Error for {model['name']}: No available cores or max models reached: {resp_json.get('detail')}")
            elif response.status == 500:
                print(f"Error for {model['name']}: Server error: {resp_json.get('detail')}")
            else:
                print(f"Error for {model['name']}: {response.reason}")
        else:
            print(f"{model['name']}: Обучение завершено за {time_exec:.1f}с")
    except asyncio.TimeoutError:
        print(f"Timeout error for {model['name']}: Model not saved in time.")
    except aiohttp.ClientError as e:
        print(f"Network error for {model['name']}: {str(e)}")

async def main():
    print("Начало асинхронного обучения...")
    async_start = time.time()
    tasks = [train_async(aiohttp.ClientSession(), model) for model in models]
    await asyncio.gather(*tasks)
    async_duration = time.time() - async_start
    print(f"Total asynchronous training time: {async_duration:.2f}s")
    print(f"Speedup: {sync_duration / async_duration:.2f}x")
    

nest_asyncio.apply()
await main()


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7d8d47e8a950>


Начало асинхронного обучения...


Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7d8d47e8b910>


logreg_model: Обучение завершено за 2.3с
gb_model: Обучение завершено за 64.3с
Total asynchronous training time: 64.42s
Speedup: 1.03x


## 3. Асинхронный вызов нескольких предсказаний

Сделаем асинхронные предсказания для двух моделей одновременно. Используем небольшой набор данных (`X_pred`).

In [42]:
async def predict_async(session: aiohttp.ClientSession, model_name: str):
    """Асинхронный вызов предсказания."""
    payload = {
        "y": X_pred,
        "config": {"model_name": model_name}
    }
    try:
        async with session.post(f"{BASE_URL}/predict", json=payload, timeout=10) as response:
            if response.status == 200:
                data = await response.json()
                print(f"{model_name}: Predictions received: {data['predictions'][:5]}...")
            else:
                try:
                    error_data = await response.json()
                    error_detail = error_data.get('detail', 'No detail provided')
                except (aiohttp.ContentTypeError, ValueError):
                    error_detail = f"HTTP {response.status}"
 
                if response.status == 404:
                    print(f"Error for {model_name}: Model not found: {error_detail}")
                elif response.status == 429:
                    print(f"Error for {model_name}: Too many requests: {error_detail}")
                elif response.status == 500:
                    print(f"Error for {model_name}: Server error: {error_detail}")
                else:
                    print(f"Error for {model_name}: {error_detail}")
    except aiohttp.ClientError as e:
        print(f"Network error for {model_name}: {str(e)}")

async def async_predictions():
    async with aiohttp.ClientSession() as session:
        tasks = [
            predict_async(session, model["name"])
            for model in models
        ]
        await asyncio.gather(*tasks)

# Асинхронные предсказания
print("Starting asynchronous predictions...")
await async_predictions()

Starting asynchronous predictions...
gb_model: Predictions received: [0, 1, 1, 1, 1]...
logreg_model: Predictions received: [0, 1, 0, 1, 0]...


## 4. Демонстрация остальных функций сервера

Поделаем разные операции с моделями: загрузку, выгрузку, удаление одной модели и удаление всех моделей. Используем синхронные вызовы через `requests` для простоты.

In [43]:
def manage_model(endpoint: str, model_name: str = None):
    """Универсальная функция для вызова управляющих эндпоинтов."""
    url = f"{BASE_URL}/{endpoint}"
    payload = {"config": {"model_name": model_name}} if model_name else {}
    try:
        response = requests.post(url, json=payload, timeout=10)
        response.raise_for_status()
        print(f"{endpoint}: {response.json()['message']}")
    except requests.exceptions.HTTPError as e:
        if response.status_code == 404:
            print(f"Error for {endpoint}: Model not found: {response.json()['detail']}")
        elif response.status_code == 500:
            print(f"Error for {endpoint}: Server error: {response.json()['detail']}")
        else:
            print(f"Error for {endpoint}: {str(e)}")
    except requests.exceptions.RequestException as e:
        print(f"Network error for {endpoint}: {str(e)}")

# Загрузка модели
manage_model("load", "logreg_model")

# Выгрузка модели
manage_model("unload", "logreg_model")

# Удаление модели
manage_model("remove", "logreg_model")

# Удаление всех моделей
manage_model("remove_all")

load: Model 'logreg_model' loaded successfully.
unload: Model 'logreg_model' unloaded successfully.
remove: Model 'logreg_model' removed successfully.
remove_all: All models removed successfully.


## 5. Проверка статуса сервера

In [44]:
try:
    response = requests.get(f"{BASE_URL}/status", timeout=10)
    response.raise_for_status()
    print("Server status:", json.dumps(response.json(), indent=2))
except requests.exceptions.RequestException as e:
    print(f"Network error: {str(e)}")

Server status: {
  "cpu_cores": 4,
  "max_processes": 3,
  "active_processes": 0,
  "active_requests": 0,
  "max_requests": 8,
  "model_dir": "./server/storage",
  "max_models": 4
}
