# PySpark Complete Roadmap - Training Companion

A comprehensive guide covering all essential PySpark concepts with hands-on examples.

## 1. RDD vs DataFrame vs Dataset

Understanding the core abstractions in Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("PySpark_Roadmap") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
# RDD - Resilient Distributed Dataset (Low-level API)
rdd = sc.parallelize([1, 2, 3, 4, 5])
squared_rdd = rdd.map(lambda x: x ** 2)
print("RDD Result:", squared_rdd.collect())

# DataFrame - Structured API with Schema
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
df.show()

# Dataset - Type-safe structured API (Not available in PySpark, only Scala/Java)

## 2. Core Transformations

In [None]:
from pyspark.sql.functions import col, when, lit, concat, upper, lower

# Sample data
employees = spark.createDataFrame([
    (1, "John", "Doe", "IT", 75000, "2020-01-15"),
    (2, "Jane", "Smith", "HR", 65000, "2019-03-22"),
    (3, "Bob", "Johnson", "IT", 80000, "2021-07-01"),
    (4, "Alice", "Williams", "Finance", 70000, None),
    (5, "Charlie", "Brown", "IT", 72000, "2020-11-30")
], ["id", "first_name", "last_name", "department", "salary", "hire_date"])

# Select transformation
print("Select columns:")
employees.select("first_name", "department", "salary").show()

# WithColumn transformation
print("\nAdd calculated column:")
employees_with_bonus = employees.withColumn(
    "bonus",
    when(col("department") == "IT", col("salary") * 0.15)
    .otherwise(col("salary") * 0.10)
)
employees_with_bonus.show()

# Multiple transformations
print("\nChained transformations:")
result = employees \
    .withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name"))) \
    .withColumn("department_upper", upper(col("department"))) \
    .filter(col("salary") > 70000) \
    .select("full_name", "department_upper", "salary")
result.show()

## 3. SQL Joins - All Types

In [None]:
# Create departments table
departments = spark.createDataFrame([
    ("IT", "Information Technology", "Building A"),
    ("HR", "Human Resources", "Building B"),
    ("Finance", "Financial Services", "Building C"),
    ("Marketing", "Marketing & Sales", "Building D")
], ["dept_code", "dept_name", "location"])

# Inner Join
print("Inner Join:")
inner_join = employees.join(
    departments,
    employees.department == departments.dept_code,
    "inner"
).select("first_name", "last_name", "dept_name", "location")
inner_join.show()

# Left Outer Join
print("\nLeft Outer Join:")
left_join = employees.join(
    departments,
    employees.department == departments.dept_code,
    "left"
).select("first_name", "department", "dept_name")
left_join.show()

# Right Outer Join
print("\nRight Outer Join:")
right_join = employees.join(
    departments,
    employees.department == departments.dept_code,
    "right"
).select("first_name", "dept_code", "dept_name")
right_join.show()

# Full Outer Join
print("\nFull Outer Join:")
full_join = employees.join(
    departments,
    employees.department == departments.dept_code,
    "outer"
).select("first_name", "department", "dept_code", "dept_name")
full_join.show()

# Cross Join
print("\nCross Join (Cartesian Product):")
small_df1 = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
small_df2 = spark.createDataFrame([("X",), ("Y",)], ["letter"])
cross_join = small_df1.crossJoin(small_df2)
cross_join.show()

## 4. Handling Missing Data - fillna, dropna, cast

In [None]:
from pyspark.sql.types import IntegerType, DoubleType, StringType
from datetime import datetime

# Create DataFrame with null values
null_data = spark.createDataFrame([
    (1, "John", 25, 50000.0, "2021-01-15"),
    (2, None, 30, 60000.0, "2021-02-20"),
    (3, "Jane", None, 55000.0, None),
    (4, "Bob", 35, None, "2021-04-10"),
    (5, "Alice", 28, 52000.0, None)
], ["id", "name", "age", "salary", "join_date"])

print("Original DataFrame with nulls:")
null_data.show()

# dropna - Drop rows with null values
print("\nDrop rows with any null:")
null_data.dropna().show()

