## Vertex AI Blog Post - Vertex AI Model monitoring capabilities 

In [None]:
import os
import sys
import pandas as pd
import json

import time

from google.cloud.aiplatform import gapic as aip
from google.protobuf import json_format
from google.protobuf.json_format import MessageToJson, ParseDict
from google.protobuf.struct_pb2 import Struct, Value


*Reference:*

Training code source : [Link](https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/master/notebooks/community/gapic/automl/showcase_automl_tabular_binary_classification_batch.ipynb)

## Install Python dependencies

In [None]:

#! pip3 install -U google-cloud-aiplatform $USER_FLAG
#! pip3 install -U google-cloud-storage $USER_FLAG

In [None]:
###  Restart the Kernel

## GCP Configurations

In [None]:
REGION = "us-central1" 
PROJECT_ID = "vertex-ai-blog"  #Replace with your GCP Project
print("Project ID:", PROJECT_ID)


In [None]:
! gcloud config set project $PROJECT_ID

In [None]:
DEPLOY_GPU, DEPLOY_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_K80, 1)
MACHINE_TYPE = "n1-standard"
VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU

In [None]:
# API service endpoint
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)

# Vertex location root path for your dataset, model and endpoint resources
PARENT = "projects/" + PROJECT_ID + "/locations/" + REGION

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Create GCS Bucket

In [None]:
BUCKET_NAME = "gs://vertex-ai-blog" 

In [None]:
!gsutil mb -l $REGION $BUCKET_NAME

In [None]:
#Test access to the bucket
!gsutil ls -al $BUCKET_NAME

In [None]:
# client options - same for all services
client_options = {"api_endpoint": API_ENDPOINT}

# Create client instances for key tasks to be performed
def create_dataset_client():
    client = aip.DatasetServiceClient(client_options=client_options)
    return client


def create_model_client():
    client = aip.ModelServiceClient(client_options=client_options)
    return client


def create_pipeline_client():
    client = aip.PipelineServiceClient(client_options=client_options)
    return client

# Needed for batch prediction
def create_job_client():
    client = aip.JobServiceClient(client_options=client_options)
    return client

# Endpoint creation
def create_endpoint_client():
    client = aip.EndpointServiceClient(client_options=client_options)
    return client

# Needed for Prediction call
def create_prediction_client():
    client = aip.PredictionServiceClient(client_options=client_options)
    return client


clients = {}
clients["dataset"] = create_dataset_client()
clients["model"] = create_model_client()
clients["pipeline"] = create_pipeline_client()
clients["job"] = create_job_client()
clients["endpoint"] = create_endpoint_client()
clients["prediction"] = create_prediction_client()

### Create Dataset

In [None]:
# Bank marketing Dataset
# Dataset Source: [Moro et al., 2014] S. Moro, P. Cortez and P. Rita. A Data-Driven Approach to Predict 
# the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22-31, June 2014

IMPORT_FILE = "gs://cloud-ml-tables-data/bank-marketing.csv"

In [None]:
#Alternate way of creating dataset in Vertex
from typing import List, Union

from google.cloud import aiplatform

def create_and_import_dataset_tabular_gcs_sample(
    display_name: str, project: str, location: str, gcs_source: Union[str, List[str]],):

    aiplatform.init(project=project, location=location)

    dataset = aiplatform.TabularDataset.create(
        display_name=display_name, gcs_source=gcs_source,)

    dataset.wait()

    print(f'\tDataset: "{dataset.display_name}"')
    print(f'\tname: "{dataset.resource_name}"')
    
    return(dataset.resource_name)

In [None]:
dataset_id = create_and_import_dataset_tabular_gcs_sample("bank-" + TIMESTAMP, PROJECT_ID, REGION, IMPORT_FILE)

### Create training pipeline


In [None]:
def create_pipeline(pipeline_name, model_name, dataset, schema, task):

    dataset_id = dataset.split("/")[-1]

    input_config = {
        "dataset_id": dataset_id,
        "fraction_split": {
            "training_fraction": 0.8,
            "validation_fraction": 0.1,
            "test_fraction": 0.1,
        },
    }

    training_pipeline = {
        "display_name": pipeline_name,
        "training_task_definition": schema,
        "training_task_inputs": task,
        "input_data_config": input_config,
        "model_to_upload": {"display_name": model_name},
    }

    try:
        pipeline = clients["pipeline"].create_training_pipeline(
            parent=PARENT, training_pipeline=training_pipeline
        )
        print(pipeline)
    except Exception as e:
        print("exception:", e)
        return None
    return pipeline

In [None]:
label_column = 'Deposit'

