# Setup a project

In [None]:
import digitalhub as dh

project = dh.get_or_create_project(f"{os.environ['USER']}-ml-service")

In [None]:
%%writefile src/monitor.py

import boto3
import io
import os
import requests
import json
import pandas as pd

BUCKET = "datalake"
KEY = f"{os.environ['PROJECT_NAME']}/monitordata.parquet"

def write_records(inputs,output):
    s3 = boto3.client('s3')
    d = {}
    for i in inputs:
        d[i['name']] = i['data']
    d['predict']=output
    ndf = pd.DataFrame(d)
    try:
        obj = s3.get_object(Bucket=BUCKET, Key=KEY)
        df = pd.read_parquet(io.BytesIO(obj['Body'].read()))
    except Exception as e:
        df = pd.DataFrame()
    
    df = pd.concat([df,ndf])
    df = df.tail(30000)
    
    df.to_parquet("monitoringdata.parquet")
    s3.upload_file("monitoringdata.parquet", BUCKET, KEY)

def init(context):
    url = os.getenv("SERVICE_URL")
    if not url:
        raise Exception("Missing SERVICE_URL env variable")
        
    setattr(context, "service", url)

def serve(context, event):
    context.logger.info(f"Received event: {event}")
    
    if isinstance(event.body, bytes):
        body = json.loads(event.body)
    else:
        body = event.body
        
    inputs = body["inputs"]
    res = requests.post(f"http://{context.service}", json=body)
    output_json = json.loads(res.text)    

    write_records(inputs, output_json['outputs'][0]['data'][0])
     
    return output_json

In [None]:
monitor = project.new_function(
    name="monitor", 
    kind="python", 
    python_version="PYTHON3_10", 
    code_src="src/monitor.py",     
    handler="serve",
    init_function="init"
)

Note the value of the SERVICE_URL property: it corresponds to the URL of the predictor service + '/v2/models/taxi-predictor/infer' endpoint.

In [None]:
monitor_run = monitor.run(action="serve", envs=[{"name": "SERVICE_URL", "value": "s-mlflowserveserve-9ea298144e8e4aeba0ad18b5e4aff70c.dslab-platform:8080/v2/models/taxi-predictor/infer"}])

## Fill in the production data

In [None]:
import pandas as pd

feb_data = pd.read_parquet('data/green_tripdata_2022-02.parquet')
feb_data.describe()

In [None]:
feb_data = feb_data[1001:2000]
num_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
cat_features = ["PULocationID", "DOLocationID"]
feb_data = feb_data[num_features + cat_features]
recs = feb_data.to_dict(orient='records')
inputs = []
for r in recs:
    inputs.append(
    [{'name': 'passenger_count', 'shape': [1], 'datatype': 'FP32', 'data': [r['passenger_count']]},
     {'name': 'trip_distance', 'shape': [1], 'datatype': 'FP32', 'data': [r['trip_distance']]},
     {'name': 'fare_amount', 'shape': [1], 'datatype': 'FP32', 'data': [r['fare_amount']]},
     {'name': 'total_amount', 'shape': [1], 'datatype': 'FP32', 'data': [r['total_amount']]},
     {'name': 'PULocationID', 'shape': [1], 'datatype': 'UINT32', 'data': [r['PULocationID']]},
     {'name': 'DOLocationID', 'shape': [1], 'datatype': 'UINT32', 'data': [r['DOLocationID']]}]
    )    

In [None]:
import requests

for i in inputs:
    inference_request = {
        "inputs": i
    }
    
    endpoint = f"http://{monitor_run.refresh().status.service["url"]}/v2/models/taxi-predictor/infer"
    response = requests.post(endpoint, json=inference_request)

# Evaluate data drift

In [None]:
%pip install evidently

### Define report model

In [None]:
from evidently import DataDefinition
from evidently import Dataset
from evidently import Report
from evidently.metrics import ValueDrift, DriftedColumnsCount, MissingValueCount
from evidently.presets import DataDriftPreset

num_features = ["passenger_count", "trip_distance", "fare_amount", "total_amount"]
cat_features = ["PULocationID", "DOLocationID"]

data_definition = DataDefinition(numerical_columns=num_features + ['prediction'], categorical_columns=cat_features)

report = Report(metrics=[
    ValueDrift(column='prediction'),
    DriftedColumnsCount(),
    DataDriftPreset(),
    MissingValueCount(column='prediction'),
], include_tests=True
)

### Recover reference dataset

In [None]:
import mlflow

project.get_model("taxi-predictor").download(overwrite=True)
model = mlflow.pyfunc.load_model('./model/model/')
df = project.get_dataitem('train-data').as_df()
prediction = model.predict(df[num_features + cat_features])
df['prediction'] = prediction

In [None]:
train_dataset = Dataset.from_pandas(
    df,
    data_definition
)


### Recover current dataset

In [None]:
import pandas as pd
import boto3
import io

s3 = boto3.client('s3')
obj = s3.get_object(Bucket='datalake', Key=f"{os.environ['USER']}-ml-service/monitordata.parquet")
current_df = pd.read_parquet(io.BytesIO(obj['Body'].read()))
current_df = current_df.rename(columns={'predict': 'prediction'})

val_dataset = Dataset.from_pandas(
    current_df,
    data_definition
)

In [None]:
snapshot = report.run(reference_data=train_dataset, current_data=val_dataset)

In [None]:
snapshot

In [None]:
snapshot.dict()