![Servian Logo](assets/servian_logo.png)
# Creating KubeFlow Pipelines  - Lab #

# Building a Kubeflow Pipeline #

In this lab we will explore a dataset and turn exploratory analysis into a Kubeflow pipeline.

Some of the key concepts that will be covered include:

* Building lightweight Kubflow components from developed code.
* Using a factory component
* Accessing an external data sources.



## Using Jupyter Notebooks as Development and Deployment Environment. ##

Jupyter notebooks can be used for the entire development and and deploymeny lifecycle and offers the advantages of consumable source control and a single development and execution environment.

Working exploratory code can be easily converted to Kubeflow components from within the notebook and deployed to a trainging environment.

## Pipelines, Components and Tasks ##

A pipeline is a collection of components that when chained together form the pipeline.  At is heart a component is simply code that takes inputs, applies logic and returns an output.  A task is a run of a pipeline component.

Unlike a program executions a component is standalone and atomic and **does not** share, state, memory or resources with any other component.  A pipeline is a directed acyclic graph (DAG) of Kubeflow pipeline components.

## Preparation ##

* If the Kubeflow Pipelines Python SDK is not installed then it can be installed using pip or conda.
* You will need to be within a Project running Kubeflow pipelines


In [1]:
#!pip install kfp
#!pip install fire

A detailed list of factory components can be viewed here: https://github.com/kubeflow/pipelines/tree/master/components

# Create a Kubeflow Pipeline in Python #

What we will do:

1. Build a Kubeflow pipeline with 2 tasks using a factory component
1. Build a lightweight Python component to test the output of the tasks and add it to the pipeline
1. Build a container based component to train a classification model and add it to the pipeline
1. Add a factory component to the pipeline to submit a training job to Google Cloud AI Platform

The execution graph looks like this:

![DAG](assets/DAG.png)


## 1. Importing Modules and Declaring Variables ##

We need to import modules to help us build and compile Kubeflow pipelines. 

These will vary by pipeline but in this case we need the following:


### Imports ###
* The Kubeflow Pipelines SDK

    ```import kfp```

* kfp.dsl compiles the pipeline written in Python to KFPs domain specific language (DSL) which is expressed in YAML.

    ```import kfp.dsl as dsl```

* func_to_container_op turns a Python function into a container which is then used as a KFP component to build a pipeline.

    ```from kfp.components import func_to_container_op```

* ComponentStore allows us to search for pre written and compiled Kubeflow components to include in KFP pipelines.

    ```from kfp.components import ComponentStore```

* We are using Kubernetes secrets for authentication against GCP services so require use_gcp_secret

    ```from kfp.gcp import use_gcp_secret```

* KFP can be very senstitive to types passed between components, we need to explicitly type named tuples.

    ```from typing import NamedTuple```


### Variables ###

* Location of component factory components - this is where the GCP factory components are.

    ```COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.1.36/components/gcp/'```

* GCP Project ID

    ```PROJECT_ID = 'servian-gcp-training'```

* Any Python lightweight component we create will need a base container iamge to be built from.

    ```IMAGE_URI = 'gcr.io/deeplearning-platform-release/base-cpu'```
    

* As we are using the Python SDK to submit runs we weill need to specify the host name.

    ```KFP_HOST = '656ef6603c3ce37d-dot-us-central2.pipelines.googleusercontent.com'```




In [2]:
# Run this to view available deep learning containers

#!gcloud container images list \
#  --repository="gcr.io/deeplearning-platform-release"

In [77]:
# Kubeflow pipelines
import kfp
import kfp.dsl as dsl
from kfp.components import func_to_container_op
from kfp.components import ComponentStore
from kfp.gcp import use_gcp_secret
from typing import NamedTuple



# Location of component factory components - this is where the GCP factory components are
COMPONENT_URL_SEARCH_PREFIX = 'https://raw.githubusercontent.com/kubeflow/pipelines/0.1.36/components/gcp/'

# GCP Project ID
PROJECT_ID = 'servian-gcp-training'

