In [None]:

import time
import os,json,sys, boto3
import requests, operator
from dkube.sdk import *
from dkube.sdk.api import DkubeApi
from dkube.sdk.rsrcs import DkubeModelmonitor
from dkube.sdk.rsrcs.modelmonitor import DatasetClass,ModelType,DriftAlgo, DataType
from dkube.sdk.rsrcs.modelmonitor import DatasetFormat,DkubeModelmonitorAlert, TimeZone

#### Configuration

In [None]:
# Tune the following parameters in this cell

# IP of the DKube setup without https://
# and without port number eg: 1.2.3.4
DKUBE_IP = ""

DEPLOYMENT_NAME = "insurance-demo"
## Define the model monitor name here that you will be creating 
MONITOR_NAME = DEPLOYMENT_NAME

# the frequency with which monitoring will run, value will be considered in minutes
RUN_FREQUENCY = 5

## specify your Dkube username
user = DKUBEUSERNAME = os.getenv("DKUBE_USER_LOGIN_NAME","")
## Dkube information.
TOKEN = os.getenv("DKUBE_USER_ACCESS_TOKEN","")
DKUBE_URL = os.getenv("DKUBE_URL","")


if not(TOKEN or DKUBEUSERNAME or DKUBE_URL or DKUBE_IP):
    print("Please fill the Dkube details first (TOKEN, DKUBE_URL, DKUBEUSERNAME, DKUBE_IP)")
    raise TypeError



### Monitor Inputs

In [None]:
# Read inputs
with open("inputs.json") as f:
    teamb_inputs = json.load(f)
    

s3_client = boto3.client('s3', 
                        endpoint_url = teamb_inputs["s3_endpoint"],
                        aws_access_key_id=teamb_inputs['s3_accesskeyid'],
                        aws_secret_access_key=teamb_inputs['s3_accesskey'])
    
artifacts_data = s3_client.get_object(Bucket=teamb_inputs["s3_bucket"], 
                                Key=teamb_inputs["artifacts_json"]) \
                .get('Body').read().decode("utf-8")


artifacts = json.loads(artifacts_data)
monitor_exports = artifacts["monitor_inputs"]

DKUBE_TRAIN_DATASET = MONITOR_NAME + "-train"

api = DkubeApi(URL=DKUBE_URL,token=TOKEN)

#1. Create train dataset
dataset = DkubeDataset(user, name=DKUBE_TRAIN_DATASET)
dataset.update_s3_details(endpoint=teamb_inputs["s3_endpoint"], 
                        bucket=teamb_inputs["s3_bucket"], 
                        prefix=monitor_exports["traindata"], 
                        key=teamb_inputs["s3_accesskeyid"], 
                        secret=teamb_inputs["s3_accesskey"])
    
api.create_dataset(dataset)

#2. Download train data transformer
s3_client.download_file(teamb_inputs["s3_bucket"], 
                        monitor_exports["traindata_transformer"],
                        "transform-data.py")

#3. Download metrics
s3_client.download_file(teamb_inputs["s3_bucket"], 
                        monitor_exports["thresholds"],
                        "thresholds.json")

#4. Get model specific parameters
MODEL_TYPE = monitor_exports.get("model_type", "regression")
MODEL_INPUT = monitor_exports.get("model_input", "tabular")
DATA_TZ = monitor_exports.get("data_tz", "UTC")

text_file = open("transform-data.py", "r")
#read whole file to a string
script = text_file.read()
#close file
text_file.close()

with open('thresholds.json') as f:
    thresholds = json.load(f)

### Utility Functions

In [None]:
def get_dataset_version(username, dataset_name, version):
    dataset_versions = api.get_dataset_versions(username, dataset_name)
    versions = []
    for each_version in dataset_versions:
        if each_version["version"]["name"] == version:
            uuid = each_version["version"]["uuid"]
            return f"{version}:{uuid}"
        else:
            versions.append(each_version["version"]["name"])
    return f"dataset version {version} not found, available version are {versions}"

In [None]:
#### Datasets for Monitoring

In [None]:
# Train Dataset
training_data = f'{DKUBE_TRAIN_DATASET}:'+ DKUBEUSERNAME
train_data_version = get_dataset_version(DKUBEUSERNAME,
                                            DKUBE_TRAIN_DATASET, "v1")
# Predict Dataset is derived internally
predict_data_format = "cloudeventlogs"

# Label Dataset - Create a placeholder in Minio
LIVE_DATASET = MONITOR_NAME + "-labels"
labelled_data = LIVE_DATASET + ":"+ DKUBEUSERNAME


# Get the cloudevents bucket creds

response =  api.get_cloudevents_logstore_creds()
               
MINIO_KEY = response["access_key_id"]
MINIO_SECRET_KEY = response["access_key"]
MINIO_BUCKET = response["bucket"]

MINIO_ENDPOINT = f"http://{DKUBE_IP}:32221"

