In [None]:
# imports
import sys
import logging
import os
import subprocess

from pathlib import Path

# fairing:include-cell
import fire
import joblib
import logging
import nbconvert
import pathlib
import pandas as pd
import pprint
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor
from importlib import reload
from sklearn.datasets import make_regression
from kubeflow.metadata import metadata
from datetime import datetime
import retrying
import urllib3

# Imports not to be included in the built docker image
import util
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes import client as k8s_client
from kubeflow import fairing   
from kubeflow.fairing.builders import append
from kubeflow.fairing.deployers import job
from kubeflow.fairing.preprocessors.converted_notebook import ConvertNotebookPreprocessorWithFire

In [1]:
from oauth2client.client import GoogleCredentials
credentials = GoogleCredentials.get_application_default()

ImportError: No module named oauth2client.client

In [2]:
# install required libs
KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.32/kfp.tar.gz'
FAIRING_PACKAGE = 'git+git://github.com/kubeflow/fairing.git'

def init_install():
  # Install the SDK
  subprocess.check_call(["pip3", "install", "--user", "-r", "requirements.txt"])
  subprocess.check_call(["pip3", "install", "--user", KFP_PACKAGE, "--upgrade"])
  subprocess.check_call(["pip3", "install", "--user", FAIRING_PACKAGE])
  subprocess.check_call(["gcloud", "auth", "configure-docker", "--quiet"])

  if os.getenv("GOOGLE_APPLICATION_CREDENTIALS"):
    logging.info("Activating service account")
    subprocess.check_call(["gcloud", "auth", "activate-service-account",
                           "--key-file=" +
                           os.getenv("GOOGLE_APPLICATION_CREDENTIALS"),
                           "--quiet"])

  # install python package
  local_py_path = os.path.join(str(Path.home()), ".local/lib/python3.6/site-packages")

  if local_py_path not in sys.path:
    logging.info("Add %s python_path", local_py_path)
    sys.path.insert(0, local_py_path)

init_install()

pip installing requirements.txt
pip installing KFP https://storage.googleapis.com/ml-pipeline/release/0.1.32/kfp.tar.gz
pip installing fairing git+git://github.com/kubeflow/fairing.git@9b0d4ed4796ba349ac6067bbd802ff1d6454d015
Configure docker credentials


## Train the model and predict 

In [5]:
def train_test_split_custom(test_size=0.3):

    X, y = make_regression(n_samples=300, n_features=6, noise=0.15)
    train_X, test_X, train_y, test_y = train_test_split(X,
                                                        y,
                                                        test_size=test_size,
                                                        shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)


In [6]:
# choose the model and train the model with training data
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):

    # choose model     
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)
    # fit data in to the model
    model.fit(train_X,
              train_y,
              early_stopping_rounds=40,
              eval_set=[(test_X, test_y)])

    return model

# evaluate the model perfomance
def evaluate_model(model, test_X, test_y):
    predictions = model.predict(test_X)

    return mean_absolute_error(predictions, test_y)

# save the model
def save_model(model, model_file):
    joblib.dump(model, model_file)

# create kubeflow metadata workspace
def create_workspace():
    METADATA_STORE_HOST = "metadata-grpc-service.kubeflow" # default DNS of Kubeflow Metadata gRPC serivce.
    METADATA_STORE_PORT = 8080
    return metadata.Workspace(
        store=metadata.Store(grpc_host=METADATA_STORE_HOST, grpc_port=METADATA_STORE_PORT),
        name="xgboost-synthetic",
        description="workspace for xgboost-synthetic artifacts and executions")