print("\nDrop rows with null in specific columns:")
null_data.dropna(subset=["name", "age"]).show()

# fillna - Fill null values
print("\nFill all nulls with defaults:")
filled_df = null_data.fillna({
    "name": "Unknown",
    "age": 0,
    "salary": 50000.0,
    "join_date": "2021-01-01"
})
filled_df.show()

# Cast - Type conversion
print("\nType casting examples:")
casted_df = null_data \
    .withColumn("age_double", col("age").cast(DoubleType())) \
    .withColumn("salary_int", col("salary").cast(IntegerType())) \
    .withColumn("id_string", col("id").cast(StringType()))

casted_df.select("id", "id_string", "age", "age_double", "salary", "salary_int").show()
casted_df.printSchema()

## 5. UDF & Pandas UDF (Vectorized UDF)

In [None]:
from pyspark.sql.functions import udf, pandas_udf
from pyspark.sql.types import StringType, IntegerType, FloatType
import pandas as pd

# Basic UDF (Row-by-row processing)
def categorize_age(age):
    if age < 25:
        return "Young"
    elif age < 35:
        return "Middle"
    else:
        return "Senior"

# Register basic UDF
categorize_age_udf = udf(categorize_age, StringType())

# Apply basic UDF
print("Basic UDF Example:")
employees_categorized = employees.withColumn(
    "age_category",
    categorize_age_udf(col("salary") / 2500)  # Dummy age calculation
)
employees_categorized.select("first_name", "salary", "age_category").show()

# Pandas UDF (Vectorized - Much faster!)
@pandas_udf(returnType=StringType())
def categorize_salary_pandas(salary: pd.Series) -> pd.Series:
    return salary.apply(
        lambda x: "High" if x > 75000 else "Medium" if x > 65000 else "Low"
    )

# Apply Pandas UDF
print("\nPandas UDF Example:")
employees_salary_cat = employees.withColumn(
    "salary_category",
    categorize_salary_pandas(col("salary"))
)
employees_salary_cat.select("first_name", "salary", "salary_category").show()

# Grouped Map Pandas UDF
@pandas_udf("id long, first_name string, salary double, avg_dept_salary double", "grouped_map")
def normalize_salaries(pdf):
    avg_salary = pdf['salary'].mean()
    pdf['avg_dept_salary'] = avg_salary
    return pdf[['id', 'first_name', 'salary', 'avg_dept_salary']]

print("\nGrouped Map Pandas UDF:")
normalized = employees.groupby("department").apply(normalize_salaries)
normalized.show()

## 6. Repartition vs Coalesce

In [None]:
# Create a larger dataset
large_data = spark.range(0, 1000000).withColumn("value", col("id") * 2)

print(f"Initial partitions: {large_data.rdd.getNumPartitions()}")

# Repartition - Full shuffle, can increase or decrease partitions
repartitioned = large_data.repartition(10)
print(f"After repartition(10): {repartitioned.rdd.getNumPartitions()}")

# Repartition by column - Useful for downstream operations
employees_repartitioned = employees.repartition("department")
print(f"\nEmployees partitioned by department: {employees_repartitioned.rdd.getNumPartitions()}")

# Coalesce - No shuffle, only decrease partitions
coalesced = repartitioned.coalesce(5)
print(f"After coalesce(5): {coalesced.rdd.getNumPartitions()}")

# Performance comparison
import time

# Repartition timing
start = time.time()
large_data.repartition(20).count()
repartition_time = time.time() - start

# Coalesce timing
start = time.time()
large_data.coalesce(2).count()
coalesce_time = time.time() - start

print(f"\nRepartition time: {repartition_time:.2f}s")
print(f"Coalesce time: {coalesce_time:.2f}s")
print(f"Coalesce is {repartition_time/coalesce_time:.2f}x faster (no shuffle!)")

## 7. Explain Plans & Optimization

In [None]:
# Create sample data for join analysis
large_table = spark.range(0, 1000000).withColumn("value", col("id") * 10)
small_table = spark.range(0, 100).withColumn("multiplier", col("id") * 2)

