# Adaptive Query Execution

Adaptive query execution (AQE) is query re-optimization that occurs during query execution.Here you will be looking at:
- Performance with the adaptive query based on:
    - Reducing Post Shuffle Partitions
    - Dynamically Switching Join Strategies
    - Optimizing Skew Joins

In [None]:
#Starting the session
print("Welcome to my EMR Notebook!")

In [None]:
#building spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark").getOrCreate()

In [None]:
#import a file as dataframe to perform analysis
spark_df = spark.read.options(inferSchema='True',header='True').csv("s3://myawsbucketbuckettt/report.csv")
spark_df

In [None]:
#let us now create a tempview to run sql queries on top of the dataframe 
spark_df.createOrReplaceTempView("industry_table")

# Checking performance when the adaptive query is off vs. on



In order to enable set spark.sql.adaptive.enabled configuration property to 'true'. Let us compare the query performance in Spark with respect to adaptive queries enabled and disabled.

In [None]:
#turn adaptive query execution to 'false' to analyse the difference
spark.conf.set("spark.sql.adaptive.enabled","false")

In [None]:
#running sql query adaptive query execution to 'false'
temp_df = spark.sql("select * FROM industry_table ORDER BY Year DESC")

In [None]:
temp_df.show()

In [None]:
#turn adaptive query execution to 'true' to analyse the difference
spark.conf.set("spark.sql.adaptive.enabled","true")

In [None]:
#running sql query after adaptive query execution to 'true' and show the execution steps/time differnce in terms of partition.
temp_df = spark.sql("select * FROM industry_table ORDER BY Year DESC")

In [None]:
temp_df.show()

# Property 1 : Reducing Post Shuffle Partitions

Explain the difference how With Spark 3.0; after every stage of the job, Spark dynamically determines the optimal number of partitions by looking at the metrics of the completed stage.


In [None]:
#Setting adaptive.coalescePartitions to 'false'
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","false")

In [None]:
#running a query and getting number of partitions for job when adaptive query and adaptive.coalescePartitions to 'false'
temp_df = spark.sql("select Year,first(Industry_aggregation_NZSIOC) FROM industry_table group by Year")
print(temp_df.rdd.getNumPartitions())

In [None]:
#Setting adaptive.coalescePartitions to 'true'
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled","true")

In [None]:
#running a query and getting number of partitions for job when adaptive query and adaptive.coalescePartitions to 'true'. 
#Compare the number of partitions and explain the DAG difference.

temp_df = spark.sql("select Year,first(Industry_aggregation_NZSIOC) FROM industry_table group by Year")
print(temp_df.rdd.getNumPartitions())

# Property 2 : Dynamically Switching Join Strategies

Spark supports several join strategies, and BroadcastHash Join is usually the most performant. With AQE, the size of the composite operation is calculated accurately. And then, Spark now can replan the join strategy. We will be importing two tables and performing a join.

In [None]:
#importing ranking table as dataframe
ranking_df = spark.read.options(inferSchema='True',header='True').csv("s3://myawsbucketbuckettt/ranking.csv")
ranking_df

In [None]:
#importing events table as dataframe
events_df = spark.read.options(inferSchema='True',header='True').csv("s3://myawsbucketbuckettt/events.csv")
events_df

In [None]:
#let us now create a tempview to run sql queries on top of the ranking and events dataframe 
ranking_df.createOrReplaceTempView("ranking_table")
events_df.createOrReplaceTempView("events_table")

In [None]:
# enabling the adaptive query
spark.conf.set("spark.sql.adaptive.enabled","true")

In [None]:
#performing the joins with adaptive query handling broadcast joins
joined_df = spark.sql("select ranking_table.person_Id, ranking_table.event_Id,events_table.name from ranking_table JOIN events_table ON ranking_table.event_Id = events_table.event_Id")
joined_df.show()

In the spark UI, one can see the DAG flow and how to broadcast join handles the smaller dataset to enable efficient joins. Let us now turn off the broadcast join and analyze the join.

In [None]:
#turning off broadcast join
spark.conf.set("spark.sql.adaptive.enabled","false")
sqlContext.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")

In [None]:
#performing the joins
joined_df = spark.sql("select ranking_table.person_Id, ranking_table.event_Id,events_table.name from ranking_table JOIN events_table ON ranking_table.event_Id = events_table.event_Id")
joined_df.show()

# Property 3 : Optimizing Skew Join

Skew joins comes into play when the data is unevenly distributed, and one of the partitions is taking a much longer processing time than the others. Let us now analyze the performance of the skewed dataset we have imported above. Some of the event_id in the rankings dataset is more compared to other event_id.

In [None]:
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","false")

In [None]:
#performing the joins alongwith group by statement with skewJoin disabled.
opt_joined_df = spark.sql("select ranking_table.person_Id, ranking_table.event_Id,events_table.name from ranking_table JOIN events_table ON ranking_table.event_Id = events_table.event_Id")
opt_joined_df.show()

In [None]:
spark.conf.set("spark.sql.adaptive.enabled","true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled","true")

In [None]:
#performing the joins alongwith group by statement with skewJoin enabled.
opt_joined_df = spark.sql("select ranking_table.person_Id, ranking_table.event_Id,events_table.name from ranking_table JOIN events_table ON ranking_table.event_Id = events_table.event_Id")
opt_joined_df.show()