# Using MLRUN function locally, as a Kubernetes Job, and in a Workflow
  --------------------------------------------------------------------

#### **notebook how-to's**
* Write and test code in a notebook.
* Convert it to a containerized image.
* Run it on a Kubernetes cluster with shared file or object storage.
* Run it in an automated workflow.

<a id='top'></a>
#### **steps**
**[intall mlrun](#install)**<br>
**[define a new function and its dependencies](#define-function)**<br>
**[test the function code and pipeline locally](#test-locally)**<br>
**[define cluster jobs and build images](#build)**<br>
**[deploy (build) the function container](#deploy-build)**<br>
**[run the function on the cluster](#run-on-cluster)**<br>
**[create and run a KubeFlow Pipeline](#create-pipeline)**<br>

<a id="install" ></a>
______________________________________________
### **install mlrun**

In [1]:
# Uncomment this to install mlrun package, restart the kernel after

# !pip install -U mlrun

In [2]:
# set the UI external URL (will generate ui hyperlinks)
# %env MLRUN_UI_URL=http://<mlrun-ui-url>:<port>

______________________________________________

<a id='define-function'></a>
### **define a new function and its dependencies**

In [3]:
# nuclio: ignore
# do not remove the comment above (it is a directive to nuclio, ignore that cell during build)
# if the nuclio-jupyter package is not installed run !pip install nuclio-jupyter and restart the kernel 
import nuclio 

We use `%nuclio` magic commands to set package dependencies and configuration:

In [4]:
%nuclio cmd -c pip install pandas
%nuclio config spec.build.baseImage = "mlrun/mlrun"

%nuclio: setting spec.build.baseImage to 'mlrun/mlrun'


The ```DataItem```s and the ```context``` within which they are logged are described in the following ```mlrun``` modules (they are included here only for type clarity).

In [5]:
from mlrun.execution import MLClientCtx
from mlrun.datastore import DataItem

In [6]:
import time

def training(
    context: MLClientCtx,
    p1: int = 1,
    p2: int = 2
) -> None:
    """Train a model.

    :param context: The runtime context object.
    :param p1: A model parameter.
    :param p2: Another model parameter.
    """
    # access input metadata, values, and inputs
    print(f'Run: {context.name} (uid={context.uid})')
    print(f'Params: p1={p1}, p2={p2}')
    context.logger.info('started training')
    
    # <insert training code here>
    
    # log the run results (scalar values)
    context.log_result('accuracy', p1 * 2)
    context.log_result('loss', p1 * 3)
    
    # add a lable/tag to this run 
    context.set_label('category', 'tests')
    
    # log a simple artifact + label the artifact 
    # If you want to upload a local file to the artifact repo add src_path=<local-path>
    context.log_artifact('model', 
                          body=b'abc is 123', 
                          local_path='model.txt', 
                          labels={'framework': 'tfkeras'})

In [7]:
def validation(
    context: MLClientCtx,
    model: DataItem
) -> None:
    """Model validation.
    
    Dummy validation function.
    
    :param context: The runtime context object.
    :param model: The extimated model object.
    """
    # access input metadata, values, files, and secrets (passwords)
    print(f'Run: {context.name} (uid={context.uid})')
    print(f'file - {model.url}:\n{model.get()}\n')
    context.logger.info('started validation')    
    context.log_artifact('validation', 
                         body=b'<b> validated </b>', 
                         format='html')

The following end-code annotation tells ```nuclio``` to stop parsing the notebook from this cell. _**Please do not remove this cell**_:

In [8]:
# nuclio: end-code

______________________________________________

<a id='test-locally'></a>
### **test the function code and pipeline locally**
The functions above can be tested locally. Parameters, inputs, and outputs can be specified in the API or the `Task` object.

We create a ```function``` which defines the runtime environment (type, code, image, ..) and ```run()``` a job or experiments using that function.

We use the ```local``` runtime by default, later on we will use a ```job``` runtime for running containers, and can use other distributed runners like MpiJob, Spark, Dask, and Nuclio.

In each run we can specify the function, inputs, parameters/hyper-parameters, etc... For more details, see the [mlrun_basics notebook](mlrun_basics.ipynb).

In [14]:
from mlrun import run_local, code_to_function, mlconf, NewTask, mount_v3io
from mlrun.platforms import mount_pvc
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

<b> define the artifact location</b>

In [10]:
from os import path
out = mlconf.artifact_path or path.abspath('./data')
# {{run.uid}} will be substituted with the run id, so output will be written to different directoried per run
artifact_path = path.join(out, '{{run.uid}}')

#### _running and linking multiple tasks_
In this example we run two functions, ```training``` and ```validation``` and we pass the result from one to the other.
We will see in the ```job``` example that linking works even when the tasks are run in a workflow on different processes or containers.

```run_local()``` will run our task on a local function:

Run the training function. Functions can have multiple handlers/methods, here we call the ```training``` handler:

In [11]:
train_run = run_local(NewTask(handler=training, params={'p1': 5}, artifact_path=out))

[mlrun] 2020-04-11 15:31:27,413 starting run mlrun-f62762-training uid=b2a9d682c6a14572bae689cd42be7acd  -> http://mlrun-api:8080
Run: mlrun-f62762-training (uid=b2a9d682c6a14572bae689cd42be7acd)
Params: p1=5, p2=2
[mlrun] 2020-04-11 15:31:27,441 started training
[mlrun] 2020-04-11 15:31:27,450 log artifact model at /home/jovyan/data/model.txt, size: 10, db: Y



project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...be7acd,0,Apr 11 15:31:27,completed,mlrun-f62762-training,kind=handlerowner=jovyanhost=jupyter-notebook-65885d574c-ztzzvcategory=tests,,p1=5,accuracy=10loss=15,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run b2a9d682c6a14572bae689cd42be7acd --project default , !mlrun logs b2a9d682c6a14572bae689cd42be7acd --project default
[mlrun] 2020-04-11 15:31:27,502 run executed, status=completed


After the function runs it generates the result widget, you can click the `model` artifact to see its content.

In [12]:
train_run.outputs

{'accuracy': 10, 'loss': 15, 'model': '/home/jovyan/data/model.txt'}

The output from the first training function is passed to the validation function, let's run it:

In [15]:
model_path = train_run.outputs['model']

validation_run = run_local(NewTask(handler=validation, inputs={'model': model_path}, artifact_path=out))

[mlrun] 2020-04-11 15:32:34,837 starting run mlrun-0ded6b-validation uid=700425cc7e3242c6a7f5a41b9e9d2b34  -> http://mlrun-api:8080
Run: mlrun-0ded6b-validation (uid=700425cc7e3242c6a7f5a41b9e9d2b34)
file - /home/jovyan/data/model.txt:
b'abc is 123'

[mlrun] 2020-04-11 15:32:34,863 started validation
[mlrun] 2020-04-11 15:32:34,870 log artifact validation at /home/jovyan/data/validation.html, size: 18, db: Y



project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...9d2b34,0,Apr 11 15:32:34,completed,mlrun-0ded6b-validation,kind=handlerowner=jovyanhost=jupyter-notebook-65885d574c-ztzzv,model,,,validation


to track results use .show() or .logs() or in CLI: 
!mlrun get run 700425cc7e3242c6a7f5a41b9e9d2b34 --project default , !mlrun logs 700425cc7e3242c6a7f5a41b9e9d2b34 --project default
[mlrun] 2020-04-11 15:32:34,911 run executed, status=completed


______________________________________________

<a id="build"></a>
### **define cluster jobs and build images**

In order to use our function in a cluster we need to package our code and dependencies.

The ```code_to_function``` call will automatically generate a ```function``` object from the current notebook (or a specified file) with its list of dependencies and runtime configuration.

In [16]:
# create an ML function from the notebook, attache it to iguazio data fabric (v3io)
trainer = code_to_function(name='my-trainer', kind='job')

The functions need shared storage (file or object) media to pass and store artifacts.

You can add _**Kubernetes**_ resources like volumes, environment variables, secrets, cpu/mem/gpu, etc. to a function.

```mlrun``` uses _**KubeFlow**_ modifiers (apply) to configure resources, you can build your own or use predefined ones e.g. for [AWS resources](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/aws.py).


##### _**Option 1: Using file volumes for artifacts**_
If your are using [Iguazio data science platform](https://www.iguazio.com/) use the `mount_v3io()` auto-mount modifier.<br>
if you use other k8s PVC volumes you can use the `mlrun.platforms.mount_pvc(..)` modifier with the requiered params.

Applying ```mount_v3io()``` will attach the function to Iguazio's real-time data fabric (mounted by default to _**home**_ of the current user).

**Note**: if the notebook is not on the managed platform (running remotely) you need to create and use a v3io secret, run:

`kubectl create -n <namespace> secret generic my-v3io --from-literal=accessKey=<your access key> --from-literal=username=<your user name> --type v3io/fuse`

and use: `trainer.apply(mount_v3io(user='admin', secret='my-v3io'))`.

So for our current ```training``` function, when using Iguazio data science platform run:

In [17]:
# for use on Iguazio platform
trainer.apply(mount_v3io())

ValueError: user name/env must be specified when using "~" in path

In [None]:
# Uncomment for use with shared PVC volumes, e.g. using the NFS Share in the local k8s install
# from mlrun.platforms import mount_pvc
# trainer.apply(mount_pvc('nfsvol', 'nfsvol', '/home/jovyan/data'))

##### _**Option 2: Using AWS S3 for artifacts**_

In AWS you can use S3 and need to have a `secret` with AWS credentials. An AWS secret can be created with the following command line:

`kubectl create -n <namespace> secret generic my-aws --from-literal=AWS_ACCESS_KEY_ID=<access key> --from-literal=AWS_SECRET_ACCESS_KEY=<secret key>`

To use the secret:

In [18]:
# from kfp.aws import use_aws_secret

In [19]:
# trainer.apply(use_aws_secret(secret_name='my-aws'))
# out = 's3://<your-bucket-name>/jobs/{{run.uid}}'

______________________________________________

<a id="deploy-build"></a>
### **deploy (build) the function container**

The `deploy()` command will build a custom container image (create a cluster build job) from the outlined function dependencies.

If a pre-built container image already exists, pass the `image` name instead. _**Note that the code and params can be updated per run without building a new image**_.

The image is stored in a container repository, and by default it uses the repository configured on the MLRun API service, you can specify your own docker registry by first creating a secret, and adding that secret name to the build configuration:

`kubectl create -n <namespace> secret docker-registry my-docker --docker-server=https://index.docker.io/v1/ --docker-username=<your-user> --docker-password=<your-password> --docker-email=<your-email>`

and run this: `trainer.build_config(image='target/image:tag', secret='my_docker')`

In [20]:
trainer.deploy()

[mlrun] 2020-04-11 15:32:44,186 starting remote build, image: .mlrun/func-default-my-trainer-latest
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.6 to mlrun/mlrun:0.4.6 
[36mINFO[0m[0000] Resolved base name mlrun/mlrun:0.4.6 to mlrun/mlrun:0.4.6 
[36mINFO[0m[0000] Retrieving image manifest mlrun/mlrun:0.4.6  
[36mINFO[0m[0001] Retrieving image manifest mlrun/mlrun:0.4.6  
[36mINFO[0m[0002] Built cross stage deps: map[]                
[36mINFO[0m[0002] Retrieving image manifest mlrun/mlrun:0.4.6  
[36mINFO[0m[0002] Retrieving image manifest mlrun/mlrun:0.4.6  
[36mINFO[0m[0003] Unpacking rootfs as cmd RUN pip install pandas requires it. 
[36mINFO[0m[0044] Taking snapshot of full filesystem...        
[36mINFO[0m[0045] Resolving paths                              
[36mINFO[0m[0065] RUN pip install pandas                       
[36mINFO[0m[0065] cmd: /bin/sh                                 
[36mINFO[0m[0065] args: [-c pip install pandas]                


True

<a id="run-on-cluster"></a>
### **run the function on the cluster**


In case we made changes to the code, ```with_code``` will inject the latest code into the function (it doesn't require a new build).

In [21]:
trainer.with_code()

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f6379fb0f10>

In [25]:
trainer.apply(mount_pvc(pvc_name='pvclocal',volume_name='pvclocal',volume_mount_path="/home/jovyan/mlrun"))

<mlrun.runtimes.kubejob.KubejobRuntime at 0x7f6379fb0f10>

In [26]:
# create the base task (common to both steps), and set the output path and experiment label
base_task = NewTask(artifact_path=out).set_label('stage', 'dev')

In [27]:
# run our training task, with hyper params, and select the one with max accuracy
train_task = NewTask(name='my-training', handler='training', params={'p1': 9}, base=base_task)
train_run = trainer.run(train_task)

[mlrun] 2020-04-11 15:37:02,200 starting run my-training uid=4066a552840e4e0f9163e0fde4cb17f8  -> http://mlrun-api:8080
[mlrun] 2020-04-11 15:37:02,320 Job is running in the background, pod: my-training-rpmqb
Run: my-training (uid=4066a552840e4e0f9163e0fde4cb17f8)
Params: p1=9, p2=2
[mlrun] 2020-04-11 15:37:06,603 started training
[mlrun] 2020-04-11 15:37:06,612 log artifact model at /home/jovyan/data/model.txt, size: 10, db: Y

[mlrun] 2020-04-11 15:37:06,622 run executed, status=completed
final state: succeeded


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...cb17f8,0,Apr 11 15:37:06,completed,my-training,category=testshost=my-training-rpmqbkind=jobowner=jovyanstage=dev,,p1=9,accuracy=18loss=27,model


to track results use .show() or .logs() or in CLI: 
!mlrun get run 4066a552840e4e0f9163e0fde4cb17f8  , !mlrun logs 4066a552840e4e0f9163e0fde4cb17f8 
[mlrun] 2020-04-11 15:37:08,451 run executed, status=completed


In [28]:
# running validation, use the model result from the previos step 
model_path = train_run.outputs['model']
trainer.run(base_task, handler='validation', inputs={'model': model_path}, watch=True)

[mlrun] 2020-04-11 15:37:08,462 starting run my-trainer-validation uid=ba8811446d364b48b5839628b7361738  -> http://mlrun-api:8080
[mlrun] 2020-04-11 15:37:08,565 Job is running in the background, pod: my-trainer-validation-vp8ht
Run: my-trainer-validation (uid=ba8811446d364b48b5839628b7361738)
file - /home/jovyan/data/model.txt:
b'abc is 123'

[mlrun] 2020-04-11 15:37:12,481 started validation
[mlrun] 2020-04-11 15:37:12,488 log artifact validation at /home/jovyan/data/validation.html, size: 18, db: Y

[mlrun] 2020-04-11 15:37:12,498 run executed, status=completed
final state: succeeded


project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...361738,0,Apr 11 15:37:12,completed,my-trainer-validation,host=my-trainer-validation-vp8htkind=jobowner=jovyanstage=dev,model,,,validation


to track results use .show() or .logs() or in CLI: 
!mlrun get run ba8811446d364b48b5839628b7361738  , !mlrun logs ba8811446d364b48b5839628b7361738 
[mlrun] 2020-04-11 15:37:14,732 run executed, status=completed


<mlrun.model.RunObject at 0x7f635e1de090>

______________________________________________

<a id="create-pipeline"></a>
### **create and run a KubeFlow pipeline**

KubeFlow pipelines are used for workflow automation--we compose a graph of functions and specify parameters, inputs and outputs.

As ilustrated below, we can chain the outputs and inputs of the pipeline steps.

In [24]:
import kfp
from kfp import dsl
from mlrun import run_pipeline

In [25]:
@dsl.pipeline(
    name = 'job test',
    description = 'demonstrating mlrun usage'
)
def job_pipeline(
   p1: int = 9
) -> None:
    """Define our pipeline.
    
    :param p1: A model parameter.
    """

    train = trainer.as_step(handler='training',
                            params={'p1': p1},
                            outputs=['model'])
    
    validate = trainer.as_step(handler='validation',
                               inputs={'model': train.outputs['model']},
                               outputs=['validation'])
    

The job pipeline can compiled to a yaml file that can be used for debugging:

In [26]:
kfp.compiler.Compiler().compile(job_pipeline, 'jobpipe.yaml')

#### running the pipeline

Pipeline results are stored at the `artifact_path` location:

However, by adding ```/{{workflow.uid}}``` to the path ```mlrun``` will generate a unique folder per workflow.

In [27]:
artifact_path = 'v3io:///users/admin/kfp/{{workflow.uid}}/'

In [28]:
arguments = {'p1': 8}
run_id = run_pipeline(job_pipeline, arguments, experiment='my-job', artifact_path=artifact_path)

[mlrun] 2020-03-30 17:46:45,684 Pipeline run id=a75b5805-064d-40e1-bfda-d9c20a834e0f, check UI or DB for progress


[top](#top)