# Tutorial for High-Level Spark API using PySpark

## INTRODUCTION

### Spark Architecture

**Spark **is a `distributed computing framework` that runs on a cluster of machines. The cluster is managed by a driver program that runs on a master node. The driver program is responsible for distributing the work to the worker nodes and collecting the results from them. The driver program is also responsible for storing the data in the cluster. The data is stored in a data structure called RDD (Resilient Distributed Dataset). RDD is a collection of elements that can be operated on in parallel. RDDs are immutable, which means that once they are created, they cannot be changed. The only way to change an RDD is to create a new RDD from an existing one. RDDs are fault-tolerant, which means that if a node in the cluster fails, the data on that node can be reconstructed from the other nodes. RDDs are also lazy, which means that they are not evaluated until an action is performed on them. This allows Spark to optimize the execution of the RDDs.



![](./images/spark-architecture.png)

`The Driver Program`
This is a single process that creates work for the cluster.

`Spark Context`
This is the main entry point for Spark functionality. It represents a connection to a Spark cluster and can be used to create RDDs, accumulators and broadcast variables on that cluster. It communicates with the cluster manager to allocate resources across applications. It also keeps track of the cluster state: which nodes are alive, which are dead, which are running which tasks, etc. The Spark Context is created by the driver program and is used to create RDDs.

`Executors`
These are multiple processes throughout the cluster that do  the work in paralled. They are responsible for executing the tasks that the driver program assigns to them. They also store the data that the driver program assigns to them. The number of executors is determined by the cluster manager.

The Driver creates hobs from the user's code. `Jobs` are computations that can be executed in paralled. The SC divides jobs into tasks to be executed in the cluster. Tasks from a given job operate on different data subsets called partitions and can be executed in paralled.

A `worker` is a cluster node that can launch execute processes to run tasks. Each executor is alloted a set number of cores that can each run one task at a time. Increasing executors and cores increases cluster parallelism, but also increases the amount of memory used by the cluster.

![](./images/spark-stages.png)

A `stage` is a set of tasks within a job that can be completed on the current local data partition.

A `shuffle` marks the boundary between stages. Subsequent tasks in later stages must wait for that stage to be completed creating a dependency graph. A shuffle is an expensive operation that involves moving data between executors. It is important to minimize the number of shuffles in a job.

A shuffle is:
    <li> *Costly*, requiring data serialization, disk and network I/O. </li>
    <li> *Necessary* when an operation requres data outside the current partition of the task. e.g, Group By with a given key requiring scanning of all partitions.</li>

When Spark performs a shuffle, it redistributes data across the cluster. I employ you to look at the workaround provided by Spark for shuffles.

![](./images/spark-dag-rdd.jpeg)

In [12]:
# Importing Relevant Libraries
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

There are a varied number of options that you can configure when setting up a SparkSession. Let's go over a few of the more common ones:
* **master**: The URL for the cluster SparkContext to connect to master: The URL for the cluster SparkContext to connect to
* **appName**: The name that will be displayed in the Spark cluster UI
* **config**: Configuration for SparkSession. Any key-value pairs in the config will be applied to the session's SparkConf. For example, you can set the spark.sql.shuffle.partitions configuration property to change the number of partitions in joins and aggregations. Or you can set spark.executor.memory to change the amount of memory used per executor process.

In [13]:
# Creating a Spark Session
# Wrapping the session in brackets allows us to chain commands without using a "\" in Python
spark = (SparkSession.builder
        .master("local[*]")
        .appName("Catch-Up Session")
        .getOrCreate())

In [14]:
# Viewing the Spark Session
spark

The SparkSession is the entry point to Spark SQL. It manages the SparkContext that was used to create it, and provides a way to create DataFrames and DataSets. Spark SQL is the Spark module for working with structured data. It allows you to use SQL or the DataFrame/Dataset API to express Spark operations on structured data.

For this exercise, we will be going over how we can read, access and manipulate structured data in Spark using Spark SQL. The data we will be using will be stored in the data directory contained in this repository

## Read Files

