# Lab 8: Joins with Spark and DataFrames

We will now look at Joins with Spark DataFrames. 
First, as always we create a SparkContext and from it a HiveContext, using the "demo" database.

In [1]:
# Set up Spark Context
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *

SparkContext.setSystemProperty('spark.executor.memory', '2g')
conf = SparkConf()
conf.set('spark.executor.instances', 15)
conf.set('spark.sql.autoBroadcastJoinThreshold', 200*1024*1024)  # 200MB for broadcast join
sc = SparkContext('yarn-client', 'Spark-lab8', conf=conf)

from pyspark.sql import HiveContext
hc = HiveContext(sc)
hc.sql("use demo")

DataFrame[result: string]

Filter the weather dataset to include only data from 2013 and the San Francisco weather station (USW00023272).
Then, create three dataframes:
1. Daily percipitation (PRCP)
2. Daily lowest temparature (TMIN)
3. Daily highest temparature (TMAX)

You might want to consider renaming the "value" column in each subset to reflect the type of value it represents. For example, in the prcp table you may rename value to prcp_val

In [2]:
weather = hc.sql("select * from weather WHERE year==2013 and station == 'USW00023272'").cache()
prcp = weather.filter(weather.metric=='PRCP').withColumnRenamed('value', 'prcp_val')
tmin = weather.filter(weather.metric=='TMIN').withColumnRenamed('value', 'tmin_val')
tmax = weather.filter(weather.metric=='TMAX').withColumnRenamed('value', 'tmax_val')

Now join all the metric-specific dataframes into a new dataframe with the following fields: year, month, day, prcp, tmin, tmax. 

Print the first 5 rows of this merged dataset.

In [3]:
wdata = prcp.join(tmin, 'date_str').join(tmax, 'date_str') \
            .select(prcp.year.alias('year'), prcp.month.alias('month'), prcp.day.alias('day'), \
                    prcp.prcp_val.alias('prcp'), tmin.tmin_val.alias('tmin'), tmax.tmax_val.alias('tmax'))
wdata.cache()
wdata.limit(5).toPandas()

Unnamed: 0,year,month,day,prcp,tmin,tmax
0,2013,2,12,0,72,156
1,2013,8,25,0,156,206
2,2013,11,3,0,100,161
3,2013,6,8,0,111,167
4,2013,2,26,0,78,167


join the resulting dataframe (wdata) with the crimes dataframe, using the join key: month and day. Then print the explain() and notice how Spark uses a broadcast join.

In [4]:
crimes = hc.sql("""
SELECT *, 
       cast(substr(date_str,1,2) as int) as month, 
       cast(substr(date_str,4,2) as int) as day, 
       cast(substr(date_str,7,4) as int) as year 
FROM crimes 
WHERE substr(date_str,7,4)=='2013'
""").cache()

jdata = wdata.join(crimes, (wdata.month==crimes.month) & (wdata.day==crimes.day))  # Parens are necessary in expr
print jdata.explain()