In [2]:
# class created to use kubeflow fairing to launch separate training jobs and deploy model on kubernets
class ModelServe(object):    
    def __init__(self, model_file=None):
        self.n_estimators = 50
        self.learning_rate = 0.15

        if not model_file:
            if "MODEL_FILE" in os.environ:
                print("use passed model file")
                model_file = os.getenv("MODEL_FILE")
            else:
                print("use default MODEL_FILE")
                model_file = "mockup-model.dat"
        
        self.model_file = model_file
        print("model_file={0}".format(self.model_file))
        
        self.model = None
        self._workspace = None
        self.exec = self.init_execution()

    def train(self):
        (train_X, train_y), (test_X, test_y) = train_test_split_custom()
        
        # use kubeflow metadata library to record meta data
        self.exec.log_input(metadata.DataSet(
            description="xgboost synthetic data",
            name="synthetic-data",
            owner="someone@kubeflow.org",
            uri="file://path/to/dataset",
            version="v1.0.0"))
        
        model = train_model(train_X,
                            train_y,
                            test_X,
                            test_y,
                            self.n_estimators,
                            self.learning_rate)
        
        # log meta data
        self.exec.log_output(metadata.Metrics(
            name="xgboost-synthetic-traing-eval",
            owner="someone@kubeflow.org",
            description="training evaluation for xgboost synthetic",
            uri="gcs://path/to/metrics",
            metrics_type=metadata.Metrics.VALIDATION,
            values={"mean_absolute_error": evaluate_model(model, test_X, test_y)}))
        
        # save model
        save_model(model, self.model_file)
        
        # log output
        self.exec.log_output(metadata.Model(
            name="housing-price-model",
            description="housing price prediction model using synthetic data",
            owner="someone@kubeflow.org",
            uri=self.model_file,
            model_type="linear_regression",
            training_framework={
                "name": "xgboost",
                "version": "0.9.0"
            },
            hyperparameters={
                "learning_rate": self.learning_rate,
                "n_estimators": self.n_estimators
            },
            version=datetime.utcnow().isoformat("T")))
        
    def predict(self, X, feature_names):
        """Predict using the model for given ndarray.
        
        The predict signature should match the syntax expected by Seldon Core
        https://github.com/SeldonIO/seldon-core so that we can use
        Seldon h to wrap it a model server and deploy it on Kubernetes
        """
        if not self.model:
            self.model = joblib.load(self.model_file)
        prediction = self.model.predict(data=X)

        return [[prediction.item(0), prediction.item(1)]]

    @property
    def workspace(self):
        if not self._workspace:
            self._workspace = create_workspace()
        return self._workspace
    
    def init_execution(self):                
        r = metadata.Run(
            workspace=self.workspace,
            name="xgboost-synthetic-faring-run" + datetime.utcnow().isoformat("T"),
            description="a notebook run")

        return metadata.Execution(
            name = "execution" + datetime.utcnow().isoformat("T"),
            workspace=self.workspace,
            run=r,
            description="execution for training xgboost-synthetic")

SyntaxError: invalid syntax (<ipython-input-2-a06c1a37544c>, line 19)

In [8]:
# train model locally
model = ModelServe(model_file="mockup-model.dat")
model.train()

MetadataStore with gRPC connection initialized


model_file=mockup-model.dat
[0]	validation_0-rmse:134.005
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:129.102
[2]	validation_0-rmse:124.39
[3]	validation_0-rmse:119.218
[4]	validation_0-rmse:114.096
[5]	validation_0-rmse:109.494
[6]	validation_0-rmse:107.101
[7]	validation_0-rmse:103.463
[8]	validation_0-rmse:100.657
[9]	validation_0-rmse:96.576
[10]	validation_0-rmse:94.8884
[11]	validation_0-rmse:91.7095
[12]	validation_0-rmse:90.7389
[13]	validation_0-rmse:88.1934
[14]	validation_0-rmse:86.1535
[15]	validation_0-rmse:84.8222
[16]	validation_0-rmse:83.5818
[17]	validation_0-rmse:81.6697
[18]	validation_0-rmse:80.2789
[19]	validation_0-rmse:79.4583
[20]	validation_0-rmse:78.4213
[21]	validation_0-rmse:77.0478
[22]	validation_0-rmse:75.3792
[23]	validation_0-rmse:73.9913
[24]	validation_0-rmse:73.2026
[25]	validation_0-rmse:72.2079
[26]	validation_0-rmse:70.9489
[27]	validation_0-rmse:70.5206
[28]	validation_0-rmse:69.8641
[29]	validation_0-rm

mean_absolute_error=47.22
Model export success: mockup-model.dat


Best RMSE on eval: %.2f with %d rounds 59.595573 50


In [9]:
# predict model 
(train_X, train_y), (test_X, test_y) = train_test_split_custom()

ModelServe().predict(test_X, None)

MetadataStore with gRPC connection initialized


model_file not supplied; using the default
model_file=mockup-model.dat


[[-30.6968994140625, 45.884098052978516]]

## Train and deploy model on Kubernetes

In [10]:
# Setting up google container repositories (GCR) for storing output containers
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)

## Build the docker image using Kubeflow fairing 

In [11]:
from kubeflow.fairing import constants
constants.constants.KANIKO_IMAGE = "gcr.io/kaniko-project/executor:v0.14.0"

In [12]:
from kubeflow.fairing.builders import cluster

# output_map is a map of extra files to add to the notebook.
output_map =  {
    "Dockerfile": "Dockerfile",
    "requirements.txt": "requirements.txt",
}


preprocessor = ConvertNotebookPreprocessorWithFire(class_name='ModelServe', 
                                                   notebook_file='build-train-deploy.ipynb',
                                                   output_map=output_map)

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files=["xgboost_util.py", "mockup-model.dat"]
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()

