# Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Google Cloud ML Engine. This notebook demonstrate how to:
 
* Train an XGBoost model in a local notebook,
* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,
* Use Kubeflow Fairing to train an XGBoost model remotely on Cloud ML Engine,
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
* Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].

[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp-local-notebook/

## Set up your notebook for training an XGBoost model

Import the libraries required to train this model.

In [1]:
import sys
sys.path.insert(0,"/home/jovyan/fairing")
print(sys.path)

['/home/jovyan/fairing', '/opt/conda/lib/python36.zip', '/opt/conda/lib/python3.6', '/opt/conda/lib/python3.6/lib-dynload', '', '/opt/conda/lib/python3.6/site-packages', '/opt/conda/lib/python3.6/site-packages/IPython/extensions', '/home/jovyan/.ipython']


In [2]:
!pip3 install pandas 
!pip3 install joblib
!pip3 install sklearn



In [3]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [4]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.DEBUG)

Define a function to split the input file into training and testing datasets.

In [5]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

Define functions to train, evaluate, and save the trained model.

In [6]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define a class for your model, with methods for training and prediction.

In [7]:
class HousingServe(object):
    
    def __init__(self):

        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.model = None
        

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)

    def predict(self, X, feature_names):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        #rainer start
        #return [[prediction.item(0), prediction.item(0)]]
        return prediction.tolist()
        #rainer end

## Train an XGBoost model in a notebook

Call `HousingServe().train()` to train your model, and then evaluate and save your trained model.

In [8]:
HousingServe().train()

