# Demonstrate Git Based ML Pipeline Automation
  --------------------------------------------------------------------

Creating a local function, running predefined functions, creating and running a full ML pipeline with local and library functions.

#### **notebook how-to's**
* Create and test a simple function
* Examine data using serverless (containarized) `describe` function
* Create an automated ML pipeline from various library functions
* Running and tracking the pipeline results and artifacts

## Create and Test a Local Ingestion/Data-prep Function (e.g. Iris Data Generator)
Import nuclio SDK and magics, <b>do not remove the cell and comment !!!</b>

In [1]:
# nuclio: ignore
import nuclio

<b>Specify function dependencies and configuration<b>

In [2]:
%nuclio config spec.image = "mlrun/ml-models"

%nuclio: setting spec.image to 'mlrun/ml-models'


#### Function code
Generate the iris dataset and log the dataframe (as csv or parquet file)

In [3]:
import os
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
from mlrun.artifacts import TableArtifact, PlotArtifact
import pandas as pd

def iris_generator(context, format='csv'):
    iris = load_iris()
    iris_dataset = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_labels = pd.DataFrame(data=iris.target, columns=['label'])
    iris_dataset = pd.concat([iris_dataset, iris_labels], axis=1)
    
    context.logger.info('saving iris dataframe to {}'.format(context.artifact_path))
    context.log_dataset('iris_dataset', df=iris_dataset, format=format, index=False)


The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [4]:
# nuclio: end-code
# marks the end of a code section

## Create a project to host our functions, jobs and artifacts

Projects are used to package multiple functions, workflows, and artifacts. We usually store project code and definitions in a Git archive.

The following code creates a new project in a local dir and initialize git tracking on that

In [5]:
from os import path
from mlrun import run_local, NewTask, mlconf, import_function, mount_v3io
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

# specify artifacts target location
artifact_path = mlconf.artifact_path or path.abspath('./')
project_name = 'gitops-project'

In [6]:
from mlrun import new_project, code_to_function
project_dir = './'
skproj = new_project(project_name, project_dir)

<a id='test-locally'></a>
### Run/test the data generator function locally

The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the `Task` object.<br>
when using `run_local()` the function inputs and outputs are automatically recorded by MLRun experiment and data tracking DB.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc... For more details, see the [mlrun_basics notebook](mlrun_basics.ipynb).

In [7]:
# run the function locally
gen = run_local(name='iris_gen', handler=iris_generator, 
                project=project_name, artifact_path=path.join(artifact_path, 'data')) 

> 2020-07-29 10:38:35,433 [info] starting run iris_gen uid=3e340d3561ca402c91e9bb09b1631dd4  -> http://mlrun-api:8080
> 2020-07-29 10:38:35,518 [info] saving iris dataframe to /User/demo-github-actions/data


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
gitops-project,...b1631dd4,0,Jul 29 10:38:35,completed,iris_gen,v3io_user=adminkind=handlerowner=adminhost=jupyter-58d8fdb6fc-nmqbq,,,,iris_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run 3e340d3561ca402c91e9bb09b1631dd4 --project gitops-project , !mlrun logs 3e340d3561ca402c91e9bb09b1631dd4 --project gitops-project
> 2020-07-29 10:38:35,641 [info] run executed, status=completed


#### Convert our local code to a distributed serverless function object 

In [8]:
gen_func = code_to_function(name='gen_iris', kind='job')
skproj.set_function(gen_func)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5d751dbe10>

## Analyze the dataset features (useing marketplace function)
load dataset analysis function (`describe`) from the function hub (marketplace), and print its doc.

In [15]:
skproj.set_function('hub://describe', 'describe')
skproj.func('describe').doc()

function: describe
describe and visualizes dataset stats
default handler: summarize
entry points:
  summarize: Summarize a table
    context(MLClientCtx)  - the function context, default=
    table(DataItem)  - MLRun input pointing to pandas dataframe (csv/parquet file path), default=
    label_column(str)  - ground truth column label, default=None
    class_labels(List[str])  - label for each class in tables and plots, default=[]
    plot_hist(bool)  - (True) set this to False for large tables, default=True
    plots_dest(str)  - destination folder of summary plots (relative to artifact_path), default=plots
    update_dataset  - when the table is a registered dataset update the charts in-place, default=False


### Run the describe function on our dataset (as a Kubernetes job)
<b> using shared file system mount (`mount_v3io`) with our notebook.</b>

In [16]:
skproj.func('describe').apply(mount_v3io()).run(params={'label_column': 'label'}, 
                                                inputs={"table": gen.outputs['iris_dataset']}, 
                                                artifact_path=artifact_path)

> 2020-07-29 12:46:52,341 [info] starting run describe-summarize uid=301ab10adbf34adb898f0751c7f0f0b4  -> http://mlrun-api:8080
> 2020-07-29 12:46:52,497 [info] Job is running in the background, pod: describe-summarize-r9tvz
> 2020-07-29 12:47:01,761 [info] run executed, status=completed
final state: succeeded


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
gitops-project,...c7f0f0b4,0,Jul 29 12:46:57,completed,describe-summarize,v3io_user=adminkind=jobowner=adminhost=describe-summarize-r9tvz,table,label_column=label,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation


to track results use .show() or .logs() or in CLI: 
!mlrun get run 301ab10adbf34adb898f0751c7f0f0b4 --project gitops-project , !mlrun logs 301ab10adbf34adb898f0751c7f0f0b4 --project gitops-project
> 2020-07-29 12:47:11,671 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7f5d74f87d10>

## Create a Fully Automated ML Pipeline

#### Add more functions to our project to be used in our pipeline (from the functions hub/marketplace)

