# Continuous Training with Kubeflow Pipeline and Vertex AI

**Learning Objectives:**
1. Learn how to use KF pre-built components
1. Learn how to build a KF pipeline with these components
1. Learn how to compile, upload, and run a KF pipeline


In this lab, you will build, deploy, and run a KFP pipeline that orchestrates the **Vertex AI** services to train, tune, and deploy a **scikit-learn** model using the Google pre-built components.

## Setup

In [4]:
from datetime import datetime

from google.cloud import aiplatform

In [5]:
REGION = "us-central1"
PROJECT_ID = !(gcloud config get-value project)
PROJECT_ID = PROJECT_ID[0]

In [6]:
# Set `PATH` to include the directory containing KFP CLI
PATH = %env PATH
%env PATH=/home/jupyter/.local/bin:{PATH}

env: PATH=/home/jupyter/.local/bin:/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games


## Understanding the pipeline design


The workflow implemented by the pipeline is defined using a Python based Domain Specific Language (DSL). The pipeline's DSL is in the `pipeline_vertex/pipeline_prebuilt.py` file that we will generate below.

The pipeline's DSL has been designed to avoid hardcoding any environment specific settings like file paths or connection strings. These settings are provided to the pipeline code through a set of environment variables.


### Build the trainer image

The training step in the pipeline will require a custom training container. The custom training image is defined in `trainer_image_vertex/Dockerfile`.

In [7]:
!cat trainer_image_vertex/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune scikit-learn==0.20.4 pandas==0.24.2
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]


Let's now build and push this trainer container to the container registry:

In [8]:
IMAGE_NAME = "trainer_image_covertype_vertex"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

'gcr.io/qwiklabs-asl-00-e7f845a58208/trainer_image_covertype_vertex:latest'

In [6]:
!gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI trainer_image_vertex

