<a href="https://colab.research.google.com/github/tijazz/Big-Data/blob/main/SparkNotebooks/spark-in-parallel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallel Processing with Executors

### Introduction

As we know, Spark allows us to store data in memory in a distributed cluster of nodes.  As we'll see in this lesson, that cluster is organized with a **driver node**, which is our entry point into the cluster, as well as **worker nodes**, where tasks are carried out on the various partitions of data.

<img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/spark_cluster_disk.jpg?raw=1" width="80%">

It's really the worker nodes, whose software is called an executor, where both partitioning of the data, and simultaneous querying of those partitions occurs.  In this lesson, we'll take a deeper look at these worker nodes, and see how they are able to allow us to process data in parallel.

### Creating the Spark Context

Before we can get to our executors, the first step we need to perform is create our Spark Context.   Our Spark context, is how we interact with our driver, which is our entrypoint into our Spark cluster.

In [1]:
!pip install pyspark --quiet

[K     |████████████████████████████████| 281.4 MB 33 kB/s 
[K     |████████████████████████████████| 198 kB 42.2 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark import SparkContext, SparkConf

In [14]:
conf = SparkConf().setAppName("films").setMaster("local[3]")
sc = SparkContext.getOrCreate(conf=conf)

> Now with the code above, we just set the name of our Spark cluster, `films` and we specified that we will be running our cluster on our local computer (as opposed to something like AWS), and that our cluster will have two cores (with `local[2]`).

That last component is quite important.  This is because, with Spark if one main feature is storage in memory, the other is that Spark performs our queries in parallel.  And by setting the number of cores in our cluster we are determining the amount of parallelization when we perform our queries.

We'll focus on this feature of Spark in this lesson, but to do so we'll need to take a deeper look at the worker nodes -- where all of this parallelization occurs.

### Looking at an Executor: CPU Cores 

As we know, when we read our data into Spark, we can store that data distributed through worker nodes located in the cluster.  

> Now in Spark, *the software* operating on each worker node is referred to as an **executor**. The computer is the worker node.  Each orange box above represents an executor.  

So below we can see that when we read in our movies dataset from the `s3` bucket, these records are distributed across the executors.

> <img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/cluster_executor.jpg?raw=1" width='80%'>

Now above we see that our dataset is partitioned into three different partitions, one for each node.  And then when we say look for a specific record, each executor will get to work looking for the data.  But really, on each executor, we often have multiple partitions of our data -- and what constrains our ability to partition in our data is the number of cores in all of executors.  

If we think about it, it makes sense that the number of partitions of our data is determined by the number of cores.  This is because even though we may only have one CPU doing the work in an executor, if there are multiple cores, then multiple processes can be performed simultaneously.  And because our data is in memory, we can partition our dataset, so that each core can process a subdivision of the data.  

So when we look at an executor, a main thing to consider is the number of cores available to us to partition our data.

<img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/executor_closer.jpg?raw=1" width="40%">

### Seeing Parallelization in action

We can get a sense of this parallelization if we pass our data into Spark.  In fact one way to feed our data into spark is with a method called parallelize. 

> For example, we can start with a list of movies.

In [18]:
movies = ['Shukran', 'Captain Hook', 'Escapade', 'Shazam!', 'Captain Marvel', 'Escape Room', 'How to Train Your Dragon: The Hidden World']

Ok, from here, let's distribute our data across the executors of our cluster.  We can do see by passing our data into the `parallelize` method.

And from there, we move this data into Spark with the following:

In [19]:
movies_rdd = sc.parallelize(movies)
movies_rdd

ParallelCollectionRDD[4] at readRDDFromFile at PythonRDD.scala:274

> And then we can look at the number of cores in that data.

In [20]:
movies_rdd.getNumPartitions()

2

So we can see that Spark automatically partitioned our data, with one partition for each core in the cluster.

> Remember that we specified the number of cores when we create our spark context.

```python
conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)
```

The importance of this is now we can perform operations across four different partitions of the dataset simultanously.  So if we want to find a matching film record, we can search across four different partitions simultaneously.

In [7]:
movies_rdd.filter(lambda movie: movie == 'Captain Marvel').collect()

['Captain Marvel']

> So in the line above, our data was first split up four ways, and then we looked for Captain Marvel on each slice of the data.

### Looking at an Executor: Memory

So we just saw that one piece of hardware that can determine the processing of our data is the number of cores in per executor.  Another way the hardware of an executor can constrain how we process data is with the available memory. 

<img src="https://github.com/jigsawlabs-student/pyspark-rdds/blob/main/executor_closer.jpg?raw=1" width="40%">

The reason why memory can impact our *processing* of data, is because our memory in an executor is divided into *working memory* and *storage*.

> And by default, both working and storage are allocated 50% of total memory on a node.  

And this is important to consider because if we decide to persist our data in memory, we may consume some of the working memory needed to say filter through our data.

### Summary

In this lesson, we saw how Spark can allow us to process our data in parallel through our executors.  In Spark we have one executor operating on each worker node, and those executors have one or more CPUs which have one or more cores.  Spark partitions our data and allocate a partition to each core on a cluster.  Then when we run a query, like looking for a record, each core can query it's partitioned data.     

So we saw that the parallel processing capabilities of our Spark cluster is constrained by the number of cores across our cluster's executors.  We also saw that this processing is constrained by the amount of available memory in each executor.  This is because in Spark, memory on each executor is divided into memory for storage of data, and working memory -- needed when performing queries.  And when too much memory is consumed with storage it can leave less data needed when processing that data.

Finally, in this lesson, we saw how to create a cluster, parallelize our data, and query a cluster.  We created our cluster with the following:

In [30]:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("films").setMaster("local[2]")
sc = SparkContext.getOrCreate(conf=conf)

> By specifying `setMaster("local[2]")`, above we said specified to distribute our tasks across two different cores.  

And now with our Spark Context set up, we could feed a Python list into a dataset distributed across our Spark Cluster with the `parallelize` function.

In [31]:
movies = ['Shazam!', 'Captain Marvel', 'Escape Room', 'How to Train Your Dragon: The Hidden World']

In [32]:
movies_rdd = sc.parallelize(movies)

And we can see the number of partitions with the following.

In [33]:
movies_rdd.getNumPartitions()

2

Finally, we used filter to search through our data.

In [34]:
movies_rdd.filter(lambda movie: movie == 'Captain Marvel').collect()

['Captain Marvel']

### Resources

* [Spark Internals Gitbook](https://books.japila.pl/apache-spark-internals/overview/)

* [Drivers and Executors Knoldus Blog](https://blog.knoldus.com/understanding-the-working-of-spark-driver-and-executor/)

* [Drivers and Executors StackOverflow](https://stackoverflow.com/questions/32621990/what-are-workers-executors-cores-in-spark-standalone-cluster)

* [Presenting RDDs Berkeley Paper](https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf)

* [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

* [RDDs Simplified](https://vishnuviswanath.com/spark_rdd)

* [Databricks RDDs](https://databricks.com/glossary/what-is-rdd)

* [Databricks best practices gitbook](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/index.html)