In [None]:
# Copyright 2023 ROI Training, Inc. 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Vertex AI Pipelines: Automating Your AutoML Tabular Workflow

## Overview



- Create a pipeline that will:
    - Create a dataset
    - Train an AutoML classification model
    - Create an endpoint
    - Deploy your model
    - Perform batch predictions
- Compile the pipeline
- Execute the KFP pipeline using **Vertex AI Pipelines**

## 1. Prepare your environment



In [None]:
# Install required Python packages

! pip3 install --quiet \
    google-cloud-storage \
    google-cloud-aiplatform \
    google-cloud-pipeline-components==2.4.1 \
    kfp


In [None]:
# Set useful Python and environment variables

import uuid

# get info from gcloud
proj_output     = !gcloud config get-value project
sa_output       = !gcloud auth list 2>/dev/null

# set variables for later use
UUID            = uuid.uuid4().hex
project         = proj_output[0]
location        = "us-central1"
service_account = sa_output[2].replace("*", "").strip()
bucket_uri      = f"gs://{project}-pipelines"

In [None]:
# Configure the default region
# Create buckets
# Set permissions on the buckets

! gcloud config set compute/region {location}

! gsutil mb -l {location} -p {project} {bucket_uri}
! gsutil iam ch serviceAccount:{service_account}:roles/storage.objectCreator $bucket_uri
! gsutil iam ch serviceAccount:{service_account}:roles/storage.objectViewer $bucket_uri

## 2. Define the pipeline



In [None]:
import kfp
import uuid

from google.cloud import aiplatform
from kfp import compiler, dsl
from kfp.dsl import (Artifact, ClassificationMetrics, Input, Metrics, Output, component, importer)

# set variables to use when creating pipeline
pipeline_name       = f"pipeline-{UUID}"
pipeline_root       = f"{bucket_uri}/pipeline_root/"
dataset_name        = f"dataset_{UUID}"
train_job_name      = f"train_job_{UUID}"
model_name          = f"model_{UUID}"
endpoint_name       = f"endpoint_{UUID}"
predict_job_name    = f"predict_job_{UUID}"
machine_type        = "n1-standard-4"
training_source     = "gs://sb-challenge-labs/data_science/adult-income.csv"
prediction_source   = ["gs://sb-challenge-labs/data_science/income-batch-pipelines.jsonl"]
prediction_target   = f"{bucket_uri}/pipeline-predict"

@kfp.dsl.pipeline(name=pipeline_name, pipeline_root=pipeline_root)
def pipeline(
    project: str,
    location: str,
    dataset_name: str,
    train_job_name: str,
    model_name: str,
    endpoint_name: str,
    predict_job_name: str,
    machine_type: str,
    training_source: str,
    prediction_source: list,
    prediction_target: str,
):  
    from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
    from google_cloud_pipeline_components.v1.dataset.create_tabular_dataset.component import tabular_dataset_create as TabularDatasetCreateOp
    from google_cloud_pipeline_components.v1.endpoint.create_endpoint.component import endpoint_create as EndpointCreateOp
    from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import model_deploy as ModelDeployOp
    from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
    
    dataset_create_op = TabularDatasetCreateOp(
        project=project,
        location=location,
        display_name=dataset_name,
        gcs_source=training_source
    )

    training_op = AutoMLTabularTrainingJobRunOp(
        project=project,
        location=location,
        display_name=train_job_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        model_display_name=model_name,
        dataset=dataset_create_op.outputs["dataset"],
        target_column="income_bracket"
    )
    
    endpoint_op = EndpointCreateOp(
        project=project,
        location=location,
        display_name=endpoint_name,
    )

    deploy_op = ModelDeployOp(
        model=training_op.outputs["model"],
        endpoint=endpoint_op.outputs["endpoint"],
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        dedicated_resources_machine_type=machine_type,
    )

    predict_op = ModelBatchPredictOp(
        project=project,
        location=location,
        job_display_name=predict_job_name,
        model=training_op.outputs["model"],
        instances_format="jsonl",
        predictions_format="jsonl",
        gcs_source_uris=prediction_source,
        gcs_destination_output_uri_prefix=prediction_target
    )


## 3. Compile the pipeline

In [None]:
# Compile the pipeline with a package path of "tabular_classification_pipeline.yaml"

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tabular_classification_pipeline.yaml",
)

## 4. Run the pipeline

In [None]:
# Initialize the connection and create the pipeline job object

aiplatform.init(
    project=project, 
    location=location, 
    staging_bucket=bucket_uri
)

job = aiplatform.PipelineJob(
    display_name=pipeline_name,
    template_path="tabular_classification_pipeline.yaml",
    pipeline_root=pipeline_root,
    parameter_values={
        "project": project,
        "location": location,
        "dataset_name": dataset_name,
        "train_job_name": train_job_name,
        "model_name": model_name,
        "endpoint_name": endpoint_name,
        "predict_job_name": predict_job_name,
        "machine_type": machine_type,
        "training_source": training_source,
        "prediction_source": prediction_source,
        "prediction_target": prediction_target,
    },
    enable_caching=False,
)

In [None]:
# Run the job

job.run()