# CatBoost incremental
## 🧬 Serving

Author: https://github.com/deburky

### API

In [None]:
!uv pip install -e ../.

In [None]:
import ray
from ray import serve

from catboost_incremental.serve_ray import CatBoostModelDeployment

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Deploy the CatBoost model with Ray Serve
model_path = "../models/cb_model.cbm"
app = CatBoostModelDeployment.bind(model_path=model_path)

# Start Ray Serve (no need for uvicorn here, as Serve is already managing the server)
serve.start(detached=True, http_options={"host": "0.0.0.0", "port": 8000})

# Run the app to expose the endpoint
serve.run(app, route_prefix="/predict")

In [None]:
import asyncio

import httpx
import nest_asyncio
import pyarrow.dataset as ds

nest_asyncio.apply()

API_URL = "http://127.0.0.1:8000/predict"
DATA_PATH = "../data"
CHUNK_SIZE = 50_000
MAX_CONCURRENT_REQUESTS = 20


def load_rows_in_chunks(path: str, chunk_size: int):
    """Yields row dictionaries in chunks from Parquet."""
    dataset = ds.dataset(path, format="parquet")
    for batch in dataset.to_batches(batch_size=chunk_size):
        df = batch.to_pandas()
        df = df.drop(columns=["partition_id", "target"], errors="ignore")
        yield df.to_dict(orient="records")


async def run_chunk(rows, chunk_idx, client, sem):
    """Async function to send a chunk of rows to the API."""
    async with sem:
        try:
            resp = await client.post(API_URL, json=rows)
            print(f"Chunk {chunk_idx} => {resp.json()}")
        except Exception as e:
            print(f"Chunk {chunk_idx} failed: {e}")


async def main():
    """Main function to run all chunks concurrently."""
    sem = asyncio.Semaphore(MAX_CONCURRENT_REQUESTS)
    async with httpx.AsyncClient() as client:
        tasks = []
        tasks.extend(
            run_chunk(rows, chunk_idx, client, sem)
            for chunk_idx, rows in enumerate(load_rows_in_chunks(DATA_PATH, CHUNK_SIZE))
        )
        await asyncio.gather(*tasks)


await main()

In [None]:
import ray
from ray import serve

# Properly shut down Ray Serve and Ray
serve.shutdown()
ray.shutdown()

print("Ray and Ray Serve have been shut down.")
