# guide to spark partitioning: DataSet joins 1


This notebook has:


spark.sql.shuffle.partitions set to 50


spark.sql.autoBroadcastJoinThreshold set to 0




In [1]:
case class MyRecord(key: Int, value: String)

In [2]:
def createPartionedDataset(name: String,
                           numRecords: Int,
                           numPartitions: Int)
        : Dataset[MyRecord] = {
    Range.inclusive(1, numRecords).map { value =>
        MyRecord(value, s"$name-value")
    }.toDS.repartition(numPartitions).localCheckpoint()
}

def createPartionedDatasetOnKey(name: String,
                                numRecords: Int,
                                numPartitions: Int)
        : Dataset[MyRecord] = {
    Range.inclusive(1, numRecords).map { value =>
        MyRecord(value, s"$name-value")
    }.toDS.repartition(numPartitions, $"key").localCheckpoint()
}

In Broadcast Hash join mechanism, one of the two input Datasets (participating in the Join) is broadcasted to all the executors.A Hash Table is being built on all the executors from the broadcasted Dataset, after which, each partition of the non-broadcasted input Dataset is joined independently to the other Dataset being available as a local hash table.

It must be obvious from the functioning of Broadcast Hash Join that the number of output partitions of the resultant Dataset  is always equal to the number of partitions of the non broadcasted input Dataset.



In [4]:
val a = createPartionedDataset("a", 1000, 4)
val b = createPartionedDataset("b", 500, 6)
val c = a.join(broadcast(b), Seq("key"))

c.explain()

c.rdd.getNumPartitions

== Physical Plan ==
*(1) Project [key#2, value#3, value#12]
+- *(1) BroadcastHashJoin [key#2], [key#11], Inner, BuildRight
   :- Scan ExistingRDD[key#2,value#3]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
      +- Scan ExistingRDD[key#11,value#12]


4

Let us now see the rules deciding he output partitions of the Joined Dataset is decided for the Shuffle Hash and Sort Merge Join:

**(1)** If none of the two input Datasets (participating in the Join transformation) is already hash partitioned on the respective Join Key(s), then the configured value of ‘spark.sql.shuffle.partitions’ is chosen as the number of partitions in the output joined




In [6]:
val a = createPartionedDataset("a", 1000, 4)
val b = createPartionedDataset("b", 1000, 6)
val c = a.join(b, Seq("key"))

c.explain()

c.rdd.getNumPartitions

== Physical Plan ==
*(3) Project [key#30, value#31, value#40]
+- *(3) SortMergeJoin [key#30], [key#39], Inner
   :- *(1) Sort [key#30 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#30, 50)
   :     +- Scan ExistingRDD[key#30,value#31]
   +- *(2) Sort [key#39 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#39, 50)
         +- Scan ExistingRDD[key#39,value#40]


50

**(2)** If one or both of the input Datasets are already hash partitioned based on respective Join Key(s), then the maximum value of
the number of partitions among these input RDDs is compared against the configured value of ‘spark.sql.shuffle.partitions’ in the
following ways to decide on the resultant number of partitions:


**(2a)** If the maximum value is above the configured value of ‘spark.sql.shuffle.partitions’, then the maximum value is chosen as
the number of partitions in the output joined Dataset.




In [8]:
val a = createPartionedDatasetOnKey("a", 1000, 123)
val b = createPartionedDataset("b", 1000, 6)
val c = a.join(b, Seq("key"))

println(a.queryExecution.executedPlan.outputPartitioning)

c.explain()

c.rdd.getNumPartitions

hashpartitioning(key#57, 123)
== Physical Plan ==
*(3) Project [key#57, value#58, value#67]
+- *(3) SortMergeJoin [key#57], [key#66], Inner
   :- *(1) Sort [key#57 ASC NULLS FIRST], false, 0
   :  +- Scan ExistingRDD[key#57,value#58]
   +- *(2) Sort [key#66 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#66, 123)
         +- Scan ExistingRDD[key#66,value#67]


123

(2b) If the maximum value is below the configured value of ‘spark.sql.shuffle.partitions’, then the configured value is chosen as the number of partitions in the output joined Dataset. 

In [10]:
val a = createPartionedDatasetOnKey("a", 1000, 4)
val b = createPartionedDataset("b", 1000, 6)
val c = a.join(b, Seq("key"))

println(a.queryExecution.executedPlan.outputPartitioning)

c.explain()

c.rdd.getNumPartitions

hashpartitioning(key#132, 4)
== Physical Plan ==
*(3) Project [key#132, value#133, value#142]
+- *(3) SortMergeJoin [key#132], [key#141], Inner
   :- *(1) Sort [key#132 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#132, 50)
   :     +- Scan ExistingRDD[key#132,value#133]
   +- *(2) Sort [key#141 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#141, 50)
         +- Scan ExistingRDD[key#141,value#142]


50

In Cartesian Join, the number of partitions in the output Joined Dataset is always equal to product of number of partitions of the
input Datasets

In [13]:
val a = createPartionedDataset("a", 1000, 5)
val b = a.crossJoin(a)

b.explain()

b.rdd.getNumPartitions

== Physical Plan ==
CartesianProduct
:- Scan ExistingRDD[key#111,value#112]
+- Scan ExistingRDD[key#125,value#126]


25