# Running Parquez pipeline

In [None]:
#!pip install mlrun==0.6.0rc7
!pip show mlrun

### create the mlrun project 

In [None]:
from os import path, getenv
from mlrun import new_project, mlconf

#project_name = '-'.join(filter(None, ['getting-started-iris', getenv('V3IO_USERNAME', None)]))
project_name = "parquez"
project_path = path.abspath('./')
project = new_project(project_name, project_path)
project.save()
print(f'Project path: {project_path}\nProject name: {project_name}')

In [None]:
out = mlconf.artifact_path or path.abspath('./data')
# {{run.uid}} will be substituted with the run id, so output will be written to different directoried per run
artifact_path = path.join(out, '{{run.uid}}')
%env PYTHONPATH=./

### set the project functions

In [21]:
from mlrun.run import new_function
from mlrun import mount_v3io

image_name =  'aviaigz/parquez:0.6.0rc7'

project.set_function("functions/validate_input.py", 'validate', kind='job', image=image_name)
project.set_function("functions/get_table_schema.py", 'get_schema', kind='job', image=image_name)
project.set_function("functions/create_parquet_table.py", 'create_parquet', kind='job', image=image_name)
project.set_function("functions/create_kv_view.py", 'create_kv_view', kind='job', image=image_name)
project.set_function("functions/create_unified_view.py", 'create_unified_view', kind='job', image=image_name)
project.set_function("functions/parquet_add_partition.py", 'parquet_add_partition', kind='job', image=image_name)
project.set_function("functions/delete_kv_partition.py", 'delete_kv_partition', kind='job', image=image_name)
spark_job = new_function(kind='spark', command='/User/parquez/functions/kv_to_parquet.py', name='kv_to_parquet') # /User isn't supported at this stage
project.set_function(spark_job)
project.set_function("functions/run_scheduler.py", 'run_scheduler', kind='job', image=image_name)
project.set_function("functions/parquet_add_partition.py", 'run_scheduler', kind='job', image=image_name)
project.set_function("functions/parquetinizer.py", 'parquetinizer', kind='job', image=image_name)




<mlrun.runtimes.kubejob.KubejobRuntime at 0x7fc6719a9d10>

In [22]:

project.func('kv_to_parquet').with_driver_limits(cpu="1300m")
project.func('kv_to_parquet').with_driver_requests(cpu=1, mem="512m") # gpu_type & gpus=<number_of_gpus> are supported too
project.func('kv_to_parquet').with_executor_limits(cpu="1400m")
project.func('kv_to_parquet').with_executor_requests(cpu=1, mem="512m")
project.func('kv_to_parquet').with_igz_spark() # Adds fuse, daemon & iguazio's jars support
project.func('kv_to_parquet').deploy()# Rebuilds the image with MLRun - This is needed in order to support artifact logging etc. This step is too long (~3 minutes)
project.func('delete_kv_partition').deploy() 
project.func('parquet_add_partition').deploy()
project.func('create_kv_view').deploy()
project.func('parquetinizer').deploy()



> 2021-01-11 09:37:47,232 [info] running build to add mlrun package, set with_mlrun=False to skip if its already in the image
> 2021-01-11 09:37:47,234 [info] starting remote build, image: .mlrun/func-parquez-kv_to_parquet-latest
E0111 09:37:50.495449       1 aws_credentials.go:77] while getting AWS credentials NoCredentialProviders: no valid providers in chain. Deprecated.
	For verbose messaging see aws.Config.CredentialsChainVerboseErrors
