<a href="https://colab.research.google.com/github/victorviro/Big-Data/blob/main/PySpark_RDD_example_Jobs%2C_stages_and_tasks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Example: Jobs, stages, and tasks

As we’ve discussed in the notebook [Introduction to Spark](https://nbviewer.org/github/victorviro/Big-Data/blob/main/Introduction_to_Spark.ipynb):

- RDDs support 2️⃣ **two types of operations**: [🎬⚙️ actions and transformations](https://nbviewer.org/github/victorviro/Big-Data/blob/main/Introduction_to_Spark.ipynb#3).

- The components of ⚙️ execution of a Spark application are: [**Jobs, stages, and tasks**](https://nbviewer.org/github/victorviro/Big-Data/blob/main/Introduction_to_Spark.ipynb#%E2%9A%99%EF%B8%8F-Jobs,-stages-and-tasks-)

We also discussed that, during executing, Spark **translates a ↕️ logical representation** (RDD Lineage) **into a physical execution plan** by performing several ♻️ optimizations and merging multiple operations into tasks (↔️[DAG](https://nbviewer.org/github/victorviro/Big-Data/blob/main/Introduction_to_Spark.ipynb#%E2%86%94%EF%B8%8F-DAG-(Directed-Acyclic-Graph)-)). Understanding how user code compiles ⬇ down into the 🎚 level of physical execution is an advanced concept, but it helps us in tuning and 🐞 debugging applications.

To demonstrate Spark’s phases of execution, we’ll walk through an **example** application of a **simple log analysis**. 

The table of contents of this notebook is as follow:

1. [📥 Running Pyspark in Colab](#1)
2. [🏋️ Exercise](#2)
    1. [⚡ Caching](#2.1)
    2. [📋 Spark execution summary](#2.2)
5. [📕 References](#3)

# 📥 Running Pyspark in Colab <a name="1"></a>

To run spark in Colab, we need to first install all the dependencies, i.e. Apache Spark with hadoop, Java 8 and Findspark to locate the spark in the system. Follow the steps to install the dependencies:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz
!tar xvf "spark-3.0.3-bin-hadoop3.2.tgz"
!pip install -q findspark

Now that we installed Spark and Java, we set the environment path which enables us to run Pyspark.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop3.2"

Finally, let's run a local spark session to test our installation:

In [None]:
!pip install pyspark

In [None]:
import findspark
findspark.init('/content/spark-3.0.3-bin-hadoop3.2')
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

print(spark)

<pyspark.sql.session.SparkSession object at 0x7f2e6b83c550>


# 🏋️ Exercise <a name="2"></a>

For **input data**, we’ll use a text file that consists of log messages of varying 🎚 degrees of severity. We want to compute **how many log messages appear at each 🎚 level of severity**.


Let's 📥 download and print its first lines.

In [None]:
!wget -q https://raw.githubusercontent.com/victorviro/Big-Data/main/files/logs.log
!head -5 logs.log

2015-10-18 18:01:47,978 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1445144423722_0020_000001
2015-10-18 18:01:48,963 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:
2015-10-18 18:01:48,963 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Kind: YARN_AM_RM_TOKEN, Service: , Ident: (appAttemptId { application_id { id: 20 cluster_timestamp: 1445144423722 } attemptId: 1 } keyId: -127633188)
2015-10-18 18:01:49,228 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Using mapred newApiCommitter.
2015-10-18 18:01:50,353 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: OutputCommitter set in config null


In [None]:
input = sc.textFile('logs.log')
for line in input.take(2):
    print(line)

2015-10-18 18:01:47,978 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Created MRAppMaster for application appattempt_1445144423722_0020_000001
2015-10-18 18:01:48,963 INFO [main] org.apache.hadoop.mapreduce.v2.app.MRAppMaster: Executing with tokens:


As we can see, each element of the `input` RDD is a string representing a line of the logs file, that is a log message.

Now let's ✂️ split each log message into words.

In [None]:
tokenized = input.filter(lambda x: len(x) > 0).map(lambda x: x.split(" "))

Note that the `map()` operation does not mutate the existing `input` RDD. Instead, it returns a 👉 pointer to an entirely new RDD. `input` RDD can still be reused later in the program.

Each element of the `tokenized` RDD is a list of strings representing a log message of the file.

In [None]:
for line in tokenized.take(2):
    print(line)

['2015-10-18', '18:01:47,978', 'INFO', '[main]', 'org.apache.hadoop.mapreduce.v2.app.MRAppMaster:', 'Created', 'MRAppMaster', 'for', 'application', 'appattempt_1445144423722_0020_000001']
['2015-10-18', '18:01:48,963', 'INFO', '[main]', 'org.apache.hadoop.mapreduce.v2.app.MRAppMaster:', 'Executing', 'with', 'tokens:']


Finally, we 🔢 count the number of messages for each 🎚 level of severity.

In [None]:
from operator import add
counts = tokenized.map(lambda x: (x[2], 1)).reduceByKey(add)

[('INFO', 1040), ('WARN', 808), ('FATAL', 2), ('ERROR', 150)]

To **display the lineage** of an RDD, Spark provides the `toDebugString()` method. Let's display the lineage of the `input` RDD.

In [None]:
print(input.toDebugString().decode("utf-8"))

(2) logs.log MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
 |  logs.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


We created the `input` RDD by calling `sc.textFile()`. The lineage gives us some clues as to what `sc.textFile()` does since it reveals which RDDs were created in the `textFile()` function. We can see that it creates a `HadoopRDD` and then performs a map on it to create the returned RDD.

Now let's display the lineage of the `counts` RDD.

In [None]:
print(counts.toDebugString().decode("utf-8"))

(2) PythonRDD[8] at collect at <ipython-input-10-e54d4d27e4f7>:3 []
 |  MapPartitionsRDD[7] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[6] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(2) PairwiseRDD[5] at reduceByKey at <ipython-input-10-e54d4d27e4f7>:2 []
    |  PythonRDD[4] at reduceByKey at <ipython-input-10-e54d4d27e4f7>:2 []
    |  logs.log MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  logs.log HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


The ↕️ lineage of the `counts` RDD is more complicated. That RDD has several 👆 ancestors since there are other operations that were performed on 🔝 top of the `input` RDD, such as additional maps, filtering, and reduction. The lineage of `counts` is displayed graphically in the following figure.

<center><img src='https://i.ibb.co/hg4jHx1/rdd-lineage2.png'></center>

Before we perform an action, these RDDs simply store metadata. To 🔛 trigger computation, let’s call an 🎬 action on the `counts` RDD by `collect()`ing it to the 🚗 driver.

In [None]:
counts.collect()

[('INFO', 1040), ('WARN', 808), ('FATAL', 2), ('ERROR', 150)]

We get the number of log messages that appear at each 🎚 level of severity.

Internally, Spark’s scheduler creates a **↔️ physical execution plan** to compute the RDDs needed for performing the 🎬 action. When we call `collect()` on the RDD, every partition of the RDD must be materialized and then transferred to the 🚗 driver program. Spark’s scheduler starts at the final RDD being computed (in this case, `counts`) and works ⤴️ backward to find what it must compute.

In the simplest case, the scheduler outputs a computation stage for each RDD in this graph where the stage has tasks for each partition in that RDD. Those stages are then executed in ⬅️ reverse order to compute the final required RDD. In more complex cases, the physical set of stages will not be an exact 🖇 1:1 correspondence to the RDD graph. This can occur when the scheduler performs 🔗 **pipelining (collapsing of multiple RDDs into a single-stage)**. Pipelining occurs when RDDs can be computed from their parents without data movement (no data 🔀 shuffling over the network is required).

The ↕️ lineage output shown previously (by using the `toDebugString()` method) uses indentation levels to show where RDDs are going to be 🔗 pipelined together into physical stages. RDDs that exist at the same level of indentation as their parents will be pipelined during physical execution. For instance, when we are computing `counts`, even though there are a large number of parent RDDs, there are only two levels of indentation shown. This indicates that the physical execution will require only **two stages**. The pipelining, in this case, is because there are several filter and map operations in sequence. The right half of the following figure shows the two stages of execution that are required to compute the `counts` RDD.

<center><img src='https://i.ibb.co/M1gcY95/spark-lineage-to-dag.png'></center>

If we visit the application’s web UI, we will see that two stages occur to fulfill the `collect()` 🎬 action.

The **set of stages** produced for a particular action is termed a **job**. In each case **when we invoke 🎬 actions** such as `count()`, we are creating a job composed of one or more stages.

Once the stage graph is defined, tasks are created and dispatched to an internal scheduler, which varies depending on the 🚀 deployment mode being used. Stages in the physical plan can 🖇 depend on each other, based on the RDD ↕️ lineage, so they will be executed in a specific order. For instance, a stage that outputs 🔀 shuffle data must occur ⬅️ before one that relies on that data being present.

A physical **stage will launch tasks that each do the same thing but on specific partitions** of data. Each task internally performs the same steps:

1. ⬅️ **Fetching its input**, either from data storage (if the RDD is an input RDD), an existing RDD (if the stage is based on already cached data), or shuffle outputs.

2. **Performing the ⚙️ operation** necessary to compute RDD(s) that it represents. For instance, executing `filter()` or `map()` functions on the input data, or performing grouping or reduction.

3. ✍️ **Writing output to** a shuffle, to external storage, or back to the 🚗 driver (if it is the final RDD of an 🎬 action such as `count()`).

## ⚡ Caching <a name="2.1"></a>

In addition to 🔗 pipelining, 

- Spark’s internal scheduler may truncate the lineage of the RDD graph if an existing RDD has already been persisted in memory or on disk. Spark can “short-circuit” in this case and just begin computing based on the persisted RDD. 

- This truncation can happen also when an RDD is already materialized as a side effect of an earlier 🔀 shuffle, even if it was not explicitly `persist()`ed. This is an under-the-hood ♻️ optimization that takes advantage of the fact that Spark shuffle outputs are written to disk, and exploits the fact that many times portions of the RDD graph are recomputed.

To see the effects of caching on physical execution, let’s ⚡ cache the `counts` RDD and see how that truncates the execution graph for future 🎬 actions. If we revisit the UI, we'll see that caching ⬇ reduces the number of stages required when executing future computations. Calling `collect()` a few more times will reveal only one stage executing to perform the action.

In [None]:
# Cache the RDD
counts.cache()

PythonRDD[8] at collect at <ipython-input-10-e54d4d27e4f7>:3

In [None]:
# The first subsequent execution after caching will again require 2 stages
counts.collect()

[('INFO', 1040), ('WARN', 808), ('FATAL', 2), ('ERROR', 150)]

In [None]:
# This execution will only require a single stage
counts.collect()

## 📋 Spark execution summary <a name="2.2"></a>

To summarize, the following phases occur during Spark execution:
- 👨‍💻 **User** code **defines** the Spark **application**
- Operations on RDDs create new RDDs that refer back to their parents, thereby creating the ↔️ **lineage graph**.
- 🎬 Actions force **translation** of the logical representation **to a physical execution plan (DAG)**.
- **When we call an 🎬 action** on an RDD it must be computed. This requires computing its parent RDDs as well. Spark’s scheduler submits a **job** to compute all needed RDDs. That job will **have one or more stages**, which are parallel waves of computation **composed of tasks**. Each stage will correspond to one or more RDDs in the DAG. A single-stage can correspond to multiple RDDs due to 🔗 pipelining.
- **Tasks are ⌚️ scheduled** and executed on a cluster.
- Stages are processed in order, with individual tasks launching to compute segments of the RDD. Once the final stage is finished in a job, the action is complete 🔚.

In a given Spark application, this entire sequence of steps may occur many times in a continuous fashion as new RDDs are created.

# 📕 References <a name="3"></a>

- [Spark documentation](https://spark.apache.org/docs/latest/cluster-overview.html)

- [Book "Learning Spark: Lightning-Fast Big Data Analysis"](https://www.oreilly.com/library/view/learning-spark/9781449359034/)