# Spark & Databricks - Practice Notebook


In [None]:
# Complete hands-on examples for interview preparation

# =============================================================================
# SECTION 1: ENVIRONMENT SETUP
# =============================================================================

# Install required packages (run once)
# !pip install pyspark findspark delta-spark

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
import random

# Create Spark Session with Delta Lake support

spark = SparkSession.builder   
.appName(“Spark Practice”)   
.config(“spark.sql.extensions”, “io.delta.sql.DeltaSparkSessionExtension”)   
.config(“spark.sql.catalog.spark_catalog”, “org.apache.spark.sql.delta.catalog.DeltaCatalog”)   
.config(“spark.sql.adaptive.enabled”, “true”)   
.master(“local[*]”)   
.getOrCreate()

print(f”Spark version: {spark.version}”)
print(f”Spark UI: http://localhost:4040”)

In [None]:
# =============================================================================
# SECTION 2: SPARK FUNDAMENTALS
# =============================================================================

# Create sample data

data = [
    (1, “Alice”, 28, “US”, 50000),
    (2, “Bob”, 35, “UK”, 60000),
    (3, “Carol”, 32, “US”, 55000),
    (4, “David”, 45, “DE”, 70000),
    (5, “Eve”, 29, “UK”, 52000)
]

schema = StructType([
StructField(“id”, IntegerType(), False),
StructField(“name”, StringType(), False),
StructField(“age”, IntegerType(), False),
StructField(“country”, StringType(), False),
StructField(“salary”, IntegerType(), False)
])

df = spark.createDataFrame(data, schema)
df.show()

# Basic operations

print(”=== Basic Operations ===”)
df.select(“name”, “salary”).show()
df.filter(col(“age”) > 30).show()
df.groupBy(“country”).agg(avg(“salary”).alias(“avg_salary”)).show()

# Check partitions

print(f”Number of partitions: {df.rdd.getNumPartitions()}”)

# Repartition vs Coalesce

df_repartitioned = df.repartition(10)
print(f”After repartition: {df_repartitioned.rdd.getNumPartitions()}”)

df_coalesced = df_repartitioned.coalesce(2)
print(f”After coalesce: {df_coalesced.rdd.getNumPartitions()}”)

In [None]:
# =============================================================================
# SECTION 3: DELTA LAKE OPERATIONS
# =============================================================================

print(”\n=== Delta Lake Operations ===”)

# Create Delta table

delta_path = “/tmp/delta/users”
df.write.format(“delta”).mode(“overwrite”).save(delta_path)

# Read Delta table

delta_df = spark.read.format(“delta”).load(delta_path)
delta_df.show()

# Time Travel - Version based

df.write.format(“delta”).mode(“append”).save(delta_path)
version_0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(delta_path)
print(“Version 0 (original 5 rows):”)
version_0.show()

# MERGE (Upsert)

updates = spark.createDataFrame([
(1, “Alice”, 29, “US”, 52000),  # Update
(6, “Frank”, 40, “FR”, 65000)   # Insert
], schema)

deltaTable = DeltaTable.forPath(spark, delta_path)
deltaTable.alias(“target”).merge(
updates.alias(“source”),
“target.id = source.id”
).whenMatchedUpdateAll()   
.whenNotMatchedInsertAll()   
.execute()

print(“After MERGE:”)
spark.read.format(“delta”).load(delta_path).show()

# OPTIMIZE

spark.sql(f”OPTIMIZE delta.`{delta_path}`”)

# DESCRIBE HISTORY

spark.sql(f”DESCRIBE HISTORY delta.`{delta_path}`”).show()


In [None]:
# =============================================================================
# SECTION 4: DATA SKEW & SALTING
# =============================================================================

print(”\n=== Data Skew & Salting ===”)

# Create skewed dataset

skewed_data = []

# Bot user with 1000 events

for i in range(1000):
skewed_data.append((“bot”, f”event_{i}”, random.randint(1, 100)))

# Normal users with 5 events each

for user in [“alice”, “bob”, “carol”, “david”]:
for i in range(5):
skewed_data.append((user, f”event_{i}”, random.randint(1, 100)))

skewed_df = spark.createDataFrame(
skewed_data,
[“user_id”, “event”, “value”]
)

# Check distribution

print(“Key distribution (shows skew):”)
skewed_df.groupBy(“user_id”).count().orderBy(desc(“count”)).show()

# Apply salting

salt_range = 10
salted_df = skewed_df.withColumn(
“salted_key”,
when(col(“user_id”) == “bot”,
concat(col(“user_id”), lit(”_”), floor(rand() * salt_range).cast(“int”)))
.otherwise(col(“user_id”))
)

print(“After salting (bot split into 10 keys):”)
salted_df.groupBy(“salted_key”).count().orderBy(desc(“count”)).show()


In [None]:
# =============================================================================
# SECTION 5: BROADCAST JOIN
# =============================================================================

print(”\n=== Broadcast Join ===”)

# Large table

large_data = [(i, f”event_{i}”, random.choice([1, 2, 3, 4, 5]))
for i in range(10000)]
large_df = spark.createDataFrame(large_data, [“event_id”, “event_name”, “user_id”])

# Small table

small_data = [(1, “Alice”), (2, “Bob”), (3, “Carol”), (4, “David”), (5, “Eve”)]
small_df = spark.createDataFrame(small_data, [“user_id”, “user_name”])

# Regular join (both sides shuffle)

regular_join = large_df.join(small_df, “user_id”)

# Broadcast join (only large table processed)

broadcast_join = large_df.join(broadcast(small_df), “user_id”)

print(“Check query plans to see difference:”)
print(”\nRegular join plan:”)
regular_join.explain()
print(”\nBroadcast join plan (no Exchange on small table):”)
broadcast_join.explain()

In [None]:
# =============================================================================
# SECTION 6: Z-ORDER OPTIMIZATION
# =============================================================================

print(”\n=== Z-ORDER Optimization ===”)

# Create larger dataset for Z-ORDER demo

zorder_data = []
for i in range(10000):
zorder_data.append((
random.randint(1, 100),  # user_id
random.choice([“2024-01-01”, “2024-01-02”, “2024-01-03”]),  # date
random.choice([“US”, “UK”, “DE”, “FR”]),  # country
random.randint(1, 1000)  # amount
))

zorder_df = spark.createDataFrame(
zorder_data,
[“user_id”, “date”, “country”, “amount”]
)

# Write as Delta

zorder_path = “/tmp/delta/events”
zorder_df.write.format(“delta”).mode(“overwrite”).save(zorder_path)

# Check file count before optimization

print(“Before OPTIMIZE:”)
spark.sql(f”DESCRIBE DETAIL delta.`{zorder_path}`”).select(“numFiles”).show()

# OPTIMIZE with Z-ORDER

spark.sql(f”OPTIMIZE delta.`{zorder_path}` ZORDER BY (user_id, date)”)

print(“After OPTIMIZE + Z-ORDER:”)
spark.sql(f”DESCRIBE DETAIL delta.`{zorder_path}`”).select(“numFiles”).show()

# Query benefits from Z-ORDER (check in Spark UI)

result = spark.read.format(“delta”).load(zorder_path)   
.filter((col(“user_id”) == 42) & (col(“date”) == “2024-01-02”))   
.count()
print(f”Query result (data skipping enabled): {result}”)


In [None]:

# =============================================================================
# SECTION 7: DATA QUALITY CHECKS
# =============================================================================

print(”\n=== Data Quality Checks ===”)

class DataQualityChecker:
def **init**(self, df):
self.df = df
self.results = []

```
def expect_column_values_not_null(self, column, threshold=1.0):
    null_count = self.df.filter(col(column).isNull()).count()
    total = self.df.count()
    null_pct = (null_count / total) * 100
    passed = null_pct <= (100 - threshold * 100)
    
    self.results.append({
        'check': 'not_null',
        'column': column,
        'passed': passed,
        'null_percentage': null_pct
    })
    return self

def expect_column_values_in_set(self, column, valid_values):
    invalid_count = self.df.filter(~col(column).isin(valid_values)).count()
    passed = invalid_count == 0
    
    self.results.append({
        'check': 'in_set',
        'column': column,
        'passed': passed,
        'invalid_count': invalid_count
    })
    return self

def expect_column_values_between(self, column, min_val, max_val):
    out_of_range = self.df.filter(
        (col(column) < min_val) | (col(column) > max_val)
    ).count()
    passed = out_of_range == 0
    
    self.results.append({
        'check': 'between',
        'column': column,
        'passed': passed,
        'out_of_range_count': out_of_range
    })
    return self

def get_results(self):
    return self.results

def raise_on_failure(self):
    failed = [r for r in self.results if not r['passed']]
    if failed:
        raise ValueError(f"Quality checks failed: {failed}")
    return self
```

# Test data with quality issues

test_data = [
(1, “Alice”, 28, “US”),
(2, “Bob”, 35, “UK”),
(3, None, 32, “US”),      # Null name
(4, “David”, 150, “XX”),  # Invalid age and country
(5, “Eve”, 29, “UK”)
]

test_df = spark.createDataFrame(
test_data,
[“id”, “name”, “age”, “country”]
)

# Run quality checks

checker = DataQualityChecker(test_df)
checker   
.expect_column_values_not_null(“name”, threshold=1.0)   
.expect_column_values_in_set(“country”, [“US”, “UK”, “DE”, “FR”])   
.expect_column_values_between(“age”, 0, 120)

print(“Quality check results:”)
for result in checker.get_results():
status = “✓ PASS” if result[‘passed’] else “✗ FAIL”
print(f”{status}: {result[‘check’]} on {result[‘column’]}”)

# Quarantine pattern

def validate_with_quarantine(df):
valid_condition = (
col(“name”).isNotNull() &
col(“age”).between(0, 120) &
col(“country”).isin([“US”, “UK”, “DE”, “FR”])
)

```
valid_df = df.filter(valid_condition)
invalid_df = df.filter(~valid_condition).withColumn(
    "rejection_reason", lit("Quality check failed")
)

return valid_df, invalid_df
```

valid_data, invalid_data = validate_with_quarantine(test_df)

print(”\nValid records:”)
valid_data.show()

print(“Quarantined records:”)
invalid_data.show()

In [None]:
# =============================================================================
# SECTION 8: PERFORMANCE DIAGNOSTICS
# =============================================================================

print(”\n=== Performance Diagnostics ===”)

# Create large dataset for performance testing

perf_data = []
for i in range(100000):
perf_data.append((
random.randint(1, 1000),
f”user_{random.randint(1, 10000)}”,
random.randint(1, 100)
))

perf_df = spark.createDataFrame(perf_data, [“id”, “user”, “value”])

# Check partition distribution

print(“Partition distribution:”)
perf_df.groupBy(spark_partition_id()).count().show()

# Analyze query plan

print(”\nQuery plan (look for shuffles - ‘Exchange’):”)
perf_df.groupBy(“user”).agg(sum(“value”)).explain()

# Cache frequently accessed data

print(”\nCaching demo:”)
cached_df = perf_df.cache()
cached_df.count()  # Materialize cache
print(“Data cached - subsequent operations will be faster”)


In [None]:
# =============================================================================
# SECTION 9: MEDALLION ARCHITECTURE EXAMPLE
# =============================================================================

print(”\n=== Medallion Architecture ===”)

# Bronze: Raw data

bronze_data = [
{“user_id”: “1”, “event”: “click”, “timestamp”: “2024-01-01T10:00:00”, “value”: “100”},
{“user_id”: “2”, “event”: “purchase”, “timestamp”: “2024-01-01T11:00:00”, “value”: “200”},
{“user_id”: None, “event”: “click”, “timestamp”: None, “value”: “50”},  # Bad data
{“user_id”: “3”, “event”: “view”, “timestamp”: “2024-01-01T12:00:00”, “value”: “invalid”}
]

bronze_df = spark.createDataFrame(bronze_data)
bronze_df.write.format(“delta”).mode(“overwrite”).save(”/tmp/delta/bronze_events”)
print(“Bronze layer (raw data with issues):”)
bronze_df.show()

# Silver: Cleaned data

silver_df = spark.read.format(“delta”).load(”/tmp/delta/bronze_events”)   
.filter(col(“user_id”).isNotNull())   
.filter(col(“timestamp”).isNotNull())   
.withColumn(“user_id”, col(“user_id”).cast(“int”))   
.withColumn(“value”, col(“value”).cast(“int”))   
.filter(col(“value”).isNotNull())   
.withColumn(“date”, to_date(col(“timestamp”)))

silver_df.write.format(“delta”).mode(“overwrite”).save(”/tmp/delta/silver_events”)
print(“Silver layer (cleaned and typed):”)
silver_df.show()

# Gold: Business aggregations

gold_df = spark.read.format(“delta”).load(”/tmp/delta/silver_events”)   
.groupBy(“date”, “event”)   
.agg(
count(”*”).alias(“event_count”),
sum(“value”).alias(“total_value”)
)

gold_df.write.format(“delta”).mode(“overwrite”).save(”/tmp/delta/gold_daily_metrics”)
print(“Gold layer (business metrics):”)
gold_df.show()

In [None]:
# =============================================================================
# SECTION 10: EXECUTOR CONFIGURATION
# =============================================================================

print(”\n=== Executor Configuration ===”)

# Check current configuration

print(“Current Spark configuration:”)
conf = spark.sparkContext.getConf().getAll()
for key, value in conf:
if ‘executor’ in key.lower() or ‘driver’ in key.lower():
print(f”{key}: {value}”)

# Example: Reconfigure (for demonstration)

# In production, set these when creating SparkSession

spark.conf.set(“spark.sql.shuffle.partitions”, “100”)  # Default is 200
print(”\nShuffle partitions set to 100 (reduces overhead for small data)”)


In [None]:
# =============================================================================
# SECTION 11: STREAMING EXAMPLE (Conceptual)
# =============================================================================

print(”\n=== Streaming Example (Conceptual) ===”)

# Note: This is a batch simulation of streaming

# In production, use readStream with Kafka, Delta, etc.

print(”””
Streaming pattern in production:

stream = spark.readStream \
.format(“delta”) \
.load(”/bronze/events”)

query = stream.writeStream \
.format(“delta”) \
.outputMode(“append”) \
.option(“checkpointLocation”, “/checkpoints/stream”) \
.start(”/silver/events”)

Key concepts:

- readStream: Continuous data ingestion
- writeStream: Continuous output
- checkpointLocation: Fault tolerance
- outputMode: append/complete/update
  “””)

In [None]:
# =============================================================================
# SECTION 12: CLEANUP
# =============================================================================

print(”\n=== Cleanup ===”)

# Uncomment to clean up Delta tables

# import shutil

# shutil.rmtree(”/tmp/delta”, ignore_errors=True)

# print(“Delta tables cleaned up”)

# Stop Spark session

# spark.stop()

# print(“Spark session stopped”)

print(”\n=== Practice Complete! ===”)
print(“Next steps:”)
print(“1. Review Spark UI at http://localhost:4040”)
print(“2. Experiment with different configurations”)
print(“3. Try on larger datasets”)
print(“4. Practice on Databricks Community Edition”)
print(“5. Review the README for interview questions”)