In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Large DataFrame (Fact Table)
large_data = [
    (1, "Alice", "HR"),
    (2, "Bob", "IT"),
    (3, "Charlie", "FIN"),
    (4, "David", "IT"),
    (5, "Eve", "HR"),
    (6, "Frank", "FIN"),
    (7, "George", "HR"),
    (8, "Henry", "FIN"),
    (9, "Isla", "IT"),
    (10, "Julia", "HR")
]
large_df = spark.createDataFrame(large_data, ["emp_id", "name", "dept_id"])

# Small DataFrame (Lookup Table)
small_data = [
    ("HR", "Human Resources"),
    ("IT", "Information Technology"),
    ("FIN", "Finance & Accounts")
]
small_df = spark.createDataFrame(small_data, ["dept_id", "dept_name"])


In [0]:
base = "/Volumes/exp/join/"

large_path = base + "j"
small_path = base + "v"

large_df.write.mode("overwrite").option("header", True).csv(large_path)
small_df.write.mode("overwrite").option("header", True).csv(small_path)

print("Stored at:", large_path, "and", small_path)


Stored at: /Volumes/exp/join/j and /Volumes/exp/join/v


In [0]:
large_input = spark.read.option("header", True).option("inferSchema", True).csv(large_path)
small_input = spark.read.option("header", True).option("inferSchema", True).csv(small_path)

large_input.show()
small_input.show()


+------+-------+-------+
|emp_id|   name|dept_id|
+------+-------+-------+
|     9|   Isla|     IT|
|    10|  Julia|     HR|
|     4|  David|     IT|
|     5|    Eve|     HR|
|     3|Charlie|    FIN|
|     6|  Frank|    FIN|
|     7| George|     HR|
|     8|  Henry|    FIN|
|     1|  Alice|     HR|
|     2|    Bob|     IT|
+------+-------+-------+

+-------+--------------------+
|dept_id|           dept_name|
+-------+--------------------+
|     IT|Information Techn...|
|    FIN|  Finance & Accounts|
|     HR|     Human Resources|
+-------+--------------------+



In [0]:
default_join = large_input.join(small_input, "dept_id")
default_join.show()
default_join.explain(True)


+-------+------+-------+--------------------+
|dept_id|emp_id|   name|           dept_name|
+-------+------+-------+--------------------+
|     IT|     9|   Isla|Information Techn...|
|     HR|    10|  Julia|     Human Resources|
|     IT|     4|  David|Information Techn...|
|     HR|     5|    Eve|     Human Resources|
|    FIN|     3|Charlie|  Finance & Accounts|
|    FIN|     6|  Frank|  Finance & Accounts|
|     HR|     7| George|     Human Resources|
|    FIN|     8|  Henry|  Finance & Accounts|
|     HR|     1|  Alice|     Human Resources|
|     IT|     2|    Bob|Information Techn...|
+-------+------+-------+--------------------+

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [dept_id])
:- Relation [emp_id#12235,name#12236,dept_id#12237] csv
+- Relation [dept_id#12271,dept_name#12272] csv

== Analyzed Logical Plan ==
dept_id: string, emp_id: int, name: string, dept_name: string
Project [dept_id#12237, emp_id#12235, name#12236, dept_name#12272]
+- Join Inner, (dept_id#12237 = d

In [0]:
from pyspark.sql.functions import broadcast

broadcast_join = large_input.join(broadcast(small_input), "dept_id")
broadcast_join.show()
broadcast_join.explain(True)


+-------+------+-------+--------------------+
|dept_id|emp_id|   name|           dept_name|
+-------+------+-------+--------------------+
|     IT|     9|   Isla|Information Techn...|
|     HR|    10|  Julia|     Human Resources|
|     IT|     4|  David|Information Techn...|
|     HR|     5|    Eve|     Human Resources|
|    FIN|     3|Charlie|  Finance & Accounts|
|    FIN|     6|  Frank|  Finance & Accounts|
|     HR|     7| George|     Human Resources|
|    FIN|     8|  Henry|  Finance & Accounts|
|     HR|     1|  Alice|     Human Resources|
|     IT|     2|    Bob|Information Techn...|
+-------+------+-------+--------------------+

== Parsed Logical Plan ==
'Join UsingJoin(Inner, [dept_id])
:- Relation [emp_id#12235,name#12236,dept_id#12237] csv
+- UnresolvedHint broadcast
   +- Relation [dept_id#12271,dept_name#12272] csv

== Analyzed Logical Plan ==
dept_id: string, emp_id: int, name: string, dept_name: string
Project [dept_id#12237, emp_id#12235, name#12236, dept_name#12272]
+-

In [0]:
import time

start = time.time()
default_join.collect()
end = time.time()
print("Default Join Time:", round(end - start, 5), "sec")

start = time.time()
broadcast_join.collect()
end = time.time()
print("Broadcast Join Time:", round(end - start, 5), "sec")


Default Join Time: 0.78814 sec
Broadcast Join Time: 0.456 sec