There are a lot of similarities between Pandas and Spark SQL when it comes to reading files. The main difference is that Spark SQL is able to read files from a distributed file system, such as HDFS, whereas Pandas is only able to read files from a local file system. Spark SQL is also able to read files from a local file system, but it is not recommended to do so in a production environment.

On top of that, there are also similar functionalities when handling the dataframes produced. Let's explore some of those while also listing the differences.

In [15]:
# Column names to use in the wine dataset
colNames = [
    'target', 'alcohol', 'malic_acid', 'ash', 'alcalinity_of_ash', 'magnesium', 'total_phenols',
    'flavanoids', 'nonflavanoid_phenols', 'proanthocyanins', 'color_intensity', 'hue',
    'od280_od315_of_diluted_wines', 'proline'
]

In [16]:
# Read the data into a variable called wine
wine = spark.read.csv('./data/wine.data', header=False, inferSchema=True).toDF(*colNames)

While reading the csv, we could have gone about it in different manners. For example:
1. `spark.read.csv("data/airports.csv").option("header", "true").option("inferSchema", "true")`

    In this instance, we are chaining the options we want to set to the dataframe. This is a very common way of doing things in Spark SQL.
2. `spark.read.options(header="true", inferSchema="true").csv("data/airports.csv")`

    In this instance, we are passing the options as keyword arguments to the options function. This is also a very common way of doing things in Spark SQL.

Pick what you prefer and stick with it. The important thing is to be **consistent**.

In [17]:
# View the first 5 rows of the wine dataset
wine.show(5)

+------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+
|target|alcohol|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280_od315_of_diluted_wines|proline|
+------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+
|     1|  14.23|      1.71|2.43|             15.6|      127|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92|   1065|
|     1|   13.2|      1.78|2.14|             11.2|      100|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4|   1050|
|     1|  13.16|      2.36|2.67|             18.6|      101|          2.8|      3.24|          

A schema is a description of the structure of your data. It is a list of fields (columns) and their data types, nothing more. It does not contain any data itself. A schema can be applied to a DataFrame, which allows Spark to understand the data in that DataFrame. This allows Spark to run certain optimizations on the data, and allows Spark to compress the data when it is serialized and sent over the network.

In [18]:
# Printing the schema of the wine dataset
wine.printSchema()

root
 |-- target: integer (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- malic_acid: double (nullable = true)
 |-- ash: double (nullable = true)
 |-- alcalinity_of_ash: double (nullable = true)
 |-- magnesium: integer (nullable = true)
 |-- total_phenols: double (nullable = true)
 |-- flavanoids: double (nullable = true)
 |-- nonflavanoid_phenols: double (nullable = true)
 |-- proanthocyanins: double (nullable = true)
 |-- color_intensity: double (nullable = true)
 |-- hue: double (nullable = true)
 |-- od280_od315_of_diluted_wines: double (nullable = true)
 |-- proline: integer (nullable = true)



## Basic Data Cleaning

In [19]:
# Checking for duplicates
assert wine.count() == wine.dropDuplicates().count()

In [20]:
wine.select('target', 'alcohol', 'malic_acid').distinct().show()

+------+-------+----------+
|target|alcohol|malic_acid|
+------+-------+----------+
|     1|  14.02|      1.68|
|     2|  11.79|      2.13|
|     3|  12.81|      2.31|
|     3|  12.82|      3.37|
|     1|   13.2|      1.78|
|     2|  12.37|      1.07|
|     1|  14.23|      1.71|
|     3|  12.36|      3.83|
|     2|  11.66|      1.88|
|     1|  13.05|      1.73|
|     2|  12.69|      1.53|
|     2|  12.47|      1.52|
|     2|  12.51|      1.73|
|     1|  13.16|      2.36|
|     2|  13.67|      1.25|
|     2|  12.29|      2.83|
|     2|  12.29|      1.41|
|     1|  13.63|      1.81|
|     2|  12.29|      1.61|
|     1|  13.72|      1.43|
+------+-------+----------+
only showing top 20 rows



In [21]:
# Checking for nulls


wine.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in wine.columns]).show() 

