# E2E Serverless ML pipeline  - Ingest, Train, Auto Deploy Model
  --------------------------------------------------------------------

Using the classic Iris dataset to demonstrate definition and automation of an end to end ML pipeline.

#### **notebook how-to's**
* Write and test ML pipeline in a notebook.
* Use hyper parameter tests
* Convert the code to serverless functions and run in the cluster
* Define an ML pipeline DAG (using KubeFlow Pipelines)
  * with 4 steps: data prep, training, model deployment, model report
* Check our pipeline results from the notebook

<a id='top'></a>
#### **steps**
**[define a new function and its dependencies](#define-function)**<br>
**[run the data collection and training locally](#test-locally)**<br>
**[running a task with Hyper parameters (GridSearch)](#hyper-param)**<br>
**[define cluster jobs, build images and run](#build)**<br>
**[Create a multi-stage KubeFlow Pipeline from our functions](#pipeline)**<br>

In [16]:
# nuclio: ignore
import nuclio

<a id='define-function'></a>
### **define a new function and its dependencies**

In [None]:
%%nuclio cmd 
pip install sklearn
pip install xgboost
pip install matplotlib

In [17]:
%nuclio config spec.build.baseImage = "mlrun/mlrun"

%nuclio: setting spec.build.baseImage to 'mlrun/mlrun'


In [18]:
# use this to supress XGB FutureWarning
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [19]:
import xgboost as xgb
import os
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
from mlrun.artifacts import TableArtifact, PlotArtifact
import pandas as pd


def iris_generator(context):
    iris = load_iris()
    iris_dataset = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_labels = pd.DataFrame(data=iris.target, columns=['label'])
    iris_dataset = pd.concat([iris_dataset, iris_labels], axis=1)
    context.logger.info('saving iris dataframe to {}'.format(context.out_path))
    context.log_artifact(TableArtifact('iris_dataset', df=iris_dataset))
    

def xgb_train(context, 
              dataset='',
              model_name='model.bst',
              max_depth=6,
              num_class=10,
              eta=0.2,
              gamma=0.1,
              steps=20):

    df = pd.read_csv(dataset)
    X = df.drop(['label'], axis=1)
    y = df['label']
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, y, test_size=0.2)
    dtrain = xgb.DMatrix(X_train, label=Y_train)
    dtest = xgb.DMatrix(X_test, label=Y_test)

    # Get params from event
    param = {"max_depth": max_depth,
             "eta": eta, "nthread": 4,
             "num_class": num_class,
             "gamma": gamma,
             "objective": "multi:softprob"}

    # Train model
    xgb_model = xgb.train(param, dtrain, steps)

    preds = xgb_model.predict(dtest)
    best_preds = np.asarray([np.argmax(line) for line in preds])

    # log results and artifacts
    context.log_result('accuracy', float(accuracy_score(Y_test, best_preds)))
    context.log_artifact('model', body=bytes(xgb_model.save_raw()), 
                         local_path=model_name, labels={'framework': 'xgboost'})
    
    
import matplotlib
import matplotlib.pyplot as plt
from io import BytesIO

def plot_iter(context, iterations, col='accuracy', num_bins=10):
    df = pd.read_csv(BytesIO(iterations.get()))
    x = df['output.{}'.format(col)]
    fig, ax = plt.subplots(figsize=(6,6))
    n, bins, patches = ax.hist(x, num_bins, density=1)
    ax.set_xlabel('Accuraccy')
    ax.set_ylabel('Count')
    context.log_artifact(PlotArtifact('myfig', body=fig))

The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [20]:
# nuclio: end-code
# marks the end of a code section

<a id='test-locally'></a>
### run the data collection and training locally

The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the `Task` object.

We use the ```local``` runtime by default, later on we will use a ```job``` runtime for running containers.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc... For more details, see the [mlrun_basics notebook](mlrun_basics.ipynb).

In [21]:
from mlrun import new_function, code_to_function, NewTask, v3io_cred, new_model_server, mlconf, get_run_db, mount_v3io
# for local DB path use 'User/mlrun' instead 
mlconf.dbpath = 'http://mlrun-api:8080'

#### Generate the iris dataset and store in a CSV

In [22]:
out_path='/User/data/xgb'

In [23]:
gen = new_function().run(name='iris_gen', handler=iris_generator, out_path=out_path) 

[mlrun] 2020-02-24 11:16:33,320 starting run iris_gen uid=e1abe57e5a0d4a99adb202f824309a0e  -> http://mlrun-api:8080
[mlrun] 2020-02-24 11:16:33,351 saving iris dataframe to /User/data/xgb
[mlrun] 2020-02-24 11:16:33,368 log artifact iris_dataset at /User/data/xgb/iris_dataset.csv, size: 2776, db: Y



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...309a0e,0,Feb 24 11:16:33,completed,iris_gen,kind=handlerowner=adminhost=jupyter-68bdf65845-6rc8r,,,,iris_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run e1abe57e5a0d4a99adb202f824309a0e --project default , !mlrun logs e1abe57e5a0d4a99adb202f824309a0e --project default
[mlrun] 2020-02-24 11:16:33,411 run executed, status=completed


#### define a training task and run locally

In [24]:
task = NewTask(handler=xgb_train, out_path=out_path, inputs={'dataset': gen.outputs['iris_dataset']})
task.with_params(eta=0.1, max_depth=6, gamma=0.1)

<mlrun.model.RunTemplate at 0x7fa074e00470>

In [25]:
run = new_function().run(task)

[mlrun] 2020-02-24 11:16:37,611 starting run mlrun-56e268-xgb_train uid=a4e220acbebe49d7874e1d5fcc281767  -> http://mlrun-api:8080
[mlrun] 2020-02-24 11:16:37,975 log artifact model at /User/data/xgb/model.bst, size: 48404, db: Y



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...281767,0,Feb 24 11:16:37,completed,mlrun-56e268-xgb_train,kind=handlerowner=adminhost=jupyter-68bdf65845-6rc8r,dataset,eta=0.1max_depth=6gamma=0.1,accuracy=0.9333333333333333,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run a4e220acbebe49d7874e1d5fcc281767 --project default , !mlrun logs a4e220acbebe49d7874e1d5fcc281767 --project default
[mlrun] 2020-02-24 11:16:38,058 run executed, status=completed


<a id="hyper-param" ></a>
### running a task with Hyper parameters (GridSearch)

In many cases we want to run our jobs with multiple parameter combination options, we can simply create a task with hyper params (list of possible values per parameter) and MLRun will run all the combinations.

MLRun will store all the results (see the `iteration_results` artifact), you can specify which result is the best (will be treated as the overall task output) using the selection criteria (`max.accuracy` i.e. the one with maximum value as the `accuracy` result).

In [26]:
# test our function locally with multiple parameters
parameters = {
     "eta":       [0.10, 0.20],
     "max_depth": [3, 6, 10],
     "gamma":     [0.1, 0.3],
     }

hyper_task = NewTask(handler=xgb_train, out_path=out_path, inputs={'dataset': gen.outputs['iris_dataset']})
hyper_task.with_hyper_params(parameters, 'max.accuracy')
run = new_function().run(hyper_task)

[mlrun] 2020-02-24 11:16:42,403 starting run mlrun-fe986c-xgb_train uid=9fc669ee53734c47ab7e292763e87315  -> http://mlrun-api:8080
> --------------- Iteration: (1) ---------------
[mlrun] 2020-02-24 11:16:42,462 log artifact model at /User/data/xgb/1/model.bst, size: 49268, db: Y

> --------------- Iteration: (2) ---------------
[mlrun] 2020-02-24 11:16:42,525 log artifact model at /User/data/xgb/2/model.bst, size: 48836, db: Y

> --------------- Iteration: (3) ---------------
[mlrun] 2020-02-24 11:16:42,589 log artifact model at /User/data/xgb/3/model.bst, size: 50708, db: Y

> --------------- Iteration: (4) ---------------
[mlrun] 2020-02-24 11:16:42,657 log artifact model at /User/data/xgb/4/model.bst, size: 47468, db: Y

> --------------- Iteration: (5) ---------------
[mlrun] 2020-02-24 11:16:42,724 log artifact model at /User/data/xgb/5/model.bst, size: 49052, db: Y

> --------------- Iteration: (6) ---------------
[mlrun] 2020-02-24 11:16:42,808 log artifact model at /User/data/

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...e87315,0,Feb 24 11:16:42,completed,mlrun-fe986c-xgb_train,kind=handlerowner=admin,dataset,,best_iteration=8accuracy=1.0,modeliteration_results


to track results use .show() or .logs() or in CLI: 
!mlrun get run 9fc669ee53734c47ab7e292763e87315 --project default , !mlrun logs 9fc669ee53734c47ab7e292763e87315 --project default
[mlrun] 2020-02-24 11:16:43,290 run executed, status=completed


<a id="build"></a>
______________________________________________
### **define cluster jobs and build images**

In order to use our function in a cluster we need to package our code and dependencies.

The ```code_to_function``` call will automatically generate a ```function``` object from the current notebook (or a specified file) with its list of dependencies and runtime configuration.

The `.deploy()` command will build the dependencies and image required for running our function.

We use `.apply(mount_v3io())` to attach a v3io (iguazio data fabric) volume to our function. By default v3io will mount the current user home into the `\User` function path.

Alternatively we can use S3 as a data source or target, for that you need to add AWS credentials to the task and specify paths starting with `s3://` e.g.:

    task.with_secrets('file', 'secrets.txt')
    out_path='s3://my-bucket/data'

In [27]:
# create the function from the notebook code + annotations
xgbfn = code_to_function('xgb', runtime='job').apply(mount_v3io())

In [28]:
xgbfn.deploy()

[mlrun] 2020-02-24 11:17:34,230 starting remote build, image: .mlrun/func-default-xgb-latest
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.4 to mlrun/mlrun:0.4.4 
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.4 to mlrun/mlrun:0.4.4 
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Error while retrieving image from cache: getting file info: stat /cache/sha256:0b0e383fd8f9f7660906f952e6e7329d4a0a88da19f8631c45b1e37c6e0e5549: no such file or directory 
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0000] Built cross stage deps: map[]                
[36mINFO[0m[0000] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0001] Error while retrieving image from cache: getting file info: stat /cache/sha256:0b0e383fd8f9f7660906f952e6e7329d4a0a88da19f8631c45b1e37c6e0e5549: no such file or directory 
[36mINFO[0m[0001] Downloading base image mlrun/mlrun:0.4.4     
[36mINFO[0m[0001] Unpacking rootf

True

**run our task using the cluster job**

In [30]:
task.with_input('dataset', gen.outputs['iris_dataset'])
nrun = xgbfn.run(task, handler='xgb_train', out_path=out_path, watch=True)

[mlrun] 2020-02-24 11:20:33,396 starting run xgb-xgb_train uid=e2a2c3c9cec143d6b39c790c0298b943  -> http://mlrun-api:8080
[mlrun] 2020-02-24 11:20:33,471 Job is running in the background, pod: xgb-xgb-train-88877
[mlrun] 2020-02-24 11:20:47,186 log artifact model at /User/data/xgb/model.bst, size: 49268, db: Y

[mlrun] 2020-02-24 11:20:47,196 run executed, status=completed
final state: succeeded


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...98b943,0,Feb 24 11:20:47,completed,xgb-xgb_train,host=xgb-xgb-train-88877kind=jobowner=admin,dataset,eta=0.1gamma=0.1max_depth=6,accuracy=0.9333333333333333,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run e2a2c3c9cec143d6b39c790c0298b943  , !mlrun logs e2a2c3c9cec143d6b39c790c0298b943 
[mlrun] 2020-02-24 11:20:52,719 run executed, status=completed


<a id="pipeline"></a>
______________________________________________
## Create a multi-stage KubeFlow Pipeline from our functions
* Load Iris dataset into a CSV
* Train a model using XGBoost with Hyper-parameter
* Deploy the model using Nuclio-serving
* Generate a plot of the training results

In [31]:
import kfp
from kfp import dsl

**create a model serving function from the [model-serving notebook](nuclio-serving.ipynb)** 

This function will be used in our workflow

In [32]:
# define a nuclio-serving functions, generated from a remote notebook file
srvfn = new_model_server('iris-serving', 
                         model_class='XGBoostModel', 
                         filename='https://raw.githubusercontent.com/mlrun/functions/master/serving/xgboost/xgb_serving.ipynb')

# attach to the fabric (to read the model file)
srvfn.apply(mount_v3io())

<mlrun.runtimes.function.RemoteRuntime at 0x7fa058de9160>

**define a 4 step workflow with hyper-params**

In [37]:
@dsl.pipeline(
    name='My XGBoost training pipeline',
    description='Shows how to use mlrun.'
)
def xgb_pipeline(
   eta = [0.1, 0.2, 0.3], gamma = [0.1, 0.2, 0.3]
):

    ingest = xgbfn.as_step(name='ingest_iris', handler='iris_generator',
                          outputs=['iris_dataset'])

    
    train = xgbfn.as_step(name='xgb_train', handler='xgb_train',
                          hyperparams = {'eta': eta, 'gamma': gamma},
                          selector='max.accuracy',
                          inputs = {'dataset': ingest.outputs['iris_dataset']}, 
                          outputs=['model'])

    
    plot = xgbfn.as_step(name='plot', handler='plot_iter',
                         inputs={'iterations': train.outputs['iteration_results']},
                         outputs=['iris_dataset'])

    # deploy the model serving function with inputs from the training stage
    deploy = srvfn.deploy_step(project = 'iris', models={'iris_v1': train.outputs['model']})

#### Create a KubeFlow client and submit the pipeline with parameters

**define the artifacts output path**
the pipeline outputs will be writtento the artifacts path directory, the path can be a file path (require volume mounts) or an object path (v3io://, s3://, ..).

if we specify `{{workflow.uid}}` in the path it will be replaced with the actual workflow ID, this way every workflow run will store artifacts in a unique location for reproducability.

In [38]:
from mlrun import run_pipeline
artifact_path = 'v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/'
arguments = {'eta': [0.05, 0.10, 0.40, 0.5], 'gamma': [0.1, 0.3, 0.6]}

In [39]:
id = run_pipeline(xgb_pipeline, arguments, experiment='xgb', artifact_path=artifact_path)

[mlrun] 2020-02-24 11:23:12,190 Pipeline run id=b2bcedb9-0a0c-48cf-a36b-28fdcacfb19b, check UI or DB for progress


### check the resilts of our pipeline

In [40]:
# connect to the run db 
db = get_run_db().connect()

In [41]:
# query the DB with filter on workflow ID (only show this workflow) 
db.list_runs('', labels=f'workflow={id}').show()

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...fb8d1e,0,Feb 24 11:23:34,completed,xgb_train,kind=jobowner=rootworkflow=b2bcedb9-0a0c-48cf-a36b-28fdcacfb19b,dataset,,accuracy=1.0best_iteration=1,modeliteration_results
...f099a2,0,Feb 24 11:23:22,completed,ingest_iris,host=ingest-iris-swg9bkind=jobowner=rootworkflow=b2bcedb9-0a0c-48cf-a36b-28fdcacfb19b,,,,iris_dataset


**[back to top](#top)**