Continuous training pipeline with Kubeflow Pipeline and AI Platform
--

The workflow implemented by the pipeline is defined using a Python based Domain Specific Language (DSL). The pipeline's DSL is in the `covertype_training_pipeline.py` file.

The pipeline's DSL is to avoid hardcoding any environment specific settings like file paths or connection strings. These settings are provided to the pipeline code through a set of environment variables.

In [1]:
!grep 'BASE_IMAGE =' -A 5 pipeline/covertype_training_pipeline.py

BASE_IMAGE = os.getenv('BASE_IMAGE')
TRAINER_IMAGE = os.getenv('TRAINER_IMAGE')
RUNTIME_VERSION = os.getenv('RUNTIME_VERSION')
PYTHON_VERSION = os.getenv('PYTHON_VERSION')
COMPONENT_URL_SEARCH_PREFIX = os.getenv('COMPONENT_URL_SEARCH_PREFIX')
USE_KFP_SA = os.getenv('USE_KFP_SA')


__The custom components execute in a container image defined in base_image/Dockerfile__

In [2]:
!cat base_image/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire scikit-learn==0.20.4 pandas==0.24.2 kfp==0.2.5

__The training step in the pipeline employes the AI Platform Training component to schedule a AI Platform Training job in a custom training container. The custom training image is defined in trainer_image/Dockerfile__

In [3]:
!cat trainer_image/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire cloudml-hypertune scikit-learn==0.20.4 pandas==0.24.2
WORKDIR /app
COPY train.py .

ENTRYPOINT ["python", "train.py"]

Building and deploying the pipeline
--

Before deploying to AI Platform Pipelines, the pipeline DSL has to be compiled into a pipeline runtime format, also refered to as a pipeline package. The runtime format is based on Argo Workflow, which is expressed in YAML.

__Configure environment settings__

In [5]:
!gsutil ls

gs://artifacts.zeta-rush-341516.appspot.com/
gs://cloud-ai-platform-4aa74d0a-5386-461c-8135-3f0feac88a35/
gs://cloud-ai-platform-fffcccf5-f8f6-480b-9bb1-7a0a20be6be1/
gs://mlops-youness/
gs://storage_bucket_speech/
gs://zeta-rush-341516_cloudbuild/


For `ENDPOINT`, we use the value of the host variable in the `Connect to this Kubeflow Pipelines instance from a Python client via Kubeflow Pipelines SDK` section of the SETTINGS window.

For `ARTIFACT_STORE_URI`, we copy the bucket name

In [6]:
REGION = 'us-central1'
ENDPOINT = 'https://163ec11eb127b446-dot-us-central1.pipelines.googleusercontent.com' 
ARTIFACT_STORE_URI = 'gs://mlops-youness'
PROJECT_ID = !(gcloud config get-value core/project)
PROJECT_ID = PROJECT_ID[0]

__Build the trainer image__

In [7]:
IMAGE_NAME='trainer_image'
TAG='latest'
TRAINER_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [9]:
!gcloud builds submit --timeout 15m --tag $TRAINER_IMAGE trainer_image

