## Join Strategy in Spark

In Apache Spark, joins are a critical operation for combining data from two or more datasets based on a related column. 

Spark provides several types of join strategies, and understanding these strategies can help you optimize performance and resource usage.

**Potential Interview questions:**
- What are the join strategies in Spark?
- Why is join expensive/wide dependency transformations?
- Difference between shuffle hash join and shuffle sort-merge join?
- When do we need broadcast join?

**Broadcas**
- The driver will collect all the data partitions for table A, and broadcast the entire table A to all of the executors. This avoids an all-to-all communication (shuffle) between the executors which is expensive. 
- The data partitions for table B do not need to move unless explicitly told to do so.

**Shuffle**
- Shuffling in Spark refers to the process of redistributing data across the cluster, typically required when transformations like groupByKey, reduceByKey, join, or distinct are performed. 
- During shuffling, Spark moves data between partitions and worker nodes to ensure that related data ends up in the same partition for further computation. This process is essential but can be expensive in terms of performance.
- All the executors will communicate with every other executor to share the data partition of table A and table B. All the records with the same join keys will then be in the same executors. 
- As this is an all-to-all communication, this is expensive as the network can become congested and involve I/O. After the shuffle, each executor holds data partitions (with the same join key) of table A and table B.

### Key join strategies in Spark:
- Shuffle sort-merge join
- Shuffle hash join
- Broadcast hash join
- Cartesian join
- Broadcast nested loop join

#### Shuffle sort-merge join:
- The Shuffle Sort-Merge Join is Spark’s default join strategy for large datasets in an equi-join scenario.

- **How it works:**
  - Spark shuffles data from both datasets based on the join key to ensure matching keys are colocated on the same node.
  - Each partition of the shuffled data is then sorted by the join key.
  - Finally, a merge operation is performed on the sorted data to produce the joined result.
- **When to use:**
  - Best for large datasets where broadcast join is not possible.
  - Suitable for equi-joins
- **Advantages:**
  - Scales well for very large datasets.
  - Does not require datasets to fit in memory.
- **Disadvantages:**
  - Expensive due to shuffling and sorting.

**NOTE:**
- This is Spark's default join for large tables when no broadcast is applicable.
- You can disable it using **`spark.sql.join.preferSortMergeJoin = false.`**

In [0]:
# Example in PySpark:

df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob"), (3, "Charlie")], ["id", "name"])
df2 = spark.createDataFrame([(1, "HR"), (2, "Engineering"), (3, "Sales")], ["id", "dept"])

