# Broadcast Joins

**Technical Accomplishments:**
* Introduce the concept of Broadcast Joins

In this exercise you will:<br>
Learn about the concepts of Broadcast Joins





## The Data Source

This data uses the **Pageviews By Seconds** data set.

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# I've already gone through the exercise to determine
# how many partitions I want and in this case it is...
partitions = 8

# Make sure wide operations don't repartition to 200
spark.conf.set("spark.sql.shuffle.partitions", str(partitions))

In [None]:
# The directory containing our parquet files.
parquetFile = "data/pageviews_by_second.parquet/"

In [None]:
# Create our initial DataFrame. We can let it infer the 
# schema because the cost for parquet files is really low.
pageviewsDF = (spark.read
  .option("inferSchema", "true")                # The default, but not costly w/Parquet
  .parquet(parquetFile)                         # Read the data in
  .repartition(partitions)                      # From 7 >>> 8 partitions
  .withColumnRenamed("timestamp", "capturedAt") # rename and convert to timestamp datatype
  .withColumn("capturedAt", unix_timestamp( col("capturedAt"), "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp") )
  .orderBy( col("capturedAt"), col("site") )    # sort our records
  .cache()                                      # Cache the expensive operation
)
# materialize the cache
pageviewsDF.count()

7200000

## Broadcast Joins

If you saw the section on UDFs carefully, you can say that we can **aggregate by the Day-Of-Week**.

We will **first use a UDF** only to discover that either there was a built in function to do the exact same thing or not.

We also saw that **Monday had more data** than that of any other day of the week.

We will then forked the `DataFrame` in such a way so as it can be compared with the **Mobile Requests to Desktop Requests**.

Next, we will **join those to `DataFrames`** into one, so that we could easily compare the two sets of the data.

We you can say that the problem with the data **has nothing to do with Mobile vs Desktop**.

So we don't need that type of join (two ~large `DataFrames`)

However, what if we wanted to **reproduce our first exercise** (counts per day-of-week)
* without a UDF
* with a lookup table for the day-of-week
* with a join between the pageviews and the lookup table

What's different about this example is that we are **joining a big `DataFrame` to a small `DataFrame`**.

In this scenario, Spark can optimize the join and **avoid the expensive shuffle** with a **Broadcast Join**.

Let's start with two `DataFrames`
* The first we will derive from our original `DataFrame`. In this case, we will use a simple number for the day-of-week.
* The second `DataFrame` will map that number (1-7) to the labels **Monday**, **Tue**, **W**, or whatever...

Let's take a look at our first `DataFrame`.

In [None]:
columnTrans = date_format(col("capturedAt"), "u").alias("dow")

pageviewsWithDowDF = (pageviewsDF
    .withColumn("dow", columnTrans)  # Add the column dow
)
(pageviewsWithDowDF
  .cache()                           # mark the data as cached
  .count()                           # materialize the cache
)
pageviewsWithDowDF.show()

+-------------------+-------+--------+---+
|         capturedAt|   site|requests|dow|
+-------------------+-------+--------+---+
|2015-03-16 00:00:00|desktop|    2343|  1|
|2015-03-16 00:00:00| mobile|    1628|  1|
|2015-03-16 00:00:01|desktop|    2382|  1|
|2015-03-16 00:00:01| mobile|    1636|  1|
|2015-03-16 00:00:02|desktop|    2546|  1|
|2015-03-16 00:00:02| mobile|    1619|  1|
|2015-03-16 00:00:03|desktop|    2402|  1|
|2015-03-16 00:00:03| mobile|    1776|  1|
|2015-03-16 00:00:04|desktop|    2370|  1|
|2015-03-16 00:00:04| mobile|    1716|  1|
|2015-03-16 00:00:05|desktop|    2417|  1|
|2015-03-16 00:00:05| mobile|    1721|  1|
|2015-03-16 00:00:06|desktop|    2318|  1|
|2015-03-16 00:00:06| mobile|    1695|  1|
|2015-03-16 00:00:07|desktop|    2580|  1|
|2015-03-16 00:00:07| mobile|    1630|  1|
|2015-03-16 00:00:08|desktop|    2545|  1|
|2015-03-16 00:00:08| mobile|    1731|  1|
|2015-03-16 00:00:09|desktop|    2366|  1|
|2015-03-16 00:00:09| mobile|    1664|  1|
+----------