# The container image we will use
IMAGE_URI = 'gcr.io/deeplearning-platform-release/base-cpu'

# Kubeflow pipelines hosty URI
KFP_HOST = '5c4264ee40616903-dot-us-central2.pipelines.googleusercontent.com'

# Region
REGION = 'us-central1'

## 2. Build a Pipeline with Two Tasks Using a Factory Component ##

Using a factory component to create a task is straighforward

1. Declare where your factory components definitions are stored
2. Load the component you want to use.

In this example we are using the GCP BigQuery component and its is located at ```COMPONENT_URL_SEARCH_PREFIX``` declared above.

This is a third-party component and reading the documentation is advised. The specific documentation is here:

https://github.com/kubeflow/pipelines/tree/master/components/gcp/bigquery/query

### Factory Component Variables ###

As per the documentation we can pass in variables, while many are optional its wortjh being explicit for anything where a random storage location is created.

We need somewhere to store the CSV ouput

```output_gcs_name```

We need the location of a dataset to store the BQ table

```dataset_location```

```dataset_id```

```table_id``` 

We need an SQL query to pass in

```my_query```


These are passed in as pipeline variables and consumed by the pipeline components.

In [4]:
# The GCS bucket URI where you will store files
gcs_bucket_root = 'servian-gcp-training'
my_things = 'jamie'

output_gcs_name = 'gs://{}/labs/{}'.format(gcs_bucket_root,my_things)
print(output_gcs_name)

# Create component factory component (bigquery)
component_store = ComponentStore(
    local_search_paths=None, url_search_prefixes=[COMPONENT_URL_SEARCH_PREFIX])

# Create BigQuery Operator
bigquery_query_op = component_store.load_component('bigquery/query')

gs://servian-gcp-training/labs/jamie


In [78]:
# 10% sample = keeping it small for the demo
training_query = '''SELECT
                          *
                     FROM
                          `servian-gcp-training.covertype.covertype` AS cover
                     WHERE
                          MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), 10) IN (0)'''


# 20% sample
testing_query = '''SELECT
                          *
                     FROM
                          `servian-gcp-training.covertype.covertype` AS cover
                     WHERE
                          MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(cover))), 10) IN (8,9)'''

In [76]:
training_table_id = '{}_train'.format(my_things)
testing_table_id = '{}_test'.format(my_things)


training_gcs_name = output_gcs_name+'/train.csv'
testing_gcs_name = output_gcs_name+'/test.csv'

print(training_gcs_name)
print(testing_gcs_name)


dataset_location = 'US'
dataset_id = 'covertype'

gs://servian-gcp-training/labs/jamie/train.csv
gs://servian-gcp-training/labs/jamie/test.csv


In [7]:
@dsl.pipeline(
    name='{}-factory-component-pipeline'.format(my_things),
    description='A pipeline using a factory component to carry out two tasks')

def pipeline(project_id=PROJECT_ID,
            region = REGION,
            training_query=training_query,
            training_table_id=training_table_id,
            training_gcs_name=training_gcs_name,
            testing_table_id=testing_table_id,
            testing_gcs_name=testing_gcs_name, 
            dataset_location=dataset_location,
            ):
    
    
    # Get training data - save to BQ and GCS
    get_training_data = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=training_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=training_gcs_name)
    
    # Get training data - save to BQ and GCS
    get_testing_data = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=testing_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=testing_gcs_name).after(get_training_data)
    
    # Authentication to services for K8s service account
    #kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))

In [8]:
client = kfp.Client(host=KFP_HOST)
pipeline = client.create_run_from_pipeline_func(pipeline, experiment_name='Kubeflow Pipeline Labs', arguments={})

# 3. Add Lightweight Python Component to the Pipeline #

    
In this step we will create a lightweight component which runs as a container and takes the ouput of the the biquery task and tests the output.  

Lightweight Python components do not require you to build a new container image for every code change. They’re intended for fast iteration in a notebook environment.

Advantages over container components:

