In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

SHUFFLE_PARTITIONS = 5

In [2]:
spark = (SparkSession
         .builder
         .master("local[*]")
         .appName("salaries")
         .config("spark.sql.adaptive.enabled", "false")
         .config("spark.sql.shuffle.partitions", SHUFFLE_PARTITIONS)
         .getOrCreate()
         )

your 131072x1 screen size is bogus. expect trouble
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/19 16:20:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading employee and salary data.

In [3]:
df_employees = (spark
                .read
                .option("header", True)
                .option("delimiter", ";")
                .option("inferSchema", True)
                .csv("inputs/employees_10000.csv")
                )

df_salaries = (spark
               .read
               .option("header", True)
               .option("delimiter", ";")
               .option("inferSchema", True)
               .csv("inputs/salaries_10000.csv")
               )

### Displaying imbalanced data distribution across partitions

Joining and displaying employee and salary data.

In [4]:
df_unbalanced = (df_employees
                 .join(df_salaries, on="salary_id", how="inner")
                 .select("employee_id", "salary_id", "name", "department", "salary")
                 )

df_unbalanced.show()

+-----------+---------+--------------------+-----------+--------+
|employee_id|salary_id|                name| department|  salary|
+-----------+---------+--------------------+-----------+--------+
|          1|        1|    Frederico Santos|  Mercearia|23708.05|
|          2|        2|Alessandra Noguei...|     Livros|21482.94|
|          3|        3|  Dalila Xavier Neto|     Música|15480.98|
|          4|        4|           Davi Reis|     Jardim|11900.67|
|          5|        5|     Warley Reis Jr.|     Música| 6097.59|
|          6|        6|Srta. Maria Luiza...|Eletrônicos| 6501.28|
|          7|        7|     Ricardo Batista|     Beleza| 5151.14|
|          8|        8|Srta. Melissa Xavier|   Crianças| 2360.42|
|          9|        9|         Kléber Reis|   Crianças| 7841.75|
|         10|       10|         Silas Silva|    Sapatos|17572.83|
|         11|       11|       Melissa Souza|     Roupas|25391.04|
|         12|       12|        João Batista|      Jogos|  7907.0|
|         

This demonstrates a partition imbalance, where all data is in one partition out of the 5 defined by `spark.sql.shuffle.partitions`.

In [5]:
(df_unbalanced
 .withColumn("partition_id", F.spark_partition_id())
 .groupBy("partition_id")
 .count()
 .show()
 )

+------------+-----+
|partition_id|count|
+------------+-----+
|           0|10000|
+------------+-----+



### Applying salt hash join technique for improved partitioning

We create a new DataFrame based on `df_employees`, adding a `salt_id` column, which is generated by concatenating each employee's `salary_id` with a random value between 0 and 4.

The DataFrame is then repartitioned by `salt_id`, resulting in a better partition balance due to the randomness introduced.

In [6]:
salt_col = F.concat_ws("_", F.col("salary_id"), (F.rand() * 5).cast("integer"))

df_employees_balanced = (df_employees
                         .withColumn("salt_id", salt_col)
                         .repartition("salt_id")
                         )

Next, we create a new DataFrame from `df_salaries`, also adding a `salt_id` column. This `salt_id` is formed by the Cartesian product of the `salary_id` column and the range 0 to 4.

As before, the DataFrame is repartitioned using `salt_id` as well.

In [7]:
df_range = spark.range(0, SHUFFLE_PARTITIONS) # schema -> id: int

df_salaries_balanced = (df_salaries
                        .join(df_range, how="cross")
                        .withColumn("salt_id", F.concat_ws("_", F.col("salary_id"), F.col("id")))
                        .repartition("salt_id")
                        .drop(F.col("id"))
                        )

Joining on `salt_id` and displaying employee and salary data.

In [8]:
df_balanced = (df_employees_balanced.alias("e")
               .join(df_salaries_balanced.alias("s"), on="salt_id", how="inner")
               .select("employee_id", "s.salary_id", "name", "department", "salary", "salt_id")
               )

df_balanced.show()

+-----------+---------+-------------------+------------+--------+-------+
|employee_id|salary_id|               name|  department|  salary|salt_id|
+-----------+---------+-------------------+------------+--------+-------+
|          3|        3| Dalila Xavier Neto|      Música|15480.98|    3_1|
|         13|       13|      Roberto Souza|      Jardim|21442.78|   13_0|
|         15|       15|    Heloísa Martins|        Bebê| 5598.62|   15_3|
|         17|       17|   Isabela Carvalho|  Automotivo|25050.54|   17_0|
|         23|       23|  Marina Costa Neto|      Jardim| 17041.9|   23_4|
|         24|       24|          Liz Silva|        Bebê|18754.61|   24_2|
|         39|       39|    Carla Braga Jr.| Eletrônicos| 22664.0|   39_4|
|         57|       57|    Henrique Moraes|       Saúde|17021.79|   57_0|
|         60|       60|       Yasmin Souza|   Mercearia|20018.08|   60_4|
|         61|       61|       Bruna Santos|   Mercearia|23166.84|   61_3|
|         63|       63|       Heloísa 

This demonstrates the balance across the 5 partitions defined by `spark.sql.shuffle.partitions`.

In [9]:
(df_balanced
 .withColumn("partition_id", F.spark_partition_id())
 .groupBy("partition_id")
 .agg(F.count("*").alias("count"))
 .orderBy("partition_id")
 .show()
 )

+------------+-----+
|partition_id|count|
+------------+-----+
|           0| 1961|
|           1| 2007|
|           2| 1997|
|           3| 1998|
|           4| 2037|
+------------+-----+



For small datasets, the performance difference is almost negligible, and may even worsen slightly. However, when dealing with Big Data in a clustered environment, techniques like this can lead to significant performance improvements and help prevent memory leaks due to more efficient shuffling.

With the advent of [AQE](https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution), techniques like this are often unnecessary. However, it's always useful to be aware of alternative methods for solving such problems.