# Project-Databricks-Skew-AQE-Solutions

## Objective
Demonstrate how Adaptive Query Execution (AQE) in Apache Spark mitigates data skew in join operations using the Online Retail Dataset in Databricks. This project covers:
- Configuring AQE and skew join optimization.
- Performing a skewed join between a large dataset and a small table.
- Analyzing AQE’s impact via the Spark UI.
- Documenting findings for professional presentation or interviews.

## Dataset
- **Online Retail Dataset**: `/FileStore/tables/online_retail.csv`
- Contains e-commerce transaction data with columns like `CustomerID`, `InvoiceNo`, `Quantity`, etc.
- Likely to have skew (e.g., some `CustomerID` values have many transactions).

## Best Practices
- Modular code blocks for readability and reusability.
- Descriptive Markdown cells for each step.
- Structured for GitHub upload or interview presentation.

---

## Step 1: Configure AQE for Skew Handling
- Enable AQE and skew join optimization to address data skew.
- Set `spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes` to 256MB to split large partitions.
- Keep default `spark.sql.adaptive.coalescePartitions.minPartitionSize` (1MB) unless small partitions are detected.
- These settings are applied at the session level to guide AQE’s runtime optimizations (e.g., splitting skewed partitions, merging small ones).

In [0]:
# Configure AQE
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")
# Optional: Uncomment if small partitions are observed
# spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "10MB")

# Verify configurations
print("AQE Enabled:", spark.conf.get("spark.sql.adaptive.enabled"))
print("Skew Join Enabled:", spark.conf.get("spark.sql.adaptive.skewJoin.enabled"))
print("Skew Partition Threshold:", spark.conf.get("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes"))
print("Min Partition Size:", spark.conf.get("spark.sql.adaptive.coalescePartitions.minPartitionSize"))

AQE Enabled: true
Skew Join Enabled: true
Skew Partition Threshold: 256MB
Min Partition Size: 1048576b


## Step 2: Load the Online Retail Dataset
- Load the CSV file into a DataFrame (`retail_df`).
- Infer schema and include headers for accurate column names.
- Display schema and sample data to verify loading.

In [0]:
# Load the Online Retail Dataset
retail_df = spark.read.csv("/FileStore/tables/online_retail.csv", header=True, inferSchema=True)

# Display schema and sample data
retail_df.printSchema()
retail_df.show(5, truncate=False)

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)

+---------+---------+-----------------------------------+--------+------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/10 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |12/1/10 8:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/10 8:26|2.7

## Step 3: Create a Small Customer Table
- Create a small DataFrame (`customer_df`) with `CustomerID` and `Name` to simulate a skewed join.
- This table is intentionally small to mimic real-world scenarios where one table is smaller than the other, potentially exacerbating skew in the larger table.

In [0]:
# Create a small customer table
customer_data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
customer_df = spark.createDataFrame(customer_data, ["CustomerID", "Name"])

# Display customer table
customer_df.show()

+----------+-----+
|CustomerID| Name|
+----------+-----+
|         1|Alice|
|         2|  Bob|
|         3|Cathy|
+----------+-----+



## Step 4: Perform Skewed Join with AQE
- Join `retail_df` with `customer_df` on `CustomerID` using a left outer join.
- The Online Retail Dataset may have skewed `CustomerID` values (e.g., some customers have many transactions).
- AQE should detect and mitigate skew by splitting large partitions (threshold: 256MB).

In [0]:
# Perform the skewed join
skewed_join_df = retail_df.join(customer_df, "CustomerID", "left_outer")

# Display sample results
skewed_join_df.show(5, truncate=False)

+----------+---------+---------+-----------------------------------+--------+------------+---------+--------------+----+
|CustomerID|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate |UnitPrice|Country       |Name|
+----------+---------+---------+-----------------------------------+--------+------------+---------+--------------+----+
|17850     |536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |12/1/10 8:26|2.55     |United Kingdom|null|
|17850     |536365   |71053    |WHITE METAL LANTERN                |6       |12/1/10 8:26|3.39     |United Kingdom|null|
|17850     |536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |12/1/10 8:26|2.75     |United Kingdom|null|
|17850     |536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |12/1/10 8:26|3.39     |United Kingdom|null|
|17850     |536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |12/1/10 8:26|3.39     |United Kingdom|null|
+----------+---------+---------+

## Step 5: Analyze Spark UI
- Access the Spark UI via **Clusters > Spark UI** or the job execution link in the cell output.
- **SQL Tab**: Verify `AdaptiveSparkPlan` and `SkewJoin` nodes in the query plan, indicating AQE is active and handling skew.
- **Stages Tab**: Check task durations and input sizes to confirm AQE balanced the workload (e.g., split a 500MB partition into 100MB tasks).
- Document findings below after reviewing the UI.

## Step 6: Spark UI Findings
- **AQE Status**: Confirmed `AdaptiveSparkPlan (24)` in the SQL query plan, indicating AQE was active.
- **Skew Handling**: No `SkewJoin` node detected; the dataset (48 rows from `retail_df`, 3 rows from `customer_df`) was too small (7.0 KiB total) to exceed the 256 MB skew threshold.
- **Performance Impact**: 
  - **Task Durations**: Stage 9 (join) completed in 0.3s with 1 task, reflecting efficient execution for the small dataset.
  - **Input Sizes**: Left side (`retail_df`) processed 7.0 KiB (48 rows), right side (`customer_df`) 96.0 B (3 rows); no splitting occurred due to size.
- **Partition Merging**: Not applicable; the single task suggests no need for merging given the limited data.
- **Observation**: AQE optimized the join by switching to a `BroadcastHashJoin`, leveraging the small `customer_df` size, completing in 4s overall. Skew handling wasn’t triggered due to the educational dataset size, aligning with our learning goal.