AutoML training (classifier), Model validation (test_classifier), Real-time model server, and Model REST API Tester

In [9]:
skproj.set_function('hub://sklearn_classifier', 'train')
skproj.set_function('hub://test_classifier', 'test')
skproj.set_function('hub://model_server', 'serving')
skproj.set_function('hub://model_server_tester', 'live_tester')
#print(skproj.to_yaml())

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5d74fd8d50>

#### Define and save a pipeline 

The following workflow definition will be written into a file, it describes a Kubeflow execution graph (DAG)<br>
and how functions and data are connected  to form an end to end pipeline. 

* Build the iris generator (ingest) function container 
* Ingest the iris data
* Analyze the dataset (describe)
* Train and test the model
* Deploy the model as a real-time serverless function
* Test the serverless function REST API with test dataset

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.<br>
The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

> Note: the pipeline can include CI steps like building container images and deploying models as illustrated  in the following example.


In [17]:
%%writefile ./workflow.py
from kfp import dsl
from mlrun import mount_v3io, NewTask


funcs = {}
this_project = None
DATASET = 'iris_dataset'
LABELS  = "label"

# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
     
    # uncomment this line to collect the inference results into a stream
    # and specify a path in V3IO (<datacontainer>/<subpath>)
    #functions['serving'].set_env('INFERENCE_STREAM', 'users/admin/model_stream')

    
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun."
)
def kfpipeline():
    
    # run the ingestion function with the new image and params
    ingest = funcs['gen-iris'].as_step(
        name="get-data",
        handler='iris_generator',
        params={'format': 'pq'},
        outputs=[DATASET])

    # analyze our dataset
    describe = funcs["describe"].as_step(
        name="summary",
        params={"label_column": LABELS},
        inputs={"table": ingest.outputs[DATASET]})
    
    # train with hyper-paremeters
    train = funcs["train"].as_step(
        name="train",
        params={"sample"          : -1,
                "label_column"    : LABELS,
                "test_size"       : 0.10},
        hyperparams={'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier",
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        selector='max.accuracy',
        inputs={"dataset"         : ingest.outputs[DATASET]},
        labels={"commit": this_project.params.get('commit', '')},
        outputs=['model', 'test_set'])

    # test and visualize our model
    test = funcs["test"].as_step(
        name="test",
        params={"label_column": LABELS},
        inputs={"models_path" : train.outputs['model'],
                "test_set"    : train.outputs['test_set']})

    # deploy our model as a serverless function
    deploy = funcs["serving"].deploy_step(models={f"{DATASET}_v1": train.outputs['model']},
                                          tag=this_project.params.get('commit', 'v1'))

    # test out new model server (via REST API calls)
    tester = funcs["live_tester"].as_step(name='model-tester',
        params={'addr': deploy.outputs['endpoint'], 'model': f"{DATASET}_v1"},
        inputs={'table': train.outputs['test_set']})


Overwriting ./workflow.py


In [18]:
# register the workflow file as "main", embed the workflow code into the project YAML
skproj.set_workflow('main', 'workflow.py')

Save the project definitions to a file (project.yaml), it is recommended to commit all changes to a Git repo.

In [22]:
skproj.artifact_path = 'v3io:///users/{{run.user}}/pipe/{{workflow.uid}}'
skproj.save()

<a id='run-pipeline'></a>
## Run a pipeline workflow manually (not via git PR)

<b>This section is not used for the git automation, rather demo how to run the workflow from the notebook</b>

use the `run` method to execute a workflow, you can provide alternative arguments and specify the default target for workflow artifacts.<br>
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks

> Note: The same command can be issued through CLI commands:<br>
    `mlrun project my-proj/ -r main -p "v3io:///users/{{run.user}}/mlrun/kfp/{{workflow.uid}}/"`

The `dirty` flag allow us to run a project with uncommited changes (when the notebook is in the same git dir it will always be dirty)<br>
The `watch` flag will wait for the pipeline to complete

In [23]:
# If you want to get slack notification after the run with result summary, set the env var below
# %env SLACK_WEBHOOK=<slack webhooh url>

In [24]:
run_id = skproj.run(
    'main', arguments={}, 
    dirty=True, watch=True)

> 2020-07-29 13:04:18,155 [info] Pipeline run id=8f462295-2154-428a-b861-4ec8be504832, check UI or DB for progress
> 2020-07-29 13:04:18,156 [info] waiting for pipeline run completion


uid,start,state,name,results,artifacts
...7a29c0aa,Jul 29 13:05:03,completed,model-tester,total_tests=15errors=0match=14avg_latency=11446min_latency=11047max_latency=12131,latency
...79bfc3ac,Jul 29 13:04:54,completed,test,accuracy=0.9333333333333333test-error=0.06666666666666667auc-micro=0.9655555555555556auc-weighted=0.9888888888888889f1-score=0.9137254901960784precision_score=0.8888888888888888recall_score=0.9629629629629629,confusion-matrixfeature-importancesprecision-recall-multiclassroc-multiclasstest_set_preds
...1ecf6eb3,Jul 29 13:04:37,completed,summary,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation
...60321e1e,Jul 29 13:04:36,completed,train,best_iteration=1accuracy=0.9705882352941176test-error=0.029411764705882353auc-micro=0.9969723183391004auc-weighted=0.9949732620320856f1-score=0.9679633867276888precision_score=0.9666666666666667recall_score=0.9722222222222222,test_setconfusion-matrixfeature-importancesprecision-recall-multiclassroc-multiclassmodeliteration_results
...b05fce96,Jul 29 13:04:26,completed,get-data,,iris_dataset


**[back to top](#top)**