In [0]:
from pyspark.sql.functions import *

**Turn off AQE**

In [0]:
spark.conf.set('spark.sql.adaptive.enabled', False)

In [0]:
#Cross validate if it is deisabled
spark.conf.get('spark.sql.adaptive.enabled')

'false'

**Create Smaple dataframes**

In [0]:
# Big dataframe
df_transactions = spark.createDataFrame([
    (1, 'US',100),
    (2, 'IN', 200),
    (3, 'UK', 150),
    (4, 'US', 50),
    (5, 'IN', 50)
], ['id','country_code','amount'])

# Small_table
df_country = spark.createDataFrame([
    ('US', 'United States'),
    ('IN', 'India'),
    ('UK', 'United Kingdom')
], ['country_code','country_name'])

In [0]:
df_transactions.display()
df_country.display()

id,country_code,amount
1,US,100
2,IN,200
3,UK,150
4,US,50
5,IN,50


country_code,country_name
US,United States
IN,India
UK,United Kingdom


**Applying join before optimization**

In [0]:
df_enriched = df_transactions.join(df_country, on='country_code', how='left')
df_enriched.display()

country_code,id,amount,country_name
US,1,100,United States
US,4,50,United States
IN,2,200,India
IN,5,50,India
UK,3,150,United Kingdom


![](/dbfs/FileStore/images/sortMergeJoin.png)

Even if both the dataframes are small Sort merge join is opted by default.

**Performing broadcast join**

In [0]:
df_join_opt = df_transactions.join(broadcast(df_country), on = 'country_code', how='left')
df_join_opt.display()

country_code,id,amount,country_name
US,1,100,United States
IN,2,200,India
UK,3,150,United Kingdom
US,4,50,United States
IN,5,50,India


**SQL hints**

In [0]:
df_transactions.createOrReplaceTempView('transactions')
df_country.createOrReplaceTempView('country')

In [0]:
df = spark.sql('''
               SELECT *
                FROM transactions t
                JOIN country c
                ON t.country_code = c.country_code
            ''')

In [0]:
df.display()

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
4,US,50,US,United States
2,IN,200,IN,India
5,IN,50,IN,India
3,UK,150,UK,United Kingdom


This again performed sort Merge join

SQL hint

In [0]:
df = spark.sql('''
               SELECT * /* BROADCAST(c) */
                FROM transactions t
                JOIN country c
                ON t.country_code = c.country_code
            ''')

In [0]:
display(df)

id,country_code,amount,country_code.1,country_name
1,US,100,US,United States
4,US,50,US,United States
2,IN,200,IN,India
5,IN,50,IN,India
3,UK,150,UK,United Kingdom