+------+-------+----------+---+-----------------+---------+-------------+----------+--------------------+---------------+---------------+---+----------------------------+-------+
|target|alcohol|malic_acid|ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity|hue|od280_od315_of_diluted_wines|proline|
+------+-------+----------+---+-----------------+---------+-------------+----------+--------------------+---------------+---------------+---+----------------------------+-------+
|     0|      0|         0|  0|                0|        0|            0|         0|                   0|              0|              0|  0|                           0|      0|
+------+-------+----------+---+-----------------+---------+-------------+----------+--------------------+---------------+---------------+---+----------------------------+-------+



> Derivation for the above code can be found [here](https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe)

**Explanation**: The code loops over all the columns and for each, it filters the null entries and returns them to the count function which tallies them up. Each result gets an alias of the column name and is then unioned with the previous result. The final result is a dataframe with the column names and the number of null entries for each column.

**Conclusion**: None of the columns have null values.

In [22]:
# It's important to break down what happened in the above line of code
# Here is the output of the list comprehension used.
[F.count(F.when(F.isnull(c), c)).alias(c) for c in wine.columns]

[Column<'count(CASE WHEN (target IS NULL) THEN target END) AS target'>,
 Column<'count(CASE WHEN (alcohol IS NULL) THEN alcohol END) AS alcohol'>,
 Column<'count(CASE WHEN (malic_acid IS NULL) THEN malic_acid END) AS malic_acid'>,
 Column<'count(CASE WHEN (ash IS NULL) THEN ash END) AS ash'>,
 Column<'count(CASE WHEN (alcalinity_of_ash IS NULL) THEN alcalinity_of_ash END) AS alcalinity_of_ash'>,
 Column<'count(CASE WHEN (magnesium IS NULL) THEN magnesium END) AS magnesium'>,
 Column<'count(CASE WHEN (total_phenols IS NULL) THEN total_phenols END) AS total_phenols'>,
 Column<'count(CASE WHEN (flavanoids IS NULL) THEN flavanoids END) AS flavanoids'>,
 Column<'count(CASE WHEN (nonflavanoid_phenols IS NULL) THEN nonflavanoid_phenols END) AS nonflavanoid_phenols'>,
 Column<'count(CASE WHEN (proanthocyanins IS NULL) THEN proanthocyanins END) AS proanthocyanins'>,
 Column<'count(CASE WHEN (color_intensity IS NULL) THEN color_intensity END) AS color_intensity'>,
 Column<'count(CASE WHEN (hue I

Great! Since we have no null or duplicate values, the work we need to do is minimal. Let's move on to the next step.
Let's rename a column in the dataframe. Let's focus on the od280_od315_of_diluted_wines column. This column is a bit hard to read, so let's rename it to od280.

In [23]:
# Great example of withColumnRenamed in action
wine = wine.withColumnRenamed('od280_od315_of_diluted_wines', 'od280')

I encourage you to look up how to feature engineer using the `withColumn` function. It is a very useful function that you will be using a lot.

#### Query Data Using SQL

It's possible to create a temporary view of a DataFrame by calling the createOrReplaceTempView method on that DataFrame. This will register the DataFrame as a table in the catalog, which will allow you to run SQL queries on its data. This is a temporary view, so it will only exist for the duration of your SparkSession.

In [24]:
# Example of using views in Spark
wine.createOrReplaceTempView('wine')

# Query for unique values and their distiribution in the target column
spark.sql("""SELECT target, COUNT(*) AS valueCounts FROM wine
          GROUP BY target
          ORDER BY target""").show()

+------+-----------+
|target|valueCounts|
+------+-----------+
|     1|         59|
|     2|         71|
|     3|         48|
+------+-----------+



> Clearly there is a class imbalance in our targe variable especially within class 2.

In [25]:
# Average alcohol content by target rounded off to 2 decimal places
spark.sql( """
          SELECT target, ROUND(AVG(alcohol), 2) AS avgAlcohol FROM wine
          GROUP BY target
          ORDER BY target
          """).show()

+------+----------+
|target|avgAlcohol|
+------+----------+
|     1|     13.74|
|     2|     12.28|
|     3|     13.15|
+------+----------+



I am sure SQL querying brings in a familiarity to the data scientists who have worked with SQL databases. It is still recommended to use the DataFrame API for most of your data manipulation tasks, as it is **more flexible and less error-prone** than SQL queries. 

In [26]:
# It's possible to do the same using the DataFrame API
# Remember the paranthesis just allow us to chain commands without using a "\"
(wine.select(['target', 'alcohol']).groupBy('target').
 agg(F.round(F.avg('alcohol'), 2).alias('avgAlcohol'))
 .orderBy('target')).show()

# the same as 
# wine.select(['target', 'alcohol']).groupBy('target').agg(F.round(F.avg('alcohol'), 2).alias('avgAlcohol')).orderBy('target').show()

+------+----------+
|target|avgAlcohol|
+------+----------+
|     1|     13.74|
|     2|     12.28|
|     3|     13.15|
+------+----------+



## Build a Machine Learning Pipeline

Since we have very little preprocessing done, all we need to do is create a pipeline that will take in the data and output a model. We will be using the `VectorAssembler` to create a vector of all the features and then we will be using the `RandomForestClassifier` model to train our data.

It would also be a good idea to cross validate our model to ensure that we are not overfitting our data.

In [27]:
# Split the data into training and test sets
train, test = wine.randomSplit([0.7, 0.3], seed=23)

In [28]:
assembler = VectorAssembler(inputCols=wine.columns[1:], outputCol='features',
                            handleInvalid='error')

# Instantiate the RandomForestClassifier with the following parameters
rfModel = RandomForestClassifier(featuresCol='features',
                       labelCol='target', predictionCol='prediction',
                       numTrees=50, maxDepth=5, seed=23
                       )

# Instantiate the Evaluation metric
evaluator = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction',
                                              metricName='f1') # Using F1 score as the metric

