# Understand dag in spark web ui

In this section, we will learn how to read the DAG in the spark web ui

As you know, spark uses lazy execution model, then you run a transformation, there will be no actual job that execute the transformation. A **spark job** is triggered until you execute an action (e.g. show, collect, take, count, etc.)


When a spark job is executed, In spark web UI (Jobs tab), you will find the job grouped by its status (e.g. running, succeed, failed.). Click on the job description, you will land on the **details for job** page. In this page, you can view the execution DAG of all stages. **A DAG represents a chain of RDD transformations**

In [1]:
from pyspark.sql import SparkSession
import os
from pyspark.sql import functions as f

In [2]:
local = True

if local:
    spark = SparkSession.builder \
        .master("local[4]") \
        .appName("UnderstandSparkUIDAG") \
        .getOrCreate()
else:
    spark = SparkSession.builder \
        .master("k8s://https://kubernetes.default.svc:443") \
        .appName("UnderstandSparkUIDAG") \
        .config("spark.kubernetes.container.image", "inseefrlab/jupyter-datascience:py3.9.7-spark3.2.0")\
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", os.environ['KUBERNETES_SERVICE_ACCOUNT'])\
        .config("spark.executor.instances", "4")\
        .config("spark.executor.memory", "8g")\
        .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])\
        .getOrCreate()

22/03/02 15:54:23 WARN Utils: Your hostname, pliu-SATELLITE-P850 resolves to a loopback address: 127.0.1.1; using 172.22.0.33 instead (on interface wlp3s0)
22/03/02 15:54:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/02 15:54:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
df=spark.range(1,100000)
df.show(5)

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+
only showing top 5 rows



In [5]:
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