In [None]:
TRANSFORMATIONS = [
    {"auto": {"column_name": "Age"}},
    {"auto": {"column_name": "Job"}},
    {"auto": {"column_name": "MaritalStatus"}},
    {"auto": {"column_name": "Education"}},
    {"auto": {"column_name": "Default"}},
    {"auto": {"column_name": "Balance"}},
    {"auto": {"column_name": "Housing"}},
    {"auto": {"column_name": "Loan"}},
    {"auto": {"column_name": "Contact"}},
    {"auto": {"column_name": "Day"}},
    {"auto": {"column_name": "Month"}},
    {"auto": {"column_name": "Duration"}},
    {"auto": {"column_name": "Campaign"}},
    {"auto": {"column_name": "PDays"}},
    {"auto": {"column_name": "POutcome"}},
]

In [None]:
PIPE_NAME = "bank_pipe-" + TIMESTAMP
MODEL_NAME = "bank_model-" + TIMESTAMP

TRAINING_SCHEMA = "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_tables_1.0.0.yaml"

task = Value(
    struct_value=Struct(
        fields={
            "target_column": Value(string_value=label_column),
            "prediction_type": Value(string_value="classification"),
            "train_budget_milli_node_hours": Value(number_value=1000),
            "disable_early_stopping": Value(bool_value=False),
            "transformations": json_format.ParseDict(TRANSFORMATIONS, Value()),
        }
    )
)

response = create_pipeline(PIPE_NAME, MODEL_NAME, dataset_id, TRAINING_SCHEMA, task)

Now save the unique identifier of the training pipeline you created.

In [None]:
# The full unique ID for the pipeline
pipeline_id = response.name
# The short numeric ID for the pipeline
pipeline_short_id = pipeline_id.split("/")[-1]

print(pipeline_id)

In [None]:
def get_training_pipeline(name, silent=False):
    response = clients["pipeline"].get_training_pipeline(name=name)
    if silent:
        return response

    print("pipeline")
    print(" name:", response.name)
    print(" display_name:", response.display_name)
    print(" state:", response.state)
    print(" training_task_definition:", response.training_task_definition)
    print(" training_task_inputs:", dict(response.training_task_inputs))
    print(" create_time:", response.create_time)
    print(" start_time:", response.start_time)
    print(" end_time:", response.end_time)
    print(" update_time:", response.update_time)
    print(" labels:", dict(response.labels))
    return response


response = get_training_pipeline(pipeline_id)

In [None]:
#Monitor when the training job would be completed
while True:
    response = get_training_pipeline(pipeline_id, True)
    if response.state != aip.PipelineState.PIPELINE_STATE_SUCCEEDED:
        print("Training job has not completed:", response.state)
        model_to_deploy_id = None
        if response.state == aip.PipelineState.PIPELINE_STATE_FAILED:
            raise Exception("Training Job Failed")
    else:
        model_to_deploy = response.model_to_upload
        model_to_deploy_id = model_to_deploy.name
        print("Training Time:", response.end_time - response.start_time)
        break
    time.sleep(60)

print("model to deploy:", model_to_deploy_id)

### Evaluation

In [None]:
def list_model_evaluations(name):
    response = clients["model"].list_model_evaluations(parent=name)
    for evaluation in response:
        print("model_evaluation")
        print(" name:", evaluation.name)
        print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
        metrics = json_format.MessageToDict(evaluation._pb.metrics)
        for metric in metrics.keys():
            print(metric)
        print("logloss", metrics["logLoss"])
        print("auPrc", metrics["auPrc"])

    return evaluation.name

last_evaluation = list_model_evaluations(model_to_deploy_id)

In [1]:
#For Testing - Delete
#model_to_deploy_id = "projects/92852031310/locations/us-central1/models/4681676530605096960"
model_to_deploy_id = "projects/92852031310/locations/us-central1/models/2880518154633609216"


### Deploy Model for online predictions and monitoring demo


In [None]:
MIN_NODES = 1
MAX_NODES = 1

In [None]:
ENDPOINT_NAME = "bank_endpoint-" + TIMESTAMP


def create_endpoint(display_name):
    endpoint = {"display_name": display_name}
    response = clients["endpoint"].create_endpoint(parent=PARENT, endpoint=endpoint)
    print("Long running operation:", response.operation.name)

    result = response.result(timeout=300)
    print("result")
    print(" name:", result.name)
    print(" display_name:", result.display_name)
    print(" description:", result.description)
    print(" labels:", result.labels)
    print(" create_time:", result.create_time)
    print(" update_time:", result.update_time)
    return result