[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-rmse:34205.4
[30]	validation_0-rmse

mean_absolute_error=18173.15
Model export success: trained_ames_model.dat


Best RMSE on eval: %.2f with %d rounds 28787.720703 50


## Set up Kubeflow Fairing for training and predictions on GCP

Import the `fairing` library and configure the GCP environment that your training or prediction job will run in.

In [9]:
import os
import fairing

# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
#GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
#DOCKER_REGISTRY = 'mtr.external.otc.telekomcloud.com/rainer_englisch/kubeflow_test1'
DOCKER_REGISTRY = 'index.docker.io/m1st3rb3an'
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'library/python:{}'.format(PY_VERSION)
BASE_IMAGE = 'm1st3rb3an/rainers-python:latest'

In [10]:
from fairing import TrainJob
#from fairing.backends import KubeflowGKEBackend
from fairing.backends import KubeflowBackend
train_job = TrainJob(HousingServe, BASE_IMAGE, input_files=['ames_dataset/train.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY, backend=KubeflowBackend())
train_job.submit()

Using preprocessor: <class 'fairing.preprocessors.function.FunctionPreProcessor'>
Using docker registry: index.docker.io/m1st3rb3an
Using builder: <class 'fairing.builders.append.append.AppendBuilder'>
Building the docker image.
Building image using Append builder...
Creating docker context: /tmp/fairing_context_kvzubq_l
/home/jovyan/fairing/fairing/__init__.py already exists in Fairing context, skipping...
Loading Docker credentials for repository 'm1st3rb3an/rainers-python:latest'
Image successfully built in 1.6732040740007506s.
Pushing image index.docker.io/m1st3rb3an/fairing-job:772FBC5A...
Loading Docker credentials for repository 'index.docker.io/m1st3rb3an/fairing-job:772FBC5A'
Uploading index.docker.io/m1st3rb3an/fairing-job:772FBC5A
Layer sha256:7dc3a6a0e509ba4468dafa767116859fcfe1bfd8ad9101ec73691fbd6e1d314a exists, skipping
Layer sha256:4bc370953db1f559d31afa0043cd304a675ee7d1770fa1583c9b05a25fc9c793 exists, skipping
Layer sha256:d54db43011fd116b8cb6d9e49e268cee1fa6212f152b3

[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-rmse:34205.4
[30]	validation_0-rmse

'fairing-job-x4mnk'

## Train an XGBoost model remotely on Kubeflow

Import the `TrainJob` and `KubeflowGKEBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.

## Train an XGBoost model remotely on Cloud ML Engine

Import the `TrainJob` and `GCPManagedBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Cloud ML Engine.

## Deploy the trained model to Kubeflow for predictions

Import the `PredictionEndpoint` and `KubeflowGKEBackend` classes. Kubeflow Fairing packages the `HousingServe` class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.

In [11]:
from fairing import PredictionEndpoint
from fairing.backends import KubeflowBackend
endpoint = PredictionEndpoint(HousingServe, BASE_IMAGE, input_files=['trained_ames_model.dat', "requirements.txt"],
                              docker_registry=DOCKER_REGISTRY, backend=KubeflowBackend())
endpoint.create()

Using preprocessor: <class 'fairing.preprocessors.function.FunctionPreProcessor'>
Using docker registry: index.docker.io/m1st3rb3an
Using builder: <class 'fairing.builders.append.append.AppendBuilder'>
Building the docker image.
Building image using Append builder...
Creating docker context: /tmp/fairing_context_8poq10dc
/home/jovyan/fairing/fairing/__init__.py already exists in Fairing context, skipping...
Loading Docker credentials for repository 'm1st3rb3an/rainers-python:latest'
Image successfully built in 1.6089806579984725s.
Pushing image index.docker.io/m1st3rb3an/fairing-job:F889148A...
Loading Docker credentials for repository 'index.docker.io/m1st3rb3an/fairing-job:F889148A'
Uploading index.docker.io/m1st3rb3an/fairing-job:F889148A
Layer sha256:7dc3a6a0e509ba4468dafa767116859fcfe1bfd8ad9101ec73691fbd6e1d314a exists, skipping
Layer sha256:4bc370953db1f559d31afa0043cd304a675ee7d1770fa1583c9b05a25fc9c793 exists, skipping
Layer sha256:d4b7902036fe0cefdfe9ccf0404fe13322ecbd552f132

## Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.

In [12]:
import requests
import json

In [13]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
print("train_X.shape: {}".format(train_X.shape))
print("train_y.shape: {}".format(train_y.shape))
print("test_X.shape: {}".format(test_X.shape))
print("test_y.shape: {}".format(test_y.shape))


train_X.shape: (1095, 37)
train_y.shape: (1095,)
test_X.shape: (365, 37)
test_y.shape: (365,)


In [14]:
HousingServe().predict(test_X,None)[:5]

[165164.875, 111924.984375, 191548.140625, 104526.6171875, 179303.25]

## Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.

In [15]:
import requests
import json

In [16]:
url_prediction = endpoint.url+"/predict"
print(url_prediction)

http://10.111.235.107:5000/predict


In [17]:

serialized_data = json.loads('{"data":{"names":["a","b"],"tensor":{"shape":[2,2],"values":[0,0,1,1]}}}')
serialized_data
headers = {'content-type': 'application/json'}

In [18]:
import numpy as np
feature_names = None
data = test_X
pdata={
            "data": {
                "names":feature_names,
                "tensor": {
                    "shape": np.asarray(data.shape).tolist(),
                    "values": data.flatten().tolist(),
                },
            }
        }
        
serialized_data = json.dumps(pdata)

In [19]:
endpoint.predict_nparray(test_X)

PredictionEndpoint.predict_nparray: Start
self.url: http://10.111.235.107:5000/predict


HTTPConnectionPool(host='10.111.235.107', port=5000): Max retries exceeded with url: /predict (Caused by ConnectTimeoutError(<urllib3.connection.HTTPConnection object at 0x7f26d62449b0>, 'Connection to 10.111.235.107 timed out. (connect timeout=0.1)'))


PredictionEndpoint.predict_nparray: End


{'data': {'tensor': {'shape': [365],
   'values': [165164.875,
    111924.984375,
    191548.140625,
    104526.6171875,
    179303.25,
    84702.0546875,
    118877.984375,
    132294.71875,
    133606.21875,
    107332.8515625,
    321764.0,
    182190.046875,
    230963.40625,
    174228.75,
    290951.1875,
    177461.453125,
    200190.75,
    122958.15625,
    133376.265625,
    123258.2734375,
    285932.1875,
    201018.390625,
    133680.75,
    130342.6328125,
    127097.6015625,
    118401.671875,
    215022.828125,
    84957.3671875,
    102413.1953125,
    176742.28125,
    118672.8125,
    209591.78125,
    236567.546875,
    217784.34375,
    122548.890625,
    160624.375,
    131057.703125,
    134156.1875,
    249800.1875,
    174228.75,
    114823.9296875,
    129791.625,
    98600.8046875,
    180586.625,
    126388.6328125,
    124807.3125,
    166880.390625,
    355064.84375,
    109526.2578125,
    94474.359375,
    126749.578125,
    170985.75,
    174361.765625,

In [20]:
r = requests.post(url_prediction, data={'json':serialized_data})#, headers = headers, timeout=1000)
r.text

'{"data":{"tensor":{"shape":[365],"values":[165164.875,111924.984375,191548.140625,104526.6171875,179303.25,84702.0546875,118877.984375,132294.71875,133606.21875,107332.8515625,321764.0,182190.046875,230963.40625,174228.75,290951.1875,177461.453125,200190.75,122958.15625,133376.265625,123258.2734375,285932.1875,201018.390625,133680.75,130342.6328125,127097.6015625,118401.671875,215022.828125,84957.3671875,102413.1953125,176742.28125,118672.8125,209591.78125,236567.546875,217784.34375,122548.890625,160624.375,131057.703125,134156.1875,249800.1875,174228.75,114823.9296875,129791.625,98600.8046875,180586.625,126388.6328125,124807.3125,166880.390625,355064.84375,109526.2578125,94474.359375,126749.578125,170985.75,174361.765625,138643.71875,157280.078125,119259.4453125,154374.9375,187144.859375,122540.9609375,200533.078125,181294.203125,159886.875,206930.671875,253135.796875,181625.515625,143236.828125,191254.375,130490.75,137577.46875,170611.625,207779.203125,272294.34375,190662.78125,2051

In [21]:
endpoint.delete()

Deleted service: kubeflow/fairing-service-b8b42
Deleted deployment: kubeflow/fairing-deployer-s8dlp