pipe = Pipeline(stages=[assembler, rfModel])

In [36]:
# Fit the pipeline to the training data
pipeModel = pipe.fit(train)

# Transform the training and test sets
trainResults = pipeModel.transform(train)
testResults = pipeModel.transform(test)

# Use the evaluator to get the F1 score for the training and test sets
trainF1 = evaluator.evaluate(trainResults)
print("The F1 score on the training data is {:.2%}.".format(trainF1))
testF1 = evaluator.evaluate(testResults)
print("The F1 score on the test data is {:.2%}.".format(testF1))

The F1 score on the training data is 100.00%.
The F1 score on the test data is 94.40%.


Given the size of the dataset, it's no surprise we have such good metrics. It would be a good idea to cross validate our model to ensure that we are not overfitting our data.

### Paramater Tuning

In [37]:
params = ParamGridBuilder()\
    .addGrid(rfModel.numTrees, [50, 100, 150])\
    .addGrid(rfModel.maxDepth, [5, 10, 15])\
    .build()
    
# Instantiate the CrossValidator with 5 folds/iterations
cv = CrossValidator(estimator=pipe, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

In [31]:
# Fitting the cross validator to the training data
cvModel = cv.fit(train)

In [32]:
# Get the average F1 metric from the cross validator models.
cvModel.avgMetrics

[0.9835267240329415,
 0.9835267240329415,
 0.9835267240329415,
 0.9835267240329415,
 0.9835267240329415,
 0.9835267240329415,
 0.9738328464819211,
 0.9738328464819211,
 0.9738328464819211]

This is a pretty descent performance. The `cvModel` variable is now saved as the best performing model. We can now use this model to make predictions on our test set.

In [33]:
# Let's examine our best model as of Cross Validation
bestRfModel = cvModel.bestModel.stages[1]

In [34]:
# For the parameters we focused on, what are the best values?
bestRfModel.getNumTrees, bestRfModel.getMaxDepth()

(50, 5)

## Close Spark Session

In [35]:
# spark.stop()