All we did here is that, add one column **dow** which has the value **1** for **Monday**, **2** for **Tuesday**, etc.

Next, we are going to load a mapping of 1, 2, 3, etc. to Mon, Tue, Wed, etc from a **REALLY** small `DataFrame`.

In [None]:
labelsDF = spark.read.parquet("data/day-of-week")

display(labelsDF) # view our labels

DataFrame[dow: int, longName: string, abbreviated: string, shortName: string]

Now that we have the two `DataFrames`.

Finally, we can execute a join between the two `DataFrames`

In [None]:
joinedDowDF = (pageviewsWithDowDF
  .join(labelsDF, pageviewsWithDowDF["dow"] == labelsDF["dow"])
  .drop( pageviewsWithDowDF["dow"] )
)
joinedDowDF.show()

+-------------------+-------+--------+---+--------+-----------+---------+
|         capturedAt|   site|requests|dow|longName|abbreviated|shortName|
+-------------------+-------+--------+---+--------+-----------+---------+
|2015-03-16 00:00:00|desktop|    2343|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:00| mobile|    1628|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:01|desktop|    2382|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:01| mobile|    1636|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:02|desktop|    2546|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:02| mobile|    1619|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:03|desktop|    2402|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:03| mobile|    1776|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:04|desktop|    2370|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:04| mobile|    1716|  1|  Monday|        Mon|        M|
|2015-03-16 00:00:05|desktop|    2417|

Now, that the data is joined, we can easily aggregate by any (or all) of the various labels which represents the day-of-week.

Notice that we are not losing the numerical **dow** column which we can use to sort.

And when we will group this, you can group by any one of the labels.

In [None]:
aggregatedDowDF = (joinedDowDF
  .groupBy(col("dow"), col("longName"), col("abbreviated"), col("shortName"))  
  .sum("requests")                                             
  .withColumnRenamed("sum(requests)", "Requests")
  .orderBy(col("dow"))
)
# Display and then graph...
aggregatedDowDF.show()

+---+---------+-----------+---------+----------+
|dow| longName|abbreviated|shortName|  Requests|
+---+---------+-----------+---------+----------+
|  1|   Monday|        Mon|        M|2356818845|
|  2|  Tuesday|        Tue|       Tu|1995034884|
|  3|Wednesday|        Wed|        W|1977615396|
|  4| Thursday|        Thr|       Th|1931508977|
|  5|   Friday|        Fri|        F|1842512718|
|  6| Saturday|        Sat|       Sa|1662762048|
|  7|   Sunday|        Sun|       Su|1576726066|
+---+---------+-----------+---------+----------+



## Already Broadcasted

You beleive or not, that was the broadcast join.

The proof of which can be seen by looking at the physical plan.

Run the `explain()` and then look for **BroadcastHashJoin** and/or **BroadcastExchange**.

In [None]:
aggregatedDowDF.explain()

