# KubeFlow Pipeline: Loan-Delinquency Model

This notebook assumes that you have already set up a GKE cluster with Kubeflow installed as per this codelab: [g.co/codelabs/kubecon18](g.co/codelabs/kubecon18). Currently, this notebook must be run from the Kubeflow JupyterHub installation, as described in the codelab.

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Submit and run the pipeline
* Add a step in the pipeline

This example pipeline is composed of 4 steps:
1. Create train and eval split from a dataset
2. Perform hyper parmeter tuning for a basic tensorflow model on CMLE 
3. Train this model with best hyperparameters
4. Deploy this model on CMLE


## Setup

Do some imports and set some variables.  Set the `WORKING_DIR` to a path under the Cloud Storage bucket you created earlier.

In [1]:
import kfp  # the Pipelines SDK.  This library is included with the notebook image.
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

In [2]:
# Define some pipeline input variables. 
BUCKET_NAME = 'test-gcp-demo' # No gs:// here.

WORKING_DIR = 'gs://' + BUCKET_NAME +  'loan-delinq/notebooks' # Such as gs://bucket/object/path
PROJECT_ID = 'test-gcp-02-224508'

## Create an *Experiment* in the Kubeflow Pipeline System

The Kubeflow Pipeline system requires an "Experiment" to group pipeline runs. You can create a new experiment, or call `client.list_experiments()` to get existing ones.

### Note that this notebook should be running in JupyterHub in the same cluster as the pipeline system, Otherwise it will fail to talk to the pipeline system.

In [7]:
client = kfp.Client()
client.list_experiments()

{'experiments': [{'created_at': datetime.datetime(2019, 3, 2, 2, 21, 46, tzinfo=tzlocal()),
                  'description': None,
                  'id': 'b7fe0a9d-78bb-4b15-819f-0dc1560cf3dd',
                  'name': 'ex'},
                 {'created_at': datetime.datetime(2019, 3, 2, 2, 39, 18, tzinfo=tzlocal()),
                  'description': None,
                  'id': '1a6fbdd0-8556-42d6-8107-0ce6b136d791',
                  'name': 'ex1'},
                 {'created_at': datetime.datetime(2019, 3, 2, 5, 5, 28, tzinfo=tzlocal()),
                  'description': None,
                  'id': '19d416c5-d570-402e-a8a8-1e0b4f6bc838',
                  'name': 'ex1-1'},
                 {'created_at': datetime.datetime(2019, 3, 5, 20, 3, 53, tzinfo=tzlocal()),
                  'description': None,
                  'id': 'a1ccb473-f60a-4afc-8235-0ed499a0111b',
                  'name': 'exp_gtc'},
                 {'created_at': datetime.datetime(2019, 3, 5, 22, 18, 4, tzinfo=

In [8]:
exp = client.create_experiment(name='datagen_notebook-2')

## Define a Pipeline

Authoring a pipeline is like authoring a normal Python function. The pipeline function describes the topology of the pipeline. 

Each step in the pipeline is typically a `ContainerOp` --- a simple class or function describing how to interact with a docker container image. In the pipeline, all the container images referenced in the pipeline are already built. 

In [4]:

@dsl.pipeline(
  name='Loan Delinquency',
  description='Demonstrate a tensorflow pipeline for Loan-Delinquency Classification'
)
def loan_delinq_pipeline (
  project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='test-gcp-02-224508'),
  bucketName: dsl.PipelineParam=dsl.PipelineParam(name='bucketName', value='test-gcp-demo')   
  ):
    preprocess = dsl.ContainerOp(
      name='trainevalsplit',
      # image needs to be a compile-time string
      image='gcr.io/test-gcp-02-224508/loan-pipeline-trainevalsplit:latest',
      arguments=[
        '--project', project,
        '--bucket', bucketName
      ],
      file_outputs={'bucket': '/output.txt'}
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
    
    
    hparam_train = dsl.ContainerOp(
      name='hypertrain',
      # image needs to be a compile-time string
      image='gcr.io/test-gcp-02-224508/loan-pipeline-hptune:latest',
      arguments=[
                  '--bucket', bucketName,
                   '--kfp'
      ],
      file_outputs={'jobname': '/output.txt'}
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
    
    train_tuned = dsl.ContainerOp(
      name='trainTFtuned',
      # image needs to be a compile-time string
      image='gcr.io/test-gcp-02-224508/loan-pipeline-trainmodel:latest',

      arguments=[
          '--bucket', bucketName,
          '--hyperjob', hparam_train.outputs['jobname'],
                   '--kfp'
      ],
      file_outputs={'train': '/output.txt'}
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
    train_tuned.set_memory_request('2G')
    train_tuned.set_cpu_request('1')
    
    
    deploy_cmle = dsl.ContainerOp(
      name='deploytfmodel',
      # image needs to be a compile-time string
      image='gcr.io/test-gcp-02-224508/loan-pipeline-deploytfmodel:latest',
      arguments=[
        '--modeldir', train_tuned.outputs['train'],  # modeldir
        '--modelname'  , 'loandelinquency',
        '--modelversion', 'v1',
                   '--kfp'
      ],
      file_outputs={
        'model': '/model.txt',
        'version': '/version.txt'
      }
    )
    hparam_train.after(preprocess)
    train_tuned.after(hparam_train)
    deploy_cmle.after(train_tuned)
  #train.set_gpu_limit(4)

## Submit an experiment *run*

In [5]:
compiler.Compiler().compile(loan_delinq_pipeline, 'loan_delinq_pipeline.tar.gz')

The call below will run the compiled pipeline.  We won't actually do that now, but instead we'll add a new step to the pipeline, then run it.

In [9]:
#You'd uncomment this call to actually run the pipeline. 
run = client.run_pipeline(exp.id, 'loan_delinq_pipeline', 'loan_delinq_pipeline.tar.gz',
                          params={'bucketName': BUCKET_NAME,
                                   'project': PROJECT_ID})

Once the pipeline execution finishes you should see something like:

![pipeline GUI ](./pipeline.png)

-----------------------------
Copyright 2018 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.