In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DAG").master("local[*]").config("spark.ui.port", "4042").config("spark.executor.instances", 4).config("spark.executor.cores", 4).config("spark.executor.memory", "512M").getOrCreate()

print(spark.sparkContext.uiWebUrl)
spark

http://gypsum-gpu160.unity.rc.umass.edu:4042


In [16]:
# Disable AQE and Broadcast join

spark.conf.set("spark.sql.adaptive.enabled", False)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", False)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


In [17]:
# Check default Parallism

spark.sparkContext.defaultParallelism

4

In [18]:
# Create dataframes

df_1 = spark.range(4, 200, 2)
df_2 = spark.range(2, 200, 4)

In [19]:
df_2.rdd.getNumPartitions()

4

In [20]:
# Re-partition data

df_3 = df_1.repartition(5)
df_4 = df_2.repartition(7)

In [21]:
df_4.rdd.getNumPartitions()

7

In [22]:
# Join the dataframes

df_joined = df_3.join(df_4, on="id")

In [23]:
# Get the sum of ids

df_sum = df_joined.selectExpr("sum(id) as total_sum")

In [24]:
# View data
df_sum.show()



+---------+
|total_sum|
+---------+
|     4998|
+---------+



                                                                                

In [25]:
# Explain plan

df_sum.explain()

== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[sum(id#122L)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=207]
   +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#122L)])
      +- *(5) Project [id#122L]
         +- *(5) SortMergeJoin [id#122L], [id#124L], Inner
            :- *(2) Sort [id#122L ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(id#122L, 200), ENSURE_REQUIREMENTS, [plan_id=191]
            :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=190]
            :        +- *(1) Range (4, 200, step=2, splits=4)
            +- *(4) Sort [id#124L ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(id#124L, 200), ENSURE_REQUIREMENTS, [plan_id=198]
                  +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=197]
                     +- *(3) Range (2, 200, step=4, splits=4)




In [26]:
# Union the data again to see the skipped stages

df_union = df_sum.union(df_4)

In [27]:
df_union.show()



+---------+
|total_sum|
+---------+
|     4998|
|       38|
|       18|
|       66|
|       70|
|      110|
|      170|
|      154|
|       10|
|       74|
|       62|
|      118|
|      198|
|      158|
|       30|
|       98|
|       82|
|      146|
|      142|
|      150|
+---------+
only showing top 20 rows



                                                                                

In [28]:
# Explain plan

df_union.explain()

== Physical Plan ==
Union
:- *(6) HashAggregate(keys=[], functions=[sum(id#122L)])
:  +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=445]
:     +- *(5) HashAggregate(keys=[], functions=[partial_sum(id#122L)])
:        +- *(5) Project [id#122L]
:           +- *(5) SortMergeJoin [id#122L], [id#124L], Inner
:              :- *(2) Sort [id#122L ASC NULLS FIRST], false, 0
:              :  +- Exchange hashpartitioning(id#122L, 200), ENSURE_REQUIREMENTS, [plan_id=429]
:              :     +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=428]
:              :        +- *(1) Range (4, 200, step=2, splits=4)
:              +- *(4) Sort [id#124L ASC NULLS FIRST], false, 0
:                 +- Exchange hashpartitioning(id#124L, 200), ENSURE_REQUIREMENTS, [plan_id=436]
:                    +- Exchange RoundRobinPartitioning(7), REPARTITION_BY_NUM, [plan_id=435]
:                       +- *(3) Range (2, 200, step=4, splits=4)
+- ReusedExchange [id#143L], Exchange Roun