# Demonstrate Local Or Remote Functions And Full Pipelines
  --------------------------------------------------------------------

Creating a local function, running predefined functions, creating and running a full ML pipeline with local and library functions.

#### **notebook how-to's**
* Create and test a simple function
* Examine data using serverless (containarized) `describe` function
* Create an automated ML pipeline from various library functions
* Running and tracking the pipeline results and artifacts

## Create and Test a Local Function (Iris Data Generator)
Import nuclio SDK and magics, <b>do not remove the cell and comment !!!</b>

In [1]:
# nuclio: ignore
import nuclio

<b>Specify function dependencies and configuration<b>

In [2]:
%%nuclio cmd -c
pip install sklearn
pip install pyarrow

In [3]:
%nuclio config spec.build.baseImage = "mlrun/mlrun"

%nuclio: setting spec.build.baseImage to 'mlrun/mlrun'


#### Function code
Generate the iris dataset and log the dataframe (as csv or parquet file)

In [4]:
import os
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn.metrics import accuracy_score
from mlrun.artifacts import TableArtifact, PlotArtifact
import pandas as pd

def iris_generator(context, format='csv'):
    iris = load_iris()
    iris_dataset = pd.DataFrame(data=iris.data, columns=iris.feature_names)
    iris_labels = pd.DataFrame(data=iris.target, columns=['label'])
    iris_dataset = pd.concat([iris_dataset, iris_labels], axis=1)
    
    context.logger.info('saving iris dataframe to {}'.format(context.artifact_path))
    context.log_dataset('iris_dataset', df=iris_dataset, format=format, index=False)


The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [5]:
# nuclio: end-code
# marks the end of a code section

## Create a project to host our functions, jobs and artifacts

Projects are used to package multiple functions, workflows, and artifacts. We usually store project code and definitions in a Git archive.

The following code creates a new project in a local dir and initialize git tracking on that

In [6]:
from os import path
from mlrun import run_local, NewTask, mlconf, import_function, mount_v3io
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

# specify artifacts target location
artifact_path = mlconf.artifact_path or path.abspath('./')
project_name = 'sk-project'

In [7]:
from mlrun import new_project, code_to_function
project_dir = './project'
skproj = new_project(project_name, project_dir, init_git=True)

<a id='test-locally'></a>
### Run the data generator function locally

The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the `Task` object.<br>
when using `run_local()` the function inputs and outputs are automatically recorded by MLRun experiment and data tracking DB.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc... For more details, see the [mlrun_basics notebook](mlrun_basics.ipynb).

In [9]:
# run the function locally
gen = run_local(name='iris_gen', handler=iris_generator, 
                project=project_name, artifact_path=path.join(artifact_path, 'data')) 

> 2020-07-28 18:58:15,444 [info] starting run iris_gen uid=077d5a908ed94d41b546a3afba6e13f5  -> http://mlrun-api:8080
> 2020-07-28 18:58:15,532 [info] saving iris dataframe to /User/workshop/mlrun/sklearn-pipe/data


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
sk-project,...ba6e13f5,0,Jul 28 18:58:15,completed,iris_gen,v3io_user=adminkind=handlerowner=adminhost=jupyter-7cd6869d6d-kqrtd,,,,iris_dataset


to track results use .show() or .logs() or in CLI: 
!mlrun get run 077d5a908ed94d41b546a3afba6e13f5 --project sk-project , !mlrun logs 077d5a908ed94d41b546a3afba6e13f5 --project sk-project
> 2020-07-28 18:58:15,664 [info] run executed, status=completed


#### Convert our local code to a distributed serverless function object 

In [10]:
gen_func = code_to_function(name='gen_iris', kind='job')
skproj.set_function(gen_func)

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5d2fdb0e48>

## Load and run a library function (visualize dataset features and stats)

<b>Step 1:</b> load the function object from the function hub (marketplace)<br>
> note: the function marketplace location is configurable, by default it points to `mlrun/functions` git


In [11]:
skproj.set_function('hub://describe', 'describe')

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5d3806e780>

In [12]:
# read the remote function doc, params, usage
skproj.func('describe').doc()
#skproj.func('describe').spec.image_pull_policy = 'Always'

