# Creating A Pipeline Using MLRUN

In [32]:
import kfp
from kfp import dsl
from mlrun import run_start, mlrun_op, get_run_db
from mlrun.platforms import mount_v3io

In [33]:
from mlrun.builder import upload_tarball, build_image

### Build image to run on Kubeflow step
This is a one time only task

In [None]:
img_name = build_image('mlrun/sparkk8s', base_image = 'python:3.6-jessie',
            commands = ['pip install git+https://github.com/marcelonyc/mlrun',\
'apt-get update && apt-get install -y apt-transport-https',\
'curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg > /tmp/apt-key',\
'apt-key add /tmp/apt-key',\
'echo "deb https://apt.kubernetes.io/ kubernetes-xenial main" >> /etc/apt/sources.list.d/kubernetes.list',\
'apt-get update',\
'apt-get install -y kubectl wget'          ],
          with_mlrun=False)

### Define artifact locations

In [35]:
this_path = '/User/spark_operator'
db_path = this_path
#artifacts_path = this_path + '/data/{{workflow.uid}}/'
artifacts_path = 'v3io:///bigdata/mlrun/{{workflow.uid}}/'

### Functions for Kubeflow steps
This example uses the image created above and runs mlrun_spark-k8s.py

#### Run specs for spark operator execution
One per function

In [36]:
run_spec =  {'metadata':
                 {"AUTH_SECRET" : "shell-um81v5npue-qh8oc-v3io-auth",
"FUSE_SECRET" : "shell-um81v5npue-qh8oc-v3io-fuse",
'mainApplicationFile' : 'local:///v3io/users/admin/spark_operator/spark-jdbc.py',
                  'ADDITIONAL_JARS' : 'local:///v3io/users/admin/mysql-connector-java-5.1.45.jar',
                  'APP_NAME' : 'frommysql',
                  'WAIT' : True,
                  'VERBOSE' : False
                 },
             'rundb' : '/v3io/users/admin/mlrundb'
            }

In [47]:
# /run/exec/mlrun_spark-k8s.py is a built in command
# the run_spec determines what gets executed (mainApplicationFile) 
def mlrun_train(p1, p2):
    return mlrun_op('jdbctokv', 
                    command = '/User/spark_operator/exec/mlrun_spark-k8s.py', 
                    out_path = artifacts_path,
                    params = { 'sparkk8sspec' : run_spec },
                    image = 'docker-registry.default-tenant.app.mdl0911.iguazio-cd2.com:80/mlrun/sparkk8s:latest',
                    rundb = db_path)

<b> Create a Kubeflow Pipelines DSL (execution graph/DAG)</b>

In [48]:
@dsl.pipeline(
    name='Pipeline with Spark Operator Steps',
    description='Run Spark Operator as pipelnen step'
)
def mlrun_pipeline(
   p1 = 5 , p2 = '"text"'
):
    # create a train step, apply v3io mount to it (will add the /User mount to the container)
    train = mlrun_train(p1, p2).apply(mount_v3io())

<b> Create the pipeline spec </b><br>
compile the pipeline and create a YAML file from it 

In [49]:
kfp.compiler.Compiler().compile(mlrun_pipeline, 'mlrunpipe.yaml')

<b> Create a KFP client, Experiment and run the pipeline with custom parameter </b>

In [53]:
client = kfp.Client(namespace='default-tenant')
arguments = {'p1': 4}
experiment = client.create_experiment('Spark Operator Demo')
run_result = client.run_pipeline(experiment.id, 'Spark Operator Demo', 'mlrunpipe.yaml', arguments)

<b> See the run status and results in the run database </b>

In [51]:
# connect to the run db 
db = get_run_db(db_path).connect()

In [56]:
# query the DB with filter on workflow ID (only show this workflow) 
db.list_runs('', labels=f'workflow={run_result.id}').show()

uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
...26b09e,0,Sep 18 13:50:44,running,jdbctokv,workflow=4c98a4b7-da1b-11e9-a4e6-02bd29b4ad78kind=localowner=roothost=pipeline-with-spark-operator-steps-xnlcf-1404802076,,"sparkk8sspec={'metadata': {'AUTH_SECRET': 'shell-um81v5npue-qh8oc-v3io-auth', 'FUSE_SECRET': 'shell-um81v5npue-qh8oc-v3io-fuse', 'mainApplicationFile': 'local:///v3io/users/admin/spark_operator/spark-jdbc.py', 'ADDITIONAL_JARS': 'local:///v3io/users/admin/mysql-connector-java-5.1.45.jar', 'APP_NAME': 'frommysql', 'WAIT': True, 'VERBOSE': False}, 'rundb': '/v3io/users/admin/mlrundb'}",,
