Here's an example demonstrating Adaptive Query Execution (AQE) in Apache Spark within a Databricks environment. This example will show how AQE can improve the performance of a Spark job by adapting execution plans at runtime.

Setup
Ensure that AQE is enabled in your Databricks environment. You can enable it by setting the following Spark configuration options:

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")


Example Scenario
Consider a Spark job that performs a join operation on two large datasets. Without AQE, Spark makes static optimization decisions based on statistics available before execution. With AQE, Spark can dynamically optimize the join based on runtime data statistics.

Sample Data
Let's create two DataFrames to simulate the join operation.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import random

# Initialize Spark session
spark = SparkSession.builder.appName("AQEExample").getOrCreate()

# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Create a DataFrame with skewed data
data1 = [(i, random.randint(1, 100)) for i in range(1, 100001)]
df1 = spark.createDataFrame(data1, ["id", "value1"])

# Create another DataFrame
data2 = [(i, random.randint(1, 100)) for i in range(50000, 150000)]
df2 = spark.createDataFrame(data2, ["id", "value2"])

# Display the data
df1.display(5)
df2.display(5)


id,value1
1,66
2,100
3,43
4,14
5,13
6,50
7,10
8,58
9,35
10,25


id,value2
50000,50
50001,97
50002,73
50003,18
50004,82
50005,40
50006,6
50007,74
50008,64
50009,18


Join Operation Without AQE
First, let's perform the join operation without AQE to see how Spark handles it by default.

In [0]:
# Disable AQE for this example
spark.conf.set("spark.sql.adaptive.enabled", "false")

# Perform join operation
result_without_aqe = df1.join(df2, "id")

# Action to trigger the join
result_without_aqe.count()


Out[3]: 50001

Join Operation With AQE
Now, let's perform the same join operation with AQE enabled and compare the performance.

In [0]:
# Enable AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")

# Perform join operation
result_with_aqe = df1.join(df2, "id")

# Action to trigger the join
result_with_aqe.count()   

Out[4]: 50001