# Unit 6 Launching Spark Applications

## Contents

**Creating a Spark application**

**Submitting an application to YARN**

**Simple application submission example**

**Cluster vs Client mode**

**Adding dependencies**

**Complex application submission example**

**How-to install additional Python packages**

**Using additional compression codec libraries: eg. LZO**

**Sending the application in the background**

**Dynamic resource allocation**

**Hadoop Component Versions**

**Overriding configuration directory**

**Run an interactive shell**

**Exercises**

## Creating a Spark application
An application is very similar to a notebook, but there are some minor changes that must be applied.

The interactive notebook creates automatically the SparkContext (sc) and a SparkSession (spark) but in a standard application you must take care of creating them manually:

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

if __name__ == '__main__':
    spark = SparkSession \
        .builder \
        .appName('My Application') \
        .getOrCreate()
    sc = spark.sparkContext
    # ...
    # Application specific code
    # ..
    spark.stop()

## Submitting an application to YARN

To submit an application to YARN you use the **spark-submit** utility:

```
spark-submit
  --name NAME                 A name of your application.

  --master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
  --deploy-mode DEPLOY_MODE   Whether to launch the driver program locally ("client") or
                              on one of the worker machines inside the cluster ("cluster")
                              (Default: client).
  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").

  --num-executors NUM         Number of executors to launch (Default: 2).
  
  --driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
                              (Default: 1).
                              
  --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
                              or all available cores on the worker in standalone mode)

  --proxy-user NAME           User to impersonate when submitting the application.
```

The main options to take into account for resource allocation are:

* The `--num-executors` (spark.executor.instances as configuration property) option controls how many executors it will allocate for the application on the cluster .
* The `--executor-memory` (spark.executor.memory configuration property) option controls the memory allocated per executor.
* The `--executor-cores` (spark.executor.cores configuration property) option controls the cores allocated per executor.



## Simple application submission example

    spark-submit --master yarn --name testWC test.py
    spark-submit --master yarn --deploy-mode cluster --name testWC test.py



## Cluster vs Client mode


![Client mode](https://image.slidesharecdn.com/th-1150a-hall1-feng-v2-140617142634-phpapp01/95/sparkonyarn-empower-spark-applications-on-hadoop-cluster-10-638.jpg?cb=1403015417)

![Cluster mode](https://image.slidesharecdn.com/th-1150a-hall1-feng-v2-140617142634-phpapp01/95/sparkonyarn-empower-spark-applications-on-hadoop-cluster-9-638.jpg?cb=1403015417)

Image Source: [Spark-on-YARN: Empower Spark Applications on Hadoop Cluster](https://www.slideshare.net/Hadoop_Summit/sparkonyarn-empower-spark-applications-on-hadoop-cluster)

## Adding dependencies

To add dependencies you have two options:
- The **--jars** option transfers associated jar files to the cluster.
- The **--packages** option pulls directly from Spark packages. This approach requires an internet connection.
- The **--py-files** option adds .zip, .egg, or .py files to the PYTHONPATH.

### Adding dependencies: packages

    spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 ...

### Adding dependencies: jar files

    spark-submit --jars /jar_path/spark-streaming-kafka-assembly_2.10-1.6.1.2.4.2.0-258.jar ...

### Adding dependencies: egg files
In case you have an egg file of the package you want to use, you use the `--py-files` option of spark-submit or the `sc.addPyFile()` method to make it available to the application. After that you can make use of it in your application in the standard way.

    spark-submit --py-files /egg_path/avro-1.8.1-py2.7.egg ...

In [None]:
# First we add the egg file to the application environment
sc.addPyFile('/home/cesga/jlopez/packages/ClusterShell-1.7.3-py2.7.egg')
# Then we can import and use it in the standard way
from ClusterShell.NodeSet import NodeSet
nodeset = NodeSet('c[6601-6610]')

## Complex application submission example

Here you can see a real example of how to submit a real application that consumes data from Kafka in avro format using Spark Streaming:

```
spark-submit --master yarn --deploy-mode cluster \
             --num-executors 2 \
             --conf spark.yarn.submit.waitAppCompletion=false  \
             --packages com.databricks:spark-avro_2.10:2.0.1 \
             --jars /home/cesga/jlopez/packages/spark-streaming-kafka-assembly_2.10-1.6.1.2.4.2.0-258.jar \
             --py-files /home/cesga/jlopez/packages/avro-1.8.1-py2.7.egg \
             --name 'SSH attack detector' \             
             ssh_attack_detector.py
```

## How-to install additional Python packages
The simplest way is to use **pip** with the `--user` option:

    pip install --user pymongo


## Using additional compression codec libraries: eg. LZO
If you try to use a codec library without specifying where the codec resides, you will see an error.
For example, if the hadoop-lzo codec file cannot be found during spark-submit, Spark will generate the following message:

    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.

To solve it specify the hadoop-lzo jar file with the --jars option in your job submit command. 

    spark-submit --driver-memory 1G --executor-memory 1G --master yarn-client --jars /usr/hdp/2.4.2.0-258/hadoop/lib/hadoop-lzo-0.6.0.2.4.2.0-258.jar test_read_write.py

## Sending the application in the background

By default when you submit an application the spark-submit command keeps active waiting for application output. To avoid this behaviour use spark.yarn.submit.waitAppCompletion=false:

    spark-submit --conf spark.yarn.submit.waitAppCompletion=false ...



## Dynamic resource allocation
Spark provides a mechanism to dynamically adjust the resources your application occupies based on the workload. This means that your application may give resources back to the cluster if they are no longer used and request them again later when there is demand. This feature is particularly useful if multiple applications share resources in your Spark cluster.

CESGA HDP cluster has this feature enabled so it **automatically expands new executors when they are needed**, instead of fixing them at launch time with --num-executors.

This allows that interactive jobs dynamically add and remove executors during execution.

When you specify the `--num-executors` option dynamic resource allocation is disabled automatically.

## Hadoop Component Versions

The cluster has HDP 2.4.2 with Hadoop 2.7.1

![HDP 2.4.2 component versions](http://hortonworks.com/wp-content/uploads/2016/03/asparagus-hdp25.jpg)

You can also check the versions with:

    hdp-select

## Overriding configuration directory
To specify a different configuration directory other than the default “SPARK_HOME/conf”, you can set SPARK_CONF_DIR. Spark will use the configuration files (spark-defaults.conf, spark-env.sh, log4j.properties, etc) from this directory.

Example:

    export SPARK_CONF_DIR=/home/cesga/jlopez/conf/


## Running an interactive shell
Additionally to jupyter notebooks you can also use a command line interactive shell:

    pyspark --master yarn --num-executors 4 --executor-cores 6

    --num-executors NUM         Number of executors to launch (Default: 2).
    --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode)
    --driver-cores NUM          Number of cores used by the driver, only in cluster mode (Default: 1).
    --executor-memory MEM       Memory per executor (e.g. 1000M, 2G) (Default: 1G).
    --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
    --proxy-user NAME           User to impersonate when submitting the application.


To use ipython instead of python for an interactive session use:

    PYSPARK_DRIVER_PYTHON=ipython pyspark


## Exercises

* Exercise: Modify the "Unit 4 Working with meteorological data 2" notebook and submit it to YARN
* Exercise: Modify the "Unit 5 Working with meteorological data" notebook and submit it to YARN