# E2E Serverless ML pipeline  - Ingest, Train, Auto Deploy Model
  --------------------------------------------------------------------

Using the classic Iris dataset to demonstrate definition and automation of an end to end ML pipeline.

#### **notebook how-to's**
* Write and test ML pipeline in a notebook.
* Use hyper parameter tests
* Convert the code to serverless functions and run in the cluster
* Define an ML pipeline DAG (using KubeFlow Pipelines)
  * with 4 steps: data prep, training, model deployment, model report
* Check our pipeline results from the notebook

<a id='top'></a>
#### **steps**
**[define a new function and its dependencies](#define-function)**<br>
**[run the data collection and training locally](#test-locally)**<br>
**[running a task with Hyper parameters (GridSearch)](#hyper-param)**<br>
**[define cluster jobs, build images and run](#build)**<br>
**[Create a multi-stage KubeFlow Pipeline from our functions](#pipeline)**<br>

In [1]:
# nuclio: ignore
import nuclio

<a id='define-function'></a>
### **define a new function and its dependencies**

In [None]:
%%nuclio cmd 
pip install sklearn
pip install xgboost
pip install matplotlib

In [2]:
%nuclio config spec.build.baseImage = "mlrun/mlrun"

%nuclio: setting spec.build.baseImage to 'mlrun/mlrun'


In [2]:
# use this to supress XGB FutureWarning
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [3]:
import xgboost as xgb
import os
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
from mlrun.artifacts import TableArtifact, PlotArtifact
import pandas as pd


def iris_generator(context):
    iris = load_iris()
    iris_dataset = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_labels = pd.DataFrame(data=iris.target, columns=['label'])
    iris_dataset = pd.concat([iris_dataset, iris_labels], axis=1)
    context.logger.info('saving iris dataframe to {}'.format(context.out_path))
    context.log_artifact(TableArtifact('iris_dataset', df=iris_dataset))
    

def xgb_train(context, 
              dataset='',
              model_name='model.bst',
              max_depth=6,
              num_class=10,
              eta=0.2,
              gamma=0.1,
              steps=20):

    df = pd.read_csv(dataset)
    X = df.drop(['label'], axis=1)
    y = df['label']
    
    X_train, X_test, Y_train, Y_test = train_test_split(X, y, test_size=0.2)
    dtrain = xgb.DMatrix(X_train, label=Y_train)
    dtest = xgb.DMatrix(X_test, label=Y_test)

    # Get params from event
    param = {"max_depth": max_depth,
             "eta": eta, "nthread": 4,
             "num_class": num_class,
             "gamma": gamma,
             "objective": "multi:softprob"}

    # Train model
    xgb_model = xgb.train(param, dtrain, steps)

    preds = xgb_model.predict(dtest)
    best_preds = np.asarray([np.argmax(line) for line in preds])

    # log results and artifacts
    context.log_result('accuracy', float(accuracy_score(Y_test, best_preds)))
    context.log_artifact('model', body=bytes(xgb_model.save_raw()), 
                         local_path=model_name, labels={'framework': 'xgboost'})
    
    
import matplotlib
import matplotlib.pyplot as plt
from io import BytesIO

def plot_iter(context, iterations, col='accuracy', num_bins=10):
    df = pd.read_csv(BytesIO(iterations.get()))
    x = df['output.{}'.format(col)]
    fig, ax = plt.subplots(figsize=(6,6))
    n, bins, patches = ax.hist(x, num_bins, density=1)
    ax.set_xlabel('Accuraccy')
    ax.set_ylabel('Count')
    context.log_artifact(PlotArtifact('myfig', body=fig))

The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [4]:
# nuclio: end-code
# marks the end of a code section

<a id='test-locally'></a>
### run the data collection and training locally

The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the `Task` object.

We use the ```local``` runtime by default, later on we will use a ```job``` runtime for running containers.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc... For more details, see the [mlrun_basics notebook](mlrun_basics.ipynb).

In [5]:
from mlrun import new_function, code_to_function, NewTask, v3io_cred, new_model_server, mlconf, get_run_db, mount_v3io
# for local DB path use 'User/mlrun' instead 
mlconf.dbpath = 'http://mlrun-api:8080'

#### Generate the iris dataset and store in a CSV

In [6]:
out_path='/User/data/xgb'

In [7]:
gen = new_function().run(name='iris_gen', handler=iris_generator, out_path=out_path) 

[mlrun] 2020-02-23 23:14:12,964 starting run iris_gen uid=f47ed7459eb44d90a6f5b3c9db344da0  -> http://mlrun-api:8080
[mlrun] 2020-02-23 23:14:12,994 saving iris dataframe to /User/data/xgb
[mlrun] 2020-02-23 23:14:13,022 log artifact iris_dataset at /User/data/xgb/iris_dataset.csv, size: 2776, db: Y



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...344da0,0,Feb 23 23:14:12,completed,iris_gen,kind=handlerowner=adminhost=jupyter-68bdf65845-6rc8r,,,,iris_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run f47ed7459eb44d90a6f5b3c9db344da0 --project default , !mlrun logs f47ed7459eb44d90a6f5b3c9db344da0 --project default
[mlrun] 2020-02-23 23:14:13,077 run executed, status=completed


#### define a training task and run locally

In [8]:
task = NewTask(handler=xgb_train, out_path=out_path, inputs={'dataset': gen.outputs['iris_dataset']})
task.with_params(eta=0.1, max_depth=6, gamma=0.1)

<mlrun.model.RunTemplate at 0x7fa0a00073c8>

In [9]:
run = new_function().run(task)

[mlrun] 2020-02-23 23:15:03,995 starting run mlrun-b4984c-xgb_train uid=943a42af83c8416e92370d4e398e6a54  -> http://mlrun-api:8080
[mlrun] 2020-02-23 23:15:04,075 log artifact model at /User/data/xgb/model.bst, size: 49268, db: Y



uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...8e6a54,0,Feb 23 23:15:04,completed,mlrun-b4984c-xgb_train,kind=handlerowner=adminhost=jupyter-68bdf65845-6rc8r,dataset,eta=0.1max_depth=6gamma=0.1,accuracy=0.9666666666666667,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run 943a42af83c8416e92370d4e398e6a54 --project default , !mlrun logs 943a42af83c8416e92370d4e398e6a54 --project default
[mlrun] 2020-02-23 23:15:04,118 run executed, status=completed


<a id="hyper-param" ></a>
### running a task with Hyper parameters (GridSearch)

In many cases we want to run our jobs with multiple parameter combination options, we can simply create a task with hyper params (list of possible values per parameter) and MLRun will run all the combinations.

MLRun will store all the results (see the `iteration_results` artifact), you can specify which result is the best (will be treated as the overall task output) using the selection criteria (`max.accuracy` i.e. the one with maximum value as the `accuracy` result).

In [10]:
# test our function locally with multiple parameters
parameters = {
     "eta":       [0.10, 0.20],
     "max_depth": [3, 6, 10],
     "gamma":     [0.1, 0.3],
     }

hyper_task = NewTask(handler=xgb_train, out_path=out_path, inputs={'dataset': gen.outputs['iris_dataset']})
hyper_task.with_hyper_params(parameters, 'max.accuracy')
run = new_function().run(hyper_task)

[mlrun] 2020-02-23 23:15:29,339 starting run mlrun-65ebc7-xgb_train uid=89b7a0010c9e4852af1b93bfb4cc4c0f  -> http://mlrun-api:8080
> --------------- Iteration: (1) ---------------
[mlrun] 2020-02-23 23:15:29,395 log artifact model at /User/data/xgb/1/model.bst, size: 49196, db: Y

> --------------- Iteration: (2) ---------------
[mlrun] 2020-02-23 23:15:29,458 log artifact model at /User/data/xgb/2/model.bst, size: 49340, db: Y

> --------------- Iteration: (3) ---------------
[mlrun] 2020-02-23 23:15:29,571 log artifact model at /User/data/xgb/3/model.bst, size: 45524, db: Y

> --------------- Iteration: (4) ---------------
[mlrun] 2020-02-23 23:15:29,689 log artifact model at /User/data/xgb/4/model.bst, size: 48332, db: Y

> --------------- Iteration: (5) ---------------
[mlrun] 2020-02-23 23:15:29,762 log artifact model at /User/data/xgb/5/model.bst, size: 51284, db: Y

> --------------- Iteration: (6) ---------------
[mlrun] 2020-02-23 23:15:29,826 log artifact model at /User/data/

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...cc4c0f,0,Feb 23 23:15:29,completed,mlrun-65ebc7-xgb_train,kind=handlerowner=admin,dataset,,best_iteration=1accuracy=1.0,modeliteration_results


to track results use .show() or .logs() or in CLI: 
!mlrun get run 89b7a0010c9e4852af1b93bfb4cc4c0f --project default , !mlrun logs 89b7a0010c9e4852af1b93bfb4cc4c0f --project default
[mlrun] 2020-02-23 23:15:30,319 run executed, status=completed


<a id="build"></a>
______________________________________________
### **define cluster jobs and build images**

In order to use our function in a cluster we need to package our code and dependencies.

The ```code_to_function``` call will automatically generate a ```function``` object from the current notebook (or a specified file) with its list of dependencies and runtime configuration.

The `.deploy()` command will build the dependencies and image required for running our function.

We use `.apply(mount_v3io())` to attach a v3io (iguazio data fabric) volume to our function. By default v3io will mount the current user home into the `\User` function path.

Alternatively we can use S3 as a data source or target, for that you need to add AWS credentials to the task and specify paths starting with `s3://` e.g.:

    task.with_secrets('file', 'secrets.txt')
    out_path='s3://my-bucket/data'

In [13]:
# create the function from the notebook code + annotations, add volumes and parallel HTTP trigger
# the "code_output" option will generate a py file from our notebook which can be used for src control and local runs
xgbfn = code_to_function('xgb', runtime='job', code_output='../src/iris.py').apply(mount_v3io())
xgbfn.export('../src/iris.yaml')

In [14]:
xgbfn.export('../src/iris.yaml')

[mlrun] 2020-02-23 23:43:09,949 function spec saved to path: ../src/iris.yaml


In [15]:
xgbfn.to_dict()

{'kind': 'job',
 'metadata': {'name': 'xgb',
  'tag': '',
  'hash': 'c6a621f7b8ace5f3faef9e3539aa6a46455c3acb',
  'project': ''},
 'spec': {'command': '../src/iris.py',
  'args': [],
  'volumes': [{'flexVolume': {'driver': 'v3io/fuse',
     'options': {'accessKey': 'a5532257-5e22-4436-82ef-1d3ac0c53f7b',
      'container': 'users',
      'subPath': '/admin'}},
    'name': 'v3io'}],
  'volume_mounts': [{'mountPath': '/User', 'name': 'v3io'}],
  'env': [{'name': 'V3IO_API', 'value': ''},
   {'name': 'V3IO_USERNAME', 'value': ''},
   {'name': 'V3IO_ACCESS_KEY', 'value': ''}],
  'description': '',
  'build': {'base_image': 'mlrun/mlrun',
   'commands': ['pip install sklearn',
    'pip install xgboost',
    'pip install matplotlib'],
   'code_origin': 'https://github.com/yaronha/demo-xgb-project.git#e80ff575491173ca4247250ec96ebe952749f65b:xgb.ipynb'}}}

In [None]:
xgbfn.deploy()

**run our task using the cluster job**

In [14]:
task.with_input('dataset', os.path.join(out_path, df_path))
nrun = xgbfn.run(task, handler='xgb_train', out_path=out_path, watch=True)

[mlrun] 2019-12-25 22:01:17,954 starting run xgb_train uid=6a8ccf6491364d75b3b7d373fcc6a608  -> http://mlrun-api:8080
[mlrun] 2019-12-25 22:01:24,930 run executed, status=completed
  % (item.__module__, item.__name__)
final state: succeeded


uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...c6a608,0,Dec 25 22:01:24,completed,xgb,host=xgb-train-xhbwhkind=jobowner=admin,dataset,eta=0.1gamma=0.1max_depth=6,accuracy=0.9666666666666667,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run 6a8ccf6491364d75b3b7d373fcc6a608  , !mlrun logs 6a8ccf6491364d75b3b7d373fcc6a608 
[mlrun] 2019-12-25 22:01:27,152 run executed, status=completed


<a id="pipeline"></a>
______________________________________________
## Create a multi-stage KubeFlow Pipeline from our functions
* Load Iris dataset into a CSV
* Train a model using XGBoost with Hyper-parameter
* Deploy the model using Nuclio-serving
* Generate a plot of the training results

In [17]:
import kfp
from kfp import dsl

**define the artifacts output path**
the pipeline outputs will be writtento the artifacts path directory, the path can be a file path (require volume mounts) or an object path (v3io://, s3://, ..).

if we specify `{{workflow.uid}}` in the path it will be replaced with the actual workflow ID, this way every workflow run will store artifacts in a unique location for reproducability.

In [18]:
artifacts_path = 'v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/'

**create a model serving function from the [model-serving notebook](nuclio-serving.ipynb)** 

This function will be used in our workflow

In [22]:
# define a nuclio-serving functions, generated from a notebook file
srvfn = new_model_server('iris-serving', 
                         model_class='XGBoostModel', 
                         filename='nuclio-serving.ipynb')

# attach to the fabric (to read the model file)
srvfn.apply(mount_v3io())

<mlrun.runtimes.function.RemoteRuntime at 0x7fc52d41e358>

**define a 4 step workflow with hyper-params**

In [23]:
@dsl.pipeline(
    name='My XGBoost training pipeline',
    description='Shows how to use mlrun.'
)
def xgb_pipeline(
   eta = [0.1, 0.2, 0.3], gamma = [0.1, 0.2, 0.3]
):

    ingest = xgbfn.as_step(name='ingest_iris', handler='iris_generator',
                          params = {'target': df_path},
                          outputs=['iris_dataset'], out_path=artifacts_path).apply(mount_v3io())

    
    train = xgbfn.as_step(name='xgb_train', handler='xgb_train',
                          hyperparams = {'eta': eta, 'gamma': gamma},
                          selector='max.accuracy',
                          inputs = {'dataset': ingest.outputs['iris_dataset']}, 
                          outputs=['model'], out_path=artifacts_path).apply(mount_v3io())

    
    plot = xgbfn.as_step(name='plot', handler='plot_iter',
                         inputs={'iterations': train.outputs['iteration_results']},
                         outputs=['iris_dataset'], out_path=artifacts_path).apply(mount_v3io())

    # deploy the model serving function with inputs from the training stage
    deploy = srvfn.deploy_step(project = 'iris', models={'iris_v1': train.outputs['model']})

#### Create a KubeFlow client and submit the pipeline with parameters

In [24]:
# for debug generate the pipeline dsl
kfp.compiler.Compiler().compile(xgb_pipeline, 'mlrunpipe.yaml')

In [25]:
client = kfp.Client(namespace='default-tenant')

In [26]:
arguments = {'eta': [0.05, 0.10, 0.40, 0.5], 'gamma': [0.1, 0.3, 0.6]}
run_result = client.create_run_from_pipeline_func(xgb_pipeline, arguments, experiment_name='xgb')

### check the resilts of our pipeline

In [27]:
# connect to the run db 
db = get_run_db().connect()

In [30]:
# query the DB with filter on workflow ID (only show this workflow) 
db.list_runs('', labels=f'workflow={run_result.run_id}').show()

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...a2b459,12,Dec 25 22:13:37,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.5gamma=0.6,accuracy=0.9666666666666667,model
...a2b459,11,Dec 25 22:13:37,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.4gamma=0.6,accuracy=0.9333333333333333,model
...a2b459,10,Dec 25 22:13:37,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.1gamma=0.6,accuracy=0.9666666666666667,model
...a2b459,9,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.05gamma=0.6,accuracy=0.9666666666666667,model
...a2b459,8,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.5gamma=0.3,accuracy=0.9333333333333333,model
...a2b459,7,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.4gamma=0.3,accuracy=0.9666666666666667,model
...a2b459,6,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.1gamma=0.3,accuracy=0.9666666666666667,model
...a2b459,5,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.05gamma=0.3,accuracy=1.0,model
...a2b459,4,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.5gamma=0.1,accuracy=0.9666666666666667,model
...a2b459,3,Dec 25 22:13:36,completed,xgb,host=xgb-train-6mznmkind=jobowner=adminworkflow=1bd6756e-3dcd-40a4-a8fd-9e9d50dac0f9,dataset,eta=0.4gamma=0.1,accuracy=0.9666666666666667,model


**[back to top](#top)**