BroadcastHashJoin [month#70,day#71], [month#240,day#241], BuildLeft
 ConvertToUnsafe
  InMemoryColumnarTableScan [year#69,month#70,day#71,prcp#72,tmin#73,tmax#74], (InMemoryRelation [year#69,month#70,day#71,prcp#72,tmin#73,tmax#74], true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject [year#11 AS year#69,month#12 AS month#70,day#13 AS day#71,prcp_val#52 AS prcp#72,tmin_val#53 AS tmin#73,tmax_val#54 AS tmax#74]), None)
 ConvertToUnsafe
  InMemoryColumnarTableScan [incidentid#243,category#244,description#245,dayofweek#246,date_str#247,time#248,district#249,resolution#250,address#251,longitude#252,latitude#253,location#254,pdid#255,month#240,day#241,year#242], (InMemoryRelation [incidentid#243,category#244,description#245,dayofweek#246,date_str#247,time#248,district#249,resolution#250,address#251,longitude#252,latitude#253,location#254,pdid#255,month#240,day#241,year#242], true, 10000, StorageLevel(true, true, false, true, 1), (Project [incidentid#243,category#244,descr

Now measure the time it takes to execute the join by printing the first 5 rows in the resuling dataframe:

In [5]:
%%timeit -n1 -r1

jdata.limit(5).toPandas()

1 loops, best of 1: 19.4 s per loop


Setup a new SparkContext with Broadcast join disabled:

     conf.set('spark.sql.autoBroadcastJoinThreshold', -1)  # Disable broadcast join

Remember to use sc.stop() to close the original spark context and discard its hold on cluster resources.

In [6]:
sc.stop()

SparkContext.setSystemProperty('spark.executor.memory', '4g')
conf = SparkConf()
conf.set('spark.executor.instances', 8)
conf.set('spark.sql.autoBroadcastJoinThreshold', -1)  # Disable broadcast join
sc = SparkContext('yarn-client', 'lab8', conf=conf)
hc = HiveContext(sc)
hc.sql("use demo")

DataFrame[result: string]

Run the complete code sequence again using this new Spark Context (with broadcast join disabled), and use explain() to verify Spark is now using a ShuffledHashJoin instead of Broadcast join.

In [7]:
weather = hc.sql("select * from weather WHERE year==2013 and station == 'USW00023272'").cache()
prcp = weather.filter(weather.metric=='PRCP').withColumnRenamed('value', 'prcp_val')
tmin = weather.filter(weather.metric=='TMIN').withColumnRenamed('value', 'tmin_val')
tmax = weather.filter(weather.metric=='TMAX').withColumnRenamed('value', 'tmax_val')
wdata = prcp.join(tmin, 'date_str').join(tmax, 'date_str') \
            .select(prcp.year.alias('year'), prcp.month.alias('month'), prcp.day.alias('day'), \
                    prcp.prcp_val.alias('prcp'), tmin.tmin_val.alias('tmin'), tmax.tmax_val.alias('tmax'))
crimes = hc.sql("""
SELECT *, 
       cast(substr(date_str,1,2) as int) as month, 
       cast(substr(date_str,4,2) as int) as day, 
       cast(substr(date_str,7,4) as int) as year 
FROM crimes 
WHERE substr(date_str,7,4)=='2013'
""")

jdata = wdata.join(crimes, (wdata.month==crimes.month) & (wdata.day==crimes.day))
print jdata.explain()

SortMergeJoin [month#626,day#627], [month#631,day#632]
 TungstenSort [month#626 ASC,day#627 ASC], false, 0
  TungstenExchange hashpartitioning(month#626,day#627)
   TungstenProject [year#567 AS year#625,month#568 AS month#626,day#569 AS day#627,prcp_val#608 AS prcp#628,tmin_val#609 AS tmin#629,tmax_val#610 AS tmax#630]
    SortMergeJoin [date_str#570], [date_str#622]
     TungstenProject [year#567,month#568,tmin_val#609,prcp_val#608,day#569,date_str#570]
      SortMergeJoin [date_str#570], [date_str#615]
       TungstenSort [date_str#570 ASC], false, 0
        TungstenExchange hashpartitioning(date_str#570)
         ConvertToUnsafe
          Project [year#567,month#568,value#572 AS prcp_val#608,day#569,date_str#570]
           Filter (metric#571 = PRCP)
            InMemoryColumnarTableScan [year#567,value#572,month#568,day#569,metric#571,date_str#570], [(metric#571 = PRCP)], (InMemoryRelation [station#566,year#567,month#568,day#569,date_str#570,metric#571,value#572], true, 10000, Stor

Measure the time (using "%%timeit") it takes to execute without a broadcast join and compare to the previous result

In [8]:
%%timeit -n1 -r1

jdata.limit(5).toPandas()

1 loops, best of 1: 4min 23s per loop