Converting build-train-deploy.ipynb to build-train-deploy.py
Creating entry point for the class name ModelServe


[PosixPath('build-train-deploy.py'), 'xgboost_util.py', 'mockup-model.dat']

### Build base image

In [13]:
base_image = "gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0"
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
                                                 base_image=base_image,
                                                 preprocessor=preprocessor,
                                                 dockerfile_path="Dockerfile",
                                                 pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                                                 context_source=cluster.gcs_context.GCSContextSource())
cluster_builder.build()

Building image using cluster builder.
Creating docker context: /tmp/fairing_context_n34sz0lr
Converting build-train-deploy.ipynb to build-train-deploy.py
Creating entry point for the class name ModelServe
Not able to find gcp credentials secret: user-gcp-sa
Trying workload identity service account: default-editor
Waiting for fairing-builder-dcbz2-lqzjg to start...
Waiting for fairing-builder-dcbz2-lqzjg to start...
Waiting for fairing-builder-dcbz2-lqzjg to start...
Pod started running True


ERROR: logging before flag.Parse: E0226 23:44:52.505936       1 metadata.go:241] Failed to unmarshal scopes: invalid character 'h' looking for beginning of value
[36mINFO[0m[0002] Resolved base name gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0 to gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0
[36mINFO[0m[0002] Resolved base name gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0 to gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0
[36mINFO[0m[0002] Downloading base image gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0
[36mINFO[0m[0002] Error while retrieving image from cache: getting file info: stat /cache/sha256:fe174faf7c477bc3dae796b067d98ac3f0d31e8075007a1146f86d13f2c98e13: no such file or directory
[36mINFO[0m[0002] Downloading base image gcr.io/kubeflow-images-public/tensorflow-1.14.0-notebook-cpu:v0.7.0
[36mINFO[0m[0003] Built cross stage deps: map[]
[36mINFO[0m[0003]

### Build the actual image

In [14]:
preprocessor.preprocess()

builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
                                      base_image=cluster_builder.image_tag, 
                                      preprocessor=preprocessor)
builder.build()

Converting build-train-deploy.ipynb to build-train-deploy.py
Creating entry point for the class name ModelServe
Building image using Append builder...
Creating docker context: /tmp/fairing_context_x4g0orab
Converting build-train-deploy.ipynb to build-train-deploy.py
Creating entry point for the class name ModelServe
build-train-deploy.py already exists in Fairing context, skipping...
Loading Docker credentials for repository 'gcr.io/kubeflow-ci/fairing-job/fairing-job:F47EE88D'
Invoking 'docker-credential-gcloud' to obtain Docker credentials.
Successfully obtained Docker credentials.
Image successfully built in 2.249176573008299s.
Pushing image gcr.io/kubeflow-ci/fairing-job/fairing-job:BDE79D77...
Loading Docker credentials for repository 'gcr.io/kubeflow-ci/fairing-job/fairing-job:BDE79D77'
Invoking 'docker-credential-gcloud' to obtain Docker credentials.
Successfully obtained Docker credentials.
Uploading gcr.io/kubeflow-ci/fairing-job/fairing-job:BDE79D77
Layer sha256:8832e37735788

## Use kubeflow fairing to launch Job

In [15]:
pod_spec = builder.generate_pod_spec()
train_deployer = job.job.Job(cleanup=False,
                             pod_spec_mutators=[
                                 fairing.cloud.gcp.add_gcp_credentials_if_exists])

pod_spec.containers[0].command.extend(["train"])
result = train_deployer.deploy(pod_spec)

Not able to find gcp credentials secret: user-gcp-sa
Trying workload identity service account: default-editor
The job fairing-job-qwdlb launched.
Waiting for fairing-job-qwdlb-67ddb to start...
Waiting for fairing-job-qwdlb-67ddb to start...
Waiting for fairing-job-qwdlb-67ddb to start...
Pod started running True


2020-02-26 23:48:04.056153: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer.so.6'; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory
2020-02-26 23:48:04.056318: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library 'libnvinfer_plugin.so.6'; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory
2020-02-26 23:48:04.056332: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:30] Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
I0226 23:48:06.277673 140238089848640 metadata_store.py:80] MetadataStore with gRPC connection initialized
model_file not supplied; using the default
model_file=mockup-model.dat
[0]	validation_0-rmse:106.201
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	vali

In [16]:
!kubectl get jobs -l fairing-id={train_deployer.job_id} -o yaml

apiVersion: v1
items:
- apiVersion: batch/v1
  kind: Job
  metadata:
    creationTimestamp: "2020-02-26T23:47:21Z"
    generateName: fairing-job-
    labels:
      fairing-deployer: job
      fairing-id: 54d568cc-58f2-11ea-964d-46fd3ccc57c5
    name: fairing-job-qwdlb
    namespace: zhenghui
    resourceVersion: "11375571"
    selfLink: /apis/batch/v1/namespaces/zhenghui/jobs/fairing-job-qwdlb
    uid: 54d8d81b-58f2-11ea-a99d-42010a8000ac
  spec:
    backoffLimit: 0
    completions: 1
    parallelism: 1
    selector:
      matchLabels:
        controller-uid: 54d8d81b-58f2-11ea-a99d-42010a8000ac
    template:
      metadata:
        annotations:
          sidecar.istio.io/inject: "false"
        creationTimestamp: null
        labels:
          controller-uid: 54d8d81b-58f2-11ea-a99d-42010a8000ac
          fairing-deployer: job
          fairing-id: 54d568cc-58f2-11ea-964d-46fd3ccc57c5
          job-name: fairing-job-qwdlb
        name: fairing-deployer
      spec:
        containers:


## Deploy the trained model to Kubeflow for predictions

In [17]:
from kubeflow.fairing.deployers import serving
pod_spec = builder.generate_pod_spec()

module_name = os.path.splitext(preprocessor.executable.name)[0]
deployer = serving.serving.Serving(module_name + ".ModelServe",
                                   service_type="ClusterIP",
                                   labels={"app": "mockup"})
    
url = deployer.deploy(pod_spec)

Cluster endpoint: http://fairing-service-kkbtm.zhenghui.svc.cluster.local:5000/predict


In [18]:
!kubectl get deploy -o yaml {deployer.deployment.metadata.name}

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
  creationTimestamp: "2020-02-26T23:48:12Z"
  generateName: fairing-deployer-
  generation: 1
  labels:
    app: mockup
    fairing-deployer: serving
    fairing-id: 73532514-58f2-11ea-964d-46fd3ccc57c5
  name: fairing-deployer-p8xc9
  namespace: zhenghui
  resourceVersion: "11375642"
  selfLink: /apis/extensions/v1beta1/namespaces/zhenghui/deployments/fairing-deployer-p8xc9
  uid: 7354b5ec-58f2-11ea-a99d-42010a8000ac
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: mockup
      fairing-deployer: serving
      fairing-id: 73532514-58f2-11ea-964d-46fd3ccc57c5
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      annotations:
        sidecar.istio.io/inject: "false"
      creationTimestamp: nul

## Send an inference request to the prediction server

In [19]:
(train_X, train_y), (test_X, test_y) = train_test_split_custom()

In [20]:
result = util.predict_nparray(url, test_X)

(b'{"data":{"names":["t:0","t:1"],"tensor":{"shape":[1,2],"values":[-49.2782592'
 b'7734375,-54.25324630737305]}},"meta":{}}\n')


## Track Models and Artifacts

In [22]:
# Create a workspace
ws = create_workspace()
ws.list()

MetadataStore with gRPC connection initialized


[{'id': 3,
  'workspace': 'xgboost-synthetic',
  'run': 'xgboost-synthetic-faring-run2020-02-26T23:26:36.443396',
  'version': '2020-02-26T23:26:36.660862',
  'owner': 'someone@kubeflow.org',
  'description': 'housing price prediction model using synthetic data',
  'name': 'housing-price-model',
  'model_type': 'linear_regression',
  'create_time': '2020-02-26T23:26:36.660887Z',
  'uri': 'mockup-model.dat',
  'training_framework': {'name': 'xgboost', 'version': '0.9.0'},
  'hyperparameters': {'learning_rate': 0.1, 'n_estimators': 50},
  'labels': None,
  'kwargs': {}},
 {'id': 6,
  'workspace': 'xgboost-synthetic',
  'run': 'xgboost-synthetic-faring-run2020-02-26T23:27:11.144500',
  'create_time': '2020-02-26T23:27:11.458520Z',
  'version': '2020-02-26T23:27:11.458480',
  'owner': 'someone@kubeflow.org',
  'description': 'housing price prediction model using synthetic data',
  'name': 'housing-price-model',
  'model_type': 'linear_regression',
  'uri': 'mockup-model.dat',
  'training_f

In [23]:
# create pipeline
@dsl.pipeline(
   name='Training pipeline',
   description='A pipeline that trains an xgboost model for the Ames dataset.'
)
def train_pipeline(
   ):      
    command=["python", preprocessor.executable.name, "train"]
    train_op = dsl.ContainerOp(
            name="train", 
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    train_op.container.working_dir = "/app"

In [24]:
# compile
pipeline_func = train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

In [25]:
# upload and execute
EXPERIMENT_NAME = 'MockupModel'

#Specify pipeline argument values
arguments = {}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)

Creating experiment MockupModel.