try:
    dataset = DkubeDataset(DKUBEUSERNAME, name=LIVE_DATASET,remote=True)
    dataset.update_dataset_source('s3')
    dataset.update_s3_details(
        endpoint = MINIO_ENDPOINT,
        bucket=MINIO_BUCKET,
        prefix='',
        key=MINIO_KEY,
        secret=MINIO_SECRET_KEY)
    api.create_dataset(dataset)

except Exception as e:
    if e.reason:
        if e.reason.lower() != "conflict":
            response = e.body
            print(f"Failed[{response.code}]: {response.message}")
    else:
        raise e

### Model Monitor

In [None]:
DEPLOYMENT_ID = api.get_deployment_id(name=DEPLOYMENT_NAME)
mm=DkubeModelmonitor(deployemnt_id = DEPLOYMENT_ID)

In [None]:
mm.update_modelmonitor_basics(model_type=MODEL_TYPE, 
                               input_data_type=MODEL_INPUT,
                               data_timezone=DATA_TZ)

In [None]:
mm.add_thresholds(thresholds=thresholds)

## Health Monitoring

In [None]:
mm.update_deployment_monitoring_details(enabled=True, frequency=1)

## Add Drift monitoring details

In [None]:
mm.update_drift_monitoring_details(enabled=True,frequency=5,algorithm='auto')

### Add Train, Prediction, and Labelled Datasets

In [None]:
mm.add_datasources(data_class=str(DatasetClass.Train),name=training_data,
                   data_format=MODEL_INPUT,
                   version=train_data_version,transformer_script = script)

mm.add_datasources(data_class=str(DatasetClass.Predict),
                   data_format=predict_data_format,)

mm.add_datasources(data_class=str(DatasetClass.Labelled),name=labelled_data,
                   data_format=MODEL_INPUT, s3_subpath=DEPLOYMENT_ID + "/livedata",
                   predict_col="charges",groundtruth_col="GT_target",timestamp_col="timestamp")

### Performance Monitoring

In [None]:
mm.update_performance_monitoring_details(enabled=True,source_type="labelled_data",frequency=5)

### Create Model monitor

In [None]:
api.modelmonitor_create(mm,wait_for_completion=True)

### Extracting id of the Model Monitor

In [None]:
# The function below can be used to fetch model monitor ID by name.
# The monitor id will be same as deployment id.
# id = api.modelmonitor_get_id(MONITOR_NAME)
id = DEPLOYMENT_ID
print(id)

### Schema update

In [None]:
api.modelmonitor_update_schema(id,label='charges',schema_class='continuous',schema_type="prediction_output",selected=False)
api.modelmonitor_update_schema(id,label='unique_id',schema_class='continuous',schema_type="row_id",selected=False)
api.modelmonitor_update_schema(id,label='timestamp',schema_class='continuous',schema_type="timestamp",selected=False)

## age and bmi to continuous
api.modelmonitor_update_schema(id,label='age',schema_class='continuous',schema_type='input_feature',selected=True)
api.modelmonitor_update_schema(id,label='bmi',schema_class='continuous',schema_type='input_feature',selected=True)

## select these features
api.modelmonitor_update_schema(id,label='sex',schema_type='input_feature',schema_class='categorical',selected=True)
api.modelmonitor_update_schema(id,label='children',schema_type='input_feature',schema_class='categorical',selected=True)
api.modelmonitor_update_schema(id,label='smoker',schema_type='input_feature',schema_class='categorical',selected=True)
api.modelmonitor_update_schema(id,label='region',schema_type='input_feature',schema_class='categorical',selected=True)

### Add alerts

#### Deployment Health Alert

In [None]:
alert = DkubeModelmonitorAlert(name='latency_alert', alert_class = 'deployment_health')
alert.add_alert_condition(metric='latency_avg',threshold=300, op=operator.gt)
api.modelmonitor_add_alert(id,alert)

#### Feature Alert

In [None]:
alert = DkubeModelmonitorAlert(name='age_alert', alert_class = 'feature_drift')
alert.add_alert_condition(feature='age',threshold=0.02, op=operator.lt)
api.modelmonitor_add_alert(id,alert)

#### Performance Alert

In [None]:
alert = DkubeModelmonitorAlert(name='mae_alert', alert_class = 'performance_decay')
alert.add_alert_condition(metric='mae',threshold=2000, op=operator.gt)
api.modelmonitor_add_alert(id,alert)

### Start the model monitor

In [None]:
api.modelmonitor_start(id)

### Retraining / Rebaselining Model Monitor

### Cleanup

In [None]:
CLEANUP = False
if CLEANUP:
    from time import sleep
    RETRIES = 4
    while RETRIES:
        mm = api.modelmonitor_get(id)
        if mm["status"] and mm["status"]["state"].lower() != "active":
            break
        elif mm["status"] and mm["status"]["state"].lower() == "active":
            api.modelmonitor_stop(id)
        RETRIES -= 1
        sleep(5)
    else:
        raise TimeoutError("modelmonitor failed to stopped")
    api.modelmonitor_delete(id)
    