In [0]:
spark.sql("USE CATALOG hive_metastore")
spark.sql("USE default")

print("="*70)
print("ENVIRONMENT SETUP")
print("="*70)
print(f"Current Catalog: {spark.sql('SELECT current_catalog()').collect()[0][0]}")
print(f"Current Database: {spark.sql('SELECT current_database()').collect()[0][0]}")
print(f"Spark version: {spark.version}")
print("="*70)

ENVIRONMENT SETUP
Current Catalog: hive_metastore
Current Database: default
Spark version: 4.0.0


In [0]:
# ============================================
# CELL 1: DS FEATURE 1 - DATA PARTITIONING
# ============================================

print("="*70)
print("DS FEATURE 1: DATA PARTITIONING")
print("="*70)

print("\nPartitioning Strategy:")
print("  ├─ Partition Key: year, month")
print("  ├─ Number of Partitions: 12-24")
print("  └─ Benefit: Query optimization via partition pruning")

print("\nDemonstration:")

# Show partition structure
partitions = spark.sql("SHOW PARTITIONS taxi_trips")
print(f"\nTotal Partitions: {partitions.count()}")
print("\nSample Partitions:")
partitions.show(10, truncate=False)

# Query with partition pruning
print("\nPerformance Comparison:")
print("─" * 50)

import time

# Query WITHOUT partition filter (slow)
print("\n[1] Query without partition filter (scans all partitions):")
start = time.time()
result1 = spark.sql("""
    SELECT COUNT(*) 
    FROM taxi_trips 
    WHERE trip_distance > 10
""").collect()[0][0]
time1 = time.time() - start
print(f"  Result: {result1:,} rows")
print(f"  Time: {time1:.2f}s")
print(f"  Scanned: ALL partitions")

# Query WITH partition filter (fast)
print("\n[2] Query WITH partition filter (scans 1 partition):")
start = time.time()
result2 = spark.sql("""
    SELECT COUNT(*) 
    FROM taxi_trips 
    WHERE year=2024 AND month=1 AND trip_distance > 10
""").collect()[0][0]
time2 = time.time() - start
print(f"  Result: {result2:,} rows")
print(f"  Time: {time2:.2f}s")
print(f"  Scanned: 1 partition only")

if time1 > 0 and time2 > 0:
    speedup = time1 / time2
    print(f"\nSpeedup with partition pruning: {speedup:.2f}x faster")
    print(f"I/O reduction: {((time1 - time2) / time1 * 100):.1f}%")

print("\n" + "="*70)
print("KEY INSIGHT:")
print("Partitioning enables the system to skip irrelevant data,")
print("reading only the necessary partitions for query execution.")
print("="*70)

DS FEATURE 1: DATA PARTITIONING

Partitioning Strategy:
  ├─ Partition Key: year, month
  ├─ Number of Partitions: 12-24
  └─ Benefit: Query optimization via partition pruning

Demonstration:

Total Partitions: 24

Sample Partitions:
+----+-----+
|year|month|
+----+-----+
|2024|7    |
|2024|12   |
|2002|12   |
|2025|5    |
|2024|3    |
|2025|6    |
|2024|5    |
|2009|1    |
|2008|12   |
|2024|10   |
+----+-----+
only showing top 10 rows

Performance Comparison:
──────────────────────────────────────────────────

[1] Query without partition filter (scans all partitions):
  Result: 4,499,505 rows
  Time: 2.41s
  Scanned: ALL partitions

[2] Query WITH partition filter (scans 1 partition):
  Result: 216,277 rows
  Time: 0.47s
  Scanned: 1 partition only

Speedup with partition pruning: 5.11x faster
I/O reduction: 80.4%

KEY INSIGHT:
Partitioning enables the system to skip irrelevant data,
reading only the necessary partitions for query execution.


In [0]:
# ============================================
# DS FEATURE 2 - ACID TRANSACTIONS
# ============================================

print("\n" + "="*70)
print("DS FEATURE 2: ACID TRANSACTIONS (Delta Lake)")
print("="*70)