E0111 09:37:50.505138       1 metadata.go:154] while reading 'google-dockercfg' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg
E0111 09:37:50.508153       1 metadata.go:166] while reading 'google-dockercfg-url' metadata: http status code: 404 while fetching url http://metadata.google.internal./computeMetadata/v1/instance/attributes/google-dockercfg-url
[36mINFO[0m[0000] Retrieving image manifest gcr.io/iguazio/spark-app:3.0_b5984_20210102145103 
[36mINFO

False

<a id="gs-step-create-n-run-ml-pipeline"></a>
## Create and Run a Fully Automated ML Pipeline

You're now ready to create a full ML pipeline.
This is done by using [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/), which is integrated into the Iguazio Data Science Platform.
Kubeflow Pipelines is an open-source framework for building and deploying portable, scalable machine-learning workflows based on Docker containers.
MLRun leverages this framework to take your existing code and deploy it as steps in the pipeline.

In [16]:
%%writefile {path.join(project_path, 'workflow.py')}

from kfp import dsl
from mlrun import mount_v3io
from os import path
import os

V3IO_ACCESS_KEY = os.environ['V3IO_ACCESS_KEY']
V3IO_USERNAME = os.getenv('V3IO_USERNAME')

funcs = {}
project_path = path.abspath('./')
parquez_params = {'view_name':'view_name'
         ,'partition_by':'h'
         ,'partition_interval':'1h'
         ,'real_time_window':'3h'
         ,'historical_retention':'24h'
         ,'real_time_table_name':'faker'
         ,'config_path':'/User/parquez/config/parquez.ini'
         ,'user_name':V3IO_USERNAME
         ,'access_key':V3IO_ACCESS_KEY          
         ,'project_path': project_path
         }


# Configure function resources and local settings
def init_functions(functions: dict, project=None, secrets=None):
    project_path = path.abspath('./')
    for f in functions.values():
        f.apply(mount_v3io())
        f.set_env('PYTHONPATH', project_path)
        f.spec.artifact_path = 'User/artifacts'
        f.spec.service_account='mlrun-api'
        
        
# Create a Kubeflow Pipelines pipeline
@dsl.pipeline(
    name = "parquez-pipeline",
    description = "parquez description"
)
def kfpipeline():
    
#     # clean the tables
#     clean = funcs['clean'].as_step(
#         name="clean",
#         params=parquez_params,
#         outputs=['clean']
#     )
    
    # Ingest the data set
    validate = funcs['validate'].as_step(
        name="validate",
        params=parquez_params,
#         inputs={'table': clean.outputs},
        outputs=['validate']
    )
    
    # Analyze the dataset
    schema = funcs['get_schema'].as_step(
        name="get_schema",
        params = parquez_params,
        inputs={'table': validate.outputs},                       
        outputs=['schema']
    )
    
    parquet = funcs["create_parquet"].as_step(
        name="create_parquet",
        params=parquez_params,
        inputs={"table": schema.outputs['schema']},
        outputs=['create_parquet']
    )
    
    kv_view = funcs["create_kv_view"].as_step(
        name="create_kv_view",
        params=parquez_params,
        inputs={'table': parquet.outputs},
        outputs=['kv_view']
    )
    
    unified_view = funcs["create_unified_view"].as_step(
        name="create_unified_view",
        params=parquez_params,
        inputs={'table': kv_view.outputs},
        outputs=['unified_view']
    )
    
    unified_view = funcs["run_scheduler"].as_step(
        name="run_scheduler",
        params=parquez_params,
        inputs={'table': unified_view.outputs},
        outputs=['run_scheduler']
    )    

Overwriting /User/parquez/workflow.py


<a id="gs-register-workflow"></a>
#### Register the Workflow

Use the `set_workflow` MLRun project method to register your workflow with MLRun.
The following code sets the `name` parameter to the selected workflow name ("main") and the `code` parameter to the name of the workflow file that is found in your project directory (**workflow.py**).

In [17]:
# Register the workflow file as "main"
project.set_workflow('main', 'workflow.py')

In [18]:
project.save()

In [19]:
run_id = project.run(
    'main',
    arguments={}, 
    
    artifact_path=path.abspath(path.join('pipeline','{{workflow.uid}}'),
    
                              )
    ,dirty=True)

> 2021-01-05 15:14:27,637 [info] using in-cluster config.


> 2021-01-05 15:14:30,298 [info] Pipeline run id=32d412df-4a0d-4bfe-8051-4a05b3b96b75, check UI or DB for progress


In [None]:
from mlrun import get_run_db
get_run_db().list_schedules('parquez')