In [8]:
import requests
r = requests.get("http://localhost:8991/")
print("Состояние сервера:", r.status_code)
print(r.json())

Состояние сервера: 200
{'message': 'ML server is running'}


Реализуем функции для работы с клинетской частью:

In [9]:
MY_URL = "http://localhost:8991"

def client_fit(X, y, model_type="logreg", config={}):
    try:
        response = requests.post(f"{MY_URL}/fit", json={
            "model_type": model_type,
            "config": config,
            "X": X,
            "y": y
        })
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_fit] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_fit] Ошибка: {e}")

def client_load(name: str):
    try:
        response = requests.post(f"{MY_URL}/load", params={"name": name})
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_load] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_load] Ошибка: {e}")

def client_unload(name: str):
    try:
        response = requests.post(f"{MY_URL}/unload", params={"name": name})
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_unload] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_unload] Ошибка: {e}")

def client_predict(name: str, X):
    try:
        response = requests.post(f"{MY_URL}/predict", json={
            "name": name,
            "X": X
        })
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_predict] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_predict] Ошибка: {e}")

def client_remove(name: str):
    try:
        response = requests.post(f"{MY_URL}/remove", params={"name": name})
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_remove] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_remove] Ошибка: {e}")

def client_remove_all():
    try:
        response = requests.post(f"{MY_URL}/remove_all")
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"[client_remove_all] Ошибка HTTP: {response.text}")
    except Exception as e:
        print(f"[client_remove_all] Ошибка: {e}")

Проверим функционал сервера - обучение, загрузка, выгрузка, удаление, а также обработка исключений:

In [6]:
X = [[1, 2], [3, 4], [5, 6]]
y = [0, 1, 0]
resp = client_fit(X, y, model_type="logreg")
print(resp) 

{'message': "Обучение модели 'logreg' запущено", 'pid': 42}


In [7]:
model_name = resp["message"].split("'")[1]
resp = client_load(model_name)
print(resp)

{'message': "Модель 'logreg' загружена"}


In [8]:
X_new = [[7, 8], [9, 10]]
resp = client_predict(model_name, X_new)
print("predict:", resp)

predict: {'predictions': [0, 0]}


In [9]:
resp = client_load(model_name)
print(resp)

[client_load] Ошибка HTTP: {"detail":"Модель 'logreg' уже загружена"}
None


Видим "хорошую" ошибку

In [10]:
resp = client_unload(model_name)
print("unload:", resp)

unload: {'message': "Модель 'logreg' выгружена"}


In [11]:
resp = client_remove(model_name)
print("unload:", resp)

unload: {'message': "Модель 'logreg' удалена"}


In [12]:
resp = client_remove(model_name)
print("unload:", resp)

[client_remove] Ошибка HTTP: {"detail":"400: Файл модели не найден"}
unload: None


Опять "хорошая" ошибка

Теперь посмотрим на перегрузку ядер: одно ядро всегда зарезервировано для сервера, то есть так как в .env "N_CORES=4 MAX_LOADED=2" - 3 модель должны обучаться нормально, а на 4 выдавать ошибку. 

In [13]:
# logreg
X1 = [[1, 2], [3, 4], [5, 6]]
y1 = [0, 1, 0]
resp1 = client_fit(X1, y1, model_type="logreg")
print("LogReg:", resp1)

# linreg
X2 = [[10, 20], [30, 40], [50, 60]]
y2 = [100, 200, 300]
resp2 = client_fit(X2, y2, model_type="linreg")
print("LinReg:", resp2)

# random forest
X3 = [[0, 1], [1, 0], [0.5, 0.5]]
y3 = [1.0, 0.0, 0.5]
resp3 = client_fit(X3, y3, model_type="RandomForest")
print("RandomForest:", resp3)

# random forest2
X4 = [[0, 2], [1, 3], [0.5, 0.8]]
y4 = [1.5, 0.0, 1.0]
resp4 = client_fit(X4, y4, model_type="RandomForest")
print("RandomForest2:", resp4)