## Job details
After running the above code, open your spark UI (if you run in local mode, the default url is http://localhost:4040/jobs/). You should see a job, click on it, you should see a page like below figure
![Spark_UI_JOB_details](../img/spark_ui_job_details.png)

In the **event timeline**, you can check the lifecycle of each executor (e.g. added, removed) and stages of this job. Executors are per application, when an application is terminated, all executors belongs to this application will be removed. In cluster mode, each executor is an individual jvm process.
The above figure shows the timeline of local mode, you can notice, there is only one driver executor. If we run it in cluster mode, we
will have 4 executor (configured in spark session creation). Below figure is an example
 ![Spark_UI_JOB_timeline](../img/spark_ui_job_timeline.png)


In the **DAG Visualization**, the **blue shaded boxes represent to the Spark operation** that the user calls in the code. The **dots in these boxes represent RDDs created in the corresponding operations**.


In our case, we have two boxes. The name of the first box is **WholeStageCodegen**, because we create a dataframe that involves java code generation to build underlying RDDs. Then second box is **mapPartitionsInternal**, because action show() collects data over each of the RDD's partitions



## Stage details

If you want to have more details of a stage, you just click on that stage. You will see the following stage details page:

![spark_ui_stage_details](../img/spark_ui_stage_details.png)

And the event timeline can tell you how much time each step of the stage spent. Below figure is an example:

![spark_ui_stage_details_et](../img/spark_ui_stage_details_et.png)




## Spark DAG stage operation name
- **WholeStageCodegen**: happens when you run computations on DataFrames and generates Java code to build underlying RDDs
- **mapPartitions**: happens when you run computation over each of the RDD's partitions (parallelization friendly, high performance)
-

## Shuffle operations

Below command will create a dataframe first (with default partition number), then we change the partition number of the dataframe to 8. To trigger the run of the transformation, we use take(2) to return two random rows of the dataframe


In [23]:
df1=spark.range(1,1000000,2)
df1.show()
df1_split7=df1.repartition(8)
df1_split7.take(2)

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
| 21|
| 23|
| 25|
| 27|
| 29|
| 31|
| 33|
| 35|
| 37|
| 39|
+---+
only showing top 20 rows



[Row(id=221717), Row(id=153517)]

Now, you should see the following DAG:
![spark_ui_job_shuffle](../img/spark_ui_job_shuffle.png)

Now you can notice, this time we have two stages inside the job. And we have a new type of operation called **exchange**. This operation is caused by a **shuffle**, is an operation in which data is exchanged (hence the name of the operation) between all the executors in the cluster. The more massive your data and your cluster is, the more expensive this shuffle will be, because sending data over network takes time. When you optimize your spark job, you just eliminate **shuffle** as much as possible.

Spark decomposes a job into stages by using shuffles, when spark encounter a shuffle, it will create a new stage.


## A complete example

With below example, we create two dataframe and change their partition. Then we join the two dataframe. And finally we execute an aggregation function on the joined dataframe.

In [22]:
ds1 = spark.range(1, 10000000)
ds2 = spark.range(1, 10000000, 2)

ds3 = ds1.repartition(7)

ds4 = ds2.repartition(9)

ds5 = ds3.selectExpr("id * 5 as id")

joined = ds5.join(ds4, "id")

sum = joined.selectExpr("sum(id)")

sum.show()



+-------------+
|      sum(id)|
+-------------+
|5000000000000|
+-------------+



                                                                                

The above queries will generate the following DAG:
![spark_ui_job_complexe](../img/spark_ui_job_complexe.png)

- Stage 21: represent ds1 = spark.range(1, 10000000)
- Stage 22: represent ds2 = spark.range(1, 10000000, 2)
- Stage 23: represent ds4 = ds2.repartition(9),  here we have a new stage because repartition requires a shuffle, and we only re-organize the data, so we only have exchange operations in this stage
- Stage 24: represent ds3 = ds1.repartition(7), ds5 = ds3.selectExpr("id * 5 as id"), you can notice we have a WholeStageCodegen operation between two exchange operation. Because "id * 5 as id" requires generation of java code to do calculation over RDDs.
- Stage 25: represent joined = ds5.join(ds4, "id"). Join also requires a shuffle, because it needs to place the rows which have the same key on the same executor to do the join.
- Stage 26: represents sum = joined.selectExpr("sum(id)"). The shuffle is caused by the sum(). Because Spark will need to bring all the data to a single executor in order to perform the final sum. So be careful on aggregation functions, they usually involve some form of moving data between executors (aka. shuffle).


## Cache data

Spark does not save intermediate RDD/dataframe/DS, so when you use a intermediate dataframe, spark loads data from source and repeat the calculation each time. To avoid this, you can use cache() or persist() to store intermediate RDD/DF/DS.

The difference is, by default cache() method saves RDD to memory (MEMORY_ONLY) whereas persist() method is used to store it to the user-defined storage level.

In [4]:
df=spark.range(1,100000)
df.cache()
df_persist=df.select((f.col("id")*5).alias("id"))

22/03/02 16:09:17 WARN CacheManager: Asked to cache already cached data.


The cached RDD is represented by a green dot/rectangle. Below figure shows the above cache dataframe in DAG.
![spark_ui_cache](../img/spark_ui_cache.png)

You can also find the cached dataframe in storage tab. Below figure is an example
![spark_ui_storage](../img/spark_ui_storage.png)




## Persist
Persist is similar to the cache, you only need to add a storage level.

A storage level can have following possible values:
- MEMORY_ONLY,
- MEMORY_AND_DISK,
- MEMORY_ONLY_SER,
- MEMORY_AND_DISK_SER,
- DISK_ONLY,
- MEMORY_ONLY_2,
- MEMORY_AND_DISK_2

In [6]:
from pyspark import StorageLevel
df_persist.persist(StorageLevel.MEMORY_AND_DISK)
df_persist.show()

+---+
| id|
+---+
|  5|
| 10|
| 15|
| 20|
| 25|
| 30|
| 35|
| 40|
| 45|
| 50|
| 55|
| 60|
| 65|
| 70|
| 75|
| 80|
| 85|
| 90|
| 95|
|100|
+---+
only showing top 20 rows



In [7]:
# you can unpersist a cached RDD/DF/DS with the following command
df_persist.unpersist()

DataFrame[id: bigint]

In [8]:
# you can unpersist a cached RDD/DF/DS with the following command
# You can consider cache is just a wrapper of persist(StorageLevel.MEMORY_ONLY)
df.unpersist()

DataFrame[id: bigint]