d-sandbox

<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png" alt="Databricks Learning" style="width: 600px; height: 163px">
</div>

# Joins and Lookup Tables

Apache Spark&trade; and Databricks&reg; allow you to join new records to existing tables in an ETL job.

## In this lesson you:
* Join new records to a pre-existing lookup table
* Employ table join best practices relevant to big data environments

## Audience
* Primary Audience: Data Engineers
* Additional Audiences: Data Scientists and Data Pipeline Engineers

## Prerequisites
* Web browser: Please use a <a href="https://docs.databricks.com/user-guide/supported-browsers.html#supported-browsers" target="_blank">supported browser</a>.
* Concept (optional): <a href="https://academy.databricks.com/collections/frontpage/products/etl-part-1-data-extraction" target="_blank">ETL Part 1 course from Databricks Academy</a>

<iframe  
src="//fast.wistia.net/embed/iframe/pvwi0sdvwu?videoFoam=true"
style="border:1px solid #1cb1c2;"
allowtransparency="true" scrolling="no" class="wistia_embed"
name="wistia_embed" allowfullscreen mozallowfullscreen webkitallowfullscreen
oallowfullscreen msallowfullscreen width="640" height="360" ></iframe>
<div>
<a target="_blank" href="https://fast.wistia.net/embed/iframe/pvwi0sdvwu?seo=false">
  <img alt="Opens in new tab" src="https://files.training.databricks.com/static/images/external-link-icon-16x16.png"/>&nbsp;Watch full-screen.</a>
</div>

-sandbox
### Shuffle and Broadcast Joins

A common use case in ETL jobs involves joining new data to either lookup tables or historical data. You need different considerations to guide this process when working with distributed technologies such as Spark, rather than traditional databases that sit on a single machine.

Traditional databases join tables by pairing values on a given column. When all the data sits in a single database, it often goes unnoticed how computationally expensive row-wise comparisons are.  When data is distributed across a cluster, the expense of joins becomes even more apparent.

**A standard (or shuffle) join** moves all the data on the cluster for each table to a given node on the cluster. This is expensive not only because of the computation needed to perform row-wise comparisons, but also because data transfer across a network is often the biggest performance bottleneck of distributed systems.

By contrast, **a broadcast join** remedies this situation when one DataFrame is sufficiently small. A broadcast join duplicates the smaller of the two DataFrames on each node of the cluster, avoiding the cost of shuffling the bigger DataFrame.

<div><img src="https://files.training.databricks.com/images/eLearning/ETL-Part-2/shuffle-and-broadcast-joins.png" style="height: 400px; margin: 20px"/></div>

### Lookup Tables

Lookup tables are normally small, historical tables used to enrich new data passing through an ETL pipeline.

Run the cell below to mount the data.

In [0]:
%run "./Includes/Classroom-Setup"

Import a small table that will enrich new data coming into a pipeline.

In [0]:
labelsDF = spark.read.parquet("/mnt/training/day-of-week")

display(labelsDF)

dow,longName,abbreviated,shortName
1,Monday,Mon,M
2,Tuesday,Tue,Tu
3,Wednesday,Wed,W
4,Thursday,Thr,Th
5,Friday,Fri,F
6,Saturday,Sat,Sa
7,Sunday,Sun,Su


Import a larger DataFrame that gives a column to combine back to the lookup table. In this case, use Wikipedia site requests data.

In [0]:
from pyspark.sql.functions import col, date_format

pageviewsDF = (spark.read
  .parquet("/mnt/training/wikipedia/pageviews/pageviews_by_second.parquet/")
  .withColumn("dow", date_format(col("timestamp"), "u").alias("dow"))
)

display(pageviewsDF)


timestamp,site,requests,dow
2015-03-22T14:13:34,mobile,1425,7
2015-03-22T14:23:18,desktop,2534,7
2015-03-22T14:36:47,desktop,2444,7
2015-03-22T14:38:39,mobile,1488,7
2015-03-22T14:57:11,mobile,1519,7
2015-03-22T15:03:18,mobile,1559,7
2015-03-22T15:16:47,mobile,1510,7
2015-03-22T15:45:03,desktop,2673,7
2015-03-22T15:58:32,desktop,2463,7
2015-03-22T16:06:11,desktop,2525,7


Join the two DataFrames together.

In [0]:
pageviewsEnhancedDF = pageviewsDF.join(labelsDF, "dow")

display(pageviewsEnhancedDF)

dow,timestamp,site,requests,longName,abbreviated,shortName
7,2015-03-22T14:13:34,mobile,1425,Sunday,Sun,Su
7,2015-03-22T14:23:18,desktop,2534,Sunday,Sun,Su
7,2015-03-22T14:36:47,desktop,2444,Sunday,Sun,Su
7,2015-03-22T14:38:39,mobile,1488,Sunday,Sun,Su
7,2015-03-22T14:57:11,mobile,1519,Sunday,Sun,Su
7,2015-03-22T15:03:18,mobile,1559,Sunday,Sun,Su
7,2015-03-22T15:16:47,mobile,1510,Sunday,Sun,Su
7,2015-03-22T15:45:03,desktop,2673,Sunday,Sun,Su
7,2015-03-22T15:58:32,desktop,2463,Sunday,Sun,Su
7,2015-03-22T16:06:11,desktop,2525,Sunday,Sun,Su


