# Building, training, and deploying XGBoost model using Kubeflow Fairing

## What is Fairing?

[Kubeflow Fairing](https://github.com/kubeflow/fairing) provides a high level python API for machine learning operations such as training, hyper-parameter tuning, deploying models, and online/offline predictions with deployed models. It allows for data scientists to be able to develop their ML training code from within Notebooks or Python files. It makes it trivial to kick off remote execution of training and prediction jobs onto different execution platforms (eg. Kubeflow, Google Cloud ML Engine etc.) without requiring any significant changes to the training code itself.

The three major pain points that Kubeflow Fairing tackles are:
1. Packaging source code into a container 
1. Interacting with different remote backends like Kubeflow
1. Performing ml workflow tasks like going from training to a deployed model


## Ames housing value prediction using XGBoost on Kubeflow

In this example we will demonstrate how to use [Kubeflow Fairing](https://github.com/kubeflow/fairing) with XGBoost using the [Kaggle Ames Housing Prices prediction](https://www.kaggle.com/c/house-prices-advanced-regression-techniques/). We will do a detailed walk-through of how to implement, train and deploy/serve the model. You will be able to run the exact same workload on-prem and/or on any cloud provider. 

## Data preparation
You can download the dataset from the [Kaggle competition](https://www.kaggle.com/c/house-prices-advanced-regression-techniques/data). In order to make it convenient we have uploaded the dataset on GCS

```
gs://kubeflow-examples-data/ames_dataset/
```

## Local python setup
Let's install python libs required for this demo

In [None]:
deps = \
"""
pandas
joblib
numpy
xgboost
sklearn
seldon-core
google-cloud-storage
"""
with open("requirements.txt", 'w') as f:
    f.write(deps)
!pip install -r requirements.txt

## Building a model and training it locally

In [None]:
import argparse
import logging
import joblib
import sys
import os
import pandas as pd
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor

In [None]:
logging.basicConfig(format='%(message)s')
logging.getLogger().setLevel(logging.INFO)

In [None]:
## check if gsutil (part of gcloud) sdk is installed
res = !which gsutild
if len(res)==0:
    print("Please install gcloud/gsutil by following instructions here " +
          "https://cloud.google.com/sdk/docs/downloads-interactive")

In [None]:
#Copying the dataset to local storage from GCS
!gsutil cp -r gs://kubeflow-examples-data/ames_dataset/ .

In [None]:
def read_input(file_name, test_size=0.25):
    """Read input data and split it into train and test."""
    data = pd.read_csv(file_name)
    data.dropna(axis=0, subset=['SalePrice'], inplace=True)

    y = data.SalePrice
    X = data.drop(['SalePrice'], axis=1).select_dtypes(exclude=['object'])

    train_X, test_X, train_y, test_y = train_test_split(X.values,
                                                      y.values,
                                                      test_size=test_size,
                                                      shuffle=False)
    imputer = SimpleImputer(strategy="median")
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y), imputer

In [None]:
(train_X, train_y), (test_X, test_y), imputer = read_input("ames_dataset/train.csv")
print("Imputer statistics: {} of each column".format(imputer.strategy))
pd.DataFrame(imputer.statistics_).T

In [None]:
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    logging.info("mean_absolute_error=%.2f", mean_absolute_error(predictions, test_y))

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Let's create a GCS bucket for storing model weights and training artifacts

In [None]:
import fairing
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
GCS_BUCKET_ID = "{}-fairing-xgboost-demo".format(GCP_PROJECT)
GCS_BUCKET = "gs://{}".format(GCS_BUCKET_ID)
!gsutil mb {GCS_BUCKET}

In [None]:
#### Basic util functions to copy files to and from GCS
def upload_file_to_gcs(bucket_name, source_file_name, destination_file_name):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(destination_file_name)
    blob.upload_from_filename(source_file_name)
    print('File {} uploaded to to gs://{}/{}'.format(
        source_file_name,
        bucket_name,
        destination_file_name)) 

def download_file_from_gcs(bucket_name, source_file_name, destination_file_name):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.get_bucket(bucket_name)
    blob = bucket.blob(source_file_name)
    blob.download_to_filename(destination_file_name)
    print('File gs://{}/{} downloaded to {}'.format(
        bucket_name,
        source_file_name,
        destination_file_name))  

## Creating a model class with train and predict methods

In [None]:
class HousingServe(object):
    
    def __init__(self):
        self.train_input = "ames_dataset/train.csv"
        self.n_estimators = 50
        self.learning_rate = 0.1
        self.model_file = "trained_ames_model.dat"
        self.trained_model = None

    def train(self):
        (train_X, train_y), (test_X, test_y), _ = read_input(self.train_input)
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        eval_model(model, test_X, test_y)
        save_model(model, self.model_file)
        upload_file_to_gcs(GCS_BUCKET_ID, self.model_file, self.model_file)

    def predict(self, X, feature_names):
        """Predict using the model for given ndarray."""
        if not self.trained_model:
            self.trained_model = joblib.load(self.model_file)
        prediction = self.trained_model.predict(data=X)
        return [[prediction.item(0), prediction.item(0)]]

## Training Locally

In [None]:
HousingServe().train()

# Training and Deploying in Fairing

### Setting up base container and builder for fairing

Setting up google container repositories (GCR) for storing output containers. You can use any docker container registry istead of GCR.

In [None]:
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)

In [None]:
py_version = ".".join([str(x) for x in sys.version_info[0:3]])
base_image = "python:{}".format(py_version)
fairing.config.set_builder('docker', registry=DOCKER_REGISTRY, base_image=base_image)

## Kubeflow Setup
Please follow the instructions at https://www.kubeflow.org/docs/started/getting-started/ to create a Kubeflow cluster if you already don't have one.

## Training in KF

Here you are taking the model class you used in the training locally and passing it to the fairing library along with dependencies like requirements.txt and dataset for training.

In [None]:
fairing.config.set_deployer('job')
fairing.config.set_preprocessor("function", function_obj=HousingServe,
                                input_files=["requirements.txt", "ames_dataset/train.csv"])
fairing.config.run()

## Training in Google cloud ML Engine

Porting training from Kubeflow to Google cloud ML engine is just a matter of chaning the deployer to gcp.

In [None]:
fairing.config.set_deployer('gcp')
fairing.config.set_preprocessor("function", function_obj=HousingServe,
                                input_files=["requirements.txt", "ames_dataset/train.csv"])
fairing.config.run()

## Deploying model and creating an endpoint in KF

Now we have trained a model and we want to deploy it to an online prediction endpoint. This is achived by using serving deployer that creates Kubernetes service for your model. Here the same model class that is used for training is passed that has a predict function.

In [None]:
fairing.config.set_preprocessor("function", function_obj=HousingServe,
                                input_files=["requirements.txt", "trained_ames_model.dat"])
fairing.config.set_deployer('serving', serving_class="HousingServe")
fairing.config.run()

### Making prediction calls against the endpoint

In [None]:
# Copy the prediction endpoint from prev step
!curl http://<ip-address>:5000/predict -H "Content-Type: application/x-www-form-urlencoded" -d 'json={"data":{"tensor":{"shape":[1,37],"values":[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37]}}}'
        