In [55]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable Adaptive Query Execution

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### spark.sql.adaptive.coalescePartitions.enabled

In [39]:
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### When enabled, Spark dynamically reduces the number of shuffle partitions at runtime based on the actual size of data. Normally, spark.sql.shuffle.partitions is fixed (default = 200). If your data is much smaller or larger than expected, the number of partitions may be inefficient (too many tiny files or too few huge partitions).

##### df.repartition(N) is a pre-shuffle hint: it tells Spark how many partitions to use before a shuffle.
##### With AQE + coalescePartitions, Spark reoptimizes the shuffle stage after execution stats are collected.
##### If Spark finds that some partitions are too small, it will coalesce them (merge into fewer partitions).
##### So the final partition count after shuffle may be different from what you requested.

In [41]:
df = spark.range(0, 1000).repartition(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
result = df.groupBy("id").count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
result.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

200

In [44]:
df.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

50

## spark.sql.adaptive.skewJoin.enabled

##### This handles data skew during joins. If one partition is much larger than others, Spark splits that skewed partition into smaller chunks and redistributes them, so no single task becomes a bottleneck.

In [67]:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
df1 = spark.range(0, 1000).withColumnRenamed("id", "key")
df2 = spark.range(0, 10000).withColumn("key", (col("id") % 2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
joined = df1.join(df2, on="key")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
df1.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1

In [71]:
df2.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1

In [72]:
joined.rdd.getNumPartitions()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1

In [61]:
joined.explain(True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Parsed Logical Plan ==
'Join UsingJoin(Inner,Buffer(key))
:- Project [id#117L AS key#119L]
:  +- Range (0, 1000, step=1, splits=Some(1))
+- Project [id#121L, (id#121L % cast(2 as bigint)) AS key#123L]
   +- Range (0, 10000, step=1, splits=Some(1))

== Analyzed Logical Plan ==
key: bigint, id: bigint
Project [key#119L, id#121L]
+- Join Inner, (key#119L = key#123L)
   :- Project [id#117L AS key#119L]
   :  +- Range (0, 1000, step=1, splits=Some(1))
   +- Project [id#121L, (id#121L % cast(2 as bigint)) AS key#123L]
      +- Range (0, 10000, step=1, splits=Some(1))

== Optimized Logical Plan ==
Project [key#119L, id#121L]
+- Join Inner, (key#119L = key#123L)
   :- Project [id#117L AS key#119L]
   :  +- Range (0, 1000, step=1, splits=Some(1))
   +- Project [id#121L, (id#121L % 2) AS key#123L]
      +- Filter isnotnull((id#121L % 2))
         +- Range (0, 10000, step=1, splits=Some(1))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#119L, id#121L]
   +- BroadcastH

## spark.sql.adaptive.join.enabled

##### This lets Spark change the join strategy at runtime (e.g., switch from Sort-Merge Join → Broadcast Hash Join) if actual data sizes are smaller/larger than expected.

In [79]:
spark.conf.set("spark.sql.adaptive.join.enabled", "true")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
small_df = spark.range(0, 100).withColumnRenamed("id", "key")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [81]:
large_df = spark.range(0, 10000000).withColumn("key", (col("id") % 2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [82]:
joined = small_df.join(large_df, "key")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [83]:
joined.explain(False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#161L, id#163L]
   +- BroadcastHashJoin [key#161L], [key#165L], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#789]
      :  +- Project [id#159L AS key#161L]
      :     +- Range (0, 100, step=1, splits=1)
      +- Project [id#163L, (id#163L % 2) AS key#165L]
         +- Filter isnotnull((id#163L % 2))
            +- Range (0, 10000000, step=1, splits=1)

In [87]:
joined.explain(mode="cost") # mode = formatted, codegen, cost

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Optimized Logical Plan ==
Project [key#161L, id#163L], Statistics(sizeInBytes=67.1 GiB)
+- Join Inner, (key#161L = key#165L), Statistics(sizeInBytes=89.4 GiB)
   :- Project [id#159L AS key#161L], Statistics(sizeInBytes=800.0 B)
   :  +- Range (0, 100, step=1, splits=Some(1)), Statistics(sizeInBytes=800.0 B, rowCount=100)
   +- Project [id#163L, (id#163L % 2) AS key#165L], Statistics(sizeInBytes=114.4 MiB)
      +- Filter isnotnull((id#163L % 2)), Statistics(sizeInBytes=76.3 MiB)
         +- Range (0, 10000000, step=1, splits=Some(1)), Statistics(sizeInBytes=76.3 MiB, rowCount=1.00E+7)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [key#161L, id#163L]
   +- BroadcastHashJoin [key#161L], [key#165L], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#789]
      :  +- Project [id#159L AS key#161L]
      :     +- Range (0, 100, step=1, splits=1)
      +- Project [id#163L, (id#163L % 2) AS key#165L