Creating temporary tarball archive of 4 file(s) totalling 5.5 KiB before compression.
Uploading tarball of [trainer_image] to [gs://zeta-rush-341516_cloudbuild/source/1647974953.128742-e9760e2e00ef43f6922264d02a27a93c.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/zeta-rush-341516/locations/global/builds/f3509351-be20-451a-91f4-85e9c0e16ae7].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/f3509351-be20-451a-91f4-85e9c0e16ae7?project=156920671469].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "f3509351-be20-451a-91f4-85e9c0e16ae7"

FETCHSOURCE
Fetching storage object: gs://zeta-rush-341516_cloudbuild/source/1647974953.128742-e9760e2e00ef43f6922264d02a27a93c.tgz#1647974953458121
Copying gs://zeta-rush-341516_cloudbuild/source/1647974953.128742-e9760e2e00ef43f6922264d02a27a93c.tgz#1647974953458121...
/ [1 files][  1.5 KiB/  1.5 KiB]                                                
Operation complete

__Build the base image for custom components__

In [10]:
IMAGE_NAME='base_image'
TAG='latest'
BASE_IMAGE='gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, TAG)

In [11]:
!gcloud builds submit --timeout 15m --tag $BASE_IMAGE base_image

Creating temporary tarball archive of 2 file(s) totalling 242 bytes before compression.
Uploading tarball of [base_image] to [gs://zeta-rush-341516_cloudbuild/source/1647975205.79693-b113f66d2d2b42fdb2deadbfce021e61.tgz]
Created [https://cloudbuild.googleapis.com/v1/projects/zeta-rush-341516/locations/global/builds/0a2b90f6-fc53-45ca-b673-93b854c00db9].
Logs are available at [https://console.cloud.google.com/cloud-build/builds/0a2b90f6-fc53-45ca-b673-93b854c00db9?project=156920671469].
----------------------------- REMOTE BUILD OUTPUT ------------------------------
starting build "0a2b90f6-fc53-45ca-b673-93b854c00db9"

FETCHSOURCE
Fetching storage object: gs://zeta-rush-341516_cloudbuild/source/1647975205.79693-b113f66d2d2b42fdb2deadbfce021e61.tgz#1647975206122142
Copying gs://zeta-rush-341516_cloudbuild/source/1647975205.79693-b113f66d2d2b42fdb2deadbfce021e61.tgz#1647975206122142...
/ [1 files][  368.0 B/  368.0 B]                                                
Operation completed ov

__Compile the pipeline__

We can compile the DSL using an API from the `KFP SDK` or using the `KFP` compiler

In [12]:
USE_KFP_SA = False

COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/'
RUNTIME_VERSION = '1.15'
PYTHON_VERSION = '3.7'

%env USE_KFP_SA={USE_KFP_SA}
%env BASE_IMAGE={BASE_IMAGE}
%env TRAINER_IMAGE={TRAINER_IMAGE}
%env COMPONENT_URL_SEARCH_PREFIX={COMPONENT_URL_SEARCH_PREFIX}
%env RUNTIME_VERSION={RUNTIME_VERSION}
%env PYTHON_VERSION={PYTHON_VERSION}

env: USE_KFP_SA=False
env: BASE_IMAGE=gcr.io/zeta-rush-341516/base_image:latest
env: TRAINER_IMAGE=gcr.io/zeta-rush-341516/trainer_image:latest
env: COMPONENT_URL_SEARCH_PREFIX=https://raw.githubusercontent.com/kubeflow/pipelines/0.2.5/components/gcp/
env: RUNTIME_VERSION=1.15
env: PYTHON_VERSION=3.7


In [14]:
!dsl-compile --py pipeline/covertype_training_pipeline.py --output covertype_training_pipeline.yaml



In [15]:
!head covertype_training_pipeline.yaml

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: covertype-classifier-training-
  annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.8.11, pipelines.kubeflow.org/pipeline_compilation_time: '2022-03-22T19:23:32.606049',
    pipelines.kubeflow.org/pipeline_spec: '{"description": "The pipeline training
      and deploying the Covertype classifierpipeline_yaml", "inputs": [{"name": "project_id"},
      {"name": "region"}, {"name": "source_table_name"}, {"name": "gcs_root"}, {"name":
      "dataset_id"}, {"name": "evaluation_metric_name"}, {"name": "evaluation_metric_threshold"},
      {"name": "model_id"}, {"name": "version_id"}, {"name": "replace_existing_version"},


__Deploy the pipeline package__

In [16]:
PIPELINE_NAME='covertype_continuous_training'

!kfp --endpoint $ENDPOINT pipeline upload \
-p $PIPELINE_NAME \
covertype_training_pipeline.yaml

Pipeline Details
------------------
Pipeline ID  36237948-da8c-4cfb-b1f6-f5a45a586c9a
Name         covertype_continuous_training
Description
Uploaded at  2022-03-22T19:24:40+00:00
Version ID   36237948-da8c-4cfb-b1f6-f5a45a586c9a
+-----------------------------+--------------------------------------------------+
| Parameter Name              | Default Value                                    |
| project_id                  |                                                  |
+-----------------------------+--------------------------------------------------+
| region                      |                                                  |
+-----------------------------+--------------------------------------------------+
| source_table_name           |                                                  |
+-----------------------------+--------------------------------------------------+
| gcs_root                    |                                                  |
+----------------------

Submitting pipeline runs
--

We List the pipelines in AI Platform Pipelines

In [17]:
!kfp --endpoint $ENDPOINT pipeline list

+--------------------------------------+------------------------------------------------+---------------------------+
| Pipeline ID                          | Name                                           | Uploaded at               |
| 36237948-da8c-4cfb-b1f6-f5a45a586c9a | covertype_continuous_training                  | 2022-03-22T19:24:40+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| 4db0f015-4ba2-4f2d-85b4-668674c5f0be | [Tutorial] V2 lightweight Python components    | 2022-03-22T18:40:06+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| 730bf3ee-112b-4235-a90a-d456543dd1ed | [Tutorial] DSL - Control structures            | 2022-03-22T18:40:05+00:00 |
+--------------------------------------+------------------------------------------------+---------------------------+
| bace33ba-f5bb-4b11-a9e6-37017031063c | [Tutorial] Data

__Submit a run__

The ID of the covertype_continuous_training pipeline we uploaded

In [18]:
PIPELINE_ID='36237948-da8c-4cfb-b1f6-f5a45a586c9a'

In [19]:
EXPERIMENT_NAME = 'Covertype_Classifier_Training'
RUN_ID = 'Run_001'
SOURCE_TABLE = 'covertype_dataset.covertype'
DATASET_ID = 'splits'
EVALUATION_METRIC = 'accuracy'
EVALUATION_METRIC_THRESHOLD = '0.69'
MODEL_ID = 'covertype_classifier'
VERSION_ID = 'v01'
REPLACE_EXISTING_VERSION = 'True'

GCS_STAGING_PATH = '{}/staging'.format(ARTIFACT_STORE_URI)

In [20]:
!kfp --endpoint $ENDPOINT run submit \
-e $EXPERIMENT_NAME \
-r $RUN_ID \
-p $PIPELINE_ID \
project_id=$PROJECT_ID \
gcs_root=$GCS_STAGING_PATH \
region=$REGION \
source_table_name=$SOURCE_TABLE \
dataset_id=$DATASET_ID \
evaluation_metric_name=$EVALUATION_METRIC \
evaluation_metric_threshold=$EVALUATION_METRIC_THRESHOLD \
model_id=$MODEL_ID \
version_id=$VERSION_ID \
replace_existing_version=$REPLACE_EXISTING_VERSION

Creating experiment Covertype_Classifier_Training.
+--------------------------------------+---------+----------+---------------------------+--------------------------------------+
| run id                               | name    | status   | created at                | experiment id                        |
| b0270dbe-6132-40f1-a6bf-3dd274169bc8 | Run_001 |          | 2022-03-22T19:28:05+00:00 | acdce9ad-ec41-4003-b843-a8e6e82c7b37 |
+--------------------------------------+---------+----------+---------------------------+--------------------------------------+


__Monitoring the run__

We can monitor the run using KFP UI