# Shuffle Sort-Merge Join (default for large equi-joins)
result = df1.join(df2, on="id", how="inner")
result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#40L, name#41, dept#45]
   +- SortMergeJoin [id#40L], [id#44L], Inner
      :- Sort [id#40L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#40L, 200), ENSURE_REQUIREMENTS, [plan_id=168]
      :     +- Filter isnotnull(id#40L)
      :        +- Scan ExistingRDD[id#40L,name#41]
      +- Sort [id#44L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#44L, 200), ENSURE_REQUIREMENTS, [plan_id=169]
            +- Filter isnotnull(id#44L)
               +- Scan ExistingRDD[id#44L,dept#45]




### Shuffle hash join:
- A join strategy that uses hashing instead of sorting.

- **How it works:**
  - Both datasets are shuffled by the join key.
  - A hash table is created in memory for one of the datasets usually for the smaller one.
  - Matching rows are found by probing the hash table.

- **When to use:**
  - Large datasets when you want to avoid sorting (e.g., when the data is not pre-sorted).

- **Advantages:**
  - Faster than sort-merge for smaller partitions.
  - Avoids the sorting overhead.
  
- **Disadvantages:**
  - Memory-intensive because it builds hash tables.
  - Requires shuffling, which can be costly.


**NOTE:**
- Can be more efficient than sort-merge for certain data distributions.
- Controlled by the configuration `spark.sql.join.preferSortMergeJoin.`

In [0]:
# Example in PySpark:

spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")  # Prefer Hash Join

result = df1.join(df2, on="id", how="inner")  # Shuffle Hash Join
result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#74L, name#75, dept#79]
   +- SortMergeJoin [id#74L], [id#78L], Inner
      :- Sort [id#74L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#74L, 200), ENSURE_REQUIREMENTS, [plan_id=408]
      :     +- Filter isnotnull(id#74L)
      :        +- Scan ExistingRDD[id#74L,name#75]
      +- Sort [id#78L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#78L, 200), ENSURE_REQUIREMENTS, [plan_id=409]
            +- Filter isnotnull(id#78L)
               +- Scan ExistingRDD[id#78L,dept#79]




### Broadcast hash join
- A highly optimized join strategy where the smaller dataset is broadcast to all nodes.

- **How it works:**
  - The smaller dataset is broadcast (replicated) to all nodes in the cluster.
  - Each node joins its local partition of the larger dataset with the broadcasted dataset.
- **When to use:**
  - When one dataset is small enough to fit in memory.
  - The default threshold for broadcast is 10 MB, but this can be configured.
- **Advantages:**
  - Eliminates shuffling of the larger dataset.
  - Extremely fast for small-large table joins.
- **Disadvantages:**
  - Limited by the size of the smaller dataset.
  - Out-of-memory errors if the broadcasted dataset is too large.

**NOTE:**
- This is the fastest join for small-large datasets.
- The broadcast size threshold is controlled by `spark.sql.autoBroadcastJoinThreshold`

In [0]:
# Example in PySpark:
from pyspark.sql.functions import broadcast

small_df = spark.createDataFrame([(1, "HR"), (2, "Engineering")], ["id", "dept"])
result = df1.join(broadcast(small_df), on="id", how="inner")
result.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#74L, name#75, dept#101]
   +- BroadcastHashJoin [id#74L], [id#100L], Inner, BuildRight, false, true
      :- Filter isnotnull(id#74L)
      :  +- Scan ExistingRDD[id#74L,name#75]
      +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=489]
         +- Filter isnotnull(id#100L)
            +- Scan ExistingRDD[id#100L,dept#101]




### Cartesian Join:
- A join that produces a cross-product of two datasets.

- **How it works:**
  - Every row from the first dataset is paired with every row from the second dataset.
  - This results in a dataset of size `n × m` (where `n` and `m` are the sizes of the input datasets).
- **When to use:**
  - When a cross-product is explicitly required, such as for combinatorial computations.

- **Advantages:**
  - Can handle cases where no join keys are available.
- **Disadvantages:**
  - Extremely expensive for large datasets.
  - Easily produces massive datasets that are difficult to process.

**NOTE:**
- Rarely used due to its computational cost.
- Must be used with caution, especially for large datasets.


In [0]:
# Example in PySpark:
df1 = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
df2 = spark.createDataFrame([("HR",), ("Engineering",)], ["dept"])

result = df1.crossJoin(df2)
result.show()
result.explain()

+---+-----+-----------+
| id| name|       dept|
+---+-----+-----------+
|  1|Alice|         HR|
|  1|Alice|Engineering|
|  2|  Bob|         HR|
|  2|  Bob|Engineering|
+---+-----+-----------+

== Physical Plan ==
CartesianProduct
:- *(1) Scan ExistingRDD[id#107L,name#108]
+- *(2) Scan ExistingRDD[dept#111]




### Broadcast Nested Loop Join
- A join strategy for non-equi joins or when no join key is specified, but one dataset is small enough to be broadcast.

- **How it works:**
  - The smaller dataset is broadcast to all nodes.
  - A nested loop iterates over each row of the larger dataset and matches rows based on the condition

- **When to use:**
  - For non-equi joins (e.g., range joins like `A.id < B.id`).
  - When no natural join key is available, but one dataset is small.

- **Advantages:**
  - Allows for flexible join conditions.
  - Works with small datasets.

- **Disadvantages:**
  - Computationally expensive due to nested iteration.
  - Not suitable for large datasets.


In [0]:
# Example in PySpark:

small_df = spark.createDataFrame([(1, "HR"), (2, "Engineering")], ["id", "dept"])

# Non-equi join using Broadcast Nested Loop Join
result = df1.join(broadcast(small_df), df1.id != small_df.id, "inner")
result.show()
result.explain()


+---+-----+---+-----------+
| id| name| id|       dept|
+---+-----+---+-----------+
|  1|Alice|  2|Engineering|
|  2|  Bob|  1|         HR|
+---+-----+---+-----------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastNestedLoopJoin BuildRight, Inner, NOT (id#107L = id#129L), true
   :- Filter isnotnull(id#107L)
   :  +- Scan ExistingRDD[id#107L,name#108]
   +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=750]
      +- Filter isnotnull(id#129L)
         +- Scan ExistingRDD[id#129L,dept#130]




## Inteview Questions:

### Why is join an expensive (wide dependency) transformation?

Joins in Spark are considered expensive because they involve wide dependencies, meaning that one output partition depends on data from multiple input partitions. This often requires data shuffling, which can significantly impact performance due to the following reasons:

**Data Shuffling:**
- During a join, Spark often needs to redistribute data across the cluster to ensure that rows with matching keys are colocated on the same partition.
- This network-intensive operation involves serializing, transferring, and deserializing large volumes of data.
**Sorting:**
- In the case of a Sort-Merge Join, data in each partition needs to be sorted by the join key before merging, adding computational overhead.
**Memory Overhead:**
- For hash-based joins, a hash table is created in memory, which can cause memory pressure if the dataset is large.
**Stragglers:**
- Imbalanced data distribution can lead to straggler tasks, where some partitions take much longer to process, delaying the entire job.

### Difference between Shuffle Hash Join and Shuffle Sort-Merge Join

| **Aspect**             | **Shuffle Hash Join**                                      | **Shuffle Sort-Merge Join**                                    |
|-------------------------|-----------------------------------------------------------|----------------------------------------------------------------|
| **Dataset Size**        | Suitable for small to medium-sized datasets.              | Suitable for large datasets that don't fit in memory.          |
| **Shuffling**           | Requires shuffling both datasets by the join key.         | Requires shuffling both datasets and sorting by the key.       |
| **Processing**          | Builds a hash table for one partition and probes it with the other. | Performs a merge operation after sorting the partitions.       |
| **Memory Requirement**  | Higher memory usage since it builds hash tables in memory. | Lower memory usage compared to hash join but CPU-intensive due to sorting. |
| **Join Type**           | Efficient for equi-joins (exact matches).                 | Optimized for equi-joins but required for sorted data.          |
| **Performance**         | Faster for small-medium datasets, but can fail with memory pressure. | More stable and scalable for very large datasets.              |
| **Failure Handling**    | Can cause OutOfMemoryError if hash table is too large.     | Handles large datasets more gracefully.                        |
| **Configuration**       | Controlled by disabling sort-merge joins (`spark.sql.join.preferSortMergeJoin = false`). | Default join strategy for large datasets in Spark.             |


#### Why do we need Broadcast Hash Join?

Broadcast Hash Join is used to improve the performance of join operations when one of the datasets is small enough to fit in memory. Instead of shuffling the larger dataset, Spark broadcasts (replicates) the smaller dataset to all nodes, enabling each node to perform the join locally.

**Key Points:**
- Reduces expensive shuffling of the larger dataset.
- Ideal for small-large table joins where the smaller table is significantly smaller than the larger table.
- It is much faster than other join strategies when applicable.

### How does Broadcast Hash Join work?

- Spark broadcasts the smaller dataset to all nodes in the cluster.
- Each node in the cluster keeps a copy of the smaller dataset in memory.
- The join operation is performed locally on each partition of the larger dataset using the broadcasted dataset.
- A hash table is built from the broadcasted dataset to efficiently match rows from the larger dataset.

**Steps:**
- Broadcast the smaller dataset to all worker nodes.
- Build a hash table for the smaller dataset.
- For each row in the larger dataset, probe the hash table for matching rows.

### Difference between Broadcast Hash Join and Shuffle Hash Join?

| **Feature**          | **Broadcast Hash Join**                       | **Shuffle Hash Join**                  |
|-----------------------|-----------------------------------------------|----------------------------------------|
| **Dataset Size**      | One dataset must be small enough to fit in memory. | Both datasets can be large.            |
| **Shuffling**         | No shuffling of the larger dataset.          | Requires shuffling of both datasets.   |
| **Performance**       | Faster for small-large table joins.          | Slower due to the shuffle overhead.    |
| **Hash Table**        | Built on the broadcasted dataset.            | Built on the shuffled partitions.      |
| **Use Case**          | Small-large joins.                           | Medium to large dataset joins.         |


### How can we change the broadcast size of a table?

The broadcast size of a table is controlled by the configuration property:
- **`spark.sql.autoBroadcastJoinThreshold`**
- Default Value: 10 MB (in Spark).

Set the configuration in your Spark session or Spark SQL:
- **`spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50MB")`**

Use Case:
- Increase this value if you have larger datasets that can fit in memory and benefit from broadcast join.
- Decrease it to avoid broadcasting overly large datasets that may cause memory issues.