In [1]:
import time
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("TimingDemo").getOrCreate()

rdd = spark.sparkContext.parallelize(range(1, 10_000_000))

# Expensive operation: square and sum
def timed_run(label, rdd_action):
    start = time.time()
    result = rdd_action()
    duration = time.time() - start
    print(f"{label}: {duration:.3f} sec")
    return result, duration

# No cache
timed_run("No cache", lambda: rdd.map(lambda x: x * x).sum())

# Cache it and re-run
squared = rdd.map(lambda x: x * x).cache()
squared.count()  # trigger caching

timed_run("Cached run", lambda: squared.sum())

No cache: 2.275 sec
Cached run: 0.489 sec


(333333283333335000000, 0.489229679107666)

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import time

spark = SparkSession.builder.appName("DataFrameTiming").getOrCreate()

# Large dataset (simulated)
df_large = spark.range(0, 5_000_000).withColumnRenamed("id", "num")

# Small lookup table
df_small = spark.createDataFrame([(i, i * 10) for i in range(5_000)], ["num", "value"])

#Normal join (no broadcast)
start = time.time()
df_join = df_large.join(df_small, "num", "inner").count()
print(f"Normal join: {time.time() - start:.3f} sec")

# Broadcast join
start = time.time()
df_broadcast = df_large.join(broadcast(df_small), "num", "inner").count()
print(f"Broadcast join: {time.time() - start:.3f} sec")

Normal join: 4.653 sec
Broadcast join: 0.847 sec
