# Pipeline

This notebook demonstrates how to:
* Create a Kubeflow pipeline
* Use container images from container registry to train and deploy the model in the pipeline
* Submit a job for execution

## Imports

In [1]:
#%%capture

# Install the SDK (Uncomment the code if the SDK is not installed before)
#!pip3 install --upgrade pip -q
#!pip3 install kfp --upgrade -q
#!pip3 install pandas --upgrade -q

In [2]:
import json
import re
import subprocess
from datetime import datetime, timezone

#import pandas as pd
import kfp
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp

In [3]:
pip list | grep kfp

kfp (0.1.34)
kfp-server-api (0.1.18.3)
Note: you may need to restart the kernel to use updated packages.


## Pipeline

### Constants

In [4]:
# Global parameters
PROJECT = 'irn-70656-dev-1307100302' # project Id
REGION = 'europe-west1' # region
RAW_DATA_PATH = 'gs://bike-sharing-data/' # data collection repo
BUCKET = 'bike-sharing-pipeline-metadata' # ML workflow bucket
BUCKET_STAGING = 'bike-sharing-pipeline-staging' # ML workflow bucket
PIPELINE_VERSION = 'v0_1'

RUNNER_VALIDATION = 'DirectRunner'
RUNNER_TRANSFORM = 'DirectRunner'
RUNNER_TRAINING = 'AIplatformRunner'
RUNNER_ANALYSIS = 'DirectRunner'

# GCR docker images
SIN_VALIDATION = '7dda19c36262c082e9d58e53af19122949ff937c3ce61ddf5034c224026564e2' # data validation docker image
SIN_TRANSFORM = '5862b073f520d5de4966f4614e37de58210eb1a3857d3838b7b2461bb27fcec6' # data transform docker image
SIN_TRAINING = '83e77710ff8ee176166d59db25c19f100612171fa9b9a13275b951f80a6d7740' # model training docker image
SIN_ANALYSIS = 'bdd30638ea1327addec245fbcb995973f8da2f3783134b434d0940c8f9ba1210' # model analysis docker image
SIN_DEPLOYMENT = '84de4a40f6aecde5d9315576a44007fd4ecbd315c3626c97b551fe0c09563387' # model deployment docker image

# Pipeline metadata
PIPELINE_NAME = 'Bike Sharing Demand Prediction'
PIPELINE_FILENAME_SUFFIX = 'bikesharing_demand'
PIPELINE_DESCRIPTION = 'Pipeline that runs the full ML cycle for Bike Sharing Demand Predictions.'
EXPERIMENT_NAME = ('Bike_Sharing_Demand_Prediction'+'__' + PIPELINE_VERSION).upper()

### Define pipeline

In [5]:
class ObjectDict(dict):
    def __getattr__(self, name):
        if name in self:
            return self[name]
        else:
            raise AttributeError("No such attribute: " + name)

