# Spark Job Example
* Requires you to have a spark master running
    * Install Spark locally on your node (https://spark.apache.org/docs/latest/spark-standalone.html)
    * Start up a spark master instance on your node: `$ $(SPARK_HOME)/sbin/start-master.sh`
    * Start up at least one slave: `$ $(SPARK_HOME)/sbin/start-worker.sh spark://localhost:7077`
    * Verify that your master and one slave is up at http://localhost:8088
    * NOTE: If you are using this with a Spark standalone cluster you must ensure that the installed version (including minor version) matches the PySpark version or you may experience odd errors.
    * NOTE: You may run into issues with ports if you're on a corp network or VPN.  See https://stackoverflow.com/questions/52133731/how-to-solve-cant-assign-requested-address-service-sparkdriver-failed-after
* Requires you to have aws configured to allow pushing files to S3
    * Install the awscli: `$ pip install awscli` 
    * Configure your credentials: `$ aws configure`

## Illustrates how to:
* Submit a Spark job from a task
* Have the Spark job read from a bundle with s3 paths
* Have the Spark job write to s3 paths contained in the output bundle


In [1]:
%load_ext autoreload
%autoreload 1

In [2]:
import disdat.api as api
from disdatluigi.api import apply
from disdat.api import Bundle
import pandas as pd
import pickle
import time
import luigi
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os

%aimport pipelines.spark_tasks

# Make a bundle with s3 paths
NOTE: Requires a remote context to push to 

In [3]:
data_context = 'example-context'
remote_context_url = 's3://disdat-cdo-prd/'  # <------ Replace with the location of your Disdat contexts on S3
api.context(data_context)
api.remote(data_context, data_context, remote_context_url)

with Bundle(data_context, name="s3_files") as b:
    f1 = b.get_file("file_1.txt")
    with open(f1, mode='w') as f:
        f.write("This is our first file!")
    b.add_data(f1)

# Push and remove the local file
b.commit().push(delocalize=True)

Context already bound to remote at s3://disdat-cdo-prd/
Pushed committed bundle None uuid 0068f9ce-1900-4bc0-84ed-459b2b6e0246 to remote s3://disdat-cdo-prd/context


<disdat.api.Bundle at 0x7fcc43021a60>

In [4]:
spark_master = 'spark://localhost:7077'   # Fill in your spark URL (available at web page localhost:8080)
app_name = "testapp"

In [5]:
apply("example-context",
          pipelines.spark_tasks.RunSparkJob,
         params={'spark_master': spark_master,
                'app_name': app_name},
         incremental_push=True,
         force=True)

DEBUG: Checking if RunSparkJob(spark_master=spark://intuitdepe1ea6:7077, app_name=testapp, input_bundle_name=s3_files) is complete
DEBUG: Checking if ExternalDepTask(uuid=0068f9ce-1900-4bc0-84ed-459b2b6e0246, processing_name=_d41d8cd98f_d41d8cd98f) is complete
INFO: Informed scheduler that task   RunSparkJob_testapp_s3_files_spark___intuitde_75ab6e1f73   has status   PENDING
INFO: Informed scheduler that task   ExternalDepTask__d41d8cd98f_d41d_0068f9ce_1900_4b_9ccb2167db   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 665] Worker Worker(salt=523456111, workers=1, host=intuitdepe1ea6, username=kyocum, pid=665) running   RunSparkJob(spark_master=spark://intuitdepe1ea6:7077, app_name=testapp, input_bundle_name=s3_files)


:: loading settings :: url = jar:file:/Users/kyocum/bin/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/kyocum/.ivy2/cache
The jars for the packages stored in: /Users/kyocum/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-07a5cf4c-009c-4d66-9f0e-ea59e73f7de4;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
:: resolution report :: resolve 133ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	------------------------------

Spark reading file:s3a://disdat-cdo-prd/context/example-context/objects/0068f9ce-1900-4bc0-84ed-459b2b6e0246/file_1.txt


22/01/19 17:29:42 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Lines with a: 0, lines with b: 0


                                                                                

['context/example-context/objects/43cbbed8-fee9-4283-8047-a4b74bf947e8/job_output/_SUCCESS', 'context/example-context/objects/43cbbed8-fee9-4283-8047-a4b74bf947e8/job_output/part-00000-7a88870d-4124-4c07-80bf-b4c1ee249b4c-c000.snappy.parquet', 'context/example-context/objects/43cbbed8-fee9-4283-8047-a4b74bf947e8/job_output/part-00011-7a88870d-4124-4c07-80bf-b4c1ee249b4c-c000.snappy.parquet']


INFO: [pid 665] Worker Worker(salt=523456111, workers=1, host=intuitdepe1ea6, username=kyocum, pid=665) done      RunSparkJob(spark_master=spark://intuitdepe1ea6:7077, app_name=testapp, input_bundle_name=s3_files)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   RunSparkJob_testapp_s3_files_spark___intuitde_75ab6e1f73   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=523456111, workers=1, host=intuitdepe1ea6, username=kyocum, pid=665) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 2 tasks of which:
* 1 complete ones were encountered:
    - 1 ExternalDepTask(...)
* 1 ran successfully:
    - 1 RunSparkJob(...)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



Pushed committed bundle None uuid 43cbbed8-fee9-4283-8047-a4b74bf947e8 to remote s3://disdat-cdo-prd/context


{'success': True, 'did_work': True}