Now aggregate the results to see trends by day of the week.

:NOTE: `pageviewsEnhancedDF` is a large DataFrame so it can take a while to process depending on the size of your cluster.

In [0]:
from pyspark.sql.functions import col

aggregatedDowDF = (pageviewsEnhancedDF
  .groupBy(col("dow"), col("longName"), col("abbreviated"), col("shortName"))  
  .sum("requests")                                             
  .withColumnRenamed("sum(requests)", "Requests")
  .orderBy(col("dow"))
)

display(aggregatedDowDF)

dow,longName,abbreviated,shortName,Requests
1,Monday,Mon,M,2356818845
2,Tuesday,Tue,Tu,1995034884
3,Wednesday,Wed,W,1977615396
4,Thursday,Thr,Th,1931508977
5,Friday,Fri,F,1842512718
6,Saturday,Sat,Sa,1662762048
7,Sunday,Sun,Su,1576726066


-sandbox
### Exploring Broadcast Joins

In joining these two DataFrames together, no type of join was specified.  In order to examine this, look at the physical plan used to return the query. This can be done with the `.explain()` DataFrame method. Look for **BroadcastHashJoin** and/or **BroadcastExchange**.

<div><img src="https://files.training.databricks.com/images/eLearning/ETL-Part-2/broadcasthashjoin.png" style="height: 400px; margin: 20px"/></div>

In [0]:
aggregatedDowDF.explain()

By default, Spark did a broadcast join rather than a shuffle join.  In other words, it broadcast `labelsDF` to the larger `pageviewsDF`, replicating the smaller DataFrame on each node of our cluster.  This avoided having to move the larger DataFrame across the cluster.

Take a look at the broadcast threshold by accessing the configuration settings.

In [0]:
threshold = spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
print("Threshold: {0:,}".format( int(threshold) ))

This is the maximize size in bytes for a table that broadcast to worker nodes.  Dropping it to `-1` disables broadcasting.

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

Now notice the lack of broadcast in the query physical plan.

In [0]:
pageviewsDF.join(labelsDF, "dow").explain()

Next reset the original threshold.

In [0]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", threshold)

### Explicitly Broadcasting Tables

There are two ways of telling Spark to explicitly broadcast tables. The first is to change the Spark configuration, which affects all operations. The second is to declare it using the `broadcast()` function in the `functions` package.

In [0]:
from pyspark.sql.functions import broadcast

pageviewsDF.join(broadcast(labelsDF), "dow").explain()

## Exercise 1: Join a Lookup Table

Join a table that includes country name to a lookup table containing the full country name.

### Step 1: Import the Data

Create the following DataFrames:<br><br>

- `countryLookupDF`: A lookup table with ISO country codes located at `/mnt/training/countries/ISOCountryCodes/ISOCountryLookup.parquet`
- `logWithIPDF`: A server log including the results from an IPLookup table located at `/mnt/training/EDGAR-Log-20170329/enhanced/logDFwithIP.parquet`

In [0]:
# TODO
countryLookupDF = spark.read.parquet("/mnt/training/countries/ISOCountryCodes/ISOCountryLookup.parquet")

display(countryLookupDF)
logWithIPDF = spark.read.parquet("/mnt/training/EDGAR-Log-20170329/enhanced/logDFwithIP.parquet")



EnglishShortName,alpha2Code,alpha3Code,numericCode,ISO31662SubdivisionCode,independentTerritory
Afghanistan,AF,AFG,4,ISO 3166-2:AF,Yes
Åland Islands,AX,ALA,248,ISO 3166-2:AX,No
Albania,AL,ALB,8,ISO 3166-2:AL,Yes
Algeria,DZ,DZA,12,ISO 3166-2:DZ,Yes
American Samoa,AS,ASM,16,ISO 3166-2:AS,No
Andorra,AD,AND,20,ISO 3166-2:AD,Yes
Angola,AO,AGO,24,ISO 3166-2:AO,Yes
Anguilla,AI,AIA,660,ISO 3166-2:AI,No
Antarctica,AQ,ATA,10,ISO 3166-2:AQ,No
Antigua and Barbuda,AG,ATG,28,ISO 3166-2:AG,Yes


