In [None]:
# Copyright 2023 ROI Training, Inc. 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Automating your AutoML Tabular workflow

## Overview



### Objective

In this activity you will learn to use `Vertex AI Pipelines` and `Google Cloud Pipeline Components` to build the same `AutoML`  classification model you created in the AutoML activity

### Main Tasks

- Create a pipeline that will:
    - Create a dataset
    - Train an AutoML classification model
    - Create an endpoint
    - Deploys your model
- Compile the pipeline
- Execute the KFP pipeline using **Vertex AI Pipelines**

Documentation for the Google Cloud Pipelines components can be found [ here](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.4.1/index.html).

## 1. Prepare your notebook instance



In [1]:
# Install required Python packages

! pip3 install --upgrade --quiet google-cloud-aiplatform \
                                 google-cloud-storage \
                                 kfp \
                                 google-cloud-pipeline-components

In [2]:
# Set required Python and environment variables

import random
import string
import sys
import uuid

PROJECT_ID = "sb-challenge-labs" #[your-project-id]"
REGION = "us-central1"
UUID = uuid.uuid4().hex

shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
print(SERVICE_ACCOUNT)

760420608781-compute@developer.gserviceaccount.com


In [4]:
# Configure Google Cloud SDK on Workbench instance

! gcloud config set project {PROJECT_ID}
! gcloud config set compute/region {REGION}

Updated property [core/project].
Updated property [compute/region].


## 2. Prepare your project



In [5]:
# create a bucket in which to store intermediate artifacts

BUCKET_URI = f"gs://{PROJECT_ID}-pipelines"
! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}

Creating gs://sb-challenge-labs-pipelines/...


In [6]:
# give your workbench instance's service account required permissions

! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

## 3. Define the pipeline



In [7]:
# import libraries for pipelines

from typing import NamedTuple

import kfp
from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component)

In [8]:
# set path for storing the pipeline artifacts

pipeline_name = f"sb-challenge-labs-pipelines-training-{UUID}"
pipeline_root = f"{BUCKET_URI}/pipeline_root/"

In [58]:
# define the pipeline

import textwrap

@kfp.dsl.pipeline(name=pipeline_name, pipeline_root=pipeline_root)
def pipeline(
    gcs_source: str,
    dataset_name: str,
    job_name: str,
    model_name: str,
    endpoint_name: str,
    machine_type: str,
    project: str,
    gcp_region: str,
    thresholds_dict_str: str,
    predict_name: str,
):
    from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
    from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import tabular_dataset_create as TabularDatasetCreateOp
    from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import endpoint_create as EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint ModelDeployOp
    from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
    
    bigquery_query_job_op = BigqueryQueryJobOp(
        project=project,
        location="US",
        query = textwrap.dedent(f"""
            create or replace view `{project}.ml_data.income-census-training-no-fw` as
            select
                * except(functional_weight)
            from
                `bigquery-public-data.ml_datasets.census_adult_income`
        """)
    )
    
    dataset_create_op = TabularDatasetCreateOp(
        project=project,
        location=gcp_region,
        display_name=dataset_name,
        bq_source=f"bq://{project}.ml_data.income-census-training-no-fw",
    ).after(bigquery_query_job_op)

    training_op = AutoMLTabularTrainingJobRunOp(
        project=project,
        location=gcp_region,
        display_name=job_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        model_display_name=model_name,
        dataset=dataset_create_op.outputs["dataset"],
        target_column="income_bracket"
    )

    endpoint_op = EndpointCreateOp(
        project=project,
        location=gcp_region,
        display_name=endpoint_name,
    )

    deploy_op = ModelDeployOp(
        model=training_op.outputs["model"],
        endpoint=endpoint_op.outputs["endpoint"],
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        dedicated_resources_machine_type=machine_type,
    )
    
#     predict_op = ModelBatchPredictOp(
#         project=project,
#         job_display_name=predict_name,
#         location=gcp_region,
#         bigquery_output_table=f"`{project}.ml_data.kp_predictions`",
        
        
#     )
    
    
    

## 4. Compile the pipeline

In [59]:
# compile the pipeline

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_classification_pipeline.yaml",
)

## 4. Compile the pipeline

In [60]:
# initialize the Vertex AI SDK connection

aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

In [61]:
# Set variables used to launch the pipeline

pipeline_name = f"pipeline_{UUID}"
dataset_name = f"dataset_{UUID}"
model_name = f"model_{UUID}"
job_name = f"job_{UUID}"
endpoint_name = f"endpoint_{UUID}"
machine_type = "n1-standard-4"
gcs_source = "gs://sb-challenge-labs/adult-income.csv"
predict_name = f"predict_{UUID}"

In [62]:
# Validate region of the given source (BigQuery) against region of the pipeline

# Configure the pipeline
job = aiplatform.PipelineJob(
    display_name=pipeline_name,
    template_path="tabular_classification_pipeline.yaml",
    pipeline_root=pipeline_root,
    parameter_values={
        "project": PROJECT_ID,
        "gcp_region": REGION,
        "gcs_source": gcs_source,
        "thresholds_dict_str": '{"auRoc": 0.95}',
        "dataset_name": dataset_name,
        "job_name": job_name,
        "model_name": model_name,
        "endpoint_name": endpoint_name,
        "predict_name": predict_name,
        "machine_type": machine_type,
    },
    enable_caching=False,
)

Run the pipeline job. Click on the generated link to see your run in the Cloud Console.

In [None]:
# Run the job
job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/760420608781/locations/us-central1/pipelineJobs/pipeline-32ad5fad41cd487fbc4496b7f629d79c-20231007183223
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/760420608781/locations/us-central1/pipelineJobs/pipeline-32ad5fad41cd487fbc4496b7f629d79c-20231007183223')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/pipeline-32ad5fad41cd487fbc4496b7f629d79c-20231007183223?project=760420608781
PipelineJob projects/760420608781/locations/us-central1/pipelineJobs/pipeline-32ad5fad41cd487fbc4496b7f629d79c-20231007183223 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/760420608781/locations/us-central1/pipelineJobs/pipeline-32ad5fad41cd487fbc4496b7f629d79c-20231007183223 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/760420608781/locations/us-central1/pipelineJobs/pipeline-32ad5fad41cd487

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

(Set `delete_bucket` to **True** to delete the Cloud Storage bucket.)

In [None]:
import os

delete_bucket = False

# Delete the Vertex AI Pipeline Job
job.delete()

# Delete the Vertex AI Endpoint
endpoints = aiplatform.Endpoint.list(
    filter=f"display_name={ENDPOINT_DISPLAY_NAME}", order_by="create_time"
)

if len(endpoints) > 0:
    endpoint = endpoints[0]
    endpoint.delete(force=True)

# Delete the Vertex AI model
models = aiplatform.Model.list(
    filter=f"display_name={MODEL_DISPLAY_NAME}", order_by="create_time"
)
if len(models) > 0:
    model = models[0]
    model.delete()

# Delete the Vertex AI Dataset
datasets = aiplatform.TabularDataset.list(
    filter=f"display_name={DATASET_DISPLAY_NAME}", order_by="create_time"
)
if len(datasets) > 0:
    dataset = datasets[0]
    dataset.delete()

# Delete the Cloud Storage bucket
if delete_bucket or os.getenv("IS_TESTING"):
    ! gsutil rm -r $BUCKET_URI