In [2]:
%env METAFLOW_PROFILE=oleg2

env: METAFLOW_PROFILE=oleg2


# Run the server

```
serve run server:batch_preds
```

In [3]:
import requests
from base import TabularBatchPrediction as TBP
import pandas as pd
import json

2023-10-28 13:36:38,913	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-10-28 13:36:39,332	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


In [4]:
hello_world_output = requests.post("http://localhost:8000").json()
assert hello_world_output['response'] == "Hello World!", "Are you sure you're running the server?"

In [5]:
def get_data() -> pd.DataFrame:
    tbp = TBP()
    _, valid_dataset, test_dataset = tbp.load_dataset()
    df = test_dataset.to_pandas()
    df["id"] = df.index
    return df, valid_dataset.to_pandas()["target"].values


def prepare_post_body(batch: pd.DataFrame) -> dict:
    id_to_batch = {}
    for record in batch:
        _id = record.pop("id")
        id_to_batch[_id] = record
    return id_to_batch


def query_predict_endpoint(batch_size=5, df=None):
    if df is None:
        df, true_targets = get_data()
    batch = df.sample(batch_size).to_dict("records")
    id_to_batch_features = prepare_post_body(batch)
    output = requests.post(
        "http://localhost:8000/predict/", data=json.dumps(id_to_batch_features)
    ).json()
    return output

In [9]:
df, true_targets = get_data()

No dataset name or loader provided. Using default breast_cancer.csv dataset.


2023-10-28 13:37:31,584	INFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.
2023-10-28 13:37:31,997	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-10-28 13:37:31,998	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-10-28 13:37:31,998	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


In [10]:
msg = "Prediction proba for sample input {} is {}. True target is {}."
output = query_predict_endpoint(batch_size=5, df=df)

In [11]:
for id, proba in output.items():
    print(msg.format(id, proba, true_targets[int(id)]))

Prediction proba for sample input 64 is 0.8402084112167358. True target is 1.
Prediction proba for sample input 158 is 0.8899078965187073. True target is 1.
Prediction proba for sample input 15 is 0.47248977422714233. True target is 1.
Prediction proba for sample input 25 is 0.8899078965187073. True target is 1.
Prediction proba for sample input 35 is 0.12808369100093842. True target is 0.


# What is running on the `/predict` endpoint on the server?

In [10]:
from ray.train.xgboost import XGBoostPredictor
import ray
from ray.train.batch_predictor import BatchPredictor
from metaflow import Run, Flow


def select_from_checkpoint_registry(flow_name="Train"):
    flow = Flow(flow_name)
    run = flow.latest_successful_run
    result = run.data.result
    return result.checkpoint


# duplicated in `query_predict_endpoint` above
batch_size = 2
if df is None:
    df = get_data()
batch = df.sample(batch_size).to_dict("records")
id_to_batch_features = prepare_post_body(batch)

features = ray.data.from_items(list(id_to_batch_features.values()))
checkpoint = select_from_checkpoint_registry()
predictor = BatchPredictor.from_checkpoint(checkpoint, XGBoostPredictor)
preds = predictor.predict(features).to_pandas()["predictions"].values
preds_payload = dict(zip(id_to_batch_features.keys(), preds))

2023-09-14 17:47:14,732	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(StandardScaler._transform_pandas)->MapBatches(ScoringWrapper)]
2023-09-14 17:47:14,734	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-09-14 17:47:14,735	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-09-14 17:47:14,766	INFO actor_pool_map_operator.py:117 -- MapBatches(StandardScaler._transform_pandas)->MapBatches(ScoringWrapper): Waiting for 1 pool actors to start...


In [11]:
preds_payload

{54: 0.89597195, 155: 0.89597195}