In [0]:
# Disabling spark default optimization behaviour
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

In [0]:
spark.conf.get("spark.sql.adaptive.enabled")

In [0]:
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10485760") # --> 10 MB

In [0]:
# Join Optimizations 

In [0]:
from pyspark.sql import functions as F

In [0]:
emp = [
    (1,"Smith",-1,"2018","10","M",3000),
    (2,"Rose",1,"2010","20","M",4000),
    (3,"Williams",1,"2010","10","M",1000),
    (4,"Jones",2,"2005","10","F",2000),
    (5,"Brown",2,"2010","40","",-1),
    (6,"Brown",2,"2010","50","",-1)
  ]
empColumns = ["emp_id","name","superior_emp_id","year_joined",
       "emp_dept_id","gender","salary"]

empDF = spark.createDataFrame(data=emp, schema = empColumns)

dept = [("Finance",10),
    ("Marketing",20),
    ("Sales",30),
    ("IT",40)
  ]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

In [0]:
display(empDF)

In [0]:
display(deptDF)

In [0]:
result_df = (
    empDF
    .join(
        deptDF,
        on=empDF.emp_dept_id == deptDF.dept_id,
        how="left"
    )
)
result_df.display()

In [0]:
broadcasted_df = (
    empDF
    .join(
        F.broadcast(deptDF),
        on=empDF.emp_dept_id == deptDF.dept_id,
        how="left"
    )
)
broadcasted_df.display()

In [0]:
auto_broadcast_df = (
    empDF
    .join(
        deptDF,
        on=empDF.emp_dept_id == deptDF.dept_id,
        how="left"
    )
)
auto_broadcast_df.display()

In [0]:
# cache | persist

In [0]:
auto_broadcast_df.cache().first()

In [0]:
auto_broadcast_df.filter()

auto_broadcast_df.groupBy()

auto_broadcast_df.withColumn()

In [0]:
auto_broadcast_df.persist(storageLevel="MEMORY_AND_DISK")