function: describe
describe and visualizes dataset stats
default handler: summarize
entry points:
  summarize: Summarize a table
    context(MLClientCtx)  - the function context, default=
    table(DataItem)  - MLRun input pointing to pandas dataframe (csv/parquet file path), default=
    label_column(str)  - ground truth column label, default=None
    class_labels(List[str])  - label for each class in tables and plots, default=[]
    plot_hist(bool)  - (True) set this to False for large tables, default=True
    plots_dest(str)  - destination folder of summary plots (relative to artifact_path), default=plots
    update_dataset  - when the table is a registered dataset update the charts in-place, default=False


<b>Step 2:</b> Run the describe function as a Kubernetes job with specified parameters.

> `mount_v3io()` vonnect our function to v3io shared file system and allow us to pass the data and get back the results (plots) directly to our notebook, we can choose other mount options to use NFS or object storage

In [13]:
skproj.func('describe').apply(mount_v3io()).run(params={'label_column': 'label'}, 
                                                inputs={"table": gen.outputs['iris_dataset']}, 
                                                artifact_path=artifact_path)

> 2020-07-28 18:58:26,152 [info] starting run describe-summarize uid=8f1e2c52b3af48a5af34e8cc368d84ba  -> http://mlrun-api:8080
> 2020-07-28 18:58:26,648 [info] Job is running in the background, pod: describe-summarize-p8vgd
> 2020-07-28 18:59:15,125 [info] run executed, status=completed
final state: succeeded


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
sk-project,...368d84ba,0,Jul 28 18:59:10,completed,describe-summarize,v3io_user=adminkind=jobowner=adminhost=describe-summarize-p8vgd,table,label_column=label,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation


to track results use .show() or .logs() or in CLI: 
!mlrun get run 8f1e2c52b3af48a5af34e8cc368d84ba --project sk-project , !mlrun logs 8f1e2c52b3af48a5af34e8cc368d84ba --project sk-project
> 2020-07-28 18:59:19,017 [info] run executed, status=completed


<mlrun.model.RunObject at 0x7f5d3a8e4f98>

## Create a Fully Automated ML Pipeline

#### Add more functions to our project to be used in our pipeline (from the functions hub/marketplace)

AutoML training (classifier), Model validation (test_classifier), Real-time model server, and Model REST API Tester

In [14]:
skproj.set_function('hub://sklearn_classifier', 'train')
skproj.set_function('hub://test_classifier', 'test')
skproj.set_function('hub://model_server', 'serving')
skproj.set_function('hub://model_server_tester', 'live_tester')
#print(skproj.to_yaml())

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f5d2fd2e4a8>

#### Define and save a pipeline 

The following workflow definition will be written into a file, it describes a Kubeflow execution graph (DAG)<br>
and how functions and data are connected  to form an end to end pipeline. 

* Build the iris generator (ingest) function container 
* Ingest the iris data
* Analyze the dataset (describe)
* Train and test the model
* Deploy the model as a real-time serverless function
* Test the serverless function REST API with test dataset

Check the code below to see how functions objects are initialized and used (by name) inside the workflow.<br>
The `workflow.py` file has two parts, initialize the function objects and define pipeline dsl (connect the function inputs and outputs).

> Note: the pipeline can include CI steps like building container images and deploying models as illustrated  in the following example.


In [15]:
%%writefile project/workflow.py
from kfp import dsl
from mlrun import mount_v3io

funcs = {}
DATASET = 'iris_dataset'
LABELS  = "label"


# init functions is used to configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    for f in functions.values():
        f.apply(mount_v3io())
     
    # uncomment this line to collect the inference results into a stream
    # and specify a path in V3IO (<datacontainer>/<subpath>)
    #functions['serving'].set_env('INFERENCE_STREAM', 'users/admin/model_stream')

    
@dsl.pipeline(
    name="Demo training pipeline",
    description="Shows how to use mlrun."
)
def kfpipeline():
    
    # build our ingestion function (container image)
    builder = funcs['gen-iris'].deploy_step(skip_deployed=True)
    
    # run the ingestion function with the new image and params
    ingest = funcs['gen-iris'].as_step(
        name="get-data",
        handler='iris_generator',
        image=builder.outputs['image'],
        params={'format': 'pq'},
        outputs=[DATASET])

    # analyze our dataset
    describe = funcs["describe"].as_step(
        name="summary",
        params={"label_column": LABELS},
        inputs={"table": ingest.outputs[DATASET]})
    
    # train with hyper-paremeters 
    train = funcs["train"].as_step(
        name="train-skrf",
        params={"sample"          : -1, 
                "label_column"    : LABELS,
                "test_size"       : 0.10},
        hyperparams={'model_pkg_class': ["sklearn.ensemble.RandomForestClassifier", 
                                         "sklearn.linear_model.LogisticRegression",
                                         "sklearn.ensemble.AdaBoostClassifier"]},
        selector='max.accuracy',
        inputs={"dataset"         : ingest.outputs[DATASET]},
        outputs=['model', 'test_set'])

    # test and visualize our model
    test = funcs["test"].as_step(
        name="test",
        params={"label_column": LABELS},
        inputs={"models_path" : train.outputs['model'],
                "test_set"    : train.outputs['test_set']})

    # deploy our model as a serverless function
    deploy = funcs["serving"].deploy_step(models={f"{DATASET}_v1": train.outputs['model']}, tag='v2')
    
    # test out new model server (via REST API calls)
    tester = funcs["live_tester"].as_step(name='model-tester',
        params={'addr': deploy.outputs['endpoint'], 'model': f"{DATASET}_v1"},
        inputs={'table': train.outputs['test_set']})


Overwriting project/workflow.py


In [16]:
# register the workflow file as "main", embed the workflow code into the project YAML
skproj.set_workflow('main', 'workflow.py', embed=True)

Save the project definitions to a file (project.yaml), it is recommended to commit all changes to a Git repo.

In [17]:
skproj.save()

<a id='run-pipeline'></a>
## Run a pipeline workflow
use the `run` method to execute a workflow, you can provide alternative arguments and specify the default target for workflow artifacts.<br>
The workflow ID is returned and can be used to track the progress or you can use the hyperlinks

> Note: The same command can be issued through CLI commands:<br>
    `mlrun project my-proj/ -r main -p "v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/"`

The `dirty` flag allow us to run a project with uncommited changes (when the notebook is in the same git dir it will always be dirty)<br>
The `watch` flag will wait for the pipeline to complete and print results

In [18]:
artifact_path = path.abspath('./pipe/{{workflow.uid}}')
run_id = skproj.run(
    'main',
    arguments={}, 
    artifact_path=artifact_path, 
    dirty=True, watch=True)

> 2020-07-28 18:59:36,423 [info] using in-cluster config.


> 2020-07-28 18:59:37,043 [info] Pipeline run id=417e8dc4-3794-4597-863a-df42c080d2e5, check UI or DB for progress
> 2020-07-28 18:59:37,044 [info] waiting for pipeline run completion


uid,start,state,name,results,artifacts
...798bfee3,Jul 28 19:11:40,completed,model-tester,total_tests=15errors=0match=14avg_latency=4610min_latency=4212max_latency=6914,latency
...d35b3c1a,Jul 28 19:11:15,completed,test,accuracy=0.9333333333333333test-error=0.06666666666666667auc-micro=0.9977777777777778auc-weighted=1.0f1-score=0.9137254901960784precision_score=0.8888888888888888recall_score=0.9629629629629629,confusion-matrixprecision-recall-multiclassroc-multiclasstest_set_preds
...3ff5d26d,Jul 28 19:10:58,completed,train-skrf,best_iteration=2accuracy=0.9705882352941176test-error=0.029411764705882353auc-micro=0.9961072664359862auc-weighted=0.9949732620320856f1-score=0.9679633867276888precision_score=0.9666666666666667recall_score=0.9722222222222222,test_setconfusion-matrixprecision-recall-multiclassroc-multiclassmodeliteration_results
...15634845,Jul 28 19:11:01,completed,summary,,histogramsviolinimbalanceimbalance-weights-veccorrelation-matrixcorrelation
...d231b97b,Jul 28 19:09:22,completed,get-data,,iris_dataset


**[back to top](#top)**