In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, broadcast
import time

# Initialize Spark Session with Optimized Configurations
spark = SparkSession.builder \
    .appName("Spark Optimization") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

print("✅ Spark Session Initialized with Optimized Configurations")

In [None]:
# Load Sample Large Dataset
df_large = spark.read.parquet("hdfs:///datasets/large_table.parquet")
df_small = spark.read.parquet("hdfs:///datasets/small_lookup.parquet")

# Broadcast Join Optimization
start_time = time.time()
df_joined = df_large.join(broadcast(df_small), "user_id")
df_joined.count()
print(f"✅ Broadcast Join Execution Time: {time.time() - start_time:.2f} seconds")

In [None]:
# Optimize Partitioning Strategy
df_repartitioned = df_large.repartition(100)  # Increase parallelism
df_coalesced = df_large.coalesce(10)          # Reduce shuffle cost

print(f"✅ Repartitioned: {df_repartitioned.rdd.getNumPartitions()} partitions")
print(f"✅ Coalesced: {df_coalesced.rdd.getNumPartitions()} partitions")

In [None]:
# Caching and Materializing Data
df_large.cache()
df_large.count()  # Trigger caching
print("✅ Data Cached in Memory")

In [None]:
# Monitoring Execution Plan
df_large.explain(True)

In [None]:
# Stop Spark Session
spark.stop()