In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

With our container pushed to Container Registry, we're now ready to kick off a custom model training job.

### Import packages

In [None]:
# Import packages

import os
import json
import logging
import pandas as pd
import numpy as np
from datetime import datetime
from pytz import timezone
from googleapiclient import discovery
from google.cloud import aiplatform

### Configure Global Variables

List your current GCP project name

In [None]:
project_id = !gcloud config list --format 'value(core.project)' 2>/dev/null

Configure your system variables

In [None]:
# Configure your global variables
PROJECT = project_id[0]          # Replace with your project ID
USER = 'test_user'               # Replace with your user name
BUCKET_NAME = project_id[0] + '-vertex-ai'       # Replace with your gcs bucket name

FOLDER_NAME = 'sklearn_models'
ALGORITHM = 'isolation_forest'
TIMEZONE = 'US/Pacific'         
REGION = 'us-central1'           # bucket should be in same region as Vertex AI         
PACKAGE_URIS = f"gs://{BUCKET_NAME}/trainer/{FOLDER_NAME}/{ALGORITHM}/trainer-0.1.tar.gz" 
TRAIN_FEATURE_PATH = f"gs://{BUCKET_NAME}/{FOLDER_NAME}_data/{ALGORITHM}/train/train.csv"
TEST_FEATURE_PATH = f"gs://{BUCKET_NAME}/{FOLDER_NAME}_data/{ALGORITHM}/test/test.csv"

IMAGE_URI=f"gcr.io/{PROJECT}/sklearn_isolation_forest:v1"
print(f"Container URI: {IMAGE_URI}")

In [None]:
print(f"Project:      {PROJECT}")
print(f"Bucket Name: {BUCKET_NAME}")
print(f"Python Package URI: {PACKAGE_URIS}")
print(f"Training Data URI: {TRAIN_FEATURE_PATH}")
print(f"Python Package URI: {TEST_FEATURE_PATH}")

In [None]:
DATA_DIR = f"./data"

print(f"Data Directory: {DATA_DIR}")

------
### Step 1: Kick off the training job
Vertex AI gives you two options for training models:

* **AutoML**: Train high-quality models with minimal effort and ML expertise.
* **Custom training**: Run your custom training applications in the cloud using one of Google Cloud's pre-built containers or use your own.
In this lab, we're using custom training via our own custom container on Google Container Registry.



Configure your system variables

In [None]:
# Google Cloud AI Platform requires each job to have unique name, 
# Therefore, we use prefix + timestamp to form job names.
JOB_NAME = 'custom_container_isolation_forest_{}'.format(
    datetime.now(timezone(TIMEZONE)).strftime("%m%d%y_%H%M")
    )
# We use the job names as folder names to store outputs.
JOB_DIR = 'gs://{}/{}/{}'.format(
    BUCKET_NAME,
    FOLDER_NAME,
    JOB_NAME,
    )

#JOB_DIR = f"gs://{BUCKET_NAME}/{FOLDER_NAME}/training_container_isolation_forest"

print("JOB_NAME = ", JOB_NAME)
print("JOB_DIR = ", JOB_DIR)

MAX_SAMPLES = '100'  # No of samples
RANDOM_STATE_SEED = '42'

print("MAX_SAMPLES = ", MAX_SAMPLES)
print("RANDOM_STATE_SEED = ", RANDOM_STATE_SEED)

In [None]:
CUSTOM_CONTAINER_IMAGE_URI = IMAGE_URI
api_endpoint = "us-central1-aiplatform.googleapis.com"
MACHINE_TYPE = "n1-standard-4"
REPLICA_COUNT = 1

# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
custom_job = {
    "display_name": JOB_NAME,
    "job_spec": {
        "worker_pool_specs": [
            {
                "machine_spec": {
                    "machine_type": MACHINE_TYPE,
                },
                "replica_count": 1,
                "container_spec": {
                    "image_uri": CUSTOM_CONTAINER_IMAGE_URI,
                    "command": [],
                    "args": [
                      '--input',
                      TRAIN_FEATURE_PATH,
                      '--job-dir',
                      JOB_DIR,
                      '--max-samples',
                      MAX_SAMPLES,
                      '--random-state-seed',
                      RANDOM_STATE_SEED
                    ],
                },
            }
        ]
    },
}
parent = f"projects/{PROJECT}/locations/{REGION}"
response = client.create_custom_job(parent=parent, custom_job=custom_job)
print("response:", response)

**The training job will take about 10-15 minutes to complete.**

Check the training job status

In [None]:
# check the training job status
job_id_trn = response.name.split('/')[-1]
client_options = {"api_endpoint": api_endpoint}
client = aiplatform.gapic.JobServiceClient(client_options=client_options)
name = client.custom_job_path(
    project=PROJECT,
    location=REGION,
    custom_job=job_id_trn,
)
response = client.get_custom_job(name=name)
print(response.state)

More information on using containers for prediction:
https://cloud.google.com/vertex-ai/docs/predictions/use-custom-container