* Faster iteration: No need to build new container image after every change (building images takes some time).
* Easier authoring: Components can be created in a local environment. Docker and Kubernetes are not required.


To build a component, we define a stand-alone Python function and then call kfp.components.func_to_container_op(func) to convert the function to a component that can be used in a pipeline.

There are several requirements for the component function:

The function must be stand-alone.

* It should not use any code declared outside the function definition.
* Any imports should be added inside the main component function.
* Any helper functions should also be defined inside the main component function.
* The function can only import packages that are available in the base image.

If you need to import a package that’s not available in the default base image you can try to find a container image that already includes the required packages. 

If the function operates on numbers, the parameters must have type hints. Supported types are int, float, bool. All other arguments are passed as strings.

To build a component with multiple output values, use Python’s typing.NamedTuple type hint syntax

In [9]:
# Lightweight Python component function
def check_outputs(output) -> str:
    print(type(output))
    print(str(output))
    return 'Success'

check_outputs_operator = func_to_container_op(check_outputs,  base_image=IMAGE_URI)

In [10]:
@dsl.pipeline(
    name='{}-factory-component-pipeline-w-lwc'.format(my_things),
    description='A pipeline using a factory component to carry out two tasks and a lightweight component')

def pipeline(project_id=PROJECT_ID,
            region = REGION,
            training_query=training_query,
            training_table_id=training_table_id,
            training_gcs_name=training_gcs_name,
            testing_table_id=testing_table_id,
            testing_gcs_name=testing_gcs_name, 
            dataset_location=dataset_location,
            ):
    
    
    # Get training data - save to BQ and GCS
    get_training_data_task = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=training_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=training_gcs_name)
    
    # Get testing data - save to BQ and GCS
    get_testing_data_task = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=testing_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=testing_gcs_name)
    
    # Check output of get training data task
    check_output_task1 =  check_outputs_operator(get_training_data_task.outputs['output_gcs_path'])
    
    # Check output of get testing data task
    check_output_task2 =  check_outputs_operator(get_testing_data_task.outputs['output_gcs_path'])
    
    # Authentication to services for K8s service account
    #kfp.dsl.get_pipeline_conf().add_op_transformer(use_gcp_secret('user-gcp-sa'))

In [11]:
client = kfp.Client(host=KFP_HOST)
pipeline = client.create_run_from_pipeline_func(pipeline, experiment_name='Kubeflow Pipeline Labs', arguments={})


Some key differences to the previous lightweight Python component we built are:

* We want to be explicit about what we returm so we use ```NamedTuple``` from the Python ```typing``` library imported above.
* We also format the return specifically as a named tuple using ```namedtuple``` from the Python ```collections``` library.
* As we are not using standard Python we must import required libraries within the function. In this case ```pandas``` and ```namedtuple``` from ```collections```

## 4. Build a Container Component and Add it the the Pipeline ##

For this we be actually running some training code.

The great thing about this is you can simply take working notebook code and easily convert it to a full container component.

You can still do exploration and local experimentation in the same notebook as pipelines are created.

What we will be doing

1. Create a training function
1. Test the trainging function
1. Create a container from the training function
1. Deploy the container to the container registry
1. Add the container component to the pipeline as a training task


In [12]:
import pandas as pd
df_train = pd.read_csv(training_gcs_name)

In [13]:
df_train.head()

Unnamed: 0,Elevation,Aspect,Slope,Horizontal_Distance_To_Hydrology,Vertical_Distance_To_Hydrology,Horizontal_Distance_To_Roadways,Hillshade_9am,Hillshade_Noon,Hillshade_3pm,Horizontal_Distance_To_Fire_Points,Wilderness_Area,Soil_Type,Cover_Type
0,2332,90,0,30,0,384,219,237,155,1328,Cache,C2702,5
1,2637,135,0,390,211,2078,219,238,156,443,Commanche,C2703,2
2,2552,0,0,484,33,953,218,238,156,1044,Commanche,C2703,2
3,2563,45,0,417,21,1120,218,237,156,1262,Commanche,C2703,2
4,2559,0,0,510,16,1113,218,238,156,1332,Commanche,C2703,2


