## Skew finding and mitigation strategies

Salting

Broadcast join - if broadcast join is applied on the skewed columns then AQE switches to sort merge join in runtime

Range partitioning - range partitioning can be used to minimize partition scans but it can make skew worse if single range partition contains skewed data

Custom partitioner (high-level pseudocode)

| Strategy | When it works best | When it fails / limitations | Cost / Downside |
|----------|--------------------|-----------------------------|------------------|
| Salting | When a small set of join/aggregation keys (hot keys) causes skew — you add a salt to spread the heavy key across partitions. Example: single userId with 90% rows. | Fails if skew is due to many moderately-heavy keys (not a single hot key); does not help range query patterns. If not applied consistently it breaks deterministic joins. | Increases data size (extra column), increases shuffle & I/O, complicates downstream join logic (need to un-salt), requires careful key selection and testing. |
| Range partitioning | When queries primarily filter by a numeric/date range and the distribution across ranges is reasonably even and stable. Good for time-windowed reads (time-series). | Fails when data distribution is uneven or changes over time (hot ranges), and for queries that are not range-based (point lookups or joins on other keys). Repartitioning for new ranges is costly. | Can create hotspots (heavy partitions), expensive re-partitioning/maintenance, can produce many small/imbalanced files, requires knowledge of data distribution. |
| Custom Partitioner (hash/rule-based) | When you can design a deterministic function that routes keys to balanced partitions (e.g., composite key hashing) and you control producer & consumer logic. | Fails when query patterns change or when other joins use different partition keys; harder to apply with high-level DataFrame APIs (more natural with RDDs). | Higher implementation complexity, maintenance cost, risk of creating new imbalance if partitioner is poor; may require custom serialization/coordination. |
| Broadcast Join | When one side of the join is small enough to fit comfortably in executor memory (and is stable). Fastest fix for many join skews. | Fails when the "small" side grows unexpectedly or when memory limits are restrictive; not suitable for large-scale joins. | Memory pressure on executors, OOM risk; increases driver/driver-to-executor broadcast overhead; not applicable to multi-stage complex joins. |


##### Salting: 
             adding salt = randInt(0, N) and join on (key, salt).
add salt: only salt the hot key (fast)
salted = inputdf.withColumn(
    "salt",
    F.when(F.col("skew_key") == "0", (F.rand() * k).cast("int")).otherwise(F.lit(0))
)
.withColumn("salted_key", F.concat_ws("_", F.col("skew_key"), F.col("salt")))


##### Range partitioning: 
               df.write.partitionBy("year","month") 
               hotspot case: If most of the data falls under a single month of a perticular year then range partitioning is not useful

##### Custom partitioner: 
                RDD.partitionBy(customPartitioner)
                  mentioned in below cells

##### Broadcast: 
                broadcast(small_df) 


### One quick decision flow (one-liner):

If small side < memory → broadcast. Else if single hot key → salt. Else if queries are range-driven and stable → range partition. Else consider custom partitioner or redesign schema.

One interview-ready metric to show: e.g., “Before: job shuffle read 800MB, runtime 12m. After salting: shuffle read 600MB, runtime 7m (but write size +100MB).” Even hypothetical numbers are better than no numbers.

## Custom Partition

## The BIG limitation: DataFrames do NOT support custom partition functions


## Generate data

In [41]:
from pyspark.sql.functions import col, trim, length
from pyspark.sql import SparkSession
import pyspark.sql.functions as F



In [42]:
spark = SparkSession.builder.appName("skewData").getOrCreate()


In [43]:
inputdf = spark.read.parquet("E:/projects/DataEngineeringPrep/skew_data")

In [44]:
inputdf.printSchema()


root
 |-- row_id: string (nullable = true)
 |-- skew_key: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- category: string (nullable = true)



In [45]:
inputdf = inputdf.withColumn("amount_value", F.col("amount").cast("int"))

In [46]:
inputdf.groupBy("skew_key").sum("amount_value").orderBy(F.desc("sum(amount_value)")).show()

+--------+-----------------+
|skew_key|sum(amount_value)|
+--------+-----------------+
|       0|        449680279|
|     182|            70313|
|     233|            69457|
|     163|            66097|
|     151|            65781|
|     996|            64701|
|      76|            64362|
|     581|            64079|
|     180|            64056|
|     972|            64024|
|      95|            63957|
|     876|            63434|
|     648|            63204|
|     443|            63168|
|     981|            62681|
|     864|            62572|
|     425|            62423|
|     449|            62242|
|     333|            62242|
|     819|            62198|
+--------+-----------------+
only showing top 20 rows



In [47]:
print(spark.sparkContext.uiWebUrl)

http://DESKTOP-AGBO2U2:4040


In [48]:
inputdf.explain(True)