------
### 2. Deploy a model endpoint
When we set up our training job, we specified where Vertex AI should look for our exported model assets. As part of our training pipeline, Vertex will create a model resource based on this asset path. The model resource itself isn't a deployed model, but once you have a model you're ready to deploy it to an endpoint. To learn more about Models and Endpoints in Vertex AI, check out the [documentation](https://cloud.google.com/vertex-ai/docs/start).

In this step we'll create an endpoint for our trained model. We can use this to get predictions on our model via the Vertex AI API.

#### Step 1: Import model artifacts to Vertex AI 

When you import a model, you associate it with a container for Vertex AI to run prediction requests. You can use pre-built containers provided by Vertex AI, or use your own custom containers that you build and push to Container Registry or Artifact Registry.

You can use a pre-built container if your model meets the following requirements:

- Trained in Python 3.7 or later
- Trained using TensorFlow, scikit-learn, or XGBoost
- Exported to meet framework-specific requirements for one of the pre-built prediction containers

The link to the list of pre-built predict container images:

https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers

#### Create Model

When we set up our training job, we could also setup a training pipeline and specify where Vertex AI should look for our exported model assets. As part of our training pipeline, Vertex will create a model resource based on this asset path. The model resource itself isn't a deployed model, but once you have a model you're ready to deploy it to an endpoint. To learn more about Models and Endpoints in Vertex AI, check out the [documentation](https://cloud.google.com/vertex-ai/docs/start).

In this step we'll create the model, specifying where Vertex AI should look for our exported model assets.

In [None]:
MODEL_NAME = "pre_built_container_isolation_forest"

response = aiplatform.Model.upload(
    display_name = MODEL_NAME,
    serving_container_image_uri = 'us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest',
    artifact_uri = JOB_DIR
)

model_id = response.name.split('/')[-1]

#### Step 2: Create Endpoint

You need the endpoint ID to deploy the model.

In [None]:
MODEL_ENDPOINT_DISPLAY_NAME = "pre_built_container_isolation_forest_endpoint"

aiplatform.init(project=PROJECT, location=REGION)
endpoint = aiplatform.Endpoint.create(
    display_name=MODEL_ENDPOINT_DISPLAY_NAME, project=PROJECT, location=REGION,
)

endpoint_id = endpoint.resource_name.split('/')[-1]

#### Step 3: Deploy Model to the endpoint

You must deploy a model to an endpoint before that model can be used to serve online predictions; deploying a model associates physical resources with the model so it can serve online predictions with low latency. An undeployed model can serve batch predictions, which do not have the same low latency requirements.

Deploying the endpoint will take 10-15 minutes.

In [None]:
DEPLOYED_MODEL_DISPLAY_NAME = "pre_built_container_isolation_forest_deployed"
aiplatform.init(project=PROJECT, location=REGION)

model = aiplatform.Model(model_name=model_id)

# The explanation_metadata and explanation_parameters should only be
# provided for a custom trained model and not an AutoML model.
model.deploy(
    endpoint=endpoint,
    deployed_model_display_name=DEPLOYED_MODEL_DISPLAY_NAME,
    machine_type = "n1-standard-4",
    sync=True
)

------
### 3. Send inference requests to your model

Vertex AI provides the services you need to request predictions from your model in the cloud.

There are two ways to get predictions from trained models: online prediction (sometimes called HTTP prediction) and batch prediction. In both cases, you pass input data to a cloud-hosted machine-learning model and get inferences for each data instance.

Vertex AI online prediction is a service optimized to run your data through hosted models with as little latency as possible. You send small batches of data to the service and it returns your predictions in the response.

#### Load testing data

#### Call Google API for online inference

We'll get predictions on our trained model using the Vertex Python API.

In [None]:
from googleapiclient import errors
import pandas as pd

# Load test feature and labels
x_test = pd.read_csv(TEST_FEATURE_PATH)

# Fill nan value with zeros (Prediction lacks the ability to handle nan values for now)
x_test = x_test.fillna(0)

pprobas = []
batch_size = 10
n_samples = min(160,x_test.shape[0])
print("batch_size=", batch_size)
print("n_samples=", n_samples)

aiplatform.init(project=PROJECT, location=REGION)

for i in range(0, n_samples, batch_size):
    j = min(i+batch_size, n_samples)
    print("Processing samples", i, j)
    response = aiplatform.Endpoint(endpoint_id).predict(instances=x_test.iloc[i:j].values.tolist())
    try:
        for prediction_ in response.predictions:
            pprobas.append(prediction_)
    except errors.HttpError as err:
        # Something went wrong, print out some information.
        print('There was an error getting the job info, Check the details:')
        print(err._get_reason())
        break

In [None]:
pprobas

🎉 Congratulations! 🎉

You've learned how to use Vertex AI to:

Train a model by providing the training code in a custom container. You used a scikit-learn model in this example, but you can train a model built with any framework using custom containers.
Deploy a scikit-learn model using a pre-built container as part of the same workflow you used for training.
Create a model endpoint and generate a prediction.
To learn more about different parts of Vertex, check out the documentation.
https://cloud.google.com/vertex-ai/docs

------
# Extra examples

In [None]:
aiplatform.init(project=PROJECT, location=REGION)
model_instances = aiplatform.Model.list(
    filter='display_name="pre_built_container_isolation_forest"'
)
for resource in model_instances:
    #print(dir(resource))
    print(resource.display_name)
    print(resource.resource_name)
    model_name = resource.resource_name

In [None]:
aiplatform.init(project=PROJECT, location=REGION)
endpoint_instances = aiplatform.Endpoint.list(
    filter='display_name="pre_built_container_isolation_forest_endpoint"'
)
for resource in endpoint_instances:
    #print(dir(resource))
    print(resource.display_name)
    print(resource.resource_name)
    endpoint_name = resource.resource_name

In [None]:
def predict_json(endpoint_name, csv_path, version=None):
    """Send json data to a deployed model for prediction.

    Args:
        endpoint_name (str): Required. A fully-qualified endpoint resource name or endpoint ID. 
            Example: “projects/123/locations/us-central1/endpoints/456”
        csv_path (str): path to CSV file to run predictions;
        endpoint (str): model name.
        instances ([Mapping[str: Any]]): Keys should be the names of Tensors
            your deployed model expects as inputs. Values should be datatypes
            convertible to Tensors, or (potentially nested) lists of datatypes
            convertible to tensors.
        versio (str): Optional, version of the model to target.
    Returns:
        Mapping[str: any]: dictionary of prediction results defined by the
            model.
    """
    
    from googleapiclient import errors
    import pandas as pd

    # Load test feature and labels
    x_test = pd.read_csv(csv_path)

    # Fill nan value with zeros (Prediction lacks the ability to handle nan values for now)
    x_test = x_test.fillna(0)

    pprobas = []
    batch_size = 10
    n_samples = min(160,x_test.shape[0])
    print("batch_size=", batch_size)
    print("n_samples=", n_samples)

    #aiplatform.init(project=project, location=region)

    for i in range(0, n_samples, batch_size):
        j = min(i+batch_size, n_samples)
        print("Processing samples", i, j)
        response = aiplatform.Endpoint(endpoint_name).predict(instances=x_test.iloc[i:j].values.tolist())
        try:
            for prediction_ in response.predictions:
                pprobas.append(prediction_)
        except errors.HttpError as err:
            # Something went wrong, print out some information.
            tf.compat.v1.logging.error('There was an error getting the job info, Check the details:')
            tf.compat.v1.logging.error(err._get_reason())
            break
    return pprobas

In [None]:
predictions = predict_json(endpoint_name, TEST_FEATURE_PATH)

In [None]:
import json
import os
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import joblib
from sklearn.ensemble import IsolationForest
# If the model is serialized using pickle
# then use 'model.pkl' for the model name
MODEL_FILE_NAME = 'model.joblib'

try:
    MODEL_DIR = JOB_DIR
except NameError:
    MODEL_DIR = "gs://path/to/model/directory"

model_output_path = os.path.join(MODEL_DIR,
                                     MODEL_FILE_NAME)

print(model_output_path)

# Load test feature and labels
x_train_pd = pd.read_csv(TRAIN_FEATURE_PATH)
x_test_pd = pd.read_csv(TEST_FEATURE_PATH)
x_train = x_train_pd.to_numpy()
x_test = x_test_pd.to_numpy()

rng = np.random.RandomState(42)

# Generate some abnormal novel observations
x_outliers = rng.uniform(low=-4, high=4, size=(20, 2))

with tf.io.gfile.GFile(model_output_path, 'rb') as rf:
    clf = joblib.load(rf)
#clf = joblib.load(tf.io.gfile.GFile(model_output_path, 'rb')) alternative method

y_pred_train = clf.predict(x_train)
y_pred_test = clf.predict(x_test)
y_pred_outliers = clf.predict(x_outliers)

# plot the line, the samples, and the nearest vectors to the plane
xx, yy = np.meshgrid(np.linspace(-5, 5, 50), np.linspace(-5, 5, 50))
Z = clf.decision_function(np.c_[xx.ravel(), yy.ravel()])
Z = Z.reshape(xx.shape)

plt.title("IsolationForest")
plt.contourf(xx, yy, Z, cmap=plt.cm.Blues_r)

b1 = plt.scatter(x_train[:, 0], x_train[:, 1], c='white',
                 s=20, edgecolor='k')
b2 = plt.scatter(x_test[:, 0], x_test[:, 1], c='green',
                 s=20, edgecolor='k')
c = plt.scatter(x_outliers[:, 0], x_outliers[:, 1], c='red',
                s=20, edgecolor='k')
plt.axis('tight')
plt.xlim((-5, 5))
plt.ylim((-5, 5))
plt.legend([b1, b2, c],
           ["training observations",
            "new regular observations", "new abnormal observations"],
           loc="upper left")
plt.show()