# Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud ML Engine. This notebook demonstrate how to:
 
* Train an XGBoost model in a local notebook,
* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,
* Use Kubeflow Fairing to train an XGBoost model remotely on Cloud ML Engine,
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
* Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].

[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp-local-notebook/

## Set up your notebook for training an XGBoost model

Import the libraries required to train this model.

In [1]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [2]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

Define a function to split the input file into training and testing datasets.

In [3]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

Define functions to train, evaluate, and save the trained model.

In [4]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define a class for your model, with methods for training and prediction.

In [5]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.model = None

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)

    def predict(self, X, feature_names):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return [[prediction.item(0), prediction.item(0)]]

## Train an XGBoost model in a notebook

Call `HousingServe().train()` to train your model, and then evaluate and save your trained model.

In [6]:
HousingServe().train()

[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-rmse:34205.4
[30]	validation_0-rmse

mean_absolute_error=18173.15
Model export success: trained_ames_model.dat


Best RMSE on eval: %.2f with %d rounds 28787.720703 50


## Set up Kubeflow Fairing for training and predictions on GCP

Import the `fairing` library and configure the GCP environment that your training or prediction job will run in.

In [7]:
import os
import fairing

# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'python:{}'.format(PY_VERSION)

## Train an XGBoost model remotely on Kubeflow

Import the `TrainJob` and `KubeflowGKEBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.

In [8]:
from fairing import TrainJob
from fairing.backends import KubeflowGKEBackend
train_job = TrainJob(HousingServe, BASE_IMAGE, input_files=['ames_dataset/train.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY, backend=KubeflowGKEBackend())
train_job.submit()

Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/cartick/Documents/workspace/fairing/venv/lib/python3.7/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_89gq0t3d
/Users/cartick/Documents/workspace/fairing/venv/lib/python3.7/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Building docker image gcr.io/caip-dexter-bugbash/fairing-job/fairing-job:26A3FC51...
Build output: Step 1/7 : FROM python:3.7.2
Build output: 
Build output: ---> 2053ca75899e
Build output: Step 2/7 : WORKDIR /app/
Build output: 
Build output: ---> Using cache
Build output: ---> 2c7f8a216fa8
Build output: Step 3/7 : ENV FAIRING_RUNTIME 1
Build output: 
Build output: ---> Using cache
Build output: ---> 13640ae5453f
Build output: Step 4

Build output: Downloading https://files.pythonhosted.org/packages/0f/39/bdd75b08a6fba41f098b6cb091b9e8c7a80e1b4d679a581a0ccd17b10373/tensorboard-1.13.1-py3-none-any.whl (3.2MB)
Build output: Collecting gast>=0.2.0 (from tensorflow->seldon-core->-r requirements.txt (line 6))
Build output: Downloading https://files.pythonhosted.org/packages/4e/35/11749bf99b2d4e3cceb4d55ca22590b0d7c2c62b9de38ac4a4a7f4687421/gast-0.2.2.tar.gz
Build output: Collecting tensorflow-estimator<1.14.0rc0,>=1.13.0 (from tensorflow->seldon-core->-r requirements.txt (line 6))
Build output: Downloading https://files.pythonhosted.org/packages/bb/48/13f49fc3fa0fdf916aa1419013bb8f2ad09674c275b4046d5ee669a46873/tensorflow_estimator-1.13.0-py2.py3-none-any.whl (367kB)
Build output: Collecting termcolor>=1.1.0 (from tensorflow->seldon-core->-r requirements.txt (line 6))
Build output: Downloading https://files.pythonhosted.org/packages/8a/48/a76be51647d0eb9f10e2a4511bf3ffb8cc1e6b14e9e4fab46173aa79f981/termcolor-1.1.0.tar.gz

Build output: [91mYou are using pip version 19.0.3, however version 19.1.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
[0m
Build output: ---> cf9bd5ec2ec1
Build output: Step 6/7 : COPY /app/ /app/
Build output: 
Build output: ---> 7ce163e5499e
Build output: Step 7/7 : CMD python /app/function_shim.py --serialized_fn_file /app/pickled_fn.p
Build output: 
Build output: ---> Running in 8c4f6b884444
Build output: ---> b490ae841537
Push finished: {'ID': 'sha256:b490ae841537cef98aea1cf7ecb711d406bebb793dd4e263fb24db6876eb542d'}
Build output: Successfully built b490ae841537
Build output: Successfully tagged gcr.io/caip-dexter-bugbash/fairing-job/fairing-job:26A3FC51
Publishing image gcr.io/caip-dexter-bugbash/fairing-job/fairing-job:26A3FC51...
Push output: The push refers to repository [gcr.io/caip-dexter-bugbash/fairing-job/fairing-job] None
Push output: Preparing None
Push output: Preparing None
Push output: Preparing None
Push output: Prepari







Push output: Pushed None
Push output: 26A3FC51: digest: sha256:0707e4936f021f6eb2e9f6f65d6beff2edfb46d7a1293a8924a24b844924b63a size: 3054 None
Push finished: {'Tag': '26A3FC51', 'Digest': 'sha256:0707e4936f021f6eb2e9f6f65d6beff2edfb46d7a1293a8924a24b844924b63a', 'Size': 3054}
Training job fairing-job-x82rk launched.
Waiting for fairing-job-x82rk-8fs2c to start...
Pod started running True


[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-rmse:34205.4
[30]	validation_0-rmse

Cleaning up job fairing-job-x82rk...


'fairing-job-x82rk'

## Train an XGBoost model remotely on Cloud ML Engine

Import the `TrainJob` and `GCPManagedBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Cloud ML Engine.

In [None]:
from fairing import TrainJob
from fairing.backends import GCPManagedBackend
train_job = TrainJob(HousingServe, BASE_IMAGE, input_files=['ames_dataset/train.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY, backend=GCPManagedBackend())
train_job.submit()

## Deploy the trained model to Kubeflow for predictions

Import the `PredictionEndpoint` and `KubeflowGKEBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.

In [9]:
from fairing import PredictionEndpoint
from fairing.backends import KubeflowGKEBackend
endpoint = PredictionEndpoint(HousingServe, BASE_IMAGE, input_files=['trained_ames_model.dat', "requirements.txt"],
                              docker_registry=DOCKER_REGISTRY, backend=KubeflowGKEBackend())
endpoint.create()

Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p']
/Users/cartick/Documents/workspace/fairing/venv/lib/python3.7/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_n0my892f
/Users/cartick/Documents/workspace/fairing/venv/lib/python3.7/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Building docker image gcr.io/caip-dexter-bugbash/fairing-job/fairing-job:FBDDFE2A...
Build output: Step 1/7 : FROM python:3.7.2
Build output: 
Build output: ---> 2053ca75899e
Build output: Step 2/7 : WORKDIR /app/
Build output: 
Build output: ---> Using cache
Build output: ---> 2c7f8a216fa8
Build output: Step 3/7 : ENV FAIRING_RUNTIME 1
Build output: 
Build output: ---> Using cache
Build output: ---> 13640ae5453f
Build output: Step 4

Waiting for prediction endpoint to come up...


Cluster endpoint: http://35.226.32.152:5000/predict
Prediction endpoint: http://35.226.32.152:5000/predict


## Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.

In [10]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)


{"data":{"names":["t:0","t:1"],"tensor":{"shape":[1,2],"values":[165164.875,165164.875]}},"meta":{}}



## Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.

In [11]:
endpoint.delete()

Deleted service: kubeflow/fairing-service-r5hgd
Deleted deployment: kubeflow/fairing-deployer-2dstt
