# Setup the environment to submit PySpark jobs on YARN

The basis is to submit Spark jobs to a Spark cluster. 

* [How to submit a PyDeequ job from Jupyter Notebook to a Spark/YARN
](https://stackoverflow.com/questions/68796543/how-to-use-a-pydeequ-job-from-jupyter-notebook-to-a-spark-yarn/68796544#68796544)
* [Submitting pyspark script to a remote Spark server?
](https://stackoverflow.com/questions/54641574/submitting-pyspark-script-to-a-remote-spark-server)

In [None]:
import os
import sys
import gc

In [None]:
dir = os.path.realpath(os.getcwd())

---
# HDFS permission

For a non-spark user to be able to submit a job, login to the HDFS node as the hadoop user to run:

```
hadoop fs -mkdir /user/${USERNAME}
hadoop fs -chown ${USERNAME} /user/${USERNAME}
hadoop fs -chmod g+w /user/${USERNAME}
```

Otherwise an error:
```
21/08/15 21:15:28 ERROR SparkContext: Error initializing SparkContext.
org.apache.hadoop.security.AccessControlException: Permission denied: user=${USERNAME}, access=WRITE, inode="/user":hadoop:hadoop:drwxrwxr-x
```

---
# Environment variables

## HADOOP_CONF_DIR

Copy the **HADOOP_CONF_DIR** from the Hadoop/YARN master node and set the ```HADOOP_CONF_DIR``` environment variable locally to point to the directory.

* [Launching Spark on YARN
](http://spark.apache.org/docs/latest/running-on-yarn.html#launching-spark-on-yarn)

> Ensure that **HADOOP_CONF_DIR** or **YARN_CONF_DIR** points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration. 

In [None]:
os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"

## PYTHONPATH

Refer to the **pyspark** modules to load from the ```$SPARK_HOME/python/lib``` in the Spark installation.

* [PySpark Getting Started](https://spark.apache.org/docs/latest/api/python/getting_started/install.html)

> Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:

```
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH
```

Alternatively install **pyspark** with pip or conda locally which installs the Spark runtime libararies (for standalone).

* [Can PySpark work without Spark?](https://stackoverflow.com/questions/51728177/can-pyspark-work-without-spark)

> As of v2.2, executing pip install pyspark will install Spark. If you're going to use Pyspark it's clearly the simplest way to get started. On my system Spark is installed inside my virtual environment (miniconda) at lib/python3.6/site-packages/pyspark/jars  
> PySpark has a Spark installation installed. If installed through pip3, you can find it with pip3 show pyspark. Ex. for me it is at ~/.local/lib/python3.8/site-packages/pyspark. This is a standalone configuration so it can't be used for managing clusters like a full Spark installation.

In [None]:
# os.environ['PYTHONPATH'] = "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip:/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
sys.path.extend([
    "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
    "/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])

## PYSPARK_SUBMIT_ARGS

Specify the [spark-submit](https://spark.apache.org/docs/3.1.2/submitting-applications.html#launching-applications-with-spark-submit) parameters.

```
./bin/spark-submit \
  --class <main-class> \
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]
```

The ```conf``` paramters are [Spark properties](https://spark.apache.org/docs/latest/configuration.html#available-properties) e.g. ```spark.executor.memory```

Alternatively, use [SparkSession.builder](https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession).

```
spark = SparkSession.builder\
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()
```

### Example

```
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode client \
  --supervise \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000
```

### Environment variable

```
export PYSPARK_SUBMIT_ARGS='--master yarn --executor-memory 20G --total-executor-cores 100 --num-executors 5 --driver-memory 2g --executor-memory 2g pyspark-submit'
```

---
# PyDeequ

* [Deequ - Unit Tests for Data](https://github.com/awslabs/deequ)

>Deequ is a library built on top of Apache Spark for defining "unit tests for data", which measure data quality in large datasets. We are happy to receive feedback and contributions.  
>Python users may also be interested in PyDeequ, a Python interface for Deequ. You can find PyDeequ on GitHub, readthedocs, and PyPI.

* [Maven repository com.amazon.deequ](https://mvnrepository.com/artifact/com.amazon.deequ/deequ)

* [Testing data quality at scale with PyDeequ](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/)

## Deequ jar to classpath
To use the PyDeequ, need the deequ jar file. Download the one for the Spark/Deequ version from the [Maven repository com.amazon.deequ](https://mvnrepository.com/artifact/com.amazon.deequ/deequ).

Specify them to the Spark jar properties as specified in:

* [Deequ Analyzers Basic Tutorial](https://github.com/awslabs/python-deequ/blob/master/tutorials/analyzers.ipynb)
* [TypeError: 'JavaPackage' object is not callable when running pydeequ](https://github.com/awslabs/python-deequ/issues/1)

Note that need all of them. Missing any will cause errors.
```
spark = SparkSession.builder\
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config("spark.driver.extraClassPath", classpath) \         <-----
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \   <-----
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \     <-----
    .getOrCreate()
```

In [None]:
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{dir}/jar/deequ-2.0.0-spark-3.1.jar"

!wget -q -O $classpath $deequ_jar
#!ls $classpath

In [None]:
!pip install -q pydeequ
os.environ['SPARK_VERSION'] = '3.1.2'
import pydeequ

---
# Spark Session

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder\
    .master('yarn') \
    .config('spark.submit.deployMode', 'client') \
    .config("spark.driver.extraClassPath", classpath) \
    .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
    .config('spark.debug.maxToStringFields', 100) \
    .config('spark.executor.memory', '2g') \
    .getOrCreate()

---
# AWS Product Review data

In [None]:
df = spark.read.csv(
    path=f"file:///{dir}/data/amazon_product_reviews.csv.gz",
    header=True,
)
df.printSchema()

In [None]:
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()

# Cleanup

In [None]:
del spark
gc.collect()