== Physical Plan ==
*(4) Sort [dow#122 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(dow#122 ASC NULLS FIRST, 8)
   +- *(3) HashAggregate(keys=[dow#122, longName#123, abbreviated#124, shortName#125], functions=[sum(cast(requests#2 as bigint))])
      +- Exchange hashpartitioning(dow#122, longName#123, abbreviated#124, shortName#125, 8)
         +- *(2) HashAggregate(keys=[dow#122, longName#123, abbreviated#124, shortName#125], functions=[partial_sum(cast(requests#2 as bigint))])
            +- *(2) Project [requests#2, dow#122, longName#123, abbreviated#124, shortName#125]
               +- *(2) BroadcastHashJoin [cast(dow#53 as int)], [dow#122], Inner, BuildRight
                  :- *(2) Filter isnotnull(dow#53)
                  :  +- InMemoryTableScan [requests#2, dow#53], [isnotnull(dow#53)]
                  :        +- InMemoryRelation [capturedAt#10, site#1, requests#2, dow#53], StorageLevel(disk, memory, deserialized, 1 replicas)
                  :              +- *

From the code perspective, it just looks like other joins.

So what's the difference between a regular and a broadcast-join?

## Standard Join

>* In a standard join, **ALL** of the data is shuffled
>* This can be really expensive
<br/><br/>
<p>Here we can see, how all the records keyed by "green" are moved to the same partition.<br/>The process would be repeated for "red" and "blue" records.</p>

## Broadcast Join
>* In the Broadcast Join, only the "small" data is moved.
>* It duplicates the "small" data across all executors.
>* But the "big" data is left untouched.
>* If the "small" data is small enough, this can be **VERY** efficient.
<br/><br/>
<p>Here we see the records keyed by "red" being replicated into the first partition.<br/>
   The process would be repeated for each executor.<br/>
   The entire process would be repeated again for "green" and "blue" records.</p>

## Broadcasted, How?

Behind the scenes, Spark is analyzing our two `DataFrames`.

It attempts to estimate if either or both are < 10MB.

We can see/change this threshold value with the config **spark.sql.autoBroadcastJoinThreshold**. 

The documentation reads as follows:
> Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled...

In [None]:
threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
print("Threshold: {0:,}".format( int(threshold) ))

Threshold: 10,485,760


For this kind of cases, it will take the small `DataFrame`, the `labelsDF` in our case
>* Send the entire `DataFrame` to every **Executor**
>* Then do a join on the local copy of `labelsDF`
>* Compared to taking our big `DataFrame` `pageviewsWithDowDF` and shuffling it across all executors.

We can see proof of this by dropping the threshold:

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 0)

Run the `explain()` and take a look for the **ABSENCE OF** the **BroadcastHashJoin** and/or **BroadcastExchange**.

In [None]:
(joinedDowDF
  .groupBy(col("dow"), col("longName"), col("abbreviated"), col("shortName"))  
  .sum("requests")                                             
  .withColumnRenamed("sum(requests)", "Requests")
  .orderBy(col("dow"))
  .explain()
)

== Physical Plan ==
*(6) Sort [dow#122 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(dow#122 ASC NULLS FIRST, 8)
   +- *(5) HashAggregate(keys=[dow#122, longName#123, abbreviated#124, shortName#125], functions=[sum(cast(requests#2 as bigint))])
      +- *(5) HashAggregate(keys=[dow#122, longName#123, abbreviated#124, shortName#125], functions=[partial_sum(cast(requests#2 as bigint))])
         +- *(5) Project [requests#2, dow#122, longName#123, abbreviated#124, shortName#125]
            +- *(5) SortMergeJoin [cast(dow#53 as int)], [dow#122], Inner
               :- *(2) Sort [cast(dow#53 as int) ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cast(dow#53 as int), 8)
               :     +- *(1) Filter isnotnull(dow#53)
               :        +- InMemoryTableScan [requests#2, dow#53], [isnotnull(dow#53)]
               :              +- InMemoryRelation [capturedAt#10, site#1, requests#2, dow#53], StorageLevel(disk, memory, deserialized, 1 replicas)

And now that we are done, let's restore the original threshold:

In [None]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", threshold)

## broadcast()

What if we wanted to broadcast the data and it was over the 10MB [default] threshold?

Now, we can specify that a `DataFrame` is to be broadcasted by using the `broadcast()` operation from the `sql.functions` package.

However, **it is only a hint**. Spark is allowed to ignore it.

In [None]:
pageviewsWithDowDF.join(broadcast(labelsDF), pageviewsWithDowDF["dow"] == labelsDF["dow"])

DataFrame[capturedAt: timestamp, site: string, requests: int, dow: string, dow: int, longName: string, abbreviated: string, shortName: string]

## End of Exercise