LogReg: {'message': "Обучение модели 'logreg' запущено", 'pid': 54}
LinReg: {'message': "Обучение модели 'linreg' запущено", 'pid': 66}
RandomForest: {'message': "Обучение модели 'RandomForest' запущено", 'pid': 67}
[client_fit] Ошибка HTTP: {"detail":"Нет доступных ядер для обучения модели"}
RandomForest2: None


А отдельно дополнительно обучить 4-ую модель - можно:

In [14]:
# random forest2
X4 = [[0, 2], [1, 3], [0.5, 0.8]]
y4 = [1.5, 0.0, 1.0]
resp4 = client_fit(X4, y4, model_type="RandomForest")
print("RandomForest2:", resp4)

RandomForest2: {'message': "Обучение модели 'RandomForest_1' запущено", 'pid': 68}


Так и происходит, причем в директории models приcутствуют только 3 созраненные модели. Аналогично с загрузкой - 2 можно, а 3 - уже нельзя

In [15]:
print("Загрузка logreg:", client_load("logreg"))
print("Загрузка linreg:", client_load("linreg"))
print("Загрузка RandomForest:", client_load("RandomForest"))

Загрузка logreg: {'message': "Модель 'logreg' загружена"}
Загрузка linreg: {'message': "Модель 'linreg' загружена"}
[client_load] Ошибка HTTP: {"detail":"Превышен лимит одновременно загруженных моделей"}
Загрузка RandomForest: None


In [10]:
print(client_remove_all())

{'message': 'ВСе модели удалены'}


Теперь посмотрим на работу на относительно больших данных, вначале напишем функции для их генерации:

In [14]:
import numpy as np

def generate_classification_dataset(n_samples=400000, n_features=100):
    X = np.random.rand(n_samples, n_features).tolist()
    y = np.random.randint(0, 2, size=n_samples).tolist()
    return X, y

def generate_regression_dataset(n_samples=400000, n_features=100):
    X = np.random.rand(n_samples, n_features).tolist()
    y = np.random.rand(n_samples).tolist()
    return X, y

Сгенерируем данные

In [6]:
X_log, y_log = generate_classification_dataset()
X_lin, y_lin = generate_regression_dataset()

А теперь проведем последовательный вызов двух различных моделей и измерим время:

In [7]:
import time
results = {}

print("\n[Обучение] Модель: logreg")
start = time.time()
resp_log = client_fit(X_log, y_log, model_type="logreg")
duration = time.time() - start
print(f"logreg обучена за {duration:.2f} секунд")
results["logreg"] = {"X": X_log[:10]}

print("\n[Обучение] Модель: linreg")
start = time.time()
resp_lin = client_fit(X_lin, y_lin, model_type="linreg")
duration = time.time() - start
print(f"linreg обучена за {duration:.2f} секунд")
results["linreg"] = {"X": X_lin[:10]}


[Обучение] Модель: logreg
logreg обучена за 72.02 секунд

[Обучение] Модель: linreg
linreg обучена за 79.02 секунд


Ради интереса загрузим их и посмотрим на предсказание:

In [10]:
print("Загрузка logreg:", client_load("logreg"))
print("Загрузка linreg:", client_load("linreg"))

Загрузка logreg: {'message': "Модель 'logreg' загружена"}
Загрузка linreg: {'message': "Модель 'linreg' загружена"}


Для предсказания сгенерируем новые данные:

In [15]:
X_log_new, _ = generate_classification_dataset(n_samples=10)
X_lin_new, _ = generate_regression_dataset(n_samples=10)

Наконец выведем предсказание:

In [11]:
print("\n[Предсказание] logreg:")
start = time.time()
response_log = client_predict("logreg", X_log_new)
duration = time.time() - start
print("Предсказания logreg:", response_log)
print(f"logreg предсказала за {duration:.4f} секунд")

print("\n[Предсказание] linreg:")
start = time.time()
response_lin = client_predict("linreg", X_lin_new)
duration = time.time() - start
print("Предсказания linreg:", response_lin)
print(f"linreg предсказала за {duration:.4f} секунд")


[Предсказание] logreg:
Предсказания logreg: {'predictions': [1, 0, 1, 1, 1, 1, 0, 1, 1, 0]}
logreg предсказала за 0.0167 секунд

