# Using broadcasting on Spark joins

- Remember that table joins in Spark are split between the cluster workers. If the data is not local, various shuffle operations are required and can have a negative impact on performance. Instead, we're going to use Spark's broadcast operations to give each node a copy of the specified data.

- A couple tips:

> - Broadcast the smaller DataFrame. The larger the DataFrame, the more time required to transfer to the worker nodes.
> - On small DataFrames, it may be better skip broadcasting and let Spark figure out any optimization on its own.
> - If you look at the query execution plan, a broadcastHashJoin indicates you've successfully configured broadcasting.

- The DataFrames `flights_df` and `airports_df` are available to you.

## Instructions

- Import the `broadcast()` method from `pyspark.sql.functions`.
- Create a new DataFrame `broadcast_df` by joining `flights_df` with `airports_df`, using the broadcasting.
- Show the query plan and consider differences from the original.

In [3]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [4]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [1]:
!pwd

/home/talentum/test-jupyter/P3/M3/SM4/4_Performanceimprovements


In [5]:
# Import the broadcast method from pyspark.sql.functions
from pyspark.sql.functions import broadcast

airports_df = spark.read.format("csv").option("header", "true").load("file:///home/talentum/test-jupyter/P3/M3/SM4/4_Performanceimprovements/Dataset/airportnames.txt.gz")
flights_df = flights_df = spark.read.format('csv').option("header", "true").load('file:///home/talentum/test-jupyter/P3/M3/SM4/4_Performanceimprovements/Dataset/AA_DFW_2018_Departures_Short.csv.gz')

# Join the flights_df and airports_df DataFrames using broadcasting
broadcast_df = flights_df.join(broadcast(airports_df), \
    flights_df["Destination Airport"] == airports_df["IATA"] )

# Show the query plan and compare against the original
broadcast_df.explain()

== Physical Plan ==
*(2) BroadcastHashJoin [Destination Airport#26], [IATA#11], Inner, BuildRight
:- *(2) Project [Date (MM/DD/YYYY)#24, Flight Number#25, Destination Airport#26, Actual elapsed time (Minutes)#27]
:  +- *(2) Filter isnotnull(Destination Airport#26)
:     +- *(2) FileScan csv [Date (MM/DD/YYYY)#24,Flight Number#25,Destination Airport#26,Actual elapsed time (Minutes)#27] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/talentum/test-jupyter/P3/M3/SM4/4_Performanceimprovements/Dataset/AA..., PartitionFilters: [], PushedFilters: [IsNotNull(Destination Airport)], ReadSchema: struct<Date (MM/DD/YYYY):string,Flight Number:string,Destination Airport:string,Actual elapsed ti...
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, true]))
   +- *(1) Project [AIRPORTNAME#10, IATA#11]
      +- *(1) Filter isnotnull(IATA#11)
         +- *(1) FileScan csv [AIRPORTNAME#10,IATA#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/