# Regular join
regular_join = large_table.join(small_table, large_table.id == small_table.id)

print("Regular Join Explain Plan:")
regular_join.explain(True)

print("\n" + "="*50 + "\n")

# Broadcast join
from pyspark.sql.functions import broadcast

broadcast_join = large_table.join(broadcast(small_table), large_table.id == small_table.id)

print("Broadcast Join Explain Plan:")
broadcast_join.explain(True)

# Analyzing shuffle stages
complex_query = employees \
    .groupBy("department") \
    .agg({"salary": "avg"}) \
    .join(departments, col("department") == col("dept_code")) \
    .orderBy("avg(salary)", ascending=False)

print("\n" + "="*50 + "\n")
print("Complex Query with Multiple Stages:")
complex_query.explain(True)

## 8. Broadcast Join Optimization

In [None]:
# Configure broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

# Small lookup table
lookup_table = spark.createDataFrame([
    ("IT", 1.15),
    ("HR", 1.10),
    ("Finance", 1.12),
    ("Marketing", 1.08)
], ["dept", "bonus_multiplier"])

# Method 1: Automatic broadcast (if table is small enough)
auto_broadcast = employees.join(
    lookup_table,
    employees.department == lookup_table.dept
)

print("Automatic Broadcast Join:")
auto_broadcast.explain()

# Method 2: Force broadcast hint
forced_broadcast = employees.join(
    broadcast(lookup_table),
    employees.department == lookup_table.dept
).withColumn(
    "bonus_amount",
    col("salary") * col("bonus_multiplier")
)

print("\nForced Broadcast Join with Calculation:")
forced_broadcast.select("first_name", "salary", "bonus_multiplier", "bonus_amount").show()

# Performance comparison
print("\nPerformance Metrics:")
print("Regular join stages:", regular_join.rdd.getNumPartitions())
print("Broadcast join stages:", broadcast_join.rdd.getNumPartitions())

## 9. File I/O - Parquet, ORC, Avro, JSON, CSV

In [None]:
import os
import shutil

# Create output directory
output_dir = "./spark_output"
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir)

# Sample data for file operations
file_data = employees_with_bonus.select("id", "first_name", "last_name", "department", "salary", "bonus")

# 1. Parquet (Columnar, compressed, efficient)
parquet_path = f"{output_dir}/employees.parquet"
file_data.write.mode("overwrite").parquet(parquet_path)
print("Parquet file written")

# Read Parquet
parquet_df = spark.read.parquet(parquet_path)
print("\nParquet Schema:")
parquet_df.printSchema()

# 2. ORC (Optimized Row Columnar)
orc_path = f"{output_dir}/employees.orc"
file_data.write.mode("overwrite").orc(orc_path)
orc_df = spark.read.orc(orc_path)
print("\nORC file written and read")

# 3. JSON
json_path = f"{output_dir}/employees.json"
file_data.write.mode("overwrite").json(json_path)
json_df = spark.read.json(json_path)
print("\nJSON file written and read")

# 4. CSV with options
csv_path = f"{output_dir}/employees.csv"
file_data.write \
    .mode("overwrite") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .csv(csv_path)

csv_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_path)
print("\nCSV file written and read")

# 5. Avro (requires spark-avro package)
# avro_path = f"{output_dir}/employees.avro"
# file_data.write.mode("overwrite").format("avro").save(avro_path)

# Compare file sizes
print("\nFile Size Comparison:")
for format_name, path in [("Parquet", parquet_path), ("ORC", orc_path), 
                          ("JSON", json_path), ("CSV", csv_path)]:
    size = sum(os.path.getsize(os.path.join(path, f)) 
               for f in os.listdir(path) if f.endswith(format_name.lower()))
    print(f"{format_name}: {size:,} bytes")

## 10. JDBC Connector - Database Integration

In [None]:
# JDBC connection properties
jdbc_url = "jdbc:postgresql://localhost:5432/testdb"
connection_properties = {
    "user": "username",
    "password": "password",
    "driver": "org.postgresql.Driver"
}