print("\nACID Properties:")
print("  ├─ Atomicity: All-or-nothing writes")
print("  ├─ Consistency: Schema enforcement")
print("  ├─ Isolation: Concurrent readers and writers")
print("  └─ Durability: Transaction log persistence")

print("\n[Demonstration 1: Time Travel]")

# Show history
history = spark.sql("DESCRIBE HISTORY taxi_trips LIMIT 5")
print("\nTable History (Recent 5 versions):")
display(history.select("version", "timestamp", "operation", "operationMetrics"))

print("\n[Demonstration 2: Read from Previous Version]")
# Query version 0 (original load)
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/taxi-data/delta/taxi_trips_clean")
print(f"Version 0 row count: {df_v0.count():,}")

# Query latest version
df_latest = spark.table("taxi_trips")
print(f"Latest version row count: {df_latest.count():,}")

print("\n[Demonstration 3: Schema Enforcement]")
print("\nAttempting to insert data with wrong schema:")

try:
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    
    # Create data with incompatible schema
    bad_schema = StructType([
        StructField("wrong_column", StringType(), True),
        StructField("another_wrong", IntegerType(), True)
    ])
    
    bad_data = spark.createDataFrame([("test", 123)], schema=bad_schema)
    
    # Try to write (this will fail)
    bad_data.write.format("delta").mode("append").save("/mnt/taxi-data/delta/taxi_trips_clean")
    
    print("This should not print!")
    
except Exception as e:
    print(f"EXPECTED ERROR: Schema mismatch prevented")
    print(f"Delta Lake enforced schema consistency")
    print(f"Error type: {type(e).__name__}")

print("\n[Demonstration 4: Transaction Log]")
print("\nDelta Lake maintains a transaction log for all operations:")

# Show transaction log files
transaction_log = dbutils.fs.ls("/mnt/taxi-data/delta/taxi_trips_clean/_delta_log/")
print(f"\nTransaction log files: {len(transaction_log)}")
for log_file in transaction_log[:5]:
    print(f"  {log_file.name}")

print("\n" + "="*70)
print("KEY INSIGHT:")
print("ACID transactions ensure data reliability and consistency,")
print("enabling safe concurrent operations and time travel queries.")
print("="*70)


DS FEATURE 2: ACID TRANSACTIONS (Delta Lake)

ACID Properties:
  ├─ Atomicity: All-or-nothing writes
  ├─ Consistency: Schema enforcement
  ├─ Isolation: Concurrent readers and writers
  └─ Durability: Transaction log persistence

[Demonstration 1: Time Travel]

Table History (Recent 5 versions):


version,timestamp,operation,operationMetrics
8,2025-12-02T21:30:24Z,OPTIMIZE,"Map(numRemovedFiles -> 61, numRemovedBytes -> 1509425486, p25FileSize -> 76466832, numDeletionVectorsRemoved -> 0, minFileSize -> 4349, numAddedFiles -> 20, maxFileSize -> 93564600, p75FileSize -> 88865064, p50FileSize -> 85191063, numAddedBytes -> 1509390317)"
7,2025-12-02T21:29:38Z,WRITE,"Map(numFiles -> 65, numRemovedFiles -> 24, numRemovedBytes -> 1509406534, numDeletionVectorsRemoved -> 0, numOutputRows -> 53106919, numOutputBytes -> 1509441703)"
6,2025-12-01T04:02:05Z,OPTIMIZE,"Map(numRemovedFiles -> 61, numRemovedBytes -> 1509425486, p25FileSize -> 76466832, numDeletionVectorsRemoved -> 0, minFileSize -> 4349, numAddedFiles -> 20, maxFileSize -> 93564600, p75FileSize -> 88865064, p50FileSize -> 85191063, numAddedBytes -> 1509390317)"
5,2025-12-01T04:01:13Z,WRITE,"Map(numFiles -> 65, numRemovedFiles -> 65, numRemovedBytes -> 1509631590, numDeletionVectorsRemoved -> 0, numOutputRows -> 53106919, numOutputBytes -> 1509441703)"
4,2025-12-01T03:46:28Z,WRITE,"Map(numFiles -> 65, numRemovedFiles -> 24, numRemovedBytes -> 1509048741, numDeletionVectorsRemoved -> 0, numOutputRows -> 53106919, numOutputBytes -> 1509631590)"



