# On Demand Model Monitoring Job with Batch Prediction Job
<table align="left">
  <td>
    <a href="https://colab.research.google.com/drive/1ZjdtQ-VsaZ9TKrm5pAYzUzDfG0IIyLq_#scrollTo=ndfGn8c4bI_G">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://storage.googleapis.com/cmm-public-data/notebooks/On_Demand_Model_Monitoring_Job_with_Batch_Prediction_Job.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Vertex AI logo">Open in Vertex AI Workbench
    </a>
  </td>
</table>

## Step 1: Authentication

Depending on your Jupyter environment, you may have to manually authenticate. Follow the relevant instructions below.

**1. Vertex AI Workbench**
* Do nothing as you are already authenticated.

**2. Colab, run:**

In [None]:
from google.colab import auth
auth.authenticate_user()


**3. Local JupyterLab instance, uncomment and run:**

In [None]:
# ! gcloud auth login

**4. Service account or other**
* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.

## Step 2: Installations & Setup
### Install the following packages to execute this notebook.

In [None]:
! pip3 install --upgrade --quiet \
    google-cloud-aiplatform \
    google-cloud-bigquery \
    pandas-gbq \
    'tensorflow_data_validation[visualization]<2'

# Model Monitoring Experimental SDK
! gsutil cp gs://cmm-public-data/sdk/google_cloud_aiplatform-1.36.dev20231025+centralized.model.monitoring-py2.py3-none-any.whl .
! pip install --quiet google_cloud_aiplatform-1.36.dev20231025+centralized.model.monitoring-py2.py3-none-any.whl

### Restart the kernel (only for Colab)

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Setup GCP Project ID and Initialize Vertex AI SDK for Python

In [None]:
import os

PROJECT_ID = "[your-project-id]" # @param {type:"string"}
# set the project id
! gcloud config set project $PROJECT_ID
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID

REGION = "us-central1"
! gcloud config set ai/region $REGION

In [None]:
# Initialize Vertex AI SDK for Python
import google.cloud.aiplatform as aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

### Create a Cloud Storage bucket

Create a storage bucket to store intermediate artifacts such as datasets.

In [None]:
# Create a Cloud Storage bucket
BUCKET_URI = f"gs://your-bucket-name-{PROJECT_ID}-unique"  # @param {type:"string"}

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [None]:
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

## Step 3: Prepare a Model in Vertex AI Model Registry

In [None]:
import google.cloud.aiplatform as aiplatform

MODEL_PATH = "gs://mco-mm/churn"
MODEL_NAME = "churn"
IMAGE = "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-cpu.2-5:latest"

model = aiplatform.Model.upload(
    display_name=MODEL_NAME,
    artifact_uri=MODEL_PATH,
    serving_container_image_uri=IMAGE,
    sync=True
)

MODEL_ID = model.resource_name.split("/")[-1]

## Step 4: Create a batch prediction job

This step submits the batch prediction request. If successful, it returns a JSON document summarizing the request, which is displayed in the cell output below.

In [None]:
batch_prediction_job = model.batch_predict(
  job_display_name="bp_mm_demo",
  instances_format="jsonl",
  machine_type="n1-standard-4",
  gcs_source=["gs://bp_mm_public_data/churn/churn_bp_outsample.jsonl"],
  gcs_destination_prefix=f"{BUCKET_URI}/bp_mm_output",
  sync=False
)

## Step 5: Create a Model Monitor

In [None]:
from google.cloud.aiplatform.private_preview.centralized_model_monitoring import model_monitor

my_model_monitor = model_monitor.ModelMonitor.create(
    project=PROJECT_ID,
    location=REGION,
    display_name="churn_model_monitor",
    model_name=model.resource_name,
    model_version_id="1")
MODEL_MONITOR_ID = my_model_monitor.name
print(f"MODEL MONITOR {MODEL_MONITOR_ID} created.")

## Step 6: Create a Model Monitoring Job

In [None]:
EMAIL="[your-email-address]" # @param {type:"string"}

In [None]:
import pandas as pd

TIMESTAMP = pd.Timestamp.utcnow().strftime('%Y%m%d%H%M%S')
JOB_DISPLAY_NAME = f"churn_model_monitoring_job_{TIMESTAMP}"


# Skew and drift thresholds.
DEFAULT_THRESHOLD_VALUE = 0.001

SKEW_THRESHOLDS = {
    "country": DEFAULT_THRESHOLD_VALUE,
    "cnt_user_engagement": 0.002,
}

In [None]:
# Copy files to your projects gs bucket to avoid permission issues.
# Ignore any error(s) for bucket already exists.
PUBLIC_TRAINING_DATASET = "gs://bp_mm_public_data/churn/churn_bp_insample.csv"
TRAINING_DATASET = f"{BUCKET_URI}/bp_mm_input/churn_bp_insample.csv"

! gsutil copy $PUBLIC_TRAINING_DATASET $TRAINING_DATASET

In [None]:
model_monitoring_job=my_model_monitor.run(
    display_name=JOB_DISPLAY_NAME,
    objective_config=model_monitor.spec.ObjectiveSpec(
        baseline=model_monitor.spec.MonitoringInput(
            gcs_uri=TRAINING_DATASET,
            data_format="csv",
            ground_truth_field="species"),
        target=model_monitor.spec.MonitoringInput(
            batch_prediction_job=batch_prediction_job.resource_name),
        feature_distribution_skew=model_monitor.spec.SkewSpec(
            default_threshold=DEFAULT_THRESHOLD_VALUE,
            feature_thresholds=SKEW_THRESHOLDS)
    ),
    notification_config=model_monitor.spec.NotificationSpec(
        user_emails=[EMAIL],
    ),
    output_config=model_monitor.spec.OutputSpec(
        gcs_base_dir=BUCKET_URI
    )
)

CMM_JOB_RESOURCE_NAME = model_monitoring_job.name
CMM_JOB_ID=CMM_JOB_RESOURCE_NAME.split("/")[-1]
print(f"Model Monitoring Job {CMM_JOB_ID} created.")

In [None]:
my_model_monitor.list_jobs()

## Step 7: Wait for the scheduled Model Monitoring Job to run and verify the result

### Check email

#### Here's a sample create job email...

<img src="https://services.google.com/fh/files/misc/batch_job_create.png" />

#### If there is any anomaly detected, you will receive an email like

<img src="https://services.google.com/fh/files/misc/alert_email_2.png" />

### Check GCP Console

#### Check the "Monitor" tab under "Vertex AI"

<img src="https://storage.googleapis.com/cmm-public-data/images/batch_prediction_job.gif" />

### Check Output GCS bucket

In [None]:
my_model_monitor.show_skew_stats(model_monitoring_job_name=CMM_JOB_RESOURCE_NAME)

## Step 8: Clean Up (after job finished)

In [None]:
# When no jobs are running, delete the schedule and all the jobs.
my_model_monitor.delete_all_model_monitoring_jobs()
my_model_monitor.delete()

# Delete the model
model.delete()