In [1]:
# Cell 1: Setup & S3 Config

import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Initialize Spark with S3 Support
spark = SparkSession.builder \
    .appName("Jupyter_AQE_Lesson") \
    .master("spark://spark-master:7077") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .getOrCreate()

# Set Log Level to WARN to reduce noise
spark.sparkContext.setLogLevel("WARN")

print(f"Web UI: http://localhost:4040")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8670421e-f400-46cf-983b-25e0b1c492ff;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.0.0 in central
	found io.delta#delta-storage;3.0.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 166ms :: artifacts dl 5ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	io.delta#delta-spark_2.12;3.0.0 from central in [default]
	io.delta#delta-storage;3.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 f

Web UI: http://localhost:4040


In [2]:
# Cell 2: Prepare the DataFrames


# 1. Load the Big Table (10M rows, skewed, fragmented)
SALES_PATH = "s3a://test-bucket/sales_data_skewed"
df_sales = spark.read.format("delta").load(SALES_PATH)

# 2. Create the Small Table (Dimension)
countries = [("USA", "United States"), ("IND", "India"), ("UK", "United Kingdom"), ("Other", "Rest of World")]
df_countries = spark.createDataFrame(countries, ["country_code", "country_name"])

print("Data Loaded.")

25/11/29 21:55:56 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

Data Loaded.


In [None]:
# Cell 3: RUN 1 - AQE DISABLED (The "Bad" Way) Run this, then immediately check the Spark UI Stages tab.

print("--- RUNNING WITH AQE DISABLED ---")

# Disable AQE
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Force defaults to simulate a 'static' plan
spark.conf.set("spark.sql.shuffle.partitions", "200") 
# Disable Broadcast to force a SortMergeJoin (which is sensitive to skew)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

start = time.time()

# The Heavy Join
result = (
    df_sales
    .join(df_countries, "country_code")
    .groupBy("country_name")
    .agg(sum("amount").alias("total_sales"))
)

# Trigger Action
result.collect()

print(f"‚è±Ô∏è Duration (AQE OFF): {time.time() - start:.2f} seconds")
print("üëâ Go check http://localhost:4040 -> Stages. Look for '200' tasks and Skew.")

### Observation: Data Skew
Notice the tasks have 200 partitions even though we do not need 200 partitions.
Since we defined 200 partitions, and the physical plan generated had 200 partitions, 200 partitions were spawned.
Most of them did nothing since there was no data. You can open the spark UI and check yourself.


The physical plan generated in this case was :

```== Physical Plan ==
* HashAggregate (14)
+- Exchange (13)
   +- * HashAggregate (12)
      +- * Project (11)
         +- * SortMergeJoin Inner (10)
            :- * Sort (5)
            :  +- Exchange (4)
            :     +- * Filter (3)
            :        +- * ColumnarToRow (2)
            :           +- Scan parquet  (1)
            +- * Sort (9)
               +- Exchange (8)
                  +- * Filter (7)
                     +- * Scan ExistingRDD (6)
```


  
![Skew Stage Graph](screenshots/01_AQE_disabled_data_skew.jpg)

In [5]:
# Cell 4: RUN 2 - AQE ENABLED (The "Optimized" Way) Run this after the previous one finishes.

print("--- RUNNING WITH AQE ENABLED ---")

# Enable AQE Features
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# We keep the "bad" static defaults to prove AQE fixes them at runtime
spark.conf.set("spark.sql.shuffle.partitions", "200") 
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

start = time.time()

# The Exact Same Query
result = (
    df_sales
    .join(df_countries, "country_code")
    .groupBy("country_name")
    .agg(sum("amount").alias("total_sales"))
)

# Trigger Action
result.collect()

print(f"‚è±Ô∏è Duration (AQE ON): {time.time() - start:.2f} seconds")
print("üëâ Go check http://localhost:4040. Look for Coalesced partitions (fewer than 200 tasks).")

--- RUNNING WITH AQE ENABLED ---




‚è±Ô∏è Duration (AQE ON): 7.40 seconds
üëâ Go check http://localhost:4040. Look for Coalesced partitions (fewer than 200 tasks).


                                                                                

### Observation: Data Skew
Notice the tasks have 1/2 partitions even though we started with 200 partitions.
Since we do not need 200 partitions, the AQE judged that and reduced the partitions.
![Skew Stage Graph](screenshots/02_AQE_enabled_so_1_writing_partition.jpg)

Below is the physical plan generated, and the overridden plan that was generated via AQE


```== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   * HashAggregate (20)
   +- AQEShuffleRead (19)
      +- ShuffleQueryStage (18), Statistics(sizeInBytes=152.0 B, rowCount=4)
         +- Exchange (17)
            +- * HashAggregate (16)
               +- * Project (15)
                  +- * SortMergeJoin Inner (14)
                     :- * Sort (7)
                     :  +- AQEShuffleRead (6)
                     :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=305.2 MiB, rowCount=1.00E+7)
                     :        +- Exchange (4)
                     :           +- * Filter (3)
                     :              +- * ColumnarToRow (2)
                     :                 +- Scan parquet  (1)
                     +- * Sort (13)
                        +- AQEShuffleRead (12)
                           +- ShuffleQueryStage (11), Statistics(sizeInBytes=184.0 B, rowCount=4)
                              +- Exchange (10)
                                 +- * Filter (9)
                                    +- * Scan ExistingRDD (8)
+- == Initial Plan ==
   HashAggregate (30)
   +- Exchange (29)
      +- HashAggregate (28)
         +- Project (27)
            +- SortMergeJoin Inner (26)
               :- Sort (23)
               :  +- Exchange (22)
               :     +- Filter (21)
               :        +- Scan parquet  (1)
               +- Sort (25)
                  +- Exchange (24)
                     +- Filter (9)
                        +- Scan ExistingRDD (8)
```