In [6]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description=PIPELINE_DESCRIPTION
)    
def pipeline(
    project=dsl.PipelineParam(name='project', value=PROJECT),
    region=dsl.PipelineParam(name='region', value=REGION),
    raw_data_path=dsl.PipelineParam(name='raw_data_path', value=RAW_DATA_PATH),
    bucket=dsl.PipelineParam(name='bucket', value=BUCKET),
    bucket_staging=dsl.PipelineParam(name='bucket_staging', value=BUCKET_STAGING),
    pipeline_version=dsl.PipelineParam(name='pipeline_version', value=PIPELINE_VERSION),
    runner_validation=dsl.PipelineParam(name='runner_validation', value=RUNNER_VALIDATION),
    runner_transform=dsl.PipelineParam(name='runner_transform', value=RUNNER_TRANSFORM),
    runner_training=dsl.PipelineParam(name='runner_training', value=RUNNER_TRAINING),
    runner_analysis=dsl.PipelineParam(name='runner_analysis', value=RUNNER_ANALYSIS),
    sin_validation=dsl.PipelineParam(name='sin_validation', value=SIN_VALIDATION),
    sin_transform=dsl.PipelineParam(name='sin_transform', value=SIN_TRANSFORM),
    sin_training=dsl.PipelineParam(name='sin_training', value=SIN_TRAINING),
    sin_analysis=dsl.PipelineParam(name='sin_analysis', value=SIN_ANALYSIS),
    sin_deployment=dsl.PipelineParam(name='sin_deployment', value=SIN_DEPLOYMENT),
    last_data_version='200911_154731',
    last_model_version='200911_154828',
    last_trial_id='1',
    last_deployment_flag='OK'
):
    

    start_step = 1
        
    # Step 1: Validate source data
    if start_step <= 1:
        validation = dsl.ContainerOp(
            name='data-validation',
            image='eu.gcr.io/irn-70656-dev-1307100302/bikesharing-data-validation@sha256:'+str(sin_validation),
            arguments=[
                project,
                region,
                raw_data_path, 
                bucket,
                pipeline_version,
                runner_validation
            ],
            file_outputs={'data_version': '/data_version.txt',
                          'mlpipeline_ui_metadata': '/mlpipeline-ui-metadata.json'}
        ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    else:
        validation = ObjectDict({
            'outputs': {
                'data_version': last_data_version
            }
        })
    
    # Step 2: Create ML datasets
    if start_step <= 2:
        transform = dsl.ContainerOp(
            name='data-transform',
            image='eu.gcr.io/irn-70656-dev-1307100302/bikesharing-data-transform@sha256:'+str(sin_transform),
            arguments=[
                '--pipeline_version', project,
                '--region', region,
                '--raw_data_path', raw_data_path, 
                '--bucket', bucket,
                '--pipeline_version', pipeline_version,
                '--data_version', validation.outputs['data_version'],
                '--runner', runner_transform
            ],
            file_outputs={'data_version': '/data_version.txt'}
        ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    else:
        transform = ObjectDict({
            'outputs': {
                'data_version': last_data_version
            }
        })   
        
  
    # Step 3: Train model
    if start_step <= 3:
        training = dsl.ContainerOp(
            name='model-training',
            image='eu.gcr.io/irn-70656-dev-1307100302/bikesharing-model-training@sha256:'+str(sin_training),
            arguments=[
                project,
                region,
                bucket,
                bucket_staging,
                pipeline_version,
                transform.outputs['data_version'],
                runner_training
            ],
            file_outputs={
                          'data_version': '/data_version.txt',
                          'model_version': '/model_version.txt',
                          'trial_id': '/trial_id.txt',
                         }
        ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    else:
        training = ObjectDict({
            'outputs': {
                'data_version': last_data_version,
                'model_version': last_model_version,
                'trial_id': last_trial_id
            }
        }) 
    # Step 4: Analyze model
    if start_step <= 4:
        analysis = dsl.ContainerOp(
            name='model-analysis',
            image='eu.gcr.io/irn-70656-dev-1307100302/bikesharing-model-analysis@sha256:'+str(sin_analysis),
            arguments=[
                '--pipeline_version', project,
                '--region', region,
                '--bucket', bucket,
                '--pipeline_version', pipeline_version,
                '--data_version', training.outputs['data_version'],
                '--model_version', training.outputs['model_version'],
                '--trial_id', training.outputs['trial_id'],
                '--runner', runner_analysis
            ],
            file_outputs={'data_version': '/data_version.txt',
                          'model_version': '/model_version.txt',
                          'trial_id': '/trial_id.txt',
                          'deployment_flag': '/deployment_flag.txt',
                          'mlpipeline_ui_metadata': '/mlpipeline-ui-metadata.json'}
        ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    else:
        analysis = ObjectDict({
            'outputs': {
                'data_version': last_data_version,
                'model_version': last_model_version,
                'trial_id': last_trial_id,
                'deployment_flag': last_deployment_flag,
            }
        })
        
    # Step 5: Deploy model
    if start_step <= 5:
        deployment = dsl.ContainerOp(
            name='model-deployment',
            image='eu.gcr.io/irn-70656-dev-1307100302/bikesharing-model-deployment@sha256:'+str(sin_deployment),
            arguments=[
                project,
                region,
                bucket,
                pipeline_version,
                analysis.outputs['data_version'],
                analysis.outputs['model_version'],
                analysis.outputs['trial_id'],
                analysis.outputs['deployment_flag']
            ]
        ).apply(gcp.use_gcp_secret('user-gcp-sa'))       

# Reference for invocation later
pipeline_func = pipeline

### Compile pipeline

In [7]:
pipeline_filename = 'pipeline_' + PIPELINE_FILENAME_SUFFIX + '_' + PIPELINE_VERSION + '.tar.gz'

compiler.Compiler().compile(pipeline_func, pipeline_filename)



### Submit the pipeline for execution

In [8]:
# Specify pipeline argument values
arguments = {}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    experiment = client.create_experiment(EXPERIMENT_NAME)

# Submit a pipeline run
run_name = pipeline_func.__name__ + '_run_' + datetime.now(timezone.utc).strftime("%y%m%d_%H%M%S")
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)