## Installation (CAN SKIP ONLY NEED TO INSTALL ONCE INTO THIS NOTEBOOK INSTANCE)

Install the latest version of Vertex AI SDK for Python.

In [2]:
import os

# # Google Cloud Notebook
# if os.path.exists("/opt/deeplearning/metadata/env_version"):
#     USER_FLAG = "--user"
# else:
#     USER_FLAG = ""

# ! pip3 install --upgrade google-cloud-aiplatform $USER_FLAG

Install the latest GA version of *google-cloud-storage* library as well.

In [4]:
# ! pip3 install -U google-cloud-storage $USER_FLAG

Install the latest GA version of *google-cloud-pipeline-components* library as well.

In [6]:
# ! pip3 install $USER kfp google-cloud-pipeline-components --upgrade

### Restart the kernel

Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [1]:
# # Automatically restart kernel after installs
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

Check the versions of the packages you installed.  The KFP SDK version should be >=1.6.

In [2]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 1.8.12
google_cloud_pipeline_components version: 1.0.10


## START RUNNING HERE

## Before you begin

### GPU runtime

This tutorial does not require a GPU runtime.

### Set up your Google Cloud project

**The following steps are required, regardless of your notebook environment.**

1. [Select or create a Google Cloud project](https://console.cloud.google.com/cloud-resource-manager). When you first create an account, you get a $300 free credit towards your compute/storage costs.

2. [Make sure that billing is enabled for your project.](https://cloud.google.com/billing/docs/how-to/modify-project)

3. [Enable the Vertex AI APIs, Compute Engine APIs, and Cloud Storage.](https://console.cloud.google.com/flows/enableapi?apiid=ml.googleapis.com,compute_component,storage-component.googleapis.com)

4. [The Google Cloud SDK](https://cloud.google.com/sdk) is already installed in Google Cloud Notebook.

5. Enter your project ID in the cell below. Then run the  cell to make sure the
Cloud SDK uses the right project for all the commands in this notebook.

**Note**: Jupyter runs lines prefixed with `!` as shell commands, and it interpolates Python variables prefixed with `$`.

In [2]:
PROJECT_ID = "daring-hash-348101"  # @param {type:"string"}

In [4]:
if PROJECT_ID == "" or PROJECT_ID is None or PROJECT_ID == "[your-project-id]":
    # Get your GCP project id from gcloud
    shell_output = ! gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID:", PROJECT_ID)

In [4]:
# this step causes bash to ask for reply, which cannot be done in notebook. So use cloud shell instead
# but no need to do it if project is already set to the right one

# ! gcloud config set project $PROJECT_ID

#### Region

You can also change the `REGION` variable, which is used for operations
throughout the rest of this notebook.  Below are regions supported for Vertex AI. We recommend that you choose the region closest to you.

- Americas: `us-central1`
- Europe: `europe-west4`
- Asia Pacific: `asia-east1`

You may not use a multi-regional bucket for training with Vertex AI. Not all regions provide support for all Vertex AI services.

Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)

In [3]:
REGION = "us-east1"  # @param {type: "string"}

#### Timestamp

If you are in a live tutorial session, you might be using a shared test account or project. To avoid name collisions between users on resources created, you create a timestamp for each instance session, and append the timestamp onto the name of resources you create in this tutorial.

In [4]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you initialize the Vertex AI SDK for Python, you specify a Cloud Storage staging bucket. The staging bucket is where all the data associated with your dataset and model resources are retained across sessions.

Set the name of your Cloud Storage bucket below. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.

In [5]:
BUCKET_NAME = "seangoh-smu-mle-usa"
BUCKET_URI = f"gs://{BUCKET_NAME}"

In [6]:
if BUCKET_NAME == "" or BUCKET_NAME is None or BUCKET_NAME == "gs://[your-bucket-name]":
    BUCKET_NAME = "gs://" + PROJECT_ID + "aip-" + TIMESTAMP

**Only if your bucket doesn't already exist**: Run the following cell to create your Cloud Storage bucket.

In [9]:
! gsutil mb -l $REGION $BUCKET_URI

Creating gs://seangoh-smu-mle-usa/...
ServiceException: 409 A Cloud Storage bucket named 'seangoh-smu-mle-usa' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


Finally, validate access to your Cloud Storage bucket by examining its contents:

In [10]:
! gsutil ls -al $BUCKET_URI

                                 gs://seangoh-smu-mle-usa/DailyQC/
                                 gs://seangoh-smu-mle-usa/FaceMask/
                                 gs://seangoh-smu-mle-usa/Models/
                                 gs://seangoh-smu-mle-usa/ProjectPipeline/
                                 gs://seangoh-smu-mle-usa/assignment2/
                                 gs://seangoh-smu-mle-usa/logs/
                                 gs://seangoh-smu-mle-usa/pipeline_root/
                                 gs://seangoh-smu-mle-usa/testupload/


#### Service Account

**If you don't know your service account**, try to get your service account using `gcloud` command by executing the second cell below.

**LEAVE THIS AS `[your-service-account]` AND LET GCLOUD COMMAND FIGURE IT OUT**

In [7]:
SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}

In [8]:
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your GCP project id from gcloud
    shell_output = !gcloud auth list 2>/dev/null
    SERVICE_ACCOUNT = shell_output[2].replace('*', '').strip()
    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 591661299323-compute@developer.gserviceaccount.com


#### Set service account access for Vertex AI Pipelines

Run the following commands to grant your service account access to read and write pipeline artifacts in the bucket that you created in the previous step -- you only need to run these once per service account.

In [9]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

No changes made to gs://seangoh-smu-mle-usa/
No changes made to gs://seangoh-smu-mle-usa/


### Set up variables

Next, set up some variables used throughout the tutorial.
### Import libraries and define constants

In [10]:
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.experimental.custom_job import utils
# from google_cloud_pipeline_components.v1.model import \
#     ModelUploadOp as model_upload_op
from google_cloud_pipeline_components.aiplatform import ModelUploadOp, ModelDeployOp, EndpointCreateOp
from google_cloud_pipeline_components.experimental.vertex_notification_email import VertexNotificationEmailOp

import kfp
from kfp.v2 import dsl, compiler
from kfp.v2.components import importer_node
from kfp.v2.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
from typing import NamedTuple

import tensorflow as tf

#### Vertex AI Pipelines constants

Setup up the following constants for Vertex AI Pipelines:

`PIPELINE_ROOT`: The artifact repository where KFP stores a pipeline’s artifacts. <br>
`CONTAINER_IMG_URI`: Container image for training

In [11]:
PIPELINE_ROOT = "{}/pipeline_root/face-mask".format(BUCKET_URI)
DATA_DIR = 'gs://seangoh-smu-mle-usa/FaceMask/'
DIVERGENCE_THRESHOLD = 0.01
# CONTAINER_IMG_URI = 'us-east1-docker.pkg.dev/daring-hash-348101/smu-mle-usa/efficientnettrain:latest'
CONTAINER_IMG_URI = 'us-east1-docker.pkg.dev/daring-hash-348101/smu-mle-usa/mobilenettrain:latest'
MODEL_URI = "gs://seangoh-smu-mle-usa/Models/FaceMaskMobileNetModel/" # change to cloud storage stored model location
DISPLAY_NAME = "face-mask-classification-pipeline_" + TIMESTAMP
TEST_URI = "" # change to cloud storage test dataset location
MODEL_DISPLAY_NAME = f"face-mask-classification-{TIMESTAMP}"
MODEL_PERFORMANCE_FILENAME = "MobileNetPerformanceComparison.json"
THRESHOLDS_DICT = {'f1': 0.9}

## Initialize Vertex AI SDK for Python

Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [12]:
aip.init(
    project=PROJECT_ID, 
    staging_bucket=BUCKET_URI,
    location=REGION,
)

## Define image classification model pipeline that uses components from `google_cloud_pipeline_components` and custom components

Next, you define the pipeline.

<img alt="alt_text" width="1000px" src="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/community-content/pytorch_text_classification_using_vertex_sdk_and_gcloud/images/pipelines-high-level-flow.png?raw=true" />

## Create the pipeline

Next, create the pipeline.

<img alt="alt_text" width="1000px" src="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/community-content/pytorch_text_classification_using_vertex_sdk_and_gcloud/images/concepts-of-a-pipeline.png?raw=true" />


### Data Validation Check

Create custom component for checking that train and test set brightness distribution are not too dissimilar

In [14]:
@component(
    packages_to_install=["google-cloud-storage", "numpy", "matplotlib", "scipy"],
    output_component_file="data_validation.yaml"
)
def DataValidationOp(
    bucket_name: str,
    divergence_threshold: float,
) -> str:
    from google.cloud import storage
    import numpy as np
    from matplotlib import image
    from scipy.stats import wasserstein_distance
    
    def list_blobs(bucket_name):
        """Lists all the blobs in the bucket."""
        # bucket_name = "your-bucket-name"

        storage_client = storage.Client()

        # Note: Client.list_blobs requires at least package version 1.17.0.
        blobs = storage_client.list_blobs(bucket_name)

        files = []

        for blob in blobs:
            files.append(blob.name)

        return files

    def get_image_brightness(bucket_name, source_blob_name, destination_file_name):
        """Downloads an image from the bucket and get its mean brightness."""

        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(source_blob_name)
        blob.download_to_filename(destination_file_name)

        img = image.imread(destination_file_name)

        return np.mean(img)

    files = list_blobs(bucket_name)
    train_files = [file for file in files if 'FaceMask/Train/' in file and 'Augmented' not in file]
    val_files = [file for file in files if 'FaceMask/Validation/' in file and 'Augmented' not in file]
    train_brightness = []
    val_brightness = []

    for file in train_files:
        brightness = get_image_brightness(bucket_name, file, './image.png')
        train_brightness.append(brightness)
    for file in val_files:
        brightness = get_image_brightness(bucket_name, file, './image.png')
        val_brightness.append(brightness)    

    # https://datascience.stackexchange.com/a/54385/73827
    # wasserstein-1 distance is better than kl-divergence
    divergence = wasserstein_distance(train_brightness, val_brightness)
    if divergence > divergence_threshold:
        return 'fail'
    else:
        return 'pass'

### Model Performance Threshold Check

Create custom component for checking that model performance on validation set passes a threshold

In [13]:
@component(
    packages_to_install=["google-cloud-storage", "pandas", "fsspec", "gcsfs"],
    output_component_file="PerformanceThresholdOp.yaml"
)
def PerformanceThresholdOp(
    json_url: str,
    f1_threshold: float,
) -> str:
    import pandas as pd
    
    results = pd.read_json(json_url, typ='series')
    if results['FullModel'] >= f1_threshold:
        return 'pass'
    else:
        return 'fail'
    

### Create Pipeline

In [58]:
@kfp.dsl.pipeline(
    name="face-mask-import-model-v1",
    pipeline_root=PIPELINE_ROOT,
)
def pipeline(
    display_name:str,
    project: str, 
    region: str,
    bucket: str,
    bucket_name: str,
    divergence_threshold: float,
    container_img_uri: str,
    model_perf_uri: str,
    eval_threshold: float,
    artifact_uri: str,
):
    
    notify_email_task = VertexNotificationEmailOp(
        recipients=['hy.lim.2021@mitb.smu.edu.sg',
                   'teyang.lau.2021@mitb.smu.edu.sg',
                   'sean.goh.2020@mitb.smu.edu.sg']
    )
    
    with dsl.ExitHandler(notify_email_task):
    
        # data validation op
        data_validation_op = DataValidationOp(bucket_name, divergence_threshold)

        with dsl.Condition(
            data_validation_op.output=='pass',
            name='train-model'
        ): 
            # custom model training for custom container
            training_op = gcc_aip.CustomContainerTrainingJobRunOp(
                display_name=display_name,
                container_uri=container_img_uri,
                project=project,
                location=region,
                staging_bucket=bucket,
                machine_type="n1-standard-4",
                replica_count=1,
                accelerator_type='NVIDIA_TESLA_K80',
                accelerator_count=1,  
            )

        # evaluate performance passes threshold
        model_evaluate_op = PerformanceThresholdOp(
            model_perf_uri, 
            eval_threshold,
        ).after(training_op)

        # if evaluation passes threshold, import, upload, deploy to endpoint
        with dsl.Condition(
            model_evaluate_op.output=="pass",
            name="deploy-model",
        ):
            # import the model from GCS
            import_unmanaged_model_task = importer_node.importer(
                    artifact_uri=artifact_uri,
                    artifact_class=artifact_types.UnmanagedContainerModel,
                    metadata={
                        "containerSpec": {
                            "imageUri": "us-docker.pkg.dev/cloud-aiplatform/prediction/tf2-gpu.2-8:latest",
                        },
                    },
                )
            # import_unmanaged_model_task.after(training_op)

            # upload the model to VertexAI Model Registry
            model_upload_op = ModelUploadOp(
                project=project,
                location=region,
                display_name=display_name,
                unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
            ).after(import_unmanaged_model_task)

            # create Endpoint. This is run in parallel as it is not dependent on previous nodes
            endpoint_create_op = EndpointCreateOp(
                project=project,
                location=region,
                display_name="pipelines-created-endpoint",
            )

            # deploy the model from Model Registry
            ModelDeployOp(
                endpoint=endpoint_create_op.outputs["endpoint"],
                model=model_upload_op.outputs["model"],
                deployed_model_display_name=display_name,
                traffic_split = {'0':100},
                dedicated_resources_machine_type="n1-standard-4",
                dedicated_resources_accelerator_type="NVIDIA_TESLA_K80",
                dedicated_resources_accelerator_count=1,
                dedicated_resources_min_replica_count=1,
                dedicated_resources_max_replica_count=1,
            )

## Compile the pipeline

Next, compile the pipeline.

In [59]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="facemask classification pipeline.json".replace(" ", "_"),
)

## Run the pipeline

Next, run the pipeline.

In [None]:
pipeline_params = {
    "display_name": DISPLAY_NAME,
    "project": PROJECT_ID,
    "region": REGION,
    "bucket": BUCKET_URI,
    "bucket_name": BUCKET_URI.replace('gs://', ''),
    "divergence_threshold": DIVERGENCE_THRESHOLD,
    "container_img_uri": CONTAINER_IMG_URI,
    "model_perf_uri": BUCKET_URI + "/Models/{}".format(MODEL_PERFORMANCE_FILENAME),
    "eval_threshold": THRESHOLDS_DICT['f1'],
    "artifact_uri": MODEL_URI,
}

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="facemask classification pipeline.json".replace(" ", "_"),
    pipeline_root=PIPELINE_ROOT,
    enable_caching=False,
    parameter_values=pipeline_params,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220624040326
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220624040326')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-east1/pipelines/runs/face-mask-import-model-v1-20220624040326?project=591661299323
PipelineJob projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220624040326 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220624040326 current state:
PipelineState.PIPELINE_STATE_RUNNING


# Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial -- *Note:* this is auto-generated and not all resources may be applicable for this tutorial:

- Dataset
- Pipeline
- Model
- Endpoint
- Batch Job
- Custom Job
- Hyperparameter Tuning Job
- Cloud Storage Bucket

In [47]:
delete_pipeline = True
delete_model = True
delete_endpoint = True
delete_bucket = False

try:
    if delete_endpoint and "DISPLAY_NAME" in globals():
        endpoints = aip.Endpoint.list(
            order_by="create_time"
        )
        endpoint = endpoints[0]
        endpoint.undeploy_all()
        aip.Endpoint.delete(endpoint, force=True)
        print("Deleted endpoint:", endpoint)
except Exception as e:
    print(e)

try:
    if delete_model and "DISPLAY_NAME" in globals():
        models = aip.Model.list(
            filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
        )
        model = models[0]
        aip.Model.delete(model)
        print("Deleted model:", model)
except Exception as e:
    print(e)

try:
    if delete_pipeline and "DISPLAY_NAME" in globals():
        pipelines = aip.PipelineJob.list(
            filter=f"display_name={DISPLAY_NAME}", order_by="create_time"
        )
        pipeline = pipelines[0]
        aip.PipelineJob.delete(pipeline)
        print("Deleted pipeline:", pipeline)
except Exception as e:
    print(e)

####### BE CAREFUL OF THIS! DON'T RUN!!!!!!!!!!!
####### BE CAREFUL OF THIS! DON'T RUN!!!!!!!!!!!
####### BE CAREFUL OF THIS! DON'T RUN!!!!!!!!!!!
######## if delete_bucket and "GCS_BUCKET" in globals():
########     ! gsutil rm -r $GCS_BUCKET

Deleting Endpoint : projects/591661299323/locations/us-east1/endpoints/6095446173785522176
400 There are other operations running on the Endpoint "projects/591661299323/locations/us-east1/endpoints/6095446173785522176". Operation(s) are: projects/591661299323/locations/us-east1/operations/4509212634230292480.
Deleting Model : projects/591661299323/locations/us-east1/models/185509601838366720
400 The Model "projects/591661299323/locations/us-east1/models/185509601838366720" is deployed or being deployed at the following Endpoint(s): projects/591661299323/locations/us-east1/endpoints/6095446173785522176.
Deleting PipelineJob : projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220623024310
Delete PipelineJob  backing LRO: projects/591661299323/locations/us-east1/operations/2905931166886395904
PipelineJob deleted. . Resource name: projects/591661299323/locations/us-east1/pipelineJobs/face-mask-import-model-v1-20220623024310
Deleted pipeline: <google.cloud.ai