== Parsed Logical Plan ==
'Project [row_id#320, skew_key#321, amount#322, category#323, cast('amount as int) AS amount_value#328]
+- Relation [row_id#320,skew_key#321,amount#322,category#323] parquet

== Analyzed Logical Plan ==
row_id: string, skew_key: string, amount: string, category: string, amount_value: int
Project [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328]
+- Relation [row_id#320,skew_key#321,amount#322,category#323] parquet

== Optimized Logical Plan ==
Project [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328]
+- Relation [row_id#320,skew_key#321,amount#322,category#323] parquet

== Physical Plan ==
*(1) Project [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328]
+- *(1) ColumnarToRow
   +- FileScan parquet [row_id#320,skew_key#321,amount#322,category#323] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex

In [49]:
inputdf.explain(mode = "formatted")

== Physical Plan ==
* Project (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [4]: [row_id#320, skew_key#321, amount#322, category#323]
Batched: true
Location: InMemoryFileIndex [file:/E:/projects/DataEngineeringPrep/skew_data]
ReadSchema: struct<row_id:string,skew_key:string,amount:string,category:string>

(2) ColumnarToRow [codegen id : 1]
Input [4]: [row_id#320, skew_key#321, amount#322, category#323]

(3) Project [codegen id : 1]
Output [5]: [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328]
Input [4]: [row_id#320, skew_key#321, amount#322, category#323]




In [50]:
inputdf.groupBy("skew_key").count().orderBy(F.desc("count")).show(20, truncate=False)


+--------+------+
|skew_key|count |
+--------+------+
|0       |900273|
|182     |138   |
|163     |128   |
|253     |128   |
|560     |127   |
|581     |125   |
|233     |125   |
|648     |125   |
|207     |124   |
|654     |124   |
|996     |124   |
|138     |123   |
|425     |123   |
|180     |123   |
|970     |122   |
|288     |122   |
|498     |122   |
|352     |121   |
|657     |121   |
|864     |121   |
+--------+------+
only showing top 20 rows



Salting

Broadcast join

Range partitioning

Custom partitioner (high-level pseudocode)

In [51]:
inputdf = inputdf.withColumn("salted_key", F.floor(F.rand()*1000))

In [52]:
inputdf.show()


+------+--------+------------------+--------+------------+----------+
|row_id|skew_key|            amount|category|amount_value|salted_key|
+------+--------+------------------+--------+------------+----------+
|     0|       0| 669.3620514380217|     hot|         669|       118|
|     1|       0| 776.4620725770643|     hot|         776|       910|
|     2|       0|367.75575002118535|     hot|         367|       473|
|     3|       0| 494.3132858506487|     hot|         494|       844|
|     4|       0| 591.7821226474998|     hot|         591|       309|
|     5|       0| 632.1642964005171|     hot|         632|         4|
|     6|       0|32.164495567189256|     hot|          32|       664|
|     7|       0|50.587504177523336|     hot|          50|       171|
|     8|       0| 961.6080597124873|     hot|         961|       640|
|     9|       0|182.88575075928327|     hot|         182|       344|
|    10|       0| 598.8453075088435|     hot|         598|        86|
|    11|       0|205

In [53]:
inputdf.groupBy("salted_key").sum("amount_value").show()

+----------+-----------------+
|salted_key|sum(amount_value)|
+----------+-----------------+
|       474|           506083|
|       964|           520770|
|        29|           496408|
|        26|           538848|
|       418|           505654|
|       191|           502832|
|       558|           518881|
|       541|           492404|
|        65|           498805|
|       293|           470204|
|       222|           490322|
|       270|           521946|
|       938|           463134|
|       730|           514772|
|       442|           537563|
|       705|           499682|
|       243|           525766|
|       720|           506541|
|       367|           511906|
|       278|           496530|
+----------+-----------------+
only showing top 20 rows



In [54]:
inputdf.explain(mode = "formatted")

== Physical Plan ==
* Project (3)
+- * ColumnarToRow (2)
   +- Scan parquet  (1)


(1) Scan parquet 
Output [4]: [row_id#320, skew_key#321, amount#322, category#323]
Batched: true
Location: InMemoryFileIndex [file:/E:/projects/DataEngineeringPrep/skew_data]
ReadSchema: struct<row_id:string,skew_key:string,amount:string,category:string>

(2) ColumnarToRow [codegen id : 1]
Input [4]: [row_id#320, skew_key#321, amount#322, category#323]

(3) Project [codegen id : 1]
Output [6]: [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328, FLOOR((rand(-3118830139136299688) * 1000.0)) AS salted_key#377L]
Input [4]: [row_id#320, skew_key#321, amount#322, category#323]




### Broadcast join

In [55]:
df = spark.read.parquet("E:/projects/DataEngineeringPrep/skew_data")

In [56]:
brodcastdf = F.broadcast(df)

In [57]:
joineddf = inputdf.join(brodcastdf, "skew_key", "inner")

In [58]:
# in this case broadcast join worked dataset is too small and could easily fit below the broadcast thresold of 10Mb
joineddf.explain(mode = "formatted")

== Physical Plan ==
AdaptiveSparkPlan (9)
+- Project (8)
   +- BroadcastHashJoin Inner BuildRight (7)
      :- Filter (3)
      :  +- Project (2)
      :     +- Scan parquet  (1)
      +- BroadcastExchange (6)
         +- Filter (5)
            +- Scan parquet  (4)


(1) Scan parquet 
Output [4]: [row_id#320, skew_key#321, amount#322, category#323]
Batched: true
Location: InMemoryFileIndex [file:/E:/projects/DataEngineeringPrep/skew_data]
ReadSchema: struct<row_id:string,skew_key:string,amount:string,category:string>

(2) Project
Output [6]: [row_id#320, skew_key#321, amount#322, category#323, cast(amount#322 as int) AS amount_value#328, FLOOR((rand(-3118830139136299688) * 1000.0)) AS salted_key#377L]
Input [4]: [row_id#320, skew_key#321, amount#322, category#323]

(3) Filter
Input [6]: [row_id#320, skew_key#321, amount#322, category#323, amount_value#328, salted_key#377L]
Condition : isnotnull(skew_key#321)

(4) Scan parquet 
Output [4]: [row_id#432, skew_key#433, amount#434, category