In [0]:
citiesDF = (
  ("medellin", "colombia", 2.5),
  ("bangalore", "india", 12.3)
)




citiesDF_a = sc.parallelize(citiesDF)
citiesDF_b = spark.createDataFrame(citiesDF_a, ("city", "country", "population"))

citiesDF_b.show()

+---------+--------+----------+
|     city| country|population|
+---------+--------+----------+
| medellin|colombia|       2.5|
|bangalore|   india|      12.3|
+---------+--------+----------+



In [0]:
peopleDF_b_j = peopleDF_b.join(
  broadcast(citiesDF_b),
  peopleDF_b.city == citiesDF_b.city
)

peopleDF_b_j.explain(True)

== Parsed Logical Plan ==
Join Inner, (city#4454 = city#4466)
:- LogicalRDD [first_name#4453, city#4454], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [city#4466, country#4467, population#4468], false

== Analyzed Logical Plan ==
first_name: string, city: string, city: string, country: string, population: double
Join Inner, (city#4454 = city#4466)
:- LogicalRDD [first_name#4453, city#4454], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [city#4466, country#4467, population#4468], false

== Optimized Logical Plan ==
Join Inner, (city#4454 = city#4466), rightHint=(strategy=broadcast), joinId=5
:- Filter isnotnull(city#4454)
:  +- LogicalRDD [first_name#4453, city#4454], false
+- Filter isnotnull(city#4466)
   +- LogicalRDD [city#4466, country#4467, population#4468], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   BroadcastHashJoin [city#4454], [city#4466], Inner, BuildRight, false, true
   :- Filter isnotnull(city#4454)


In [0]:
peopleDF_b_j = peopleDF_b.join(
  citiesDF_b,
  peopleDF_b.city == citiesDF_b.city
)

peopleDF_b_j.explain(True)

== Parsed Logical Plan ==
Join Inner, (city#4454 = city#4466)
:- LogicalRDD [first_name#4453, city#4454], false
+- LogicalRDD [city#4466, country#4467, population#4468], false

== Analyzed Logical Plan ==
first_name: string, city: string, city: string, country: string, population: double
Join Inner, (city#4454 = city#4466)
:- LogicalRDD [first_name#4453, city#4454], false
+- LogicalRDD [city#4466, country#4467, population#4468], false

== Optimized Logical Plan ==
Join Inner, (city#4454 = city#4466)
:- Filter isnotnull(city#4454)
:  +- LogicalRDD [first_name#4453, city#4454], false
+- Filter isnotnull(city#4466)
   +- LogicalRDD [city#4466, country#4467, population#4468], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   SortMergeJoin [city#4454], [city#4466], Inner
   :- Sort [city#4454 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(city#4454, 200), ENSURE_REQUIREMENTS, [plan_id=2055]
   :     +- Filter isnotnull(city#4454)
   :    

#### Notice how the parsed, analyzed, and optimized logical plans all contain ResolvedHint isBroadcastable=true because the broadcast() function was used. This hint isn’t included when the broadcast() function isn’t used.

## Automatic Detection

- In many cases, Spark can automatically detect whether to use a broadcast join or not, depending on the size of the data. If Spark can detect that one of the joined DataFrames is small (10 MB by default), Spark will automatically broadcast it for us. The code below:

In [0]:
bigTable = spark.range(1, 100000000)
smallTable = spark.range(1, 10000) # size estimated by Spark - auto-broadcast
joinedNumbers = smallTable.join(bigTable, "id")
joinedNumbers.explain(True)

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [id])
:- Range (1, 10000, step=1, splits=Some(4))
+- Range (1, 100000000, step=1, splits=Some(4))

== Analyzed Logical Plan ==
id: bigint
Project [id#4511L]
+- Join Inner, (id#4511L = id#4509L)
   :- Range (1, 10000, step=1, splits=Some(4))
   +- Range (1, 100000000, step=1, splits=Some(4))

== Optimized Logical Plan ==
Project [id#4511L]
+- Join Inner, (id#4511L = id#4509L)
   :- Range (1, 10000, step=1, splits=Some(4))
   +- Range (1, 100000000, step=1, splits=Some(4))

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [id#4511L]
         +- PhotonBroadcastHashJoin [id#4511L], [id#4509L], Inner, BuildLeft, false, true
            :- PhotonShuffleExchangeSource
            :  +- PhotonShuffleMapStage
            :     +- PhotonShuffleExchangeSink SinglePartition
            :        +- PhotonRange Range (1, 10000, step=1, splits=4)
            +-

- However, in the previous case, Spark did not detect that the small table could be broadcast. 
- The reason is that Spark will not determine the size of a local collection because it might be big, and evaluating its size may be an O(N) operation, which can defeat the purpose before any computation is made.

- Spark will perform auto-detection when
    - it constructs a DataFrame from scratch, e.g. spark.range
    - it reads from files with schema and/or size information, e.g. Parquet

## Configuring Broadcast Join Detection

- The threshold for automatic broadcast join detection can be tuned or disabled.
- The configuration is spark.sql.autoBroadcastJoinThreshold, and the value is taken in bytes. If you want to configure it to another number, we can set it in the SparkSession:

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)

- or deactivate it altogether by setting the value to -1.

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) 