[Demonstration 2: Read from Previous Version]
Version 0 row count: 53,106,919
Latest version row count: 53,106,919

[Demonstration 3: Schema Enforcement]

Attempting to insert data with wrong schema:
EXPECTED ERROR: Schema mismatch prevented
Delta Lake enforced schema consistency
Error type: AnalysisException

[Demonstration 4: Transaction Log]

Delta Lake maintains a transaction log for all operations:

Transaction log files: 21
  00000000000000000000.crc
  00000000000000000000.json
  00000000000000000001.00000000000000000006.compacted.json
  00000000000000000001.crc
  00000000000000000001.json

KEY INSIGHT:
ACID transactions ensure data reliability and consistency,
enabling safe concurrent operations and time travel queries.


In [0]:
# ============================================
# DS FEATURE 3 - DISTRIBUTED ML TRAINING
# ============================================

print("\n" + "="*70)
print("DS FEATURE 3: DISTRIBUTED ML TRAINING")
print("="*70)

print("\nRandom Forest: Data Parallelism Strategy")
print("─" * 50)

print("""
Distributed Training Architecture:

┌─────────────────────────────────────────────┐
│              DRIVER NODE                     │
│  - Coordinates training                      │
│  - Aggregates results from workers           │
│  - Manages model assembly                    │
└──────────────────┬──────────────────────────┘
                   │
        ┌──────────┼──────────┐
        ▼          ▼          ▼
   ┌────────┐ ┌────────┐ ┌────────┐
   │Worker 1│ │Worker 2│ │Worker 3│
   │        │ │        │ │        │
   │Trees   │ │Trees   │ │Trees   │
   │ 1-33   │ │34-66   │ │67-100  │
   │        │ │        │ │        │
   │Data    │ │Data    │ │Data    │
   │Shard 1 │ │Shard 2 │ │Shard 3 │
   └────────┘ └────────┘ └────────┘

Key Features:
1. Each worker trains a subset of trees independently
2. Data is partitioned across workers (data parallelism)
3. No communication needed between workers during training
4. Linear scalability: 2x workers ≈ 2x speedup
""")

print("\n[Demonstration: Training Metrics from MLFlow]")

import mlflow
username = spark.sql("SELECT current_user()").collect()[0][0]
experiment_name = f"/Users/{username}/nyc-taxi-prediction"

# Get Random Forest run
runs = mlflow.search_runs(
    experiment_ids=[mlflow.get_experiment_by_name(experiment_name).experiment_id],
    filter_string="tags.mlflow.runName = '02_random_forest_v1'",
    max_results=1
)

if len(runs) > 0:
    rf_run = runs.iloc[0]
    
    print(f"\nRandom Forest Training Details:")
    print(f"  Trees: 50")
    print(f"  Max Depth: 7")
    print(f"  Training Time: {rf_run['metrics.training_time_seconds']:.2f}s")
    print(f"  RMSE: {rf_run['metrics.rmse']:.2f} minutes")
    print(f"  Distributed: Yes (Spark MLlib)")
    
    # Calculate per-tree training time
    num_trees = 50
    training_time = rf_run['metrics.training_time_seconds']
    time_per_tree = training_time / num_trees
    
    print(f"\n  Time per tree: {time_per_tree:.2f}s")

print("\n" + "="*70)
print("KEY INSIGHT:")
print("Distributed ML training leverages multiple workers to train")
print("model components (trees) in parallel, dramatically reducing")
print("training time while maintaining model quality.")
print("="*70)


DS FEATURE 3: DISTRIBUTED ML TRAINING

Random Forest: Data Parallelism Strategy
──────────────────────────────────────────────────

Distributed Training Architecture:

┌─────────────────────────────────────────────┐
│              DRIVER NODE                     │
│  - Coordinates training                      │
│  - Aggregates results from workers           │
│  - Manages model assembly                    │
└──────────────────┬──────────────────────────┘
                   │
        ┌──────────┼──────────┐
        ▼          ▼          ▼
   ┌────────┐ ┌────────┐ ┌────────┐
   │Worker 1│ │Worker 2│ │Worker 3│
   │        │ │        │ │        │
   │Trees   │ │Trees   │ │Trees   │
   │ 1-33   │ │34-66   │ │67-100  │
   │        │ │        │ │        │
   │Data    │ │Data    │ │Data    │
   │Shard 1 │ │Shard 2 │ │Shard 3 │
   └────────┘ └────────┘ └────────┘

Key Features:
1. Each worker trains a subset of trees independently
2. Data is partitioned across workers (data parallelism)
3. N

In [0]:
# ============================================
# DS FEATURE 4 - FAULT TOLERANCE
# ============================================

from pyspark.sql.functions import col, avg, count

print("\n" + "="*70)
print("DS FEATURE 4: FAULT TOLERANCE (RDD Lineage)")
print("="*70)

print("\nSpark Fault Tolerance Mechanism:")
print("─" * 50)

print("""
RDD Lineage: Automatic Recovery from Failures

Example Pipeline:
─────────────────────────────────────────────────
Step 1: Read Parquet     → RDD_1
Step 2: Filter           → RDD_2 (lineage: from RDD_1)
Step 3: Transform        → RDD_3 (lineage: from RDD_2)
Step 4: Aggregate        → RDD_4 (lineage: from RDD_3)

If Worker fails at Step 3:
├─ Spark detects partition loss
├─ Traces lineage back: RDD_3 ← RDD_2 ← RDD_1
├─ Recomputes lost partition from source
└─ Continues processing automatically

User sees: No error, seamless execution
System handles: Automatic recovery
""")

print("\n[Demonstration: Query Plan with Lineage]")

# Create a complex query to show lineage
df = spark.table("taxi_trips") \
    .filter(col("trip_distance") > 5) \
    .groupBy("hour_of_day") \
    .agg(
        avg("trip_duration_minutes").alias("avg_duration"),
        count("*").alias("trip_count")
    )

print("\nQuery Execution Plan (shows lineage):")
print("─" * 50)
df.explain(extended=False)

print("\n[Checkpointing Strategy]")
print("""
For long-running jobs, checkpointing truncates lineage:

Without checkpoint:
  RDD_100 ← RDD_99 ← ... ← RDD_1 (very long chain)
  
With checkpoint:
  RDD_100 ← ... ← RDD_50 (checkpoint) [truncated]
  
Benefit: Faster recovery, reduced recomputation
""")

print("\n" + "="*70)
print("KEY INSIGHT:")
print("Spark's RDD lineage enables automatic fault recovery")
print("without user intervention, ensuring reliable processing")
print("even when worker nodes fail during execution.")
print("="*70)


DS FEATURE 4: FAULT TOLERANCE (RDD Lineage)

Spark Fault Tolerance Mechanism:
──────────────────────────────────────────────────

RDD Lineage: Automatic Recovery from Failures

Example Pipeline:
─────────────────────────────────────────────────
Step 1: Read Parquet     → RDD_1
Step 2: Filter           → RDD_2 (lineage: from RDD_1)
Step 3: Transform        → RDD_3 (lineage: from RDD_2)
Step 4: Aggregate        → RDD_4 (lineage: from RDD_3)

If Worker fails at Step 3:
├─ Spark detects partition loss
├─ Traces lineage back: RDD_3 ← RDD_2 ← RDD_1
├─ Recomputes lost partition from source
└─ Continues processing automatically

User sees: No error, seamless execution
System handles: Automatic recovery


[Demonstration: Query Plan with Lineage]

