In [1]:
import requests
from pydantic import BaseModel

# Testing BentoML Service

It demonstrates how to interact with the API endpoints using example requests and prints the responses.

Note that the server must be running locally at `http://localhost:3000` and MLflow serveice must be accessible.

In [2]:
BENTOML_URL = "http://localhost:3000"

In [3]:
class SpaceflightInput(BaseModel):
    engines: float
    passenger_capacity: int
    crew: float
    d_check_complete: bool
    moon_clearance_complete: bool
    iata_approved: bool
    company_rating: float
    review_scores_rating: float

class ModelURI(BaseModel):
    model_name: str
    model_version: int

### List registered models in MLflow

In [11]:
import mlflow

mlflow.set_tracking_uri("http://localhost:5000")
client = mlflow.tracking.MlflowClient()

# Recupera tutti i modelli registrati
for rm in client.search_registered_models():
    print("Model:", rm.name)
    for v in rm.latest_versions:
        print(f" Version: {v.version}, Stage: {v.current_stage}")


Model: spaceflights-kedro
 Version: 1, Stage: None


## POST /import_model
### Import a model from MLflow Model Registry


### Request body:
```json
{
  "model_name": <string>,
  "model_version": <int>
}
```

### Response:
```json
{
  "message": "Model <model_name>/<model_version> is being imported and loaded."
}
```

In [22]:
def import_model(model_uri: ModelURI):
    url = f"{BENTOML_URL}/import_model"
    payload = {"model_uri": model_uri.model_dump()}
    response = requests.post(url, json=payload)
    return response.json()
    
model_uri = ModelURI(model_name="spaceflights-kedro", model_version=1)

res_import = import_model(model_uri)
print("Import model response:", res_import)
# Note: heavy computation, may take a while

Import model response: {'message': 'Model spaceflights-kedro/1 is being imported and loaded.'}


# POST /predict
### Predict using the loaded model

### Request body:    
```json
{
    "engines": 2,
    "passenger_capacity": 120,
    "crew": 8,
    "d_check_complete": true,
    "moon_clearance_complete": true,
    "iata_approved": true,
    "company_rating": 4.5,
    "review_scores_rating": 95.0
}
```
    
### Response:    
```json
{
    "prediction": [<prediction>]
}
```

In [30]:
def predict(input_data: SpaceflightInput):
    url = f"{BENTOML_URL}/predict"
    payload = {"input_data": input_data.model_dump()}
    response = requests.post(url, json=payload)

    try:
        return response.json()
    except Exception:
        print("Response non JSON:")
        print(response.status_code)
        print(response.text)
        return {"error": "Response is not valid JSON"}

example_input = SpaceflightInput(
    engines=2,
    passenger_capacity=4,
    crew=1,
    d_check_complete=True,
    moon_clearance_complete=True,
    iata_approved=True,
    company_rating=4.5,
    review_scores_rating=8.0
)

res_pred = predict(example_input)
print("Prediction response:", res_pred)

Prediction response: {'prediction': [2572.5585979712696]}


# Testing production-api endpoints

In [ ]:
PRODUCTION_API = "http://localhost:8100"

## 1. GET /reference 

This endpoint loads the reference dataset used for drift detection.
MinIO must be running and the reference dataset must be available in the 'data' directory. 

In [23]:
def load_reference():
    url = f"{PRODUCTION_API}/reference"
    response = requests.get(url)
    return response.json()
    

res_import = load_reference()
print("Load reference response:", res_import)

Load reference response: {'status': 'OK', 'message': 'Predictions loaded on MinIO', 'file_uploaded': '/app/data/final_input_table.csv'}


## 2. POST /generate

This endpoint generates a new dataset and stores it in MinIO.

Setting the `drift` parameter to `True` introduces drift in the generated dataset.

In [29]:
def generate_dataset():
    url = f"{PRODUCTION_API}/generate"
    payload = {"drift": False}
    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        return {"error": str(e)}

generate_dataset()

{'status': 'OK',
 'message': 'Current dataset generated and uploaded to MinIO',
 'drift': False}

In [27]:
def generate_drifted_dataset():
    url = f"{PRODUCTION_API}/generate"
    payload = {"drift": True}
    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        return {"error": str(e)}
    
generate_drifted_dataset()

{'status': 'OK',
 'message': 'Current dataset generated and uploaded to MinIO',
 'drift': True}

## 3. GET /analyze

This endpoint performs drift analysis on the generated dataset compared to the reference dataset.
The reports and tests generated are available in the Evidently dashboard.

In case of failed tests, the kedro pipeline will be run. You can check the new model version in MLflow Model Registry.

In [30]:
def analyze():
    url = f"{PRODUCTION_API}/analyze"
    try:
        response = requests.get(url)
        response.raise_for_status()
        return response.json()
    except requests.RequestException as e:
        return {"error": str(e)}
    
analyze()

{'status': 'OK',
 'message': 'Evidently analysis completed',
 'failed_data_tests': 0,
 'failed_model_tests': 0,
 'kedro_pipeline_triggered': False}