# **Broadcast Joins in PySpark**
###### Broadcast joins (also known as Map-side joins) are a type of optimization technique used when joining a large dataset with a small dataset. The smaller dataset is broadcast to all executor nodes, making the join operation more efficient.

##### **When to Use Broadcast Joins:**
###### 1. One DataFrame is much smaller than the other
###### 2. The smaller DataFrame can fit in memory
###### 3. You want to avoid shuffle operations

###### Here's an example of how to implement broadcast joins:

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast

# Initialize Spark Session
spark = SparkSession.builder.appName("Broadcast Join Example").getOrCreate()

# Create sample DataFrames
# Large DataFrame
large_df = spark.createDataFrame([
    (1, "Product A"),
    (2, "Product B"),
    (3, "Product C"),
    (4, "Product D")
], ["product_id", "product_name"])

# Small DataFrame (good candidate for broadcasting)
small_df = spark.createDataFrame([
    (1, "Category 1"),
    (2, "Category 2")
], ["product_id", "category"])

# Method 1: Using broadcast hint
broadcast_join_df = large_df.join(broadcast(small_df), "product_id")
broadcast_join_df.explain()  # Shows the execution plan with broadcast
broadcast_join_df.show()

# Method 2: Using configuration
# Set broadcast threshold (default is 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)  # 10MB in bytes

# Join will automatically use broadcast if small_df is below threshold
auto_broadcast_join_df = large_df.join(small_df, "product_id")
auto_broadcast_join_df.explain()
auto_broadcast_join_df.show()

# Disable broadcasting if needed
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

StatementMeta(, fee2b172-d722-4d24-b54c-74a2e3116f71, 12, Finished, Available, Finished)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#816L, product_name#817, category#821]
   +- BroadcastHashJoin [product_id#816L], [product_id#820L], Inner, BuildRight, false
      :- Filter isnotnull(product_id#816L)
      :  +- Scan ExistingRDD[product_id#816L,product_name#817]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=1109]
         +- Filter isnotnull(product_id#820L)
            +- Scan ExistingRDD[product_id#820L,category#821]


+----------+------------+----------+
|product_id|product_name|  category|
+----------+------------+----------+
|         1|   Product A|Category 1|
|         2|   Product B|Category 2|
+----------+------------+----------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [product_id#816L, product_name#817, category#821]
   +- SortMergeJoin [product_id#816L], [product_id#820L], Inner
      :- Sort [product_id#816L ASC NULLS FIRST], false, 0
      :  +

### Performance Comparison:

In [11]:
import time

# Large dataset simulation
large_data = [(i, f"Product_{i}") for i in range(100000)]
small_data = [(i, f"Category_{i%5}") for i in range(100)]

large_df = spark.createDataFrame(large_data, ["id", "product"])
small_df = spark.createDataFrame(small_data, ["id", "category"])

# Regular join
start_time = time.time()
regular_join = large_df.join(small_df, "id")
regular_join.count()
regular_time = time.time() - start_time

# Broadcast join
start_time = time.time()
broadcast_join = large_df.join(broadcast(small_df), "id")
broadcast_join.count()
broadcast_time = time.time() - start_time

print(f"Regular Join Time: {regular_time:.2f} seconds")
print(f"Broadcast Join Time: {broadcast_time:.2f} seconds")

StatementMeta(, fee2b172-d722-4d24-b54c-74a2e3116f71, 13, Finished, Available, Finished)

Regular Join Time: 1.04 seconds
Broadcast Join Time: 0.57 seconds


In [12]:
# Key broadcast configurations
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)  # Set threshold to 10MB
spark.conf.set("spark.sql.shuffle.partitions", 200)  # Default shuffle partitions

StatementMeta(, fee2b172-d722-4d24-b54c-74a2e3116f71, 14, Finished, Available, Finished)

## PySpark Broadcast Join Best Practices

### Size Consideration
- **Optimal Size**: Broadcast smaller DataFrame (< 10MB by default)
- **Data Distribution**: Ensure even distribution across nodes

In [13]:
# Example of size checking before broadcast
def should_broadcast(df):
    # Get size estimate in bytes
    size_bytes = df._jdf.queryExecution().optimizedPlan().stats().sizeInBytes()
    size_mb = size_bytes / (1024 * 1024)
    return size_mb < 10  # Default threshold is 10MB

small_df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
if should_broadcast(small_df):
    result = large_df.join(broadcast(small_df), "id")

StatementMeta(, fee2b172-d722-4d24-b54c-74a2e3116f71, 15, Finished, Available, Finished)

### Configuration Options:

In [14]:
# Key broadcast configurations
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)  # Set threshold to 10MB
spark.conf.set("spark.sql.shuffle.partitions", 200)  # Default shuffle partitions

StatementMeta(, fee2b172-d722-4d24-b54c-74a2e3116f71, 16, Finished, Available, Finished)

### Effectiveness
- **Smaller DataFrame**: Broadcast joins are most effective when one DataFrame is significantly smaller than the other.
- **Avoid Expensive Shuffling**: Use broadcast joins to avoid expensive shuffling operations.
- **Sufficient Memory**: Ensure you have enough memory on executor nodes to hold the broadcast DataFrame.