### 4.1 Create Your Training Function ###

Here is a training function, you can test it locally to see if its working. It does the following:

1. Reads trainging data from GCS
1. Trains a model
1. Saves the model locally and uploads it to GCS

To get this function ready for deployment once we have tested it we need to do the following:

1. Write the function to file
1. Create a Dockerfile
1. Build the container and submit to the Google Container Registry

In [14]:
# Create a dicrectory for the files 

!mkdir training

mkdir: cannot create directory ‘training’: File exists


In [54]:
%%writefile training/train.py

import fire
import pickle
import numpy as np
import pandas as pd


from sklearn.compose import ColumnTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

from google.cloud import storage




def train_model(job_dir, training_gcs_name, gcs_bucket_root, alpha, max_iter, model_dir, my_things):
    
   
    df_train = pd.read_csv(training_gcs_name)
    
    numeric_feature_indexes = slice(0, 10)
    categorical_feature_indexes = slice(10, 12)

    preprocessor = ColumnTransformer(
    transformers=[
        ('num', StandardScaler(), numeric_feature_indexes),
        ('cat', OneHotEncoder(), categorical_feature_indexes) 
    ])

    pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', SGDClassifier(loss='log'))
  ])
    
    num_features_type_map = {feature: 'float64' for feature in df_train.columns[numeric_feature_indexes]}
    df_train = df_train.astype(num_features_type_map)
#     df_validation = df_validation.astype(num_features_type_map) 

    print('Starting training: alpha={}, max_iter={}'.format(alpha, max_iter))
    X_train = df_train.drop('Cover_Type', axis=1)
    y_train = df_train['Cover_Type']

    pipeline.set_params(classifier__alpha=alpha, classifier__max_iter=max_iter)
    pipeline.fit(X_train, y_train)
    
    print('Model all trained....')
    
    # Upload model top GCS
    print('Uploading model to GCS')
    model_filename = 'model.pkl'
    with open(model_filename, 'wb') as model_file:
        pickle.dump(pipeline, model_file)
    
    gcs_model_path = '{}/{}'.format(model_dir, model_filename)
    
    
    storage_client = storage.Client()
    bucket = storage_client.bucket(gcs_bucket_root)
    blob = bucket.blob('{}/{}/{}/{}'.format('labs',my_things, model_dir, model_filename))

    blob.upload_from_filename(model_filename)
    
    model_gcs_path = 'gs://{}/{}/{}/{}'.format(gcs_bucket_root,'labs',my_things, model_dir)

    print('Exported to: {}'.format(model_gcs_path))
    
    


if __name__ == "__main__":
    fire.Fire(train_model)



Overwriting training/train.py


#### In Notebook Unit Test ####

In [None]:
# directory to save model to
job_dir = 'model'
model_dir = 'model'
training_gcs_name = "gs://servian-gcp-training/labs/jamie/train.csv"
#
alpha = .0001
max_iter = 1000
my_things = 'jamie'
gcs_bucket_root = 'servian-gcp-training'


train_model(job_dir, training_gcs_name, gcs_bucket_root, alpha, max_iter, model_dir, my_things)

### 4.2 Create and build Container ###

#### Create Dockerfile ####

Create and write to the folder created above.

Note we are installing Python packages not in the base image

In [55]:
# Our base image
print(IMAGE_URI)

gcr.io/deeplearning-platform-release/base-cpu


In [56]:
%%writefile training/Dockerfile

FROM gcr.io/deeplearning-platform-release/base-cpu
RUN pip install -U fire scikit-learn==0.20.4 pandas==0.24.2
WORKDIR /training
COPY train.py .

ENTRYPOINT ["python", "train.py"]



Overwriting training/Dockerfile


In [57]:
ls training

Dockerfile  train.py


#### Build the Image and Register with Google Container Registry ####

