# Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud ML Engine. This notebook demonstrate how to:
 
* Train an XGBoost model in a local notebook,
* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,
  * Data is read from a PVC
  * The append builder is used to rapidly build a docker image
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
* Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].

[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp-local-notebook/

## Set up your notebook for training an XGBoost model

Import the libraries required to train this model.

In [1]:
import demo_util

demo_util.notebook_setup()

In [2]:
# fairing:include-cell
import ames
import fire
import joblib
import logging
import nbconvert
import os
import pathlib
import sys
from pathlib import Path
import pandas as pd
import pprint
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor
from importlib import reload

In [22]:
# Imports not to be included in the built docker image
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes import client as k8s_client
import fairing   
from fairing.builders import append
from fairing.deployers import job
import fairing_util

Define various constants

In [87]:
nfs_path = os.path.join("/mnt/kubeflow-gcfs/data/ames_dataset")
model_dir = os.path.join("/mnt/kubeflow-gcfs/models")
train_data = "/mnt/kubeflow-gcfs/data/ames_dataset/train.csv"
model_file = os.path.join(model_dir, "trained_ames_model.dat")

# Base image is built from the Dockerfile in the repo
# Can be the same image as your notebook
base_image = "gcr.io/code-search-demo/kubecon-demo/notebook:v20190518-2d04328-dirty-a8c2a5"

In [5]:
# Copy data to nfs
demo_util.copy_data_to_nfs(nfs_path, model_dir)

## Define Train and Predict functions

In [83]:
# fairing:include-cell
class HousingServe(object):    
    def __init__(self, model_file=None):
        self.n_estimators = 50
        self.learning_rate = 0.1
        if not model_file:
            print("model_file not supplied; checking environment variable")
            model_file = os.getenv("MODEL_FILE")
        
        self.model_file = model_file
        print("model_file={0}".format(self.model_file))
        
        self.model = None
                

    def train(self, train_input, model_file):
        (train_X, train_y), (test_X, test_y) = ames.read_input(train_input)
        model = ames.train_model(train_X,
                                 train_y,
                                 test_X,
                                 test_y,
                                 self.n_estimators,
                                 self.learning_rate)

        ames.eval_model(model, test_X, test_y)
        ames.save_model(model, model_file)

    def predict(self, X, feature_names):
        """Predict using the model for given ndarray."""
        if not self.model:
            print("Loading model {0}".format(self.model_file))
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return [[prediction.item(0), prediction.item(1)]]
    
    def create_pr_to_update_model(self, job_spec_file, new_model):
        ames.create_pr_to_update_model(job_spec_file, new_model)

## Train your Model Locally

* Train your model locally inside your notebook

In [7]:
local_model_file = "/tmp/trained_model.dat"
housing = HousingServe(local_model_file)

housing.train(train_data, local_model_file)

model_file=/tmp/trained_model.dat
[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-r

INFO:root:Best RMSE on eval: 28787.72 with 50 rounds
INFO:root:mean_absolute_error=18173.15
INFO:root:Model export success: /tmp/trained_model.dat


## Predict locally

* Run prediction inside the notebook using the newly created notebook

In [8]:
(train_X, train_y), (test_X, test_y) = ames.read_input("ames_dataset/train.csv")

housing.predict(test_X, None)

Loading model /tmp/trained_model.dat


[[165164.875, 111924.984375]]

## Use Fairing to Launch a K8s Job to train your model

### Set up Kubeflow Fairing for training and predictions

Import the `fairing` library and configure the environment that your training or prediction job will run in.

In [9]:
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'python:{}'.format(PY_VERSION)

## Use fairing to build the docker image

* This uses the append builder to rapidly build docker images

In [88]:
preprocessor = fairing_util.ConvertNotebookPreprocessorWithFire("HousingServe")

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files=["ames.py", "deployment/update_model_job.yaml", "update_model.py"]
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()
builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
                                      base_image=base_image, preprocessor=preprocessor)
builder.build()


INFO:root:Creating docker context: /tmp/fairing.context.tar.gz
INFO:root:Adding files to context: [PosixPath('ames-xgboost-build-train-deploy.py'), 'deployment/update_model_job.yaml', 'update_model.py', 'ames.py']
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/git_jlewi-kubecon-demo/fairing/fairing/__init__.py at /app/fairing/__init__.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/git_jlewi-kubecon-demo/fairing/fairing/runtime_config.py at /app/fairing/runtime_config.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding ames-xgboost-build-train-deploy.py at /app/ames-xgboost-build-train-deploy.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding deployment/update_model_job.yaml at /app/deployment/update_model_job.yaml
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding update_model.py at /app/update_model.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding ames.py at /app/ames.py
INFO:root:Loading Docker credentials for rep

## Launch the K8s Job

* Use pod mutators to attach a PVC and credentials to the pod

In [11]:
pod_spec = builder.generate_pod_spec()
pvc_mutator = fairing_util.add_pvc_mutator("kubeflow-gcfs", "/mnt/kubeflow-gcfs")
train_deployer = job.job.Job(namespace="kubeflow", 
                             cleanup=False,
                             pod_spec_mutators=[
                             fairing.cloud.gcp.add_gcp_credentials_if_exists, pvc_mutator])

# Add command line arguments
pod_spec.containers[0].command.extend(["train", train_data, model_file])
result = train_deployer.deploy(pod_spec)

INFO:fairing.kubernetes.manager:Pod started running True


model_file not supplied; checking environment variable
model_file=None
[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validati

In [12]:
!kubectl get jobs -l fairing-id={train_deployer.job_id} -o yaml

apiVersion: v1
items:
- apiVersion: batch/v1
  kind: Job
  metadata:
    creationTimestamp: "2019-05-18T18:12:59Z"
    generateName: fairing-job-
    labels:
      fairing-deployer: job
      fairing-id: 916908e6-7998-11e9-ae63-0a580a000143
    name: fairing-job-tgghf
    namespace: kubeflow
    resourceVersion: "13699285"
    selfLink: /apis/batch/v1/namespaces/kubeflow/jobs/fairing-job-tgghf
    uid: 91861760-7998-11e9-8964-42010a8e00ff
  spec:
    backoffLimit: 6
    completions: 1
    parallelism: 1
    selector:
      matchLabels:
        controller-uid: 91861760-7998-11e9-8964-42010a8e00ff
    template:
      metadata:
        creationTimestamp: null
        labels:
          controller-uid: 91861760-7998-11e9-8964-42010a8e00ff
          fairing-deployer: job
          fairing-id: 916908e6-7998-11e9-ae63-0a580a000143
          job-name: fairing-job-tgghf
        name: fairing-deployer
      spec:
        containers:
        - command:
          -

## Deploy the trained model to Kubeflow for predictions

In [13]:
from fairing.deployers import serving
import fairing_util
pod_spec = builder.generate_pod_spec()
pvc_mutator = fairing_util.add_pvc_mutator("kubeflow-gcfs", "/mnt/kubeflow-gcfs")

module_name = os.path.splitext(preprocessor.executable.name)[0]
deployer = serving.serving.Serving(module_name + ".HousingServe",
                                   service_type="ClusterIP",
                                   labels={"app": "ames"})
    
pvc_mutator(None, pod_spec, deployer.namespace)
pod_spec.containers[0].env.append({"name": "MODEL_FILE", "value": model_file})
url = deployer.deploy(pod_spec)

INFO:root:Cluster endpoint: http://fairing-service-skp97.kubeflow.svc.cluster.local


In [14]:
!kubectl get deploy -o yaml {deployer.deployment.metadata.name}

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
  creationTimestamp: "2019-05-18T18:13:12Z"
  generateName: fairing-deployer-
  generation: 1
  labels:
    app: ames
    fairing-deployer: serving
    fairing-id: 99599854-7998-11e9-ae63-0a580a000143
  name: fairing-deployer-7cn2p
  namespace: kubeflow
  resourceVersion: "13699318"
  selfLink: /apis/extensions/v1beta1/namespaces/kubeflow/deployments/fairing-deployer-7cn2p
  uid: 995b7035-7998-11e9-8964-42010a8e00ff
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: ames
      fairing-deployer: serving
      fairing-id: 99599854-7998-11e9-ae63-0a580a000143
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: ames
        fairing-deployer: ser

## Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.

In [15]:
(train_X, train_y), (test_X, test_y) = ames.read_input("ames_dataset/train.csv")

In [16]:
full_url = url + ":5000/predict"
result = fairing_util.predict_nparray(full_url, test_X)
pprint.pprint(result.content)

(b'{"data":{"names":["t:0","t:1"],"tensor":{"shape":[1,2],"values":[165164.875,'
 b'111924.984375]}},"meta":{}}\n')


## Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.

In [17]:
# !kubectl delete service -l app=ames
# !kubectl delete deploy -l app=ames

## Build a pipeline

In [18]:
EXPERIMENT_NAME = 'Ames'

#### Define the pipeline
Pipeline function has to be decorated with the `@dsl.pipeline` decorator

In [19]:
@dsl.pipeline(
   name='Training pipeline',
   description='A pipeline that trains an xgboost model for the Ames dataset.'
)
def train_pipeline(
   train_data="gs://code-search-demo_ames/data/ames_dataset/train.csv",
   model_file="gs://code-search-demo_ames/output/hello-world1.txt",
):      
    command=["python", preprocessor.executable.name, "train", train_data, model_file]
    train_op = dsl.ContainerOp(
            name="train", 
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    train_op.container.working_dir = "/app"

#### Compile the pipeline

In [20]:
pipeline_func = train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#### Submit the pipeline for execution

In [23]:
#Specify pipeline argument values
arguments = {"train_data": "gs://code-search-demo_ames/data/ames_dataset/train.csv",
             "model_file": "gs://code-search-demo_ames/output/hello-world1.txt"}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)

## Define a pipeline for CI/CD

* Define a pipeline that trains the model
* Then deploy the model and verify its working
* If the model is good we create a PR updating the model in the deployment

In [85]:
reload(ames)

<module 'ames' from '/home/jovyan/git_jlewi-kubecon-demo/ames.py'>

In [102]:
from kubernetes import client
kclient = client.ApiClient()
help(client.ApiClient)

Help on class ApiClient in module kubernetes.client.api_client:

class ApiClient(builtins.object)
 |  Generic API client for Swagger client library builds.
 |  
 |  Swagger generic API client. This client handles the client-
 |  server communication, and is invariant across implementations. Specifics of
 |  the methods and models for each application are generated from the Swagger
 |  templates.
 |  
 |  NOTE: This class is auto generated by the swagger code generator program.
 |  Ref: https://github.com/swagger-api/swagger-codegen
 |  Do not edit the class manually.
 |  
 |  :param host: The base path for the server to call.
 |  :param header_name: a header to pass when making calls to the API.
 |  :param header_value: a header value to pass when making calls to the API.
 |  
 |  Methods defined here:
 |  
 |  __del__(self)
 |  
 |  __init__(self, configuration=None, header_name=None, header_value=None, cookie=None, pool_threads=None)
 |      Initialize self.  See help(type(self)) for

In [105]:
import kubernetes
from kubernetes import config

In [107]:
config.incluster_config.load_incluster_config()

In [109]:
kclient = client.ApiClient()
kclient.configuration.host

'https://10.3.240.1:443'

In [86]:


h = HousingServe()
h.create_pr_to_update_model("deployment/update_model_job.yaml", model_file)

  
INFO:root:Created job kubeflow.model-updater-p97fv
INFO:root:Last condition None


model_file not supplied; checking environment variable
model_file=None


INFO:root:Last condition None
INFO:root:Last condition {'last_probe_time': datetime.datetime(2019, 5, 18, 23, 24, 19, tzinfo=tzlocal()),
 'last_transition_time': datetime.datetime(2019, 5, 18, 23, 24, 19, tzinfo=tzlocal()),
 'message': None,
 'reason': None,
 'status': 'True',
 'type': 'Complete'}
INFO:root:Job kubeflow.model-updater-p97fv is done
INFO:root:Final job:
{'api_version': 'batch/v1',
 'kind': 'Job',
 'metadata': {'annotations': None,
              'cluster_name': None,
              'creation_timestamp': datetime.datetime(2019, 5, 18, 23, 24, 4, tzinfo=tzlocal()),
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': 'model-updater-',
              'generation': None,
              'initializers': None,
              'labels': {'app': 'model-updater'},
              'name': 'model-updater-p97fv',
              'namespace': 'kubeflow',
              'owner_references': N

In [117]:
builder.build()

INFO:root:Creating docker context: /tmp/fairing.context.tar.gz
INFO:root:Adding files to context: [PosixPath('ames-xgboost-build-train-deploy.py'), PosixPath('ames-xgboost-build-train-deploy.py'), PosixPath('ames-xgboost-build-train-deploy.py'), PosixPath('ames-xgboost-build-train-deploy.py'), PosixPath('ames-xgboost-build-train-deploy.py'), PosixPath('ames-xgboost-build-train-deploy.py'), 'deployment/update_model_job.yaml', 'update_model.py', 'ames.py']
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/git_jlewi-kubecon-demo/fairing/fairing/__init__.py at /app/fairing/__init__.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding /home/jovyan/git_jlewi-kubecon-demo/fairing/fairing/runtime_config.py at /app/fairing/runtime_config.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding ames-xgboost-build-train-deploy.py at /app/ames-xgboost-build-train-deploy.py
INFO:root:Context: /tmp/fairing.context.tar.gz, Adding deployment/update_model_job.yaml at /app/deploy

In [118]:
CICD_EXPERIMENT_NAME = 'Ames CICD'
@dsl.pipeline(
   name='Ames CICD pipeline',
   description='A pipeline that trains an xgboost model for the Ames dataset and updates it.'
)
def cicd_pipeline(
   train_data="gs://code-search-demo_ames/data/ames_dataset/train.csv",
   model_file="gs://code-search-demo_ames/output/default.txt",
):      
    command=["python3", preprocessor.executable.name, "create-pr-to-update-model", 
             "deployment/update_model_job.yaml", model_file]    
    pr_op = dsl.ContainerOp(
            name="create-pr", 
            image=builder.image_tag,        
            command=command,
            )
    pr_op.container.working_dir = "/app"
#     command=["python3", "update_model.py", "all", "--model-file=" + model_file,
#              "--src-dir=/src", 
#              "--remote-fork=git@github.com:kubeflow-bot/kubecon-demo.git",
#              "--add-github-host=true"]
#     pr_op = dsl.ContainerOp(
#             name="train", 
#             image=builder.image_tag,
#             command=command,
#             )
#     pr_op.container.working_dir = "/src/jlewi/kubecon-demo"

pipeline_func = cicd_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

import datetime
model_file = "gs://new/model" + datetime.datetime.now().strftime("%y%m%d_%H%M%S")

#Specify pipeline argument values
arguments = {"train_data": "gs://code-search-demo_ames/data/ames_dataset/train.csv",
             "model_file": model_file}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(CICD_EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)
