# Run a pyspark job on Dataproc

1. You should use the component ```load_spark_to_GCS``` to upload ```transform_run.py``` and ```sparkicson-0.1-dependencies.jar```
2. You should use the standard components [create_cluster](https://github.com/kubeflow/pipelines/tree/master/components/gcp/dataproc/create_cluster), [submit_pyspark_job](https://github.com/kubeflow/pipelines/tree/master/components/gcp/dataproc/submit_pyspark_job) and [delete_cluster](https://github.com/kubeflow/pipelines/tree/master/components/gcp/dataproc/delete_cluster). ```kfp.components.ComponentStore``` could help.
3. Use a template name for the cluster
4. Check ```kfp.dsl.ExitHandler```

In [None]:
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
import kfp.gcp as gcp

from kfp import Client as KfpClient

import json
import os

### Load the component ```load_spark_to_GCS ``` from the local repository

Hint use `kfp.components.ComponentStore`

In [None]:
component_store = comp.ComponentStore(
  local_search_paths=['components'])

In [None]:
upload_files_to_GCS_op = component_store.load_component('load_spark_to_GCS')

### Load the GCP components from github

Hint use `kfp.components.ComponentStore`

In [None]:
remote_component_store = comp.ComponentStore(
    url_search_prefixes=['https://raw.githubusercontent.com/kubeflow/pipelines/master/components/gcp/'])

In [None]:
dataproc_create_cluster_op = remote_component_store.load_component('dataproc/create_cluster')

In [None]:
dataproc_submit_pyspark_job_op = remote_component_store.load_component('dataproc/submit_pyspark_job')

In [None]:
dataproc_delete_cluster_op = remote_component_store.load_component('dataproc/delete_cluster')

In [None]:
BUCKET_NAME = '{0}/kfp_primer/pyspark'.format('') ### insert your backet name 

### Define the pipeline

1. We are going to query the table `bigquery-samples:wikipedia_benchmark.Wiki10M`.
2. To pass complex types as paramenter use ``json.dumps`.
3. Objectes of the class `kff.dsl.PipelineParams` should be cast to `str`.

Here the pipeline you have to built
![Pipeline](img/pyspark.png)

In [None]:
@dsl.pipeline(
    name = 'Test',
    description = 'Simple pipeline to exeperiment with KFP'
)
def end_to_end_pyspark(
    cluster_project_id = 'kfp-primer-workshop', 
    cluster_region = '', ### insert your region
    cluster_name = 'spark-{{workflow.uid}}',
    gcs_pkgs_path = 'gs://{0}/output/{{workflow.uid}}/{{pod.name}}/pkgs'.format(BUCKET_NAME),
    bq_project_id = 'bigquery-samples',
    bq_dataset = 'wikipedia_benchmark',
    bq_table = 'Wiki10M',
    output_path = 'gs://{0}/output/{{workflow.uid}}/{{pod.name}}/test.csv'.format(BUCKET_NAME),
    args='',
    job='{}',
    wait_interval='30'
    ):
    
    
    delete_cluster_task = dataproc_delete_cluster_op(
        cluster_project_id,
        cluster_region,
        cluster_name
    ).apply(gcp.use_gcp_secret('user-gcp-sa'))
        
    with dsl.ExitHandler(delete_cluster_task):
        #create cluster
        create_cluster_task = dataproc_create_cluster_op(
            project_id=cluster_project_id,
            region=cluster_region,
            name=cluster_name).apply(gcp.use_gcp_secret('user-gcp-sa'))
    
        create_cluster_task.set_display_name('create cluster')
        
        #upload file to GCS
        upload_files_to_GCS_task = upload_files_to_GCS_op(
            output_gcs_path=gcs_pkgs_path).apply(gcp.use_gcp_secret('user-gcp-sa'))

        upload_files_to_GCS_task.set_display_name('upload files')
        
        #submit job to dataproc cluster
        dataproc_submit_pyspark_job_task = dataproc_submit_pyspark_job_op(
            project_id=cluster_project_id, 
            region=cluster_region, 
            cluster_name=create_cluster_task.outputs['cluster_name'], 
            main_python_file_uri=upload_files_to_GCS_task.outputs['transform_run_path'], 
            args=args, 
            pyspark_job=json.dumps({
                'main_python_file_uri': str(upload_files_to_GCS_task.outputs['transform_run_path']),
                'jar_file_uris': str(upload_files_to_GCS_task.outputs['jar_path']),
                'args' : ['--tableProjectID', str(bq_project_id), 
                          '--dataset', str(bq_dataset), 
                          '--table', str(bq_table),
                          '--output', str(output_path)]
            }),  
            job=job, 
            wait_interval=wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))
        
        dataproc_submit_pyspark_job_task.set_display_name('run pyspark job')

### Compile pipeline to check for errors

In [None]:
compiler.Compiler().compile(end_to_end_pyspark, end_to_end_pyspark.__name__ + '.pipeline.zip')

### Upload the pipeline to Kubeflow Pipeline

If running outside of the cluster with Kubeflow, set `GOOGLE_APPLICATION_CREDENTIALS` for dealing with authorisation. The service account needs to have the role `IAP-secured Web App User`.

In [None]:
# os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '' # path to the json file of the service account used to log in: it need to have role IAP-secured Web App User
# HOST = '' # url of the cluster e.g. https://demo-kubeflow.endpoints.lf-ml-demo.cloud.goog/pipeline
# CLIENT_ID = '' # The client ID used by Identity-Aware Proxy
# NAMESPACE = '' # user namespace e.g. https://demo-kubeflow.endpoints.lf-ml-demo.cloud.goog/pipeline

In [None]:
client = KfpClient(
# we are running into the same Kubeflow so we do not need to do anything
#     host=HOST,
#     client_id=CLIENT_ID,
#     namespace=NAMESPACE  
)

In [None]:
client.upload_pipeline(
    pipeline_package_path=end_to_end_pyspark.__name__ + '.pipeline.zip', 
    pipeline_name='e2e_pyspark_run_04') #make the name unique with your username

### Run the pipeline from the UI