In [0]:
display(logWithIPDF)

ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser,IPLookupISO2
101.71.41.ihh,2017-03-29,00:00:00,0.0,1437491.0,0001245105-17-000052,xslF345X03/primary_doc.xml,301.0,687.0,0.0,0.0,0.0,10.0,0.0,,CN
104.196.240.dda,2017-03-29,00:00:00,0.0,1270985.0,0001188112-04-001037,.txt,200.0,7619.0,0.0,0.0,0.0,10.0,0.0,,US
107.23.85.jfd,2017-03-29,00:00:00,0.0,1059376.0,0000905148-07-006108,-index.htm,200.0,2727.0,1.0,0.0,0.0,10.0,0.0,,US
107.23.85.jfd,2017-03-29,00:00:00,0.0,1059376.0,0000905148-08-001993,-index.htm,200.0,2710.0,1.0,0.0,0.0,10.0,0.0,,US
107.23.85.jfd,2017-03-29,00:00:00,0.0,1059376.0,0001104659-09-046963,-index.htm,200.0,2715.0,1.0,0.0,0.0,10.0,0.0,,US
107.23.85.jfd,2017-03-29,00:00:00,0.0,1364986.0,0000914121-06-002243,-index.htm,200.0,2786.0,1.0,0.0,0.0,10.0,0.0,,US
107.23.85.jfd,2017-03-29,00:00:00,0.0,1364986.0,0000914121-06-002251,-index.htm,200.0,2784.0,1.0,0.0,0.0,10.0,0.0,,US
108.240.248.gha,2017-03-29,00:00:00,0.0,1540159.0,0001217160-12-000029,f332scottlease.htm,200.0,49578.0,0.0,0.0,0.0,10.0,0.0,,US
108.59.8.fef,2017-03-29,00:00:00,0.0,732834.0,0001209191-15-017349,xslF345X03/doc4.xml,301.0,673.0,0.0,0.0,0.0,10.0,0.0,,US
108.91.91.hbc,2017-03-29,00:00:00,0.0,1629769.0,0001209191-17-023204,.txt,301.0,675.0,0.0,0.0,0.0,10.0,0.0,,US


In [0]:
# TEST - Run this cell to test your solution
dbTest("ET2-P-05-01-01", 249, countryLookupDF.count())
dbTest("ET2-P-05-01-02", 5000, logWithIPDF.count())

print("Tests passed!")

### Step 2: Broadcast the Lookup Table

Complete the following:<br><br>

- Create a new DataFrame `logWithIPEnhancedDF`
- Get the full country name by performing a broadcast join that broadcasts the lookup table to the server log
- Drop all columns other than `EnglishShortName`

In [0]:
from pyspark.sql.functions import col

countryLookupDF = (countryLookupDF.withColumnRenamed("alpha2Code", "IPLookupISO2"))

logWithIPEnhancedDFjoin = logWithIPDF.join(countryLookupDF, "IPLookupISO2")

logWithIPEnhancedDF = (logWithIPEnhancedDFjoin
                   .select(col("EnglishShortName"),col("ip"))
            
)

display(logWithIPEnhancedDF)


EnglishShortName,ip
China,101.71.41.ihh
United States of America,104.196.240.dda
United States of America,107.23.85.jfd
United States of America,107.23.85.jfd
United States of America,107.23.85.jfd
United States of America,107.23.85.jfd
United States of America,107.23.85.jfd
United States of America,108.240.248.gha
United States of America,108.59.8.fef
United States of America,108.91.91.hbc


In [0]:
# TEST - Run this cell to test your solution
cols = set(logWithIPEnhancedDF.columns)

dbTest("ET2-P-05-02-01", True, "EnglishShortName" in cols and "ip" in cols)
dbTest("ET2-P-05-02-02", True, "alpha2Code" not in cols and "ISO31662SubdivisionCode" not in cols)
dbTest("ET2-P-05-02-03", 5000, logWithIPEnhancedDF.count())

print("Tests passed!")

## Review
**Question:** Why are joins expensive operations?  
**Answer:** Joins perform a large number of row-wise comparisons, making the cost associated with joining tables grow with the size of the data in the tables.

**Question:** What is the difference between a shuffle and broadcast join? How does Spark manage these differences?  
**Answer:** A shuffle join shuffles data between nodes in a cluster. By contrast, a broadcast join moves the smaller of two DataFrames to where the larger DataFrame sits, minimizing the overall data transfer. By default, Spark performs a broadcast join if the total number of records is below a certain threshold. The threshold can be manually specified or you can manually specify that a broadcast join should take place. Since the automatic determination of whether a shuffle join should take place is by number of records, this could mean that really wide data would take up significantly more space per record and should therefore be specified manually.

**Question:** What is a lookup table?  
**Answer:** A lookup table is small table often used for referencing commonly used data such as mapping cities to countries.

## Next Steps

Start the next lesson, [Database Writes]($./06-Database-Writes ).

## Additional Topics & Resources

**Q:** Where can I get more information on optimizing table joins where data skew is an issue?  
**A:** Check out the Databricks documentation on <a href="https://docs.databricks.com/spark/latest/spark-sql/skew-join.html" target="_blank">Skew Join Optimization</a>

-sandbox
&copy; 2019 Databricks, Inc. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="http://www.apache.org/">Apache Software Foundation</a>.<br/>
<br/>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a> | <a href="http://help.databricks.com/">Support</a>