Query Execution Plan (shows lineage):
──────────────────────────────────────────────────
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[hour_of_da

In [0]:
# ============================================
# DS FEATURE 5 - RESOURCE MANAGEMENT
# ============================================

print("\n" + "="*70)
print("DS FEATURE 5: RESOURCE MANAGEMENT")
print("="*70)

print("\nDynamic Resource Allocation:")
print("─" * 50)

# Get current cluster configuration
print("\nCurrent Cluster Configuration:")

print(f"  Driver: 1 node (coordinator)")
print(f"  Workers: 2-4 nodes (configurable)")
print(f"  Cores per node: 2")
print(f"  Memory per node: 8 GB")

print("\nTotal Resources:")
total_workers = 4  # Assuming scaled to 4
total_cores = total_workers * 2
total_memory = total_workers * 8

print(f"  Total Cores: {total_cores}")
print(f"  Total Memory: {total_memory} GB")
print(f"  Parallelism: {total_cores} concurrent tasks")

print("\n[Demonstration: Spark Configuration]")

# Show key Spark configurations
spark_configs = {
    # Using simpler descriptive strings for safer fallback
    "spark.executor.memory": spark.conf.get("spark.executor.memory", "Dynamically Managed"),
    "spark.executor.cores": spark.conf.get("spark.executor.cores", "Dynamically Managed"),
    "spark.driver.memory": spark.conf.get("spark.driver.memory", "Dynamically Managed"),
    
    # MUST be a numeric string to avoid NumberFormatException
    "spark.sql.shuffle.partitions": spark.conf.get("spark.sql.shuffle.partitions", "200"), 
    
    "spark.default.parallelism": spark.conf.get("spark.default.parallelism", "Inherited")
}

print("\nKey Spark Configurations:")
for config, value in spark_configs.items():
    display_value = f"{value} (Common Default)" if config == "spark.sql.shuffle.partitions" and value == "200" else value
    print(f"  {config}: {display_value}")

print("\n[Task Distribution Example]")
print("""
For a 12M row dataset with 4 workers:

Partitioning:
├─ Total partitions: 200 (default)
├─ Rows per partition: ~60,000
├─ Partitions per worker: ~50
└─ Concurrent tasks: Up to 8 (4 workers × 2 cores)

Task Scheduling:
  Time 0s:   Tasks 1-16 start (all cores busy)
  Time 10s:  Tasks 1-16 complete, tasks 17-32 start
  ...
  Time 120s: Tasks 185-200 complete
  
Result: Optimal resource utilization across cluster
""")

print("\n[Auto-scaling Strategy]")
print("""
Configured Auto-scaling:
├─ Minimum Workers: 2
├─ Maximum Workers: 4
├─ Scale-up trigger: High task queue
├─ Scale-down trigger: 30 min idle
└─ Benefit: Cost optimization + performance

Example:
  09:00 - Light load   → 2 workers (save cost)
  10:00 - Heavy query  → Scale to 4 workers (performance)
  11:00 - Job complete → Scale to 2 workers (cost)
""")

print("\n" + "="*70)
print("KEY INSIGHT:")
print("Dynamic resource management ensures optimal cluster")
print("utilization, balancing performance needs with cost")
print("efficiency through intelligent scaling.")
print("="*70)


DS FEATURE 5: RESOURCE MANAGEMENT

Dynamic Resource Allocation:
──────────────────────────────────────────────────

Current Cluster Configuration:
  Driver: 1 node (coordinator)
  Workers: 2-4 nodes (configurable)
  Cores per node: 2
  Memory per node: 8 GB

Total Resources:
  Total Cores: 8
  Total Memory: 32 GB
  Parallelism: 8 concurrent tasks

[Demonstration: Spark Configuration]

Key Spark Configurations:
  spark.executor.memory: 629m
  spark.executor.cores: Dynamically Managed
  spark.driver.memory: Dynamically Managed
  spark.sql.shuffle.partitions: 200 (Common Default)
  spark.default.parallelism: Inherited

[Task Distribution Example]

For a 12M row dataset with 4 workers:

Partitioning:
├─ Total partitions: 200 (default)
├─ Rows per partition: ~60,000
├─ Partitions per worker: ~50
└─ Concurrent tasks: Up to 8 (4 workers × 2 cores)

Task Scheduling:
  Time 0s:   Tasks 1-16 start (all cores busy)
  Time 10s:  Tasks 1-16 complete, tasks 17-32 start
  ...
  Time 120s: Tasks 185-