# **HANDS-ON**

In order to be able to run Spark for these examples it is necessary to use pyspark. 

In order to install pyspark run "pip install pyspark -v"

In [None]:
import pyspark
import pyspark.sql
from pyspark.sql import SparkSession

**BROADCAST HASH JOIN**

In this first example it will be possible to observe how the catalyst chooses a Broadcast Hash Join strategy

In [None]:
threshold= 1024*1024*10

spark= (SparkSession\
    .builder\
    .appName("AQEOff")\
    .master("local[*]")\
    .config("spark.sql.adaptive.enabled","false")\
    .config("spark.eventLog.enabled", "false")\
    .config("spark.sql.autoBroadcastJoinThreshold", threshold)\
    .getOrCreate())

Let's go take a look at the UI:

localhost:4040

Given that no instuctions have been given to Spark, there is nothing to see yet.

Let's try to perform a join between tables

In [None]:
dfA=spark.read.parquet("./Data/bigTableA")
dfK=spark.read.parquet("./Data/keys")

In [None]:
dfResult_=dfA.join(dfK, dfA.LAVORO_A==dfK.LAVORO_KEYS, "left")
print(dfResult_.explain())

In [None]:
dfResult_.count()

In [None]:
dfResult_=dfResult_.filter(dfResult_.LAVORO_A == "A")

In [None]:
dfResult_.count()

**Sort Merge Join**

In [None]:
dfB=spark.read.parquet("./Data/bigTableB")
dfAux=dfB\
    .join(dfK, dfB.LAVORO_B==dfK.LAVORO_KEYS, "left")\
    .filter("FILLER1==FILLER2 and FILLER2==FILLER3")\
    .repartition(200)
dfResult_ =dfA.join(dfAux, dfA.LAVORO_A==dfAux.LAVORO_B, "left")

print(dfResult_.explain())
dfResult_.count()

In [None]:
spark.sparkContext.stop()
spark.stop()

# **Dynamic Optimization**

Let's take a look at how Spark dynamically switches join strategies

In [None]:
threshold= 1024*1024*10

spark= (SparkSession\
    .builder\
    .appName("AQEOn")\
    .master("local[*]")\
    .config("spark.sql.adaptive.enabled","true")\
    .config("spark.sql.adaptive.coalescePartitions.enabled","false")\
    .config( "spark.sql.adaptive.skewJoin.enabled","false")\
    .config("spark.sql.autoBroadcastJoinThreshold", threshold)\
    .config("spark.eventLog.enabled", "false")\
    .getOrCreate())

dfA=spark.read.parquet("./Data/bigTableA")
dfB =spark.read.parquet("./Data/bigTableB")
dfK=spark.read.parquet("./Data/keys")
dfAux=dfB\
    .join(dfK, dfB.LAVORO_B==dfK.LAVORO_KEYS, "left")\
    .filter("FILLER1==FILLER2 and FILLER2==FILLER3")\
    .repartition(200)
dfResult_ =dfA.join(dfAux, dfA.LAVORO_A==dfAux.LAVORO_B, "left")

print(dfResult_.explain())
dfResult_.count()


What happens if the repartition is removed?

In [None]:
dfAux=dfB\
    .join(dfK, dfB.LAVORO_B==dfK.LAVORO_KEYS, "left")\
    .filter("FILLER1==FILLER2 and FILLER2==FILLER3")
dfResult_ =dfA.join(dfAux, dfA.LAVORO_A==dfAux.LAVORO_B, "left")

print(dfResult_.explain())
dfResult_.count()

In [None]:
spark.sparkContext.stop()
spark.stop()

**NOTE**

Remember to shut down the spark context