In [1]:
from mlrun import new_function, NewTask, get_run_db, mlconf, mount_v3io, new_model_server, builder
import kfp
from kfp import dsl
import os
from os.path import isfile, join

In [22]:
from mlrun import mlconf
from os import path
mlconf.dbpath = mlconf.dbpath or 'http://mlrun-api:8080'

# specify artifacts target location
artifact_path = mlconf.artifact_path or path.abspath('./')


In [None]:
artifact_path

In [23]:
mlconf.dbpath = 'http://mlrun-api:8080'

In [24]:
# Environment vars to be set by Nuclio
#PYTHON_SCRIPT = os.getenv('PYTHON_SCRIPT','/kv-to-parquet.py')
PYTHON_SCRIPT = os.getenv('PYTHON_SCRIPT','/pi.py')
V3IO_SCRIPT_PATH = os.getenv('V3IO_SCRIPT_PATH',os.getcwd().replace('/User','/v3io/'+os.getenv('V3IO_HOME')))

SPARK_JOB_NAME = os.getenv('SPARK_JOB_NAME','my-spark-job') 
SPARK_SPEC_MEM = os.getenv('SPARK_SPEC_MEM','2g') 
SPARK_SPEC_CPU = int(os.getenv('SPARK_SPEC_CPU',1) )
SPARK_SPEC_REPLICAS = int(os.getenv('SPARK_SPEC_REPLICAS',1) )

In [25]:
#Set the pyspark script path
V3IO_SCRIPT_PATH = V3IO_SCRIPT_PATH+PYTHON_SCRIPT

In [26]:
#Get the list of the dpendency jars
V3IO_JARS_PATH = '/igz/java/libs/'
DEPS_JARS_LIST = [join(V3IO_JARS_PATH, f) for f in os.listdir(V3IO_JARS_PATH) 
                  if isfile(join(V3IO_JARS_PATH, f)) and f.startswith('v3io-') and f.endswith('.jar')]


In [32]:
#Create MLRun function to run the spark-job on the kubernetes cluster
serverless_spark_fn = new_function(kind='spark', image='urihoenig/spark-app:2.4.4-2.9.0-0.0.3', 
                                   command=f'local://{V3IO_SCRIPT_PATH}', name=SPARK_JOB_NAME).apply(mount_v3io(name='v3io', remote='~/', mount_path='/User', access_key=os.getenv('V3IO_ACCESS_KEY'),
      user=os.getenv('V3IO_USERNAME')))

In [33]:
serverless_spark_fn.spec.env.append({'name':'V3IO_HOME_URL','value':os.getenv("V3IO_HOME_URL")})

In [34]:
serverless_spark_fn.with_limits(mem=SPARK_SPEC_MEM)
serverless_spark_fn.with_requests(cpu=SPARK_SPEC_CPU)
serverless_spark_fn.with_igz_spark(igz_version='2.8_b3506_20191217042239')
#Set number of executors
serverless_spark_fn.spec.replicas = SPARK_SPEC_REPLICAS

## Run spark script

In [35]:
serverless_spark_fn.run()

[mlrun] 2020-06-01 14:41:21,274 artifact path is not defined or is local, artifacts will not be visible in the UI
[mlrun] 2020-06-01 14:41:21,285 starting run my-spark-job uid=11cd0a9eb250441ebf600e3eddce8ed5  -> http://mlrun-api:8080
++ id -u
+ myuid=1000
++ id -g
+ mygid=1000
+ set +e
++ getent passwd 1000
+ uidentry=iguazio:x:1000:1000::/igz:/bin/bash
+ set -e
+ '[' -z iguazio:x:1000:1000::/igz:/bin/bash ']'
+ SPARK_K8S_CMD=driver-py
+ case "$SPARK_K8S_CMD" in
+ shift 1
+ SPARK_CLASSPATH=':/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=\(.*\)/\1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' -n file:///igz/java/libs/v3io-py.zip ']'
+ PYTHONPATH=:file:///igz/java/libs/v3io-py.zip
+ PYSPARK_ARGS=
+ '[' -n '' ']'
+ R_ARGS=
+ '[' -n '' ']'
+ '[' 2 == 2 ']'
++ python -V
+ pyv='Python 3.6.8'
+ export PYTHON_VERSION=3.6.8
+ PYTHON_VERSION=3.6.8
+ export PYSPARK_PYTHON=python
+ PYSPARK_PYTHON=python
+ export PYSPARK_DRIVER_PYTHON=python
+ PYSP

project,uid,iter,start,state,name,labels,inputs,parameters,results,artifacts
default,...ddce8ed5,0,Jun 01 14:41:21,completed,my-spark-job,kind=sparkowner=adminv3io_user=admin,,,,


to track results use .show() or .logs() or in CLI: 
!mlrun get run 11cd0a9eb250441ebf600e3eddce8ed5 --project default , !mlrun logs 11cd0a9eb250441ebf600e3eddce8ed5 --project default
[mlrun] 2020-06-01 14:42:28,953 run executed, status=completed


<mlrun.model.RunObject at 0x7f1606461da0>

## Run Spark sript as part of a pipeline

In [36]:
LABELS = "label"

@dsl.pipeline(
    name='Kubeflow pipeline with Spark jobs',
    description='Run SparkK8s as par tof pipeline'
)
def example_pipeline(
   p1 = [1,2,3,4,5,6],
   p2 = [9,8,6,5,4,3]
):
    # Use the same fn definition, but run different functions.
    # fn2 is a definition from the external notebook
    f1 = serverless_spark_fn.as_step(NewTask(), name='Sparkstep1',outputs=['bankxact']).apply(mount_v3io(name='v3io', remote='~/', mount_path='/User', access_key=os.getenv('V3IO_ACCESS_KEY'),
      user=os.getenv('V3IO_USERNAME')))


In [37]:
client = kfp.Client(namespace='default-tenant')
p1 = [1,2,3,4,5,6]
p2 = [9,8,6,5,4,3]
#arguments = {'p1': p1 ,'p2': p2}
arguments={}

# Record pipeline deployment in KV

In [38]:
import uuid
import v3io.dataplane

In [39]:
v3io_client = v3io.dataplane.Client(max_connections=1)

In [40]:
def record_pipeline_id(run_id):
    record_id=str(run_id)
    v3io_client.put_item(container=os.getenv('MONITOR_CONTAINER','bigdata'),
                         path=os.path.join(os.getenv('MONITOR_TABLE','kubeflow_runs'),record_id),
                         attributes={
                             'id': record_id,
                             'status' : 'started',}
                        )

# get the Kubeflow run_id
Note: This notebook was written to invoke the deployment as an mlrun function.

In [41]:
def handler(context,event):
    run_id=str(uuid.uuid4())
    run_result = client.create_run_from_pipeline_func(example_pipeline, arguments, run_name='SparkPipe-'+run_id, experiment_name='SparkPipeline')
    record_pipeline_id(run_result.run_id)
    return run_result.run_id

In [42]:
handler('1','2')



'7fe97143-15fe-4486-8d12-d94177807c20'