#### Dataframe Joins and Optimizations
When we are joining dataframes it is a good practice to use the smaller dataframe on the right side as the joins are done from left to right.
However SparkSQL builds in several optimizations while performing joins and other operations on dataframes.
We can check out the execution plan of SparkSQL for any operation using the function "explain()"
An example is given below.

One other point is: when we need tojoin a dataframe which is small enough to fit into a workernode then it is better to "broadcast" this dataframe. When a dataframe is broadcasted then a copy of it is sent to all workernodes. This avoids some amount of shuffling of data when joining it with the larger dataframe as the smaller dataframe is compltely available locally for each partition of the larger dataframe.

The above optimization also is done automatically by SparkSQL's built-in optimizer. However in case it is not done it can be done manually also as shown in the example below.

In [None]:
import findspark
findspark.init("/usr/local/spark")
import pyspark

Import and initiate findspark to begin with.
And import pyspark

Now start SparkSession

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [None]:
# from pyspark.sql import functions

Create DataFrame from data source - csv file

In [None]:
customerDF = spark.read.load("customers.tsv", format="csv", sep="\t", inferSchema="true")
salestxnDF = spark.read.load("salestxns.tsv", format="csv", sep="\t", inferSchema="true")

Different types of operations on DataFrames

In [None]:
customerDF.printSchema()

In [None]:
salestxnDF.printSchema()

In [None]:
new_names_cDF = ['customer_id','customer_name','customer_city','customer_state','customer_zipcode']

In [None]:
new_names_sDF = ['salestxn_id','category_id','category_name','product_id','product_name','product_price',\
              'product_quantity','customer_id']

In [None]:
cDF2=customerDF.toDF(*new_names_cDF)

In [None]:
sDF2=salestxnDF.toDF(*new_names_sDF)

In [None]:
cDF2.printSchema()

In [None]:
cDF2.show(5)

In [None]:
sDF2.printSchema()

In [None]:
sDF2.show(5)

Joining the dataframes with the smaller one on leftside to check out the execution plan of SpqrkSQL.
 - explain() shows the physical plan while explain(True) shows logical plan, optimized logical plan also along with physical execution plan.

In [None]:
jDF1=cDF2.join(sDF2,cDF2.customer_id==sDF2.customer_id)

In [None]:
jDF1=cDF2.join(sDF2,cDF2.customer_id==sDF2.customer_id).explain()

In [None]:
jDF1=cDF2.join(sDF2,cDF2.customer_id==sDF2.customer_id).explain(True)

We can notice that the execution plan is optimized in that customer dataframe (cDF) is broadcasted.

Let us check using smaller dataframe on rightside as well.

In [None]:
jDF2=sDF2.join(cDF2,sDF2.customer_id==cDF2.customer_id)

In [None]:
type(jDF2)

In [None]:
jDF2.explain()

In [None]:
jDF2.explain(True)

We can see that the execution plan is the same optimized one.

The lines below show how to mark a dataframe for broadcast in case required to be done manually.

In [None]:
jDF3=sDF2.join(cDF2.hint("broadcast"),sDF2.customer_id==cDF2.customer_id).explain(True)

In [None]:
jDF3=sDF2.join(cDF2.hint("broadcast"),sDF2.customer_id==cDF2.customer_id)

In [None]:
jDF3.printSchema()

In [None]:
jDF3.show(5)

In [None]:
# import pyspark.sql.functions

In [None]:
pyspark.sql.functions.broadcast(cDF2)

In [None]:
type(pyspark.sql.functions.broadcast(cDF2))

In [None]:
jDF4=sDF2.join(pyspark.sql.functions.broadcast(cDF2),sDF2.customer_id==cDF2.customer_id)
type(jDF4)

In [None]:
jDF4.explain(True)

In [None]:
jDF4.show(5)