# Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on multiple backends such as Google Kubernetes Engine (GKE), Google Cloud ML Engine and Azure Kubernetes Service (AKS). This notebook demonstrate how to:
 
* Train an XGBoost model in a local notebook,
* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow using some of the supported backends,
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and
* Call the deployed endpoint for predictions.

To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].

[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp-local-notebook/

## Set up your notebook for training an XGBoost model

Import the libraries required to train this model.

In [1]:
!pip install -r requirements.txt



You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import argparse
import logging
import joblib
import sys
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [3]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

Define a function to split the input file into training and testing datasets.

In [4]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)

Define functions to train, evaluate, and save the trained model.

In [5]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds" %
               (model.best_score,
                model.best_iteration+1))
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define a class for your model, with methods for training and prediction.

In [6]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.model = None

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)

    def predict(self, X, feature_names=None):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return prediction

## Train an XGBoost model in a notebook

Call `HousingServe().train()` to train your model, and then evaluate and save your trained model.

In [7]:
HousingServe().train()

[0]	validation_0-rmse:177514
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:161858
[2]	validation_0-rmse:147237
[3]	validation_0-rmse:134132
[4]	validation_0-rmse:122224
[5]	validation_0-rmse:111538
[6]	validation_0-rmse:102142
[7]	validation_0-rmse:93392.3
[8]	validation_0-rmse:85824.6
[9]	validation_0-rmse:79667.6
[10]	validation_0-rmse:73463.4
[11]	validation_0-rmse:68059.4
[12]	validation_0-rmse:63350.5
[13]	validation_0-rmse:59732.1
[14]	validation_0-rmse:56260.7
[15]	validation_0-rmse:53392.6
[16]	validation_0-rmse:50770.8
[17]	validation_0-rmse:48107.8
[18]	validation_0-rmse:45923.9
[19]	validation_0-rmse:44154.2
[20]	validation_0-rmse:42488.1
[21]	validation_0-rmse:41263.3
[22]	validation_0-rmse:40212.8
[23]	validation_0-rmse:39089.1
[24]	validation_0-rmse:37691.1
[25]	validation_0-rmse:36875.2
[26]	validation_0-rmse:36276.2
[27]	validation_0-rmse:35444.1
[28]	validation_0-rmse:34831.5
[29]	validation_0-rmse:34205.4
[30]	validation_0-rmse

mean_absolute_error=18173.15
Model export success: trained_ames_model.dat


## Set up Kubeflow Fairing for training and predictions

Import the `fairing` library and configure the backend environment that your training or prediction job will run in.

In [8]:
import fairing

# Default to using the GKE backend.
FAIRING_BACKEND = 'KubeflowGKEBackend'

# If using KubeflowAzureBackend and unless passed to notebook as parameters,
# replace below with your configuration (remove < and > characters).
DOCKER_REGISTRY = '<your_registry>.azurecr.io'
AZURE_REGION = '<your_region>'
AZURE_RESOURCE_GROUP = '<your_resource_group>'
AZURE_STORAGE_ACCOUNT = '<your_storage_account>'

POD_SPEC_MUTATORS = None
FAIRING_BACKEND = 'KubernetesBackend'

In [9]:
import importlib

# The logic in this cell demonstrates what to change
# if wanting to use a different backend such as Cloud ML Engine (GCPManagedBackend) or
# AKS (KubeflowAzureBackend). This cell is also parametrized so that it allows programmatic
# execution of the notebook using different backends.

if FAIRING_BACKEND in ['KubeflowGKEBackend', 'GCPManagedBackend']:
    # Setting up google container repositories (GCR) for storing output containers
    GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
    DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
    BuildContext = None
if FAIRING_BACKEND == 'KubeflowAzureBackend':
    from fairing.builders.cluster.azurestorage_context import StorageContextSource
    BuildContext = StorageContextSource(
        region=AZURE_REGION, resource_group_name=AZURE_RESOURCE_GROUP,
        storage_account_name=AZURE_STORAGE_ACCOUNT
    )
if FAIRING_BACKEND == 'KubernetesBackend':
    DOCKER_REGISTRY = '<your_private_registry>'
    GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
    DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
    pod_spec_mutators=[fairing.cloud.docker.add_docker_credentials_if_exists]
    BuildContext = None
    
BackendClass = getattr(importlib.import_module('fairing.backends'), FAIRING_BACKEND)

## Train an XGBoost model remotely on Kubeflow

Import the `TrainJob` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the training data, and the training job's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the training job on Kubeflow.

In [10]:
from fairing import TrainJob
train_job = TrainJob(HousingServe, input_files=['ames_dataset/train.csv', "requirements.txt"],
                     docker_registry=DOCKER_REGISTRY,
                     backend=BackendClass(build_context_source=BuildContext),
                     pod_spec_mutators=POD_SPEC_MUTATORS)
train_job.submit()

Can't determine namespace automatically. Using 'default' namespace but recomend to provide namespace explicitly. Using 'default' namespace might result in unable to mount some required secrets in cloud backends.
Using default base docker image: registry.hub.docker.com/library/python:3.6.2
Using builder: <class 'fairing.builders.docker.docker.DockerBuilder'>
Building the docker image.
Building image using docker
Docker command: ['python', '/app/function_shim.py', '--serialized_fn_file', '/app/pickled_fn.p', '--python_version', '3.6.2']
/home/coursera/anaconda3/envs/ksenv/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Creating docker context: /tmp/fairing_context_5ya5fgje
/home/coursera/anaconda3/envs/ksenv/lib/python3.6/site-packages/fairing/__init__.py already exists in Fairing context, skipping...
Building docker image <your_private_registry>/fairing-job:5E171BC1...


APIError: 500 Server Error: Internal Server Error ("invalid reference format")

## Deploy the trained model to Kubeflow for predictions

Import the `PredictionEndpoint` and use the configured backend class. Kubeflow Fairing packages the `HousingServe` class, the trained model, and the prediction endpoint's software prerequisites as a Docker image. Then Kubeflow Fairing deploys and runs the prediction endpoint on Kubeflow.

In [None]:
from fairing import PredictionEndpoint
endpoint = PredictionEndpoint(HousingServe, input_files=['trained_ames_model.dat', "requirements.txt"],
                              docker_registry=DOCKER_REGISTRY,
                              backend=BackendClass(build_context_source=BuildContext),
                              pod_spec_mutators=POD_SPEC_MUTATORS)
endpoint.create()

## Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.

In [None]:
(train_X, train_y), (test_X, test_y) = read_input("ames_dataset/train.csv")
endpoint.predict_nparray(test_X)


## Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.

In [None]:
endpoint.delete()