In [58]:
IMAGE_NAME='training'
TAG=my_things
TRAIN_IMAGE_URI='"gcr.io/{}/{}:{}"'.format(PROJECT_ID, IMAGE_NAME, TAG)


In [59]:
print(TRAIN_IMAGE_URI)

"gcr.io/servian-gcp-training/training:jamie"


In [60]:
%%capture build_output
!gcloud builds submit --timeout 15m --tag "gcr.io/servian-gcp-training/training:jamie" training

In [61]:
#build_output.show()

### 4.3 Add The Container Component to the Pipeline ###

Also!!! We will submit it to Cloud AI Platform to do the training




In [79]:
# Create the AI platform factory components

mlengine_train_op = component_store.load_component('ml_engine/train')

mlengine_deploy_op = component_store.load_component('ml_engine/deploy')

# Not necessary under this scenario
alpha = '.0001'
max_iter = '1000'
model_dir = 'model'

# Training image
trainer_image = TRAIN_IMAGE_URI

# This is where we will create the job_directory
print(output_gcs_name)


# Deployment variables

RUNTIME_VERSION = '1.14'
PYTHON_VERSION = '3.5'
model_id = '{}'.format(my_things)
version_id = '{}_v1'.format(my_things)
replace_existing_version = 'True'


gs://servian-gcp-training/labs/jamie


In [80]:
output_gcs_name

'gs://servian-gcp-training/labs/jamie'

In [81]:
@dsl.pipeline(
    name='{}-factory-component-pipeline-w-lwc'.format(my_things),
    description='A pipeline using a factory component to carry out two tasks and a lightweight component')

def pipeline(project_id=PROJECT_ID,
            region = REGION,
            training_query=training_query,
            training_table_id=training_table_id,
            training_gcs_name=training_gcs_name,
            testing_table_id=testing_table_id,
            testing_gcs_name=testing_gcs_name, 
            dataset_location=dataset_location,
            ouput_gcs_name=output_gcs_name,
            trainer_image=trainer_image,
             replace_existing_version=replace_existing_version
            ):
    
    
    # Get training data - save to BQ and GCS
    get_training_data_task = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=training_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=training_gcs_name)
    
    # Get testing data - save to BQ and GCS
    get_testing_data_task = bigquery_query_op(
                  query=training_query,
                  project_id=project_id,
                  dataset_id=dataset_id,
                  table_id=testing_table_id,
                  dataset_location=dataset_location,
                  output_gcs_path=testing_gcs_name)
    
    # Check output of get training data task
    check_output_task1 =  check_outputs_operator(get_training_data_task.outputs['output_gcs_path'])
    
    # Check output of get testing data task
    check_output_task2 =  check_outputs_operator(get_testing_data_task.outputs['output_gcs_path'])
    
    
    # Train the model in cloud AI platform
    
    job_dir = '{}/{}'.format(output_gcs_name, model_dir)
    
    #print(job_dir)

    train_args = ['--model_dir', model_dir,
        '--training_gcs_name', get_training_data_task.outputs['output_gcs_path'],
        '--gcs_bucket_root', gcs_bucket_root, 
        '--alpha', alpha, 
        '--max_iter', max_iter,
        '--my_things', my_things
    ]

    train_model_task = mlengine_train_op(
        project_id=project_id,
        region=region,
        master_image_uri=trainer_image,
        job_dir=job_dir,
        args=train_args)
    
    check_output_task3 =  check_outputs_operator(train_model_task.outputs)

    
    
    deploy_model_task = mlengine_deploy_op(
        model_uri=train_model_task.outputs['job_dir'],
        project_id=project_id,
        model_id=model_id,
        version_id=version_id,
        runtime_version=RUNTIME_VERSION,
        python_version=PYTHON_VERSION,
        replace_existing_version=replace_existing_version).after(get_testing_data_task)


In [82]:
client = kfp.Client(host=KFP_HOST)
pipeline = client.create_run_from_pipeline_func(pipeline, experiment_name='Kubeflow Pipeline Labs', arguments={})



## Get Some Predictions ##