[Предсказание] linreg:
Предсказания linreg: {'predictions': [0.4996118240098124, 0.4987946728844853, 0.5018381878534693, 0.49282494333190524, 0.4951154400123711, 0.504658513351744, 0.5051802775714751, 0.5036603955837079, 0.505568125420329, 0.4966195842874297]}
linreg предсказала за 0.0090 секунд


Для асинхронной части сделаю следующее: Jupyter Notebook не справляется с управлением параллельными процессами из-за того, что используется multiprocessing.Process на стороне сервера, что требует запуска в отдельном процессе.

При вызове client_fit() для двух моделей в отдельных потоках из ноутбука ядро Jupyter крашится так как не справляется с отправкой больших массивов данных.

Чтобы обойти эту проблему, я вынес вызов асинхронного обучения в отдельный скрипт client_async.py, запускаемый из терминала. Он:

Загружает данные на диск (/tmp/...npy),

Отправляет запросы на /fit_from_file которую я дополнительно прописал на сторое сервера,

И позволяет протестировать параллельное обучение моделей.

Для того, чтобы измерить время исполнения, пришлось добавить в серверную часть сроку для печати текущего времени, а затем смотреть на время начала и завершения обучения в логах, иначе я могу измерить только время исполенния запроса, а не обучения.

In [12]:
!python3 client_async.py

[logreg] Запуск обучения...
[linreg] Запуск обучения...
[logreg] Запрос на обучение отправлен.
[linreg] Запрос на обучение отправлен.
Ждем завершения обучения моделей (примерно 90 секунд), а затем смотрб время в логах

[ИТОГО] Клиент завершил работу за 0.38 секунд


В логах вывод:

Начало обучения ./models/logreg_1.pkl в 2025-06-27 20:50:18.910673

Модель сохранена в ./models/logreg_1.pkl

Обучение модели ./models/logreg_1.pkl завершено в 2025-06-27 20:50:20.182342

Начало обучения ./models/linreg_1.pkl в 2025-06-27 20:50:18.919593

Модель сохранена в ./models/linreg_1.pkl

Обучение модели ./models/linreg_1.pkl завершено в 2025-06-27 20:50:23.254491

То есть асинхронное обучение значительно быстрее

Наконец, сделаем асинхронное предсказание (на тех же данных, что и последовательное)

In [19]:
import threading
import time

results = {}

print("Загрузка logreg:", client_load("logreg_1"))
print("Загрузка linreg:", client_load("linreg_1"))

def predict_async(name, X):
    start = time.time()
    try:
        res = client_predict(name, X)
        duration = time.time() - start
        results[name] = {
            "result": res,
            "duration": duration
        }
        print(f"[{name}] завершено за {duration:.2f} секунд")
    except Exception as e:
        print(f"[{name}] ошибка: {e}")

t1 = threading.Thread(target=predict_async, args=("logreg_1", X_log_new))
t2 = threading.Thread(target=predict_async, args=("linreg_1", X_lin_new))

start_total = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
end_total = time.time()

print(f"\n[ИТОГО] Асинхронное предсказание заняло {end_total - start_total:.2f} секунд")

for name, r in results.items():
    print(f"\n{name}: предсказание заняло {r['duration']:.2f} секунд")
    print("Результат:", r["result"])

Загрузка logreg: {'message': "Модель 'logreg_1' загружена"}
Загрузка linreg: {'message': "Модель 'linreg_1' загружена"}
[logreg_1] завершено за 0.02 секунд
[linreg_1] завершено за 0.02 секунд

[ИТОГО] Асинхронное предсказание заняло 0.03 секунд

logreg_1: предсказание заняло 0.02 секунд
Результат: {'predictions': [1, 0, 0, 1, 1, 1, 0, 0, 0, 1]}

linreg_1: предсказание заняло 0.02 секунд
Результат: {'predictions': [0.5035105698242556, 0.498693189052693, 0.5058725697037877, 0.49716803828232065, 0.49565610246348646, 0.49594751029734185, 0.5102173742849082, 0.5078504681797146, 0.5047366905308756, 0.4998076991499291]}