# Example read from database (commented out - requires actual DB)
"""
# Read entire table
df_from_db = spark.read \
    .jdbc(url=jdbc_url, 
          table="employees",
          properties=connection_properties)

# Read with custom query
query = "(SELECT * FROM employees WHERE salary > 50000) AS high_earners"
df_custom = spark.read \
    .jdbc(url=jdbc_url,
          table=query,
          properties=connection_properties)

# Parallel read with partitioning
df_parallel = spark.read \
    .jdbc(url=jdbc_url,
          table="large_table",
          column="id",
          lowerBound=1,
          upperBound=1000000,
          numPartitions=10,
          properties=connection_properties)

# Write to database
employees_with_bonus.write \
    .mode("append") \
    .jdbc(url=jdbc_url,
          table="employee_bonuses",
          properties=connection_properties)
"""

# Simulated JDBC operations example
print("JDBC Configuration Example:")
print(f"URL: {jdbc_url}")
print(f"Properties: {connection_properties}")
print("\nFor parallel reads, use:")
print("- column: partition column (should be numeric)")
print("- lowerBound/upperBound: range of partition column")
print("- numPartitions: number of parallel connections")

## 11. Advanced Optimization Techniques

In [None]:
# Cache and persist
from pyspark import StorageLevel

# Cache in memory
cached_df = employees_with_bonus.cache()
print("DataFrame cached in memory")

# Persist with different storage levels
persisted_df = employees_with_bonus.persist(StorageLevel.MEMORY_AND_DISK)
print("DataFrame persisted to memory and disk")

# Unpersist when done
cached_df.unpersist()
persisted_df.unpersist()

# Adaptive Query Execution (AQE)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# Data skew handling example
skewed_data = spark.createDataFrame([
    ("A", i) for i in range(1000)
] + [
    ("B", i) for i in range(10)
], ["key", "value"])

# This join would have skew issues
skewed_join = skewed_data.alias("left").join(
    skewed_data.alias("right"),
    col("left.key") == col("right.key")
)

print("\nAdaptive Query Execution will handle skew automatically")
print(f"Current AQE setting: {spark.conf.get('spark.sql.adaptive.enabled')}")

# Partition pruning with partitioned tables
partitioned_path = f"{output_dir}/partitioned_employees"
employees_with_bonus.write \
    .mode("overwrite") \
    .partitionBy("department") \
    .parquet(partitioned_path)

# Reading with partition filter (much faster!)
it_employees = spark.read.parquet(partitioned_path).filter(col("department") == "IT")
print("\nPartition pruning will only read IT department files")

## 12. Window Functions

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lead, lag, sum as spark_sum

# Window specifications
windowSpec = Window.partitionBy("department").orderBy(col("salary").desc())
windowSpecRange = Window.partitionBy("department").orderBy("salary").rangeBetween(-10000, 10000)

# Apply window functions
windowed_df = employees \
    .withColumn("row_num", row_number().over(windowSpec)) \
    .withColumn("rank", rank().over(windowSpec)) \
    .withColumn("dense_rank", dense_rank().over(windowSpec)) \
    .withColumn("next_salary", lead("salary", 1).over(windowSpec)) \
    .withColumn("prev_salary", lag("salary", 1).over(windowSpec)) \
    .withColumn("salary_range_sum", spark_sum("salary").over(windowSpecRange))

print("Window Functions Example:")
windowed_df.select(
    "first_name", "department", "salary", 
    "row_num", "rank", "dense_rank",
    "prev_salary", "next_salary"
).show()

# Running totals
runningTotalWindow = Window.partitionBy("department").orderBy("id").rowsBetween(
    Window.unboundedPreceding, Window.currentRow
)

running_totals = employees \
    .withColumn("running_total", spark_sum("salary").over(runningTotalWindow))

print("\nRunning Totals:")
running_totals.select("first_name", "department", "salary", "running_total").show()

## Cleanup

In [None]:
# Stop Spark session
spark.stop()

# Clean up output directory
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
    print("Cleaned up output directory")