Creating temporary tarball archive of 2 file(s) totalling 3.3 KiB before compression.
Uploading tarball of [trainer_image_vertex] to [gs://qwiklabs-asl-00-e7f845a58208_cloudbuild/source/1668458834.529879-15e6b79c5d0c456aac31b9617642608f.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/qwiklabs-asl-00-e7f845a58208/locations/global/builds/21bd3d04-8058-422a-afed-3cc52c2b91a5].
Logs are available at [ https://console.cloud.google.com/cloud-build/builds/21bd3d04-8058-422a-afed-3cc52c2b91a5?project=384232171367 ].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "21bd3d04-8058-422a-afed-3cc52c2b91a5"

FETCHSOURCE
Fetching storage object: gs://qwiklabs-asl-00-e7f845a58208_cloudbuild/source/1668458834.529879-15e6b79c5d0c456aac31b9617642608f.tgz#1668458837935188
Copying gs://qwiklabs-asl-00-e7f845a58208_cloudbuild/source/1668458834.529879-15e6b79c5d0c456aac31b9617642608f.tgz#1668458837935188...
/ [1 files][  1.6 KiB/  1.6 KiB]          

To match the ml framework version we use at training time while serving the model, we will have to supply the following serving container to the pipeline:

In [39]:
SERVING_CONTAINER_IMAGE_URI = (
    "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-20:latest"
)

**Note:** If you change the version of the training ml framework you'll have to supply a serving container with matching version (see [pre-built containers for prediction](https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers)).

## Building and deploying the pipeline

Let us write the pipeline to disk:

In [245]:
%%writefile ./pipeline_vertex/pipeline_prebuilt.py
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.

from kfp.v2.dsl import (Input, Artifact, component)

@component(
    base_image="gcr.io/ml-pipeline/google-cloud-pipeline-components:0.2.1",
    output_component_file="bq_export.yaml",
)
def extract_bq_op(
    bq_table: Input[Artifact],
    destination_uri: str
):

    from google.cloud import bigquery
    client = bigquery.Client(bq_table.metadata["projectId"])

    dataset_ref = bigquery.DatasetReference(bq_table.metadata["projectId"], bq_table.metadata["datasetId"])
    table_ref = dataset_ref.table(bq_table.metadata["tableId"])

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        # Location must match that of the source table.
        location="US",
    )  # API request
    extract_job.result()  # Waits for job to complete.

    print(
        "Exported {}:{}.{} to {}".format(
            bq_table.metadata["projectId"],
            bq_table.metadata["datasetId"],
            bq_table.metadata["tableId"],
            destination_uri
        )
    )


"""Kubeflow Covertype Pipeline."""
import os

from google.cloud.aiplatform import hyperparameter_tuning as hpt
from google_cloud_pipeline_components.aiplatform import (
    EndpointCreateOp,
    ModelDeployOp,
    ModelUploadOp,
)

from google_cloud_pipeline_components.experimental.bigquery import (
    BigqueryQueryJobOp,
)

from google_cloud_pipeline_components.experimental import (
    hyperparameter_tuning_job,
)
from google_cloud_pipeline_components.experimental.custom_job import (
    CustomTrainingJobOp,
)

from google.cloud import bigquery

from kfp.v2 import dsl

PIPELINE_ROOT = os.getenv("PIPELINE_ROOT")
PROJECT_ID = os.getenv("PROJECT_ID")
REGION = os.getenv("REGION")

TRAINING_CONTAINER_IMAGE_URI = os.getenv("TRAINING_CONTAINER_IMAGE_URI")
SERVING_CONTAINER_IMAGE_URI = os.getenv("SERVING_CONTAINER_IMAGE_URI")
SERVING_MACHINE_TYPE = os.getenv("SERVING_MACHINE_TYPE", "n1-standard-16")

TRAINING_FILE_PATH = os.getenv("TRAINING_FILE_PATH")
VALIDATION_FILE_PATH = os.getenv("VALIDATION_FILE_PATH")

MAX_TRIAL_COUNT = int(os.getenv("MAX_TRIAL_COUNT", "5"))
PARALLEL_TRIAL_COUNT = int(os.getenv("PARALLEL_TRIAL_COUNT", "5"))
THRESHOLD = float(os.getenv("THRESHOLD", "0.6"))

PIPELINE_NAME = os.getenv("PIPELINE_NAME", "covertype")
BASE_OUTPUT_DIR = os.getenv("BASE_OUTPUT_DIR", PIPELINE_ROOT)
MODEL_DISPLAY_NAME = os.getenv("MODEL_DISPLAY_NAME", PIPELINE_NAME)

@dsl.pipeline(
    name=f"{PIPELINE_NAME}-kfp-pipeline-full",
    description="Kubeflow pipeline that tunes, trains, and deploys on Vertex",
    pipeline_root=PIPELINE_ROOT,
)
def create_pipeline():

    def construct_query(_mode, _split):        
        query = f"CREATE OR REPLACE TABLE {PROJECT_ID}.covertype_dataset.{_mode}\
        AS (SELECT * \
        FROM `covertype_dataset.covertype` AS cover \
        WHERE \
        MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), 10) IN {_split})"
        return query

    bq_train_split_task = BigqueryQueryJobOp(
        project=PROJECT_ID,
        location="US",
        query=construct_query("training", "(1, 2, 3, 4)"),
    )

    bq_valid_split_task = BigqueryQueryJobOp(
        project=PROJECT_ID,
        location="US",
        query=construct_query("validation", "(8)"),
    )

    train_extract = extract_bq_op(
        bq_table=bq_train_split_task.outputs['destination_table'],
        destination_uri=TRAINING_FILE_PATH)

    valid_extract = extract_bq_op(
        bq_table=bq_valid_split_task.outputs['destination_table'],
        destination_uri=VALIDATION_FILE_PATH)

    worker_pool_specs = [
        {
            "machine_spec": {
                "machine_type": "n1-standard-4",
                "accelerator_type": "NVIDIA_TESLA_T4",
                "accelerator_count": 1,
            },
            "replica_count": 1,
            "container_spec": {
                "image_uri": TRAINING_CONTAINER_IMAGE_URI,
                "args": [
                    f"--training_dataset_path={TRAINING_FILE_PATH}",
                    f"--validation_dataset_path={VALIDATION_FILE_PATH}",
                    "--hptune",
                ],
            },
        }
    ]

    metric_spec = hyperparameter_tuning_job.serialize_metrics(
        {"accuracy": "maximize"}
    )

    parameter_spec = hyperparameter_tuning_job.serialize_parameters(
        {
            "alpha": hpt.DoubleParameterSpec(
                min=1.0e-4, max=1.0e-1, scale="linear"
            ),
            "max_iter": hpt.DiscreteParameterSpec(
                values=[1, 2], scale="linear"
            ),
        }
    )

    hp_tuning_task = hyperparameter_tuning_job.HyperparameterTuningJobRunOp(
        display_name=f"{PIPELINE_NAME}-kfp-tuning-job",
        project=PROJECT_ID,
        location=REGION,
        worker_pool_specs=worker_pool_specs,
        study_spec_metrics=metric_spec,
        study_spec_parameters=parameter_spec,
        max_trial_count=MAX_TRIAL_COUNT,
        parallel_trial_count=PARALLEL_TRIAL_COUNT,
        base_output_directory=PIPELINE_ROOT,
    ).after(train_extract, valid_extract)

    trials_task = hyperparameter_tuning_job.GetTrialsOp(
        gcp_resources=hp_tuning_task.outputs["gcp_resources"], region=REGION
    )

    best_hyperparameters_task = (
        hyperparameter_tuning_job.GetBestHyperparametersOp(
            trials=trials_task.output, study_spec_metrics=metric_spec
        )
    )

    # Construct new worker_pool_specs and
    # train new model based on best hyperparameters
    worker_pool_specs_task = hyperparameter_tuning_job.GetWorkerPoolSpecsOp(
        best_hyperparameters=best_hyperparameters_task.output,
        worker_pool_specs=[
            {
                "machine_spec": {"machine_type": "n1-standard-4"},
                "replica_count": 1,
                "container_spec": {
                    "image_uri": TRAINING_CONTAINER_IMAGE_URI,
                    "args": [
                        f"--training_dataset_path={TRAINING_FILE_PATH}",
                        f"--validation_dataset_path={VALIDATION_FILE_PATH}",
                        "--nohptune",
                    ],
                },
            }
        ],
    )

    training_task = CustomTrainingJobOp(
        project=PROJECT_ID,
        location=REGION,
        display_name=f"{PIPELINE_NAME}-kfp-training-job",
        worker_pool_specs=worker_pool_specs_task.output,
        base_output_directory=BASE_OUTPUT_DIR,
    )

    model_upload_task = ModelUploadOp(
        project=PROJECT_ID,
        display_name=f"{PIPELINE_NAME}-kfp-model-upload-job",
        artifact_uri=f"{BASE_OUTPUT_DIR}/model",
        serving_container_image_uri=SERVING_CONTAINER_IMAGE_URI,
    )
    model_upload_task.after(training_task)

    endpoint_create_task = EndpointCreateOp(
        project=PROJECT_ID,
        display_name=f"{PIPELINE_NAME}-kfp-create-endpoint-job",
    )
    endpoint_create_task.after(model_upload_task)

    model_deploy_op = ModelDeployOp(  # pylint: disable=unused-variable
        model=model_upload_task.outputs["model"],
        endpoint=endpoint_create_task.outputs["endpoint"],
        deployed_model_display_name=MODEL_DISPLAY_NAME,
        dedicated_resources_machine_type=SERVING_MACHINE_TYPE,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
    )


Overwriting ./pipeline_vertex/pipeline_prebuilt.py


### Compile the pipeline

Let stat by defining the environment variables that will be passed to the pipeline compiler:

Let us make sure that the `ARTIFACT_STORE` has been created, and let us create it if not:

In [246]:
!gsutil ls | grep ^{ARTIFACT_STORE}/$ || gsutil mb -l {REGION} {ARTIFACT_STORE}

gs://qwiklabs-asl-00-e7f845a58208-kfp-artifact-store/


**Note:** In case the artifact store was not created and properly set before hand, you may need
to run in **CloudShell** the following command to allow Vertex AI to access it:

```
PROJECT_ID=$(gcloud config get-value project)
PROJECT_NUMBER=$(gcloud projects list --filter="name=$PROJECT_ID" --format="value(PROJECT_NUMBER)")
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com" \
    --role="roles/storage.objectAdmin"
```

#### Use the CLI compiler to compile the pipeline

In [247]:
ARTIFACT_STORE = f"gs://{PROJECT_ID}-kfp-artifact-store"
PIPELINE_ROOT = f"{ARTIFACT_STORE}/pipeline"
DATA_ROOT = f"{ARTIFACT_STORE}/data"

TRAINING_FILE_PATH = f"{DATA_ROOT}/training/dataset.csv"
VALIDATION_FILE_PATH = f"{DATA_ROOT}/validation/dataset.csv"

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
BASE_OUTPUT_DIR = f"{ARTIFACT_STORE}/models/{TIMESTAMP}"

%env PIPELINE_ROOT={PIPELINE_ROOT}
%env PROJECT_ID={PROJECT_ID}
%env REGION={REGION}
%env SERVING_CONTAINER_IMAGE_URI={SERVING_CONTAINER_IMAGE_URI}
%env TRAINING_CONTAINER_IMAGE_URI={TRAINING_CONTAINER_IMAGE_URI}
%env TRAINING_FILE_PATH={TRAINING_FILE_PATH}
%env VALIDATION_FILE_PATH={VALIDATION_FILE_PATH}
%env BASE_OUTPUT_DIR={BASE_OUTPUT_DIR}

env: PIPELINE_ROOT=gs://qwiklabs-asl-00-e7f845a58208-kfp-artifact-store/pipeline
env: PROJECT_ID=qwiklabs-asl-00-e7f845a58208
env: REGION=us-central1
env: SERVING_CONTAINER_IMAGE_URI=us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-20:latest
env: TRAINING_CONTAINER_IMAGE_URI=gcr.io/qwiklabs-asl-00-e7f845a58208/trainer_image_covertype_vertex:latest
env: TRAINING_FILE_PATH=gs://qwiklabs-asl-00-e7f845a58208-kfp-artifact-store/data/training/dataset.csv
env: VALIDATION_FILE_PATH=gs://qwiklabs-asl-00-e7f845a58208-kfp-artifact-store/data/validation/dataset.csv
env: BASE_OUTPUT_DIR=gs://qwiklabs-asl-00-e7f845a58208-kfp-artifact-store/models/20221115002613


We compile the pipeline from the Python file we generated into a JSON description using the following command:

In [248]:
PIPELINE_JSON = "covertype_kfp_pipeline.json"

In [249]:
!dsl-compile-v2 --py pipeline_vertex/pipeline_prebuilt.py --output $PIPELINE_JSON



**Note:** You can also use the Python SDK to compile the pipeline:

```python
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=create_pipeline, 
    package_path=PIPELINE_JSON,
)

```

The result is the pipeline file. 

In [250]:
!head {PIPELINE_JSON}

{
  "pipelineSpec": {
    "components": {
      "comp-bigquery-query-job": {
        "executorLabel": "exec-bigquery-query-job",
        "inputDefinitions": {
          "parameters": {
            "job_configuration_query": {
              "type": "STRING"
            },


### Deploy the pipeline package

In [251]:
aiplatform.init(project=PROJECT_ID, location=REGION)

pipeline = aiplatform.PipelineJob(
    display_name="covertype_kfp_pipeline",
    template_path=PIPELINE_JSON,
    enable_caching=True,
)

pipeline.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/384232171367/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-full-20221115002618
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/384232171367/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-full-20221115002618')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/covertype-kfp-pipeline-full-20221115002618?project=384232171367
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/384232171367/locations/us-central1/pipelineJobs/covertype-kfp-pipeline-full-20221115002618 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/38423

Copyright 2021 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.