# Define and run a distributed training pipeline

In this notebook we will use **MLRun** to run all the functions we've written in the [mlrun-mpijob-classify](mlrun_mpijob_classify.ipynb) and [nuclio-serving-tf-images](nuclio-serving-tf-images.ipynb) in a **Kubeflow Pipeline**.

**Kubeflow Pipelines** will supply the orchastration to run the pipeline, while **MLRun** will supply an easy interface to define the pipeline and lunch the serving function at the end.

We will show how to:
* Run remote functions from notebooks using `code_to_function`
* Run saved functions from our DB using `import_function`
* How to define and lunch a Kubeflow Pipeline
* How to access the DB from the code and list the pipeline's entries

In [1]:
# nuclio: ignore
import nuclio

In [2]:
from mlrun import new_function, code_to_function, get_run_db, mount_v3io, mlconf, new_model_server, v3io_cred, import_function
import os
 
mlconf.dbpath = 'http://mlrun-api:8080'

In [3]:
base_dir = '/User/mlrun/examples'
images_path = os.path.join(base_dir, 'images')
model_name = 'cat_vs_dog_v1'

## Import and define ML functions for our pipeline (utils, training, serving)

Using `code_to_function` we parse the given python file and build a function from it

In [4]:
# Build a function from the python source code
# The code contains functions for data import and labeling 
utilsfn = code_to_function(name='file_utils', filename='../src/utils.py',
                           image='mlrun/mlrun:latest', kind='job')

# Add mount access to the function
utilsfn.apply(mount_v3io())

# Build the function image, so we can later deploy it as a job via kubeflow
utilsfn.deploy()

[mlrun] 2020-02-27 17:14:11,337 running build to add mlrun package, set with_mlrun=False to skip if its already in the image
[mlrun] 2020-02-27 17:14:11,362 starting remote build, image: .mlrun/func-default-file-utils-latest


True

In [5]:
utilsfn.export('../yaml/utils.yaml')

[mlrun] 2020-02-27 17:14:11,620 function spec saved to path: ../yaml/utils.yaml


<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f59b501deb8>

Using `import_function` we import the horovod training function from our DB.  
As we can see, all the function deployment parameters were saved, like Replicas, GPU Configuration, Mounts, Runtime and the code source.

In [6]:
# read the training function object from MLRun DB
trainer_fn = import_function('db://horovod-trainer')
# trainer_fn.deploy()
trainer_fn.to_dict()

{'kind': 'mpijob',
 'metadata': {'name': 'horovod-trainer',
  'tag': 'latest',
  'hash': 'ca9b2f2453afddcb69c1cc7360687249062abb96',
  'project': 'default',
  'updated': '2020-02-27T16:24:48.358216'},
 'spec': {'command': '/User/demo-image-classification/src/horovod-training.py',
  'args': [],
  'image': 'mlrun/mpijob:latest',
  'volumes': [{'flexVolume': {'driver': 'v3io/fuse',
     'options': {'accessKey': '48bfe2e0-9870-4d11-9924-b20502a85363',
      'container': 'users',
      'subPath': '/admin'}},
    'name': 'v3io'}],
  'volume_mounts': [{'mountPath': '/User', 'name': 'v3io'}],
  'env': [{'name': 'V3IO_API', 'value': ''},
   {'name': 'V3IO_USERNAME', 'value': ''},
   {'name': 'V3IO_ACCESS_KEY', 'value': ''}],
  'description': '',
  'replicas': 4,
  'image_pull_policy': 'Always',
  'build': {'commands': []}}}

Using `filename=<jupyter notebook file>` in the `new_model_server` we parse the given Jupyter Notebook and build our model server from it.

> All the annotations given in the notebook will be parsed and saved to the function normally

The model server will deploy the model given under `models={<model_name>:<model_file_path>}` as `model_class=<model_class_name>` .  
Just like any other MLRun function we can set our environment variables, workers and add mounts.

The model server will provide us with a `/<model_name>/predict` endpoint where we can query the model.

In [7]:
# inference function
inference_function = new_model_server('tf-images-server', 
                                      filename='./nuclio-serving-tf-images.ipynb',
                                      model_class='TFModel')
inference_function.with_http(workers=2)
inference_function.apply(mount_v3io())

<mlrun.runtimes.function.RemoteRuntime at 0x7f59b4fd4f98>

## Create and run the pipeline

In this part we define the Kubeflow Pipeline to run our process.  
MLRun helps us doing that by requiring us to only add `<fn>.as_step()` in order to turn our functions to a pipeline step for kubeflow.  All the parameters and inputs can be then set regularly and will be deployed as defined in the pipeline.  

The pipeline order is defined by the following:
* We can specify `<fn>.after(<previous fn>)`
* We can specify that a function has a parameter or input, taken from a previous function.  
  Ex: `models={'cat_vs_dog_v1': train.outputs['model']}` in the inference function definition, taking the model file from the training function.
  
Notice that you need to `log_artifact` in your function and write it's name in the function's `outputs` parameter to expose it to the pipeline for later use.

In [8]:
import kfp
from kfp import dsl

In [9]:
artifacts_path = 'v3io:///users/admin/mlrun/kfp/{{workflow.uid}}/'

In [10]:
@dsl.pipeline(
    name='Image classification training pipeline',
    description='Shows how to use mlrun with horovod.'
)
def hvd_pipeline(
    image_archive=      'http://iguazio-sample-data.s3.amazonaws.com/catsndogs.zip',
    images_path =       '/User/mlrun/examples/images', 
    source_dir=         '/User/mlrun/examples/images/cats_n_dogs',
    checkpoints_dir=    '/User/mlrun/examples/checkpoints',
    model_path=         '/User/mlrun/examples/models/cats_n_dogs.h5',
    model_name=         'cat_vs_dog_v1'
):
    open_archive = utilsfn.as_step(name='download',
                                   handler='open_archive',
                                   out_path=images_path,
                                   params={'target_dir': images_path},
                                   inputs={'archive_url': image_archive},
                                   outputs=['content']).apply(mount_v3io())
              
    label = utilsfn.as_step(name='label',
                            handler='categories_map_builder',
                            out_path=images_path,
                            params={'source_dir': source_dir},
                            outputs=['categories_map',
                                     'file_categories']).apply(mount_v3io()).after(open_archive)
    
    train = trainer_fn.as_step(name='train',
                               params={'epochs': 1,
                                       'checkpoints_dir': checkpoints_dir,
                                       'model_path': model_path,
                                       'data_path': source_dir},
                               inputs={
                                   'categories_map': label.outputs['categories_map'],
                                   'file_categories': label.outputs['file_categories']},
                               outputs=['model']).apply(v3io_cred())

    # deploy the model using nuclio functions
    deploy = inference_function.deploy_step(project='nuclio-serving', 
                                            models={model_name: train.outputs['model']})

In [11]:
# for debug generate the pipeline dsl
kfp.compiler.Compiler().compile(hvd_pipeline, 'hvd_pipeline.yaml')

In [12]:
client = kfp.Client(namespace='default-tenant')
arguments = {}
run_result = client.create_run_from_pipeline_func(hvd_pipeline, arguments, experiment_name='horovod1')

In [None]:
# connect to the run db 
db = get_run_db().connect()

In [None]:
# query the DB with filter on workflow ID (only show this workflow) 
db.list_runs('', labels=f'workflow={run_result.run_id}').show()