# Project - Databricks-Skew -BroadcastJoin-Solutions

## Overview
This project demonstrates the use of the broadcast join technique to mitigate data skew in Apache Spark join operations. We will join the Online Retail Dataset with a small lookup table, optimize performance using broadcasting, and document the results. This builds on the salting technique from Project 1 by addressing skew in a join context.

## Dataset
- **Source**: [Online Retail Dataset](https://archive.ics.uci.edu/ml/datasets/Online+Retail)
- **Details**: 541,909 records, 44.5 MB, stored at `/FileStore/tables/online_retail.csv`.
- **Lookup Table**: A manually created `customer_regions` DataFrame mapping `CustomerID` to `Region`.

## Environment Setup
- **Platform**: Databricks Community Edition
- **Cluster**: 1 driver, 1 worker
- **Notebook Path**: `/Users/uday91@gmail.com/Project - Databricks-Skew -BroadcastJoin-Solutions`
- **Objective**: Configure the environment and load the dataset for join optimization.

In [0]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast

In [0]:
# Initialize Spark session (automatically managed by Databricks)
spark = SparkSession.builder.appName("Broadcast Join Example").getOrCreate()

# Load the online retail dataset
retail_df = spark.read.csv("/FileStore/tables/online_retail.csv", header=True, inferSchema=True)

# Verify loading
print("Retail DataFrame loaded successfully.")

retail_df.show(10)

In [0]:
# Create a small lookup table for customer regions
customer_regions = spark.createDataFrame([
    (17841, "Europe"),
    (14911, "North America"),
    (14096, "Asia")
], ["CustomerID", "Region"])

# Verify lookup table
print("Customer Regions Lookup Table created successfully.")

In [0]:
# Display the DataFrames to verify
print("Retail DataFrame (first 5 rows):")
retail_df.show(5)
print("Customer Regions Lookup Table:")
customer_regions.show()

# Check schema for reference
print("Retail DataFrame Schema:")
retail_df.printSchema()
print("Customer Regions Schema:")
customer_regions.printSchema()

## Step 2: Perform Initial Join
- **Action**: Perform a normal shuffle join between `retail_df` and `customer_regions` on `CustomerID` to establish a baseline.
- **Outcome**: Verify the join results and observe potential skew in Spark UI.

In [0]:
# Perform a normal shuffle join
joined_df = retail_df.join(customer_regions, "CustomerID", "left_outer")

# Display the first 5 rows of the joined DataFrame
print("Joined DataFrame (first 5 rows):")
joined_df.show(5)

# Check the number of rows to confirm the join
print(f"Total rows in joined DataFrame: {joined_df.count()}")

## Step 2.1: Skew Analysis from Shuffle Join
- **Action**: Analyzed Spark UI for the shuffle join in Cell 8 (Stage 24, Job 16).
- **Observations**: Task durations 0.2-0.5 s (average ~0.375 s), Shuffle Read 19.4-43.4 KiB, no significant skew detected. Possible implicit broadcast of `customer_regions` due to its small size (3 rows).
- **Outcome**: Established baseline performance for comparison with explicit broadcast join.

In [0]:
# Perform a broadcast join
broadcast_joined_df = retail_df.join(broadcast(customer_regions), "CustomerID", "left_outer")

# Display the first 5 rows of the broadcast-joined DataFrame
print("Broadcast Joined DataFrame (first 5 rows):")
broadcast_joined_df.show(5)

# Check the number of rows to confirm the join
print(f"Total rows in broadcast-joined DataFrame: {broadcast_joined_df.count()}")

## Step 2.2: Isolate Null CustomerID for Skew Analysis
- **Action**: Filter the dataset to rows with null CustomerID to force skew analysis in a shuffle join.
- **Outcome**: Verify skew impact and prepare for broadcast join optimization.

In [0]:
# Filter for null CustomerID
null_retail_df = retail_df.filter("CustomerID is null")

# Perform a shuffle join on the null subset
null_joined_df = null_retail_df.join(customer_regions, "CustomerID", "left_outer")

# Display the first 5 rows
print("Joined DataFrame for Null CustomerID (first 5 rows):")
null_joined_df.show(5)

# Check the number of rows
print(f"Total rows in null-joined DataFrame: {null_joined_df.count()}")

## Step 2.3: Skew Analysis for Null Subset
- **Action**: Analyze Spark UI for the shuffle join on null CustomerID data.
- **Outcome**: Document skew evidence for comparison with broadcast join.

In [0]:
# Note: Check Spark UI manually for task durations and shuffle sizes
print("Check Spark UI (Stages tab) for shuffle join on null CustomerID data.")
print(f"Total rows in null-joined DataFrame: {null_joined_df.count()}")

## Performance Comparison: Shuffle Join vs. Broadcast Join

| Metric            | Shuffle Join (Job ID 34) | Broadcast Join (Job ID 39) |
|-------------------|--------------------------|----------------------------|
| Duration          | 0.2 s                    | 37 ms                      |
| Tasks             | 1                        | 1                          |
| Data Skew         | None observed            | None observed              |

**Notes**: The shuffle join (Job ID 34) executed efficiently with a single task, likely due to Spark's auto-broadcast optimization given the small lookup table size (3 rows). The broadcast join (Job ID 39) further confirms this optimization, completing even faster. No significant data skew was observed in either case.

## Step 3: Optimize with Broadcast Join
- **Action**: Use the `broadcast` function to optimize the join on null CustomerID data.
- **Outcome**: Compare performance with the shuffle join and verify data integrity.

In [0]:
# Perform a broadcast join on the null subset
broadcast_null_joined_df = null_retail_df.join(broadcast(customer_regions), "CustomerID", "left_outer")

# Display the first 5 rows
print("Broadcast Joined DataFrame for Null CustomerID (first 5 rows):")
broadcast_null_joined_df.show(5)

# Check the number of rows
print(f"Total rows in broadcast-null-joined DataFrame: {broadcast_null_joined_df.count()}")

## Step 3.1: Broadcast Join Performance Analysis
- **Action**: Analyzed Spark UI for the broadcast join on null CustomerID data.
- **Outcome**: Document performance improvement compared to shuffle join

## Performance Comparison: Shuffle Join vs. Broadcast Join

| Metric            | Shuffle Join (Job ID 34) | Broadcast Join (Job ID 39) |
|-------------------|--------------------------|----------------------------|
| Duration          | 0.2 s                    | 37 ms                      |
| Tasks             | 1                        | 1                          |
| Data Skew         | None observed            | None observed              |

**Notes**: The shuffle join (Job ID 34) executed efficiently with a single task, likely due to Spark's auto-broadcast optimization given the small lookup table size (3 rows). The broadcast join (Job ID 39) further confirms this optimization, completing even faster. No significant data skew was observed in either case.

In [0]:
# Note: Update with Spark UI findings
print("Check Spark UI (Stages tab) for broadcast join on null CustomerID data.")
print(f"Total rows in broadcast-null-joined DataFrame: {broadcast_null_joined_df.count()}")

## Conclusion

This project analyzed the effectiveness of broadcast joins in mitigating data skew compared to shuffle joins, using the Online Retail Dataset and a small customer_regions lookup table. Key findings include:

- **Broadcast Join Efficiency**: The broadcast join (e.g., Job ID 39, 37 ms) outperformed the shuffle join (e.g., Job ID 34, 0.2 s) due to the elimination of shuffle overhead, completing with a single task.
- **Optimization Impact**: Spark's auto-broadcasting optimized the shuffle join into a broadcast-like execution, as evidenced by the single-task performance and skipped stages in Job ID 34, likely triggered by the 3-row lookup table being below the default 10MB threshold.
- **Data Skew**: No significant data skew was observed in either join type, attributed to the small dataset size and Spark's optimization. Manual skew analysis was not required due to this optimization.