result = create_endpoint(ENDPOINT_NAME)

In [None]:
# The full unique ID for the endpoint
endpoint_id = result.name
# The short numeric ID for the endpoint
endpoint_short_id = endpoint_id.split("/")[-1]

print(endpoint_id)

In [None]:
DEPLOYED_NAME = "bank_deployed-" + TIMESTAMP


def deploy_model(
    model, deployed_model_display_name, endpoint, traffic_split={"0": 100}
):

    if DEPLOY_GPU:
        machine_spec = {
            "machine_type": DEPLOY_COMPUTE,
            "accelerator_type": DEPLOY_GPU,
            "accelerator_count": DEPLOY_NGPU,
        }
    else:
        machine_spec = {
            "machine_type": DEPLOY_COMPUTE,
            "accelerator_count": 0,
        }

    deployed_model = {
        "model": model,
        "display_name": deployed_model_display_name,
        "dedicated_resources": {
            "min_replica_count": MIN_NODES,
            "max_replica_count": MAX_NODES,
            "machine_spec": machine_spec,
        },
        "disable_container_logging": False,
    }

    response = clients["endpoint"].deploy_model(
        endpoint=endpoint, deployed_model=deployed_model, traffic_split=traffic_split
    )

    print("Long running operation:", response.operation.name)
    result = response.result()
    print("result")
    deployed_model = result.deployed_model
    print(" deployed_model")
    print("  id:", deployed_model.id)
    print("  model:", deployed_model.model)
    print("  display_name:", deployed_model.display_name)
    print("  create_time:", deployed_model.create_time)

    return deployed_model.id


deployed_model_id = deploy_model(model_to_deploy_id, DEPLOYED_NAME, endpoint_id)

In [None]:
endpoint_id

## Online Prediction

In [None]:
INSTANCE = {
    "Age": "58",
    "Job": "managment",
    "MaritalStatus": "married",
    "Education": "teritary",
    "Default": "no",
    "Balance": "2143",
    "Housing": "yes",
    "Loan": "no",
    "Contact": "unknown",
    "Day": "5",
    "Month": "may",
    "Duration": "261",
    "Campaign": "1",
    "PDays": "-1",
    "Previous": 0,
    "POutcome": "unknown",
}

In [None]:
# Define prediction function
def predict_item(data, endpoint, parameters_dict):
    parameters = json_format.ParseDict(parameters_dict, Value())

    # The format of each instance should conform to the deployed model's prediction input schema.
    instances_list = [data]
    instances = [json_format.ParseDict(s, Value()) for s in instances_list]

    response = clients["prediction"].predict(
        endpoint=endpoint, instances=instances, parameters=parameters
    )
    print("response")
    print(" deployed_model_id:", response.deployed_model_id)
    predictions = response.predictions
    print("predictions")
    for prediction in predictions:
        print(" prediction:", dict(prediction))




In [None]:
# Run a test prediction
predict_item(INSTANCE, endpoint_id, None)

## Add monitoring to endpoint

In [None]:
# Temporary.  Delete
ENDPOINT = endpoint_id
DEFAULT_INPUT = INSTANCE

In [None]:
import pprint as pp
print(ENDPOINT)
print("request:")
pp.pprint(DEFAULT_INPUT)
try:
    #resp = send_predict_request(ENDPOINT, DEFAULT_INPUT)
    resp = predict_item(INSTANCE, endpoint_id, None)
    print("response")
    pp.pprint(resp)
except Exception:
    print("prediction request failed")

### Monitoring Config

In [None]:

USER_EMAIL = "jasmeetbhatia@google.com"  # @param {type:"string"}
JOB_NAME = "bank_marketing_monitor"

# Sampling rate (optional, default=.8)
LOG_SAMPLE_RATE = 0.8  # @param {type:"number"}

# Monitoring Interval in seconds (optional, default=3600).
MONITOR_INTERVAL = 3600  # @param {type:"number"}


# URI to training dataset.
DATASET_GCS_URI = ['gs://cloud-ml-tables-data/bank-marketing.csv'] # @param {type:"string"}

# Prediction target column name in training dataset.
TARGET = "Deposit"

# Skew and drift thresholds.
SKEW_DEFAULT_THRESHOLDS = "Age,Job,Balance,Education"  # @param {type:"string"}
SKEW_CUSTOM_THRESHOLDS = "Balance:.5"  # @param {type:"string"}
DRIFT_DEFAULT_THRESHOLDS = "Age,Job,Balance,Education"  # @param {type:"string"}
DRIFT_CUSTOM_THRESHOLDS = "Balance:.5"  # @param {type:"string"}

### Create Monitoring Job

In [None]:
def create_monitoring_job(objective_configs):
    # Create sampling configuration.
    random_sampling = SamplingStrategy.RandomSampleConfig(sample_rate=LOG_SAMPLE_RATE)
    sampling_config = SamplingStrategy(random_sample_config=random_sampling)

    # Create schedule configuration.
    duration = Duration(seconds=MONITOR_INTERVAL)
    schedule_config = ModelDeploymentMonitoringScheduleConfig(monitor_interval=duration)

    # Create alerting configuration.
    emails = [USER_EMAIL]
    email_config = ModelMonitoringAlertConfig.EmailAlertConfig(user_emails=emails)
    alerting_config = ModelMonitoringAlertConfig(email_alert_config=email_config)

    # Create the monitoring job.
    #endpoint = f"projects/{PROJECT_ID}/locations/{REGION}/endpoints/{ENDPOINT_ID}"
    endpoint = f"{endpoint_id}"
    predict_schema = ""
    analysis_schema = ""
    job = ModelDeploymentMonitoringJob(
        display_name=JOB_NAME,
        endpoint=endpoint,
        model_deployment_monitoring_objective_configs=objective_configs,
        logging_sampling_strategy=sampling_config,
        model_deployment_monitoring_schedule_config=schedule_config,
        model_monitoring_alert_config=alerting_config,
        predict_instance_schema_uri=predict_schema,
        analysis_instance_schema_uri=analysis_schema,
    )
    options = dict(api_endpoint=API_ENDPOINT)
    client = JobServiceClient(client_options=options)
    parent = f"projects/{PROJECT_ID}/locations/{REGION}"
    response = client.create_model_deployment_monitoring_job(
        parent=parent, model_deployment_monitoring_job=job
    )
    print("Created monitoring job:")
    print(response)
    return response


def get_thresholds(default_thresholds, custom_thresholds):
    thresholds = {}
    default_threshold = ThresholdConfig(value=DEFAULT_THRESHOLD_VALUE)
    for feature in default_thresholds.split(","):
        feature = feature.strip()
        thresholds[feature] = default_threshold
    for custom_threshold in custom_thresholds.split(","):
        pair = custom_threshold.split(":")
        if len(pair) != 2:
            print(f"Invalid custom skew threshold: {custom_threshold}")
            return
        feature, value = pair
        thresholds[feature] = ThresholdConfig(value=float(value))
    return thresholds


def get_deployed_model_ids(endpoint_id):
    client_options = dict(api_endpoint=API_ENDPOINT)
    client = EndpointServiceClient(client_options=client_options)
    #parent = f"projects/{PROJECT_ID}/locations/{REGION}"
    #response = client.get_endpoint(name=f"{parent}/endpoints/{endpoint_id}")
    response = client.get_endpoint(name=f"{endpoint_id}")
    model_ids = []
    for model in response.deployed_models:
        model_ids.append(model.id)
    return model_ids


def set_objectives(model_ids, objective_template):
    # Use the same objective config for all models.
    objective_configs = []
    for model_id in model_ids:
        objective_config = copy.deepcopy(objective_template)
        objective_config.deployed_model_id = model_id
        objective_configs.append(objective_config)
    return objective_configs


def send_predict_request(endpoint, input):
    client_options = {"api_endpoint": PREDICT_API_ENDPOINT}
    client = PredictionServiceClient(client_options=client_options)
    params = {}
    params = json_format.ParseDict(params, Value())
    request = PredictRequest(endpoint=endpoint, parameters=params)
    inputs = [json_format.ParseDict(input, Value())]
    request.instances.extend(inputs)
    response = client.predict(request)
    return response


def list_monitoring_jobs():
    client_options = dict(api_endpoint=API_ENDPOINT)
    parent = f"projects/{PROJECT_ID}/locations/us-central1"
    client = JobServiceClient(client_options=client_options)
    response = client.list_model_deployment_monitoring_jobs(parent=parent)
    print(response)


def pause_monitoring_job(job):
    client_options = dict(api_endpoint=API_ENDPOINT)
    client = JobServiceClient(client_options=client_options)
    response = client.pause_model_deployment_monitoring_job(name=job)
    print(response)


def delete_monitoring_job(job):
    client_options = dict(api_endpoint=API_ENDPOINT)
    client = JobServiceClient(client_options=client_options)
    response = client.delete_model_deployment_monitoring_job(name=job)
    print(response)

### Create Monitoring job

In [None]:

# @title Utility imports and constants
import copy

from google.cloud.aiplatform_v1beta1.services.endpoint_service import \
    EndpointServiceClient
from google.cloud.aiplatform_v1beta1.services.job_service import \
    JobServiceClient
from google.cloud.aiplatform_v1beta1.services.prediction_service import \
    PredictionServiceClient
from google.cloud.aiplatform_v1beta1.types.io import BigQuerySource
from google.cloud.aiplatform_v1beta1.types.io import GcsSource
from google.cloud.aiplatform_v1beta1.types.model_deployment_monitoring_job import (
    ModelDeploymentMonitoringJob, ModelDeploymentMonitoringObjectiveConfig,
    ModelDeploymentMonitoringScheduleConfig)
from google.cloud.aiplatform_v1beta1.types.model_monitoring import (
    ModelMonitoringAlertConfig, ModelMonitoringObjectiveConfig,
    SamplingStrategy, ThresholdConfig)
from google.cloud.aiplatform_v1beta1.types.prediction_service import \
    PredictRequest
from google.protobuf import json_format
from google.protobuf.duration_pb2 import Duration
from google.protobuf.struct_pb2 import Value

# This is the default value at which you would like the monitoring function to trigger an alert.
# In other words, this value fine tunes the alerting sensitivity. This threshold can be customized
# on a per feature basis but this is the global default setting.
DEFAULT_THRESHOLD_VALUE = 0.001

In [None]:
# Set thresholds specifying alerting criteria for training/serving skew and create config object.
skew_thresholds = get_thresholds(SKEW_DEFAULT_THRESHOLDS, SKEW_CUSTOM_THRESHOLDS)
skew_config = ModelMonitoringObjectiveConfig.TrainingPredictionSkewDetectionConfig(
    skew_thresholds=skew_thresholds
)

# Set thresholds specifying alerting criteria for serving drift and create config object.
drift_thresholds = get_thresholds(DRIFT_DEFAULT_THRESHOLDS, DRIFT_CUSTOM_THRESHOLDS)
drift_config = ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig(
    drift_thresholds=drift_thresholds
)

# Specify training dataset source location (used for schema generation). 
# BQ or Vertex Managed datasets can also be used as source
training_dataset = ModelMonitoringObjectiveConfig.TrainingDataset(target_field=TARGET)
training_dataset.data_format = 'csv'
training_dataset.gcs_source = GcsSource(uris=DATASET_GCS_URI)


# Aggregate the above settings into a ModelMonitoringObjectiveConfig object and use
# that object to adjust the ModelDeploymentMonitoringObjectiveConfig object.
objective_config = ModelMonitoringObjectiveConfig(
    training_dataset=training_dataset,
    training_prediction_skew_detection_config=skew_config,
    prediction_drift_detection_config=drift_config,
)
objective_template = ModelDeploymentMonitoringObjectiveConfig(
    objective_config=objective_config
)

# Find all deployed model ids on the created endpoint and set objectives for each.
#model_ids = get_deployed_model_ids(ENDPOINT_ID)
model_ids = get_deployed_model_ids(endpoint_id)
objective_configs = set_objectives(model_ids, objective_template)

# Create the monitoring job for all deployed models on this endpoint.
monitoring_job = create_monitoring_job(objective_configs)

### Generate Skewed Data

In [None]:
data = pd.read_csv(IMPORT_FILE)

In [None]:
data.describe()

In [None]:
#To showcase drift, let's select only the records of people younger than 25
skewed_data = data[data['Age']<=25]

In [None]:
skewed_data.describe()

### Trigger training serving skew by sending skewed requests

In [None]:
#Test with same record multiple times
#Can use this for skewing the data and triggering serving training skew 
for i in range(2000):
    predict_item(INSTANCE, endpoint_id, None, verbose=0)

In [None]:
#Convert to string types
input_records[["Age", "Balance", "Day","Duration","Campaign","PDays","Previous","Deposit"]] = skewed_data[["Age", "Balance", "Day","Duration","Campaign","PDays","Previous","Deposit"]].astype(str)

In [None]:
record_count = input_records.Age.count()

In [None]:
#Convert dataframe to json format
result = input_records.to_json(orient="records")
parsed_input = json.loads(result)

In [None]:
#Send the records to prediction end-point
for i in range(0,record_count):
    resp = predict_item(parsed_input[i], endpoint_id, None, verbose=0)
    print(resp['classes'][0])
    print(resp['scores'][0])

### Disable Monitoring

In [None]:
#If a monitoring job needs to be deleted, use below calls
#pause_monitoring_job(monitoring_job.name)
#delete_monitoring_job(monitoring_job.name)