# BDA Assignment ‚Äî Relational (TPC‚ÄëH, RDD‚Äëonly) + Streaming

> Author : Badr TAJINI - Big Data Analytics - ESIEE 2025-2026

**Chapter 7 :** Analyzing Relational Data (TPC‚ÄëH subset)  
**Chapter 8 :** Real‚ÄëTime Analytics (NYC Taxi)

**Tools :** Spark or PySpark.   
**Advice:** Keep evidence and reproducibility.

## 0. Bootstrap

In [1]:
# write some code here
# - create SparkSession('BDA-Assignment-Relational-Streaming') with UTC timezone
# - print Spark/PySpark/Python versions
# - set spark.sql.shuffle.partitions for local runs
# ============================================================
# Lab 4 Assignment - Bootstrap
# Big Data Analytics - ESIEE 2025-2026
# Author: Badr TAJINI
# ============================================================

print("=" * 60)
print("Lab 4 Assignment - Relational & Streaming Analytics")
print("Big Data Analytics - ESIEE 2025-2026")
print("=" * 60)

# ========== IMPORTS ==========

from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark
import sys
import platform
from datetime import datetime, timezone

print("\n" + "=" * 60)
print("1. System & Environment Information")
print("=" * 60)

# System info
print(f"\nüìä System Information:")
print(f"  Platform    : {platform.platform()}")
print(f"  Python      : {sys.version.split()[0]}")
print(f"  Python Path : {sys.executable}")

# ========== CREATE SPARK SESSION ==========

print("\n" + "=" * 60)
print("2. Creating Spark Session")
print("=" * 60)

# Spark configuration optimized for local runs
conf = SparkConf()
conf.set("spark.sql.session.timeZone", "UTC")  # ‚úÖ UTC timezone
conf.set("spark.sql.shuffle.partitions", "8")  # ‚úÖ Optimized for local (vs default 200)
conf.set("spark.driver.memory", "4g")
conf.set("spark.executor.memory", "4g")
conf.set("spark.default.parallelism", "8")
conf.set("spark.sql.adaptive.enabled", "true")  # Adaptive Query Execution
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# Create session
spark = SparkSession.builder \
    .appName("BDA-Assignment-Relational-Streaming") \
    .master("local[*]") \
    .config(conf=conf) \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("WARN")  # Reduce noise in logs

print("\n‚úÖ Spark Session Created Successfully!")

# ========== PRINT VERSIONS ==========

print("\n" + "=" * 60)
print("3. Spark & Library Versions")
print("=" * 60)

print(f"\nüì¶ Versions:")
print(f"  Spark Version   : {spark.version}")
print(f"  PySpark Version : {pyspark.__version__}")
print(f"  Python Version  : {platform.python_version()}")
print(f"  Scala Version   : {spark.sparkContext._jvm.scala.util.Properties.versionString()}")
print(f"  Java Version    : {spark.sparkContext._jvm.System.getProperty('java.version')}")

# ========== PRINT KEY CONFIGS ==========

print("\n" + "=" * 60)
print("4. Key Spark Configurations")
print("=" * 60)

key_configs = [
    "spark.sql.session.timeZone",
    "spark.sql.shuffle.partitions",
    "spark.driver.memory",
    "spark.executor.memory",
    "spark.default.parallelism",
    "spark.sql.adaptive.enabled",
    "spark.sql.adaptive.coalescePartitions.enabled"
]

print("\n‚öôÔ∏è Active Configurations:")
for config in key_configs:
    value = spark.conf.get(config, "Not Set")
    print(f"  {config:<50} : {value}")

# ========== PRINT SESSION INFO ==========

print("\n" + "=" * 60)
print("5. Session Information")
print("=" * 60)

print(f"\nüåê Session Details:")
print(f"  Application Name : {spark.sparkContext.appName}")
print(f"  Master           : {spark.sparkContext.master}")
print(f"  Spark UI         : http://localhost:4040")
print(f"  Default Parallelism : {spark.sparkContext.defaultParallelism}")

# ========== TIMESTAMP ==========

print("\n" + "=" * 60)
print("6. Execution Timestamp")
print("=" * 60)

utc_now = datetime.now(timezone.utc)
print(f"\n‚è∞ Session Started (UTC):")
print(f"  {utc_now.strftime('%Y-%m-%d %H:%M:%S UTC')}")
print(f"  ISO Format: {utc_now.isoformat()}")

# ========== VERIFICATION TEST ==========

print("\n" + "=" * 60)
print("7. Spark Verification Test")
print("=" * 60)

# Quick test to verify Spark is working
test_data = [(1, "Alice", 100), (2, "Bob", 200), (3, "Charlie", 150)]
test_df = spark.createDataFrame(test_data, ["id", "name", "value"])

print("\n‚úÖ Test DataFrame Created:")
test_df.show(5, truncate=False)

print("\nüìä DataFrame Schema:")
test_df.printSchema()

print("\nüî¢ Row Count:", test_df.count())

# Clean up test
del test_df

# ========== READY ==========

print("\n" + "=" * 60)
print("‚úÖ BOOTSTRAP COMPLETE - READY FOR LAB 4")
print("=" * 60)

print("\nüìã Next Steps:")
print("  1. Load datasets into data/")
print("  2. Follow Lab 4 assignment instructions")
print("  3. Part A: Relational Analytics (TPC-H style)")
print("  4. Part B: Streaming Analytics (micro-batch)")
print("  5. Capture Spark UI screenshots for each part")
print("  6. Save execution plans in proof/")

print("\nüéØ Spark UI Access: http://localhost:4040")
print("=" * 60)


Lab 4 Assignment - Relational & Streaming Analytics
Big Data Analytics - ESIEE 2025-2026

1. System & Environment Information

üìä System Information:
  Platform    : Linux-6.6.87.2-microsoft-standard-WSL2-x86_64-with-glibc2.39
  Python      : 3.10.19
  Python Path : /home/phams/miniconda3/envs/bda-env/bin/python

2. Creating Spark Session


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/07 08:53:25 WARN Utils: Your hostname, LAPTOP-ED8D06VN, resolves to a loopback address: 127.0.1.1; using 172.19.238.66 instead (on interface eth0)
25/12/07 08:53:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 08:53:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



‚úÖ Spark Session Created Successfully!

3. Spark & Library Versions

üì¶ Versions:
  Spark Version   : 4.0.1
  PySpark Version : 4.0.1
  Python Version  : 3.10.19
  Scala Version   : version 2.13.16
  Java Version    : 21.0.6

4. Key Spark Configurations

‚öôÔ∏è Active Configurations:
  spark.sql.session.timeZone                         : UTC
  spark.sql.shuffle.partitions                       : 8
  spark.driver.memory                                : 4g
  spark.executor.memory                              : 4g
  spark.default.parallelism                          : 8
  spark.sql.adaptive.enabled                         : true
  spark.sql.adaptive.coalescePartitions.enabled      : true

5. Session Information

üåê Session Details:
  Application Name : BDA-Assignment-Relational-Streaming
  Master           : local[*]
  Spark UI         : http://localhost:4040
  Default Parallelism : 8

6. Execution Timestamp

‚è∞ Session Started (UTC):
  2025-12-07 07:53:35 UTC
  ISO Format: 2025-12

                                                                                

+---+-------+-----+
|id |name   |value|
+---+-------+-----+
|1  |Alice  |100  |
|2  |Bob    |200  |
|3  |Charlie|150  |
+---+-------+-----+


üìä DataFrame Schema:
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- value: long (nullable = true)



[Stage 3:>                                                          (0 + 8) / 8]


üî¢ Row Count: 3

‚úÖ BOOTSTRAP COMPLETE - READY FOR LAB 4

üìã Next Steps:
  1. Load datasets into data/
  2. Follow Lab 4 assignment instructions
  3. Part A: Relational Analytics (TPC-H style)
  4. Part B: Streaming Analytics (micro-batch)
  5. Capture Spark UI screenshots for each part
  6. Save execution plans in proof/

üéØ Spark UI Access: http://localhost:4040


                                                                                

## 1. Data Layout & Quick Checks

In [5]:
# write some code here
# - assert paths for:
#   data/tpch/TPC-H-0.1-TXT/  and  data/tpch/TPC-H-0.1-PARQUET/
#   data/taxi-data/
# - small sanity reads: count lines/files; print sample records
# ============================================================
# Section 1 - Dataset Extraction & Verification
# ============================================================

print("\n" + "=" * 60)
print("Section 1 ‚Äî Dataset Extraction & Verification")
print("=" * 60)

import os
import tarfile
from datetime import datetime, timezone

# ========== EXTRACT ARCHIVES IF NEEDED ==========

print("\nüì¶ Checking for compressed archives...")

# Define paths
DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
TAXI_DIR = os.path.join(DATA_DIR, "taxi-data")

# Create directories if needed
os.makedirs(TPCH_DIR, exist_ok=True)
os.makedirs(TAXI_DIR, exist_ok=True)

# Archives to extract
archives = {
    "TPC-H TXT": {
        "archive": os.path.join(DATA_DIR, "TPC-H-0.1-TXT.tar.gz"),
        "target": os.path.join(TPCH_DIR, "TPC-H-0.1-TXT"),
        "extract_to": TPCH_DIR
    },
    "TPC-H Parquet": {
        "archive": os.path.join(DATA_DIR, "TPC-H-0.1-PARQUET.tar.gz"),
        "target": os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET"),
        "extract_to": TPCH_DIR
    },
    "NYC Taxi": {
        "archive": os.path.join(DATA_DIR, "taxi-data.tar.gz"),
        "target": TAXI_DIR,
        "extract_to": DATA_DIR
    }
}

print("\nüîç Extraction Status:\n")

for name, paths in archives.items():
    archive_path = paths["archive"]
    target_path = paths["target"]
    extract_to = paths["extract_to"]
    
    # Check if already extracted
    if os.path.exists(target_path):
        print(f"  ‚úÖ {name:<20} : Already extracted at {target_path}")
    elif os.path.exists(archive_path):
        print(f"  üì¶ {name:<20} : Extracting {archive_path}...")
        
        try:
            with tarfile.open(archive_path, 'r:gz') as tar:
                tar.extractall(path=extract_to)
            print(f"     ‚úÖ Extracted to {target_path}")
        except Exception as e:
            print(f"     ‚ùå Extraction failed: {e}")
    else:
        print(f"  ‚ùå {name:<20} : Archive not found at {archive_path}")

# ========== PATH VERIFICATION ==========

print("\n" + "=" * 60)
print("Dataset Path Verification")
print("=" * 60)

# Expected paths after extraction
TPCH_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
TPCH_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
TAXI_DATA_PATH = TAXI_DIR

paths_to_check = {
    "TPC-H TXT": TPCH_TXT_PATH,
    "TPC-H Parquet": TPCH_PARQUET_PATH,
    "NYC Taxi": TAXI_DATA_PATH
}

all_paths_valid = True

print("\nüìÇ Verifying dataset paths:\n")

for name, path in paths_to_check.items():
    if os.path.exists(path):
        # Count files
        if os.path.isdir(path):
            num_files = len([f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))])
            print(f"  ‚úÖ {name:<20} : {path} ({num_files} files)")
        else:
            print(f"  ‚úÖ {name:<20} : {path}")
    else:
        print(f"  ‚ùå {name:<20} : {path} (MISSING)")
        all_paths_valid = False

if not all_paths_valid:
    raise FileNotFoundError(
        "‚ùå Some datasets are still missing! Check if the .tar.gz archives "
        "are in the data/ directory and try re-running this cell."
    )

print("\n‚úÖ All dataset paths verified successfully!")

# ========== SANITY CHECKS - PART A (TPC-H TXT) ==========

print("\n" + "=" * 60)
print("Part A ‚Äî TPC-H TXT Dataset Sanity Checks")
print("=" * 60)

TPCH_TABLES = [
    "lineitem.tbl",
    "orders.tbl",
    "part.tbl",
    "supplier.tbl",
    "partsupp.tbl",
    "customer.tbl",
    "nation.tbl",
    "region.tbl"
]

print(f"\nüìä TPC-H TXT Files:")
print(f"  Location: {TPCH_TXT_PATH}\n")

for table in TPCH_TABLES:
    txt_file = os.path.join(TPCH_TXT_PATH, table)
    
    if os.path.exists(txt_file):
        # Count lines
        with open(txt_file, 'r', encoding='utf-8') as f:
            num_lines = sum(1 for _ in f)
        
        # File size
        file_size_mb = os.path.getsize(txt_file) / (1024 * 1024)
        
        print(f"  ‚úÖ {table:<20} : {num_lines:>8,} lines | {file_size_mb:>6.2f} MB")
    else:
        print(f"  ‚ùå {table:<20} : MISSING")

# Sample from lineitem.tbl
print("\nüìã Sample Records from lineitem.tbl (first 3 lines):")
lineitem_path = os.path.join(TPCH_TXT_PATH, "lineitem.tbl")

if os.path.exists(lineitem_path):
    with open(lineitem_path, 'r', encoding='utf-8') as f:
        for i, line in enumerate(f):
            if i >= 3:
                break
            # Parse pipe-delimited
            clean_line = line.rstrip('|\n')
            fields = clean_line.split('|')
            
            print(f"\n  Line {i+1}:")
            print(f"    l_orderkey   : {fields[0]}")
            print(f"    l_partkey    : {fields[1]}")
            print(f"    l_suppkey    : {fields[2]}")
            print(f"    l_quantity   : {fields[4]}")
            print(f"    l_shipdate   : {fields[10]}")
            print(f"    (Total fields: {len(fields)})")

# ========== SANITY CHECKS - PART A (TPC-H PARQUET) ==========

print("\n" + "=" * 60)
print("Part A ‚Äî TPC-H Parquet Dataset Sanity Checks")
print("=" * 60)

PARQUET_TABLES = [
    "lineitem",
    "orders",
    "part",
    "supplier",
    "partsupp",
    "customer",
    "nation",
    "region"
]

print(f"\nüìä TPC-H Parquet Tables:")
print(f"  Location: {TPCH_PARQUET_PATH}\n")

for table in PARQUET_TABLES:
    parquet_dir = os.path.join(TPCH_PARQUET_PATH, table)
    
    if os.path.exists(parquet_dir) and os.path.isdir(parquet_dir):
        # Count Parquet files
        parquet_files = [f for f in os.listdir(parquet_dir) if f.endswith('.parquet')]
        num_files = len(parquet_files)
        
        # Total size
        total_size = sum(
            os.path.getsize(os.path.join(parquet_dir, f)) 
            for f in parquet_files
        )
        total_size_mb = total_size / (1024 * 1024)
        
        print(f"  ‚úÖ {table:<20} : {num_files:>3} files | {total_size_mb:>6.2f} MB")
    else:
        print(f"  ‚ùå {table:<20} : MISSING or not a directory")

# Quick Parquet read test
print("\nüîç Quick Parquet Read Test (lineitem):")

lineitem_parquet = os.path.join(TPCH_PARQUET_PATH, "lineitem")

if os.path.exists(lineitem_parquet):
    # Load with Spark
    lineitem_df = spark.read.parquet(lineitem_parquet)
    
    print(f"\n  üìä Schema:")
    lineitem_df.printSchema()
    
    print(f"\n  üìè Row Count: {lineitem_df.count():,}")
    
    print(f"\n  üìã Sample Records (first 3):")
    lineitem_df.select(
        "l_orderkey", 
        "l_partkey", 
        "l_suppkey", 
        "l_quantity", 
        "l_shipdate"
    ).show(3, truncate=False)
    
    # Convert to RDD (required for Part A)
    lineitem_rdd = lineitem_df.rdd
    print(f"\n  ‚úÖ Successfully converted to RDD: {lineitem_rdd.getNumPartitions()} partitions")

# ========== SANITY CHECKS - PART B (TAXI DATA) ==========

print("\n" + "=" * 60)
print("Part B ‚Äî NYC Taxi Dataset Sanity Checks")
print("=" * 60)

print(f"\nüìÇ Taxi Data Location: {TAXI_DATA_PATH}\n")

# List CSV files
if os.path.exists(TAXI_DATA_PATH):
    taxi_files = sorted([
        f for f in os.listdir(TAXI_DATA_PATH) 
        if f.endswith('.csv')
    ])
    
    num_taxi_files = len(taxi_files)
    
    print(f"  üìä Total CSV files: {num_taxi_files}")
    
    if num_taxi_files > 0:
        # Show first 5 and last 5
        print(f"\n  üìã First 5 files:")
        for f in taxi_files[:5]:
            file_path = os.path.join(TAXI_DATA_PATH, f)
            size_kb = os.path.getsize(file_path) / 1024
            print(f"    - {f:<40} ({size_kb:>8.2f} KB)")
        
        if num_taxi_files > 10:
            print(f"\n  ... ({num_taxi_files - 10} files omitted) ...")
        
        if num_taxi_files > 5:
            print(f"\n  üìã Last 5 files:")
            for f in taxi_files[-5:]:
                file_path = os.path.join(TAXI_DATA_PATH, f)
                size_kb = os.path.getsize(file_path) / 1024
                print(f"    - {f:<40} ({size_kb:>8.2f} KB)")
        
        # Sample first file
        first_file = os.path.join(TAXI_DATA_PATH, taxi_files[0])
        
        print(f"\n  üìã Sample from {taxi_files[0]}:")
        with open(first_file, 'r', encoding='utf-8') as f:
            # Header
            header = f.readline().strip()
            print(f"\n    Header: {header}")
            
            # First 3 records
            print(f"\n    First 3 records:")
            for i in range(3):
                line = f.readline().strip()
                if line:
                    print(f"      {i+1}. {line}")
        
        # Total size
        total_size = sum(
            os.path.getsize(os.path.join(TAXI_DATA_PATH, f)) 
            for f in taxi_files
        )
        total_size_mb = total_size / (1024 * 1024)
        
        print(f"\n  üíæ Total dataset size: {total_size_mb:.2f} MB")
    else:
        print("  ‚ùå No CSV files found!")
else:
    print("  ‚ùå Taxi data directory does not exist!")

# ========== SUMMARY ==========

print("\n" + "=" * 60)
print("Section 1 Summary")
print("=" * 60)

timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')

print(f"\n‚úÖ Dataset verification completed at {timestamp}")
print(f"\nüìä Summary:")
print(f"  Part A - TPC-H TXT     : {len(TPCH_TABLES)} tables")
print(f"  Part A - TPC-H Parquet : {len(PARQUET_TABLES)} tables")
print(f"  Part B - Taxi CSV      : {num_taxi_files if os.path.exists(TAXI_DATA_PATH) else 0} files")

print(f"\nüéØ Ready to proceed:")
print(f"  ‚úÖ Part A (A1-A7): Relational analytics with TPC-H")
print(f"  ‚úÖ Part B (B1-B3): Streaming analytics with NYC Taxi")

print("\n" + "=" * 60)


Section 1 ‚Äî Dataset Extraction & Verification

üì¶ Checking for compressed archives...

üîç Extraction Status:

  ‚úÖ TPC-H TXT            : Already extracted at data/tpch/TPC-H-0.1-TXT
  ‚úÖ TPC-H Parquet        : Already extracted at data/tpch/TPC-H-0.1-PARQUET
  ‚úÖ NYC Taxi             : Already extracted at data/taxi-data

Dataset Path Verification

üìÇ Verifying dataset paths:

  ‚úÖ TPC-H TXT            : data/tpch/TPC-H-0.1-TXT (16 files)
  ‚úÖ TPC-H Parquet        : data/tpch/TPC-H-0.1-PARQUET (0 files)
  ‚úÖ NYC Taxi             : data/taxi-data (0 files)

‚úÖ All dataset paths verified successfully!

Part A ‚Äî TPC-H TXT Dataset Sanity Checks

üìä TPC-H TXT Files:
  Location: data/tpch/TPC-H-0.1-TXT

  ‚úÖ lineitem.tbl         :  600,572 lines |  70.81 MB
  ‚úÖ orders.tbl           :  150,000 lines |  16.11 MB
  ‚úÖ part.tbl             :   20,000 lines |   2.28 MB
  ‚úÖ supplier.tbl         :    1,000 lines |   0.13 MB
  ‚úÖ partsupp.tbl         :   80,000 lines |  1

                                                                                


  üìä Schema:
root
 |-- l_orderkey: integer (nullable = true)
 |-- l_partkey: integer (nullable = true)
 |-- l_suppkey: integer (nullable = true)
 |-- l_linenumber: integer (nullable = true)
 |-- l_quantity: double (nullable = true)
 |-- l_extendedprice: double (nullable = true)
 |-- l_discount: double (nullable = true)
 |-- l_tax: double (nullable = true)
 |-- l_returnflag: string (nullable = true)
 |-- l_linestatus: string (nullable = true)
 |-- l_shipdate: string (nullable = true)
 |-- l_commitdate: string (nullable = true)
 |-- l_receiptdate: string (nullable = true)
 |-- l_shipinstruct: string (nullable = true)
 |-- l_shipmode: string (nullable = true)
 |-- l_comment: string (nullable = true)



                                                                                


  üìè Row Count: 600,572

  üìã Sample Records (first 3):


                                                                                

+----------+---------+---------+----------+----------+
|l_orderkey|l_partkey|l_suppkey|l_quantity|l_shipdate|
+----------+---------+---------+----------+----------+
|1         |15519    |785      |17.0      |1996-03-13|
|1         |6731     |732      |36.0      |1996-04-12|
|1         |6370     |371      |8.0       |1996-01-29|
+----------+---------+---------+----------+----------+
only showing top 3 rows

  ‚úÖ Successfully converted to RDD: 7 partitions

Part B ‚Äî NYC Taxi Dataset Sanity Checks

üìÇ Taxi Data Location: data/taxi-data

  üìä Total CSV files: 0
  ‚ùå No CSV files found!

Section 1 Summary

‚úÖ Dataset verification completed at 2025-12-06 15:54:54 UTC

üìä Summary:
  Part A - TPC-H TXT     : 8 tables
  Part A - TPC-H Parquet : 8 tables
  Part B - Taxi CSV      : 0 files

üéØ Ready to proceed:
  ‚úÖ Part A (A1-A7): Relational analytics with TPC-H
  ‚úÖ Part B (B1-B3): Streaming analytics with NYC Taxi



## 2. Parsers and Helpers

In [3]:
# write some code here
# - text parsers per TPC-H table (split('|') -> typed tuples)
# - parquet loaders using spark.read.parquet(...).rdd
# - broadcast helper for small dims (part, supplier, customer, nation)
# - utilities: save_tuples(path, iterator); month_trunc('YYYY-MM-DD')
# ============================================================
# Section 2 - TPC-H Helpers & Parsers (Part A)
# ============================================================

print("\n" + "=" * 60)
print("Section 2 ‚Äî TPC-H Helpers & Parsers")
print("=" * 60)

from datetime import datetime
from typing import Tuple, List, Iterator
import os

# ========== TEXT PARSERS (PIPE-DELIMITED) ==========

print("\nüìã Defining TPC-H Text Parsers...")

# Schema references:
# - lineitem: 16 fields (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, ...)
# - orders: 9 fields (o_orderkey, o_custkey, o_orderstatus, o_totalprice, ...)
# - part: 9 fields (p_partkey, p_name, p_mfgr, p_brand, p_type, ...)
# - supplier: 7 fields (s_suppkey, s_name, s_address, s_nationkey, ...)
# - customer: 8 fields (c_custkey, c_name, c_address, c_nationkey, ...)
# - nation: 4 fields (n_nationkey, n_name, n_regionkey, n_comment)
# - region: 3 fields (r_regionkey, r_name, r_comment)
# - partsupp: 5 fields (ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment)

def parse_lineitem(line: str) -> Tuple:
    """
    Parse lineitem.tbl (pipe-delimited)
    
    Schema (16 fields):
    0:  l_orderkey (int)
    1:  l_partkey (int)
    2:  l_suppkey (int)
    3:  l_linenumber (int)
    4:  l_quantity (float)
    5:  l_extendedprice (float)
    6:  l_discount (float)
    7:  l_tax (float)
    8:  l_returnflag (str)
    9:  l_linestatus (str)
    10: l_shipdate (str YYYY-MM-DD)
    11: l_commitdate (str)
    12: l_receiptdate (str)
    13: l_shipinstruct (str)
    14: l_shipmode (str)
    15: l_comment (str)
    
    Returns: (l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, 
              l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
              l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, 
              l_shipmode, l_comment)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # l_orderkey
        int(fields[1]),      # l_partkey
        int(fields[2]),      # l_suppkey
        int(fields[3]),      # l_linenumber
        float(fields[4]),    # l_quantity
        float(fields[5]),    # l_extendedprice
        float(fields[6]),    # l_discount
        float(fields[7]),    # l_tax
        fields[8],           # l_returnflag
        fields[9],           # l_linestatus
        fields[10],          # l_shipdate (YYYY-MM-DD)
        fields[11],          # l_commitdate
        fields[12],          # l_receiptdate
        fields[13],          # l_shipinstruct
        fields[14],          # l_shipmode
        fields[15]           # l_comment
    )

def parse_orders(line: str) -> Tuple:
    """
    Parse orders.tbl
    
    Schema (9 fields):
    0: o_orderkey (int)
    1: o_custkey (int)
    2: o_orderstatus (str)
    3: o_totalprice (float)
    4: o_orderdate (str YYYY-MM-DD)
    5: o_orderpriority (str)
    6: o_clerk (str)
    7: o_shippriority (int)
    8: o_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # o_orderkey
        int(fields[1]),      # o_custkey
        fields[2],           # o_orderstatus
        float(fields[3]),    # o_totalprice
        fields[4],           # o_orderdate
        fields[5],           # o_orderpriority
        fields[6],           # o_clerk
        int(fields[7]),      # o_shippriority
        fields[8]            # o_comment
    )

def parse_part(line: str) -> Tuple:
    """
    Parse part.tbl
    
    Schema (9 fields):
    0: p_partkey (int)
    1: p_name (str)
    2: p_mfgr (str)
    3: p_brand (str)
    4: p_type (str)
    5: p_size (int)
    6: p_container (str)
    7: p_retailprice (float)
    8: p_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # p_partkey
        fields[1],           # p_name
        fields[2],           # p_mfgr
        fields[3],           # p_brand
        fields[4],           # p_type
        int(fields[5]),      # p_size
        fields[6],           # p_container
        float(fields[7]),    # p_retailprice
        fields[8]            # p_comment
    )

def parse_supplier(line: str) -> Tuple:
    """
    Parse supplier.tbl
    
    Schema (7 fields):
    0: s_suppkey (int)
    1: s_name (str)
    2: s_address (str)
    3: s_nationkey (int)
    4: s_phone (str)
    5: s_acctbal (float)
    6: s_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # s_suppkey
        fields[1],           # s_name
        fields[2],           # s_address
        int(fields[3]),      # s_nationkey
        fields[4],           # s_phone
        float(fields[5]),    # s_acctbal
        fields[6]            # s_comment
    )

def parse_customer(line: str) -> Tuple:
    """
    Parse customer.tbl
    
    Schema (8 fields):
    0: c_custkey (int)
    1: c_name (str)
    2: c_address (str)
    3: c_nationkey (int)
    4: c_phone (str)
    5: c_acctbal (float)
    6: c_mktsegment (str)
    7: c_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # c_custkey
        fields[1],           # c_name
        fields[2],           # c_address
        int(fields[3]),      # c_nationkey
        fields[4],           # c_phone
        float(fields[5]),    # c_acctbal
        fields[6],           # c_mktsegment
        fields[7]            # c_comment
    )

def parse_nation(line: str) -> Tuple:
    """
    Parse nation.tbl
    
    Schema (4 fields):
    0: n_nationkey (int)
    1: n_name (str)
    2: n_regionkey (int)
    3: n_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # n_nationkey
        fields[1],           # n_name
        int(fields[2]),      # n_regionkey
        fields[3]            # n_comment
    )

def parse_region(line: str) -> Tuple:
    """
    Parse region.tbl
    
    Schema (3 fields):
    0: r_regionkey (int)
    1: r_name (str)
    2: r_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # r_regionkey
        fields[1],           # r_name
        fields[2]            # r_comment
    )

def parse_partsupp(line: str) -> Tuple:
    """
    Parse partsupp.tbl
    
    Schema (5 fields):
    0: ps_partkey (int)
    1: ps_suppkey (int)
    2: ps_availqty (int)
    3: ps_supplycost (float)
    4: ps_comment (str)
    """
    fields = line.rstrip('|\n').split('|')
    
    return (
        int(fields[0]),      # ps_partkey
        int(fields[1]),      # ps_suppkey
        int(fields[2]),      # ps_availqty
        float(fields[3]),    # ps_supplycost
        fields[4]            # ps_comment
    )

print("  ‚úÖ Text parsers defined for 8 TPC-H tables")

# ========== PARQUET LOADERS ==========

print("\nüì¶ Defining Parquet Loaders...")

def load_parquet_as_rdd(spark, table_name: str, base_path: str):
    """
    Load Parquet table and convert to RDD
    
    Args:
        spark: SparkSession
        table_name: Name of table (e.g., "lineitem", "orders")
        base_path: Base path to PARQUET directory (e.g., data/tpch/TPC-H-0.1-PARQUET)
    
    Returns:
        RDD of Rows
    
    Note: Assignment allows loading Parquet with DataFrame reader, then .rdd
    """
    parquet_path = os.path.join(base_path, table_name)
    
    # Load Parquet as DataFrame
    df = spark.read.parquet(parquet_path)
    
    # Convert to RDD (allowed by assignment)
    rdd = df.rdd
    
    print(f"  ‚úÖ Loaded {table_name} from Parquet: {rdd.count():,} rows, {rdd.getNumPartitions()} partitions")
    
    return rdd

print("  ‚úÖ Parquet loader function defined")

# ========== BROADCAST HELPER ==========

print("\nüì° Defining Broadcast Helper...")

def broadcast_dimension(spark, rdd, key_extractor):
    """
    Broadcast small dimension table for hash joins
    
    Args:
        spark: SparkSession
        rdd: RDD of tuples
        key_extractor: Function to extract key from tuple
    
    Returns:
        Broadcast variable containing dict {key -> tuple}
    
    Example:
        # Broadcast part table keyed by p_partkey
        part_bc = broadcast_dimension(spark, part_rdd, lambda t: t[0])
        
        # Use in map: part_bc.value.get(partkey)
    """
    # Collect dimension into dict {key -> tuple}
    dim_dict = rdd.map(lambda t: (key_extractor(t), t)).collectAsMap()
    
    # Broadcast
    bc_var = spark.sparkContext.broadcast(dim_dict)
    
    print(f"  ‚úÖ Broadcasted dimension: {len(dim_dict):,} entries")
    
    return bc_var

print("  ‚úÖ Broadcast helper defined")

# ========== UTILITIES ==========

print("\nüîß Defining Utility Functions...")

def save_tuples(output_path: str, tuples_iter: Iterator, header: str = None):
    """
    Save iterator of tuples to file (CSV-like format)
    
    Args:
        output_path: Path to output file
        tuples_iter: Iterator of tuples
        header: Optional CSV header line
    
    Example:
        save_tuples("outputs/a1_result.txt", 
                    [(1, 'Alice'), (2, 'Bob')],
                    header="id,name")
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    with open(output_path, 'w', encoding='utf-8') as f:
        if header:
            f.write(header + '\n')
        
        for tup in tuples_iter:
            # Join tuple elements with comma
            line = ','.join(str(x) for x in tup)
            f.write(line + '\n')
    
    print(f"  ‚úÖ Saved tuples to {output_path}")

def month_trunc(date_str: str) -> str:
    """
    Truncate date to YYYY-MM format
    
    Args:
        date_str: Date in YYYY-MM-DD format
    
    Returns:
        YYYY-MM string
    
    Example:
        >>> month_trunc('1996-01-15')
        '1996-01'
    """
    return date_str[:7]  # Extract first 7 chars (YYYY-MM)

def format_answer(count: int) -> str:
    """
    Format count as ANSWER=<count> for A1 output
    
    Args:
        count: Integer count
    
    Returns:
        Formatted string
    
    Example:
        >>> format_answer(1234)
        'ANSWER=1234'
    """
    return f"ANSWER={count}"

print("  ‚úÖ Utility functions defined:")
print("     - save_tuples(path, iter, header)")
print("     - month_trunc(date_str) -> YYYY-MM")
print("     - format_answer(count) -> ANSWER=<count>")

# ========== VERIFICATION TESTS ==========

print("\n" + "=" * 60)
print("Section 2 ‚Äî Parser Verification Tests")
print("=" * 60)

# Test text parsers with sample data
print("\nüß™ Testing Text Parsers:\n")

# Sample lineitem line
sample_lineitem = "1|15519|785|1|17.00|24710.35|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELIVER IN PERSON|TRUCK|egular courts above the|"
parsed_lineitem = parse_lineitem(sample_lineitem)

print("  Lineitem Parser:")
print(f"    Input  : {sample_lineitem[:80]}...")
print(f"    Output : l_orderkey={parsed_lineitem[0]}, l_partkey={parsed_lineitem[1]}, "
      f"l_shipdate={parsed_lineitem[10]}")
print(f"    Fields : {len(parsed_lineitem)} (expected 16)")

# Sample orders line
sample_orders = "1|370|O|172799.49|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously among |"
parsed_orders = parse_orders(sample_orders)

print("\n  Orders Parser:")
print(f"    Input  : {sample_orders[:80]}...")
print(f"    Output : o_orderkey={parsed_orders[0]}, o_clerk={parsed_orders[6]}, "
      f"o_orderdate={parsed_orders[4]}")
print(f"    Fields : {len(parsed_orders)} (expected 9)")

# Sample nation line
sample_nation = "0|ALGERIA|0| haggle. carefully final deposits detect slyly agai|"
parsed_nation = parse_nation(sample_nation)

print("\n  Nation Parser:")
print(f"    Input  : {sample_nation}")
print(f"    Output : n_nationkey={parsed_nation[0]}, n_name={parsed_nation[1]}")
print(f"    Fields : {len(parsed_nation)} (expected 4)")

# Test utility functions
print("\nüß™ Testing Utility Functions:\n")

print("  month_trunc():")
print(f"    '1996-01-15' ‚Üí '{month_trunc('1996-01-15')}'")
print(f"    '1995-12-31' ‚Üí '{month_trunc('1995-12-31')}'")

print("\n  format_answer():")
print(f"    1234 ‚Üí '{format_answer(1234)}'")
print(f"    0    ‚Üí '{format_answer(0)}'")

# ========== SUMMARY ==========

print("\n" + "=" * 60)
print("Section 2 Summary")
print("=" * 60)

print("\n‚úÖ TPC-H Helpers & Parsers Ready:")
print("\n  üìã Text Parsers (8 tables):")
print("     - parse_lineitem(line) ‚Üí 16-field tuple")
print("     - parse_orders(line) ‚Üí 9-field tuple")
print("     - parse_part(line) ‚Üí 9-field tuple")
print("     - parse_supplier(line) ‚Üí 7-field tuple")
print("     - parse_customer(line) ‚Üí 8-field tuple")
print("     - parse_nation(line) ‚Üí 4-field tuple")
print("     - parse_region(line) ‚Üí 3-field tuple")
print("     - parse_partsupp(line) ‚Üí 5-field tuple")

print("\n  üì¶ Parquet Loaders:")
print("     - load_parquet_as_rdd(spark, table, path)")

print("\n  üì° Broadcast Helper:")
print("     - broadcast_dimension(spark, rdd, key_extractor)")

print("\n  üîß Utilities:")
print("     - save_tuples(path, iterator, header)")
print("     - month_trunc(date_str) ‚Üí YYYY-MM")
print("     - format_answer(count) ‚Üí ANSWER=<count>")

print("\nüéØ Next Steps:")
print("  - Load TPC-H tables (TXT or Parquet)")
print("  - Implement A1-A7 queries using RDD operations only")
print("  - Compare TXT vs Parquet performance")

print("\n" + "=" * 60)



Section 2 ‚Äî TPC-H Helpers & Parsers

üìã Defining TPC-H Text Parsers...
  ‚úÖ Text parsers defined for 8 TPC-H tables

üì¶ Defining Parquet Loaders...
  ‚úÖ Parquet loader function defined

üì° Defining Broadcast Helper...
  ‚úÖ Broadcast helper defined

üîß Defining Utility Functions...
  ‚úÖ Utility functions defined:
     - save_tuples(path, iter, header)
     - month_trunc(date_str) -> YYYY-MM
     - format_answer(count) -> ANSWER=<count>

Section 2 ‚Äî Parser Verification Tests

üß™ Testing Text Parsers:

  Lineitem Parser:
    Input  : 1|15519|785|1|17.00|24710.35|0.04|0.02|N|O|1996-03-13|1996-02-12|1996-03-22|DELI...
    Output : l_orderkey=1, l_partkey=15519, l_shipdate=1996-03-13
    Fields : 16 (expected 16)

  Orders Parser:
    Input  : 1|370|O|172799.49|1996-01-02|5-LOW|Clerk#000000951|0|nstructions sleep furiously...
    Output : o_orderkey=1, o_clerk=Clerk#000000951, o_orderdate=1996-01-02
    Fields : 9 (expected 9)

  Nation Parser:
    Input  : 0|ALGERIA|0| ha

## Part A ‚Äî Relational (RDD‚Äëonly)

### A1 ‚Äî Q1: shipped items on DATE (print ANSWER=\d+)

In [5]:
# write some code here
# args: --input, --date, --text/--parquet
# pipeline (text): read lineitem -> filter by l_shipdate -> count -> print('ANSWER=', n)
# parquet path variant: spark.read.parquet(...).rdd
# ============================================================
# Section 3 - Part A: Task A1 - Shipped Items Count on Date
# ============================================================

print("\n" + "=" * 60)
print("Section 3 ‚Äî Part A: Task A1 - Shipped Items Count")
print("=" * 60)

import argparse
import time
from datetime import datetime, timezone

# ========== A1 IMPLEMENTATION ==========

def run_a1_shipped_items_count(input_path, target_date, use_parquet=False):
    """
    A1 (Q1) ‚Äî Count(*) on lineitem for a given l_shipdate = YYYY-MM-DD
    
    Args:
        input_path: Base path to TPC-H data (TXT or Parquet)
        target_date: Ship date to filter (YYYY-MM-DD format)
        use_parquet: True to use Parquet, False for TXT
    
    Returns:
        tuple: (count, duration_seconds, metrics_dict)
    
    Output: Print ANSWER=<count>
    """
    
    print(f"\n{'='*60}")
    print(f"Task A1 ‚Äî Shipped Items Count on {target_date}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # ========== LOAD DATA ==========
    
    if use_parquet:
        # ‚úÖ PERMIS : Load Parquet with DataFrame, then .rdd
        lineitem_path = os.path.join(input_path, "lineitem")
        
        print(f"üìÇ Loading Parquet from: {lineitem_path}")
        
        # Load Parquet as DataFrame
        lineitem_df = spark.read.parquet(lineitem_path)
        
        print(f"  ‚úÖ Loaded Parquet: {lineitem_df.count():,} rows")
        
        # Save explain plan (before conversion to RDD)
        plan_path = f"proof/plan_a1_parquet_load_{target_date.replace('-', '')}.txt"
        os.makedirs("proof", exist_ok=True)
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A1 Parquet Load Plan ({target_date}) ===\n\n")
            f.write(lineitem_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"  üìÑ Saved plan: {plan_path}")
        
        # Convert to RDD (PERMIS par assignment)
        lineitem_rdd = lineitem_df.rdd
        
        print(f"  ‚úÖ Converted to RDD: {lineitem_rdd.getNumPartitions()} partitions\n")
        
    else:
        # ‚úÖ TEXT: Load TXT with RDD, parse manually
        lineitem_path = os.path.join(input_path, "lineitem.tbl")
        
        print(f"üìÇ Loading TXT from: {lineitem_path}")
        
        # Load text file as RDD
        lineitem_rdd = sc.textFile(lineitem_path)
        
        print(f"  ‚úÖ Loaded TXT: {lineitem_rdd.count():,} lines")
        
        # Parse pipe-delimited lines
        lineitem_rdd = lineitem_rdd.map(parse_lineitem)
        
        print(f"  ‚úÖ Parsed to tuples: {lineitem_rdd.getNumPartitions()} partitions\n")
    
    # ========== FILTER BY SHIPDATE ==========
    
    print(f"üîç Filtering by l_shipdate = '{target_date}'...")
    
    if use_parquet:
        # For Parquet RDD: rows are Row objects with named fields
        filtered_rdd = lineitem_rdd.filter(lambda row: row.l_shipdate == target_date)
    else:
        # For TXT RDD: tuples with positional access
        # l_shipdate is at index 10 (see parse_lineitem)
        filtered_rdd = lineitem_rdd.filter(lambda t: t[10] == target_date)
    
    # ========== COUNT ==========
    
    print(f"üìä Counting filtered records...")
    
    count = filtered_rdd.count()
    
    end_time = time.time()
    duration = end_time - start_time
    
    # ========== OUTPUT ==========
    
    # Required format: ANSWER=<count>
    answer = format_answer(count)
    
    print(f"\n{'='*60}")
    print(f"‚úÖ A1 Result:")
    print(f"{'='*60}")
    print(f"\n  {answer}\n")
    print(f"  Format      : {'Parquet' if use_parquet else 'TXT'}")
    print(f"  Date        : {target_date}")
    print(f"  Duration    : {duration:.2f} seconds")
    print(f"{'='*60}\n")
    
    # ========== SAVE OUTPUT ==========
    
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a1_result_{format_suffix}_{target_date.replace('-', '')}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A1 - Shipped Items Count\n")
        f.write(f"Date: {target_date}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n")
        f.write(f"\n{answer}\n")
    
    print(f"üíæ Saved output: {output_path}\n")
    
    # ========== COLLECT METRICS ==========
    
    metrics = {
        'task': 'A1',
        'format': 'Parquet' if use_parquet else 'TXT',
        'date': target_date,
        'count': count,
        'duration_s': round(duration, 2),
        'num_partitions': lineitem_rdd.getNumPartitions()
    }
    
    return count, duration, metrics

# ========== CLI ARGUMENT PARSER (for spark-submit) ==========

def parse_args_a1():
    """
    Parse command-line arguments for A1
    
    Example:
        spark-submit script.py --input data/tpch/TPC-H-0.1-TXT --date 1996-01-01 --text
        spark-submit script.py --input data/tpch/TPC-H-0.1-PARQUET --date 1996-01-01 --parquet
    """
    parser = argparse.ArgumentParser(description='A1 - Shipped Items Count')
    
    parser.add_argument('--input', required=True, 
                        help='Base path to TPC-H data (TXT or Parquet)')
    
    parser.add_argument('--date', required=True, 
                        help='Ship date to filter (YYYY-MM-DD)')
    
    # Mutually exclusive group for format
    format_group = parser.add_mutually_exclusive_group(required=True)
    format_group.add_argument('--text', action='store_true', 
                              help='Use TXT format')
    format_group.add_argument('--parquet', action='store_true', 
                              help='Use Parquet format')
    
    return parser.parse_args()

# ========== NOTEBOOK EXECUTION (interactive mode) ==========

print("\nüéØ Running A1 in Notebook Mode (both TXT and Parquet)\n")

# Configuration
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
TARGET_DATE = "1996-01-01"  # Default test date

# ========== RUN A1 WITH TXT ==========

print("\n" + "üî∑" * 30)
print("Running A1 with TXT format")
print("üî∑" * 30 + "\n")

count_txt, duration_txt, metrics_txt = run_a1_shipped_items_count(
    input_path=INPUT_TXT_PATH,
    target_date=TARGET_DATE,
    use_parquet=False
)

# ========== RUN A1 WITH PARQUET ==========

print("\n" + "üî∂" * 30)
print("Running A1 with Parquet format")
print("üî∂" * 30 + "\n")

count_parquet, duration_parquet, metrics_parquet = run_a1_shipped_items_count(
    input_path=INPUT_PARQUET_PATH,
    target_date=TARGET_DATE,
    use_parquet=True
)

# ========== COMPARISON ==========

print("\n" + "=" * 60)
print("A1 ‚Äî TXT vs Parquet Comparison")
print("=" * 60 + "\n")

comparison_table = f"""
| Metric              | TXT              | Parquet          | Speedup     |
|---------------------|------------------|------------------|-------------|
| Count               | {count_txt:,}           | {count_parquet:,}           | N/A         |
| Duration            | {duration_txt:.2f}s            | {duration_parquet:.2f}s            | {duration_txt/duration_parquet:.2f}x        |
| Partitions          | {metrics_txt['num_partitions']}               | {metrics_parquet['num_partitions']}               | N/A         |
"""

print(comparison_table)

# Validation
if count_txt != count_parquet:
    print(f"‚ö†Ô∏è  WARNING: Counts differ! TXT={count_txt}, Parquet={count_parquet}")
else:
    print(f"‚úÖ Validation: Both formats return same count ({count_txt:,})")

print("\n" + "=" * 60 + "\n")

# ========== UPDATE METRICS LOG ==========

print("üìä Updating lab_metrics_log.csv...")

# Read existing metrics or create new
metrics_log_path = "proof/lab_metrics_log.csv"
os.makedirs("proof", exist_ok=True)

import csv

# Check if file exists
file_exists = os.path.exists(metrics_log_path)

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    # Write header if new file
    if not file_exists:
        writer.writerow([
            'run_id', 'task', 'format', 'date', 'num_files', 'input_size_mb',
            'duration_s', 'shuffle_read_mb', 'shuffle_write_mb', 'notes', 'timestamp'
        ])
    
    # Get next run_id
    if file_exists:
        with open(metrics_log_path, 'r') as rf:
            lines = list(csv.reader(rf))
            run_id = len(lines)  # Header + existing rows
    else:
        run_id = 1
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    # Add TXT entry
    writer.writerow([
        run_id,
        'A1',
        'TXT',
        TARGET_DATE,
        1,  # num_files (lineitem.tbl)
        'N/A',  # To be filled from Spark UI
        duration_txt,
        0,  # No shuffle for simple count
        0,
        f'count(*) on lineitem where l_shipdate={TARGET_DATE}',
        timestamp
    ])
    
    # Add Parquet entry
    writer.writerow([
        run_id + 1,
        'A1',
        'Parquet',
        TARGET_DATE,
        'N/A',  # Multiple parquet files
        'N/A',  # To be filled from Spark UI
        duration_parquet,
        0,
        0,
        f'same query with Parquet',
        timestamp
    ])

print(f"  ‚úÖ Updated {metrics_log_path}\n")

# ========== SPARK UI REMINDER ==========

print("\n" + "üì∏" * 30)
print("IMPORTANT: Capture Spark UI Screenshots")
print("üì∏" * 30 + "\n")

print("üéØ Required Screenshots for A1:\n")
print("  1. Jobs Tab:")
print("     - A1-TXT job: Input size, duration")
print("     - A1-Parquet job: Input size, duration")
print("     ‚Üí Save as: proof/screenshots/a1_jobs_comparison.png\n")

print("  2. SQL Tab (Parquet only):")
print("     - Physical plan showing FileScan parquet")
print("     - Predicate pushdown (if visible)")
print("     ‚Üí Save as: proof/screenshots/a1_parquet_sql_plan.png\n")

print("  3. Stages Tab:")
print("     - Task metrics (input records, shuffle)")
print("     ‚Üí Save as: proof/screenshots/a1_stages_metrics.png\n")

print("üìù Update lab_metrics_log.csv with:")
print("   - input_size_mb (from Spark UI ‚Üí Jobs ‚Üí Input)")
print("   - Any shuffle metrics (should be 0 for A1)\n")

print("=" * 60 + "\n")

# ========== SUMMARY ==========

print("\n" + "‚úÖ" * 30)
print("Section 3 Summary ‚Äî Task A1 Complete")
print("‚úÖ" * 30 + "\n")

print("üìã Deliverables:")
print(f"  ‚úÖ Output TXT    : outputs/a1_result_txt_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Output Parquet: outputs/a1_result_parquet_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Plan Parquet  : proof/plan_a1_parquet_load_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Metrics Log   : proof/lab_metrics_log.csv\n")


print("=" * 60 + "\n")



Section 3 ‚Äî Part A: Task A1 - Shipped Items Count

üéØ Running A1 in Notebook Mode (both TXT and Parquet)


üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑
Running A1 with TXT format
üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑


Task A1 ‚Äî Shipped Items Count on 1996-01-01
Format: TXT

üìÇ Loading TXT from: data/tpch/TPC-H-0.1-TXT/lineitem.tbl


                                                                                

  ‚úÖ Loaded TXT: 600,572 lines
  ‚úÖ Parsed to tuples: 3 partitions

üîç Filtering by l_shipdate = '1996-01-01'...
üìä Counting filtered records...


                                                                                


‚úÖ A1 Result:

  ANSWER=266

  Format      : TXT
  Date        : 1996-01-01
  Duration    : 13.07 seconds

üíæ Saved output: outputs/a1_result_txt_19960101.txt


üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂
Running A1 with Parquet format
üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂üî∂


Task A1 ‚Äî Shipped Items Count on 1996-01-01
Format: Parquet

üìÇ Loading Parquet from: data/tpch/TPC-H-0.1-PARQUET/lineitem
  ‚úÖ Loaded Parquet: 600,572 rows
  üìÑ Saved plan: proof/plan_a1_parquet_load_19960101.txt
  ‚úÖ Converted to RDD: 7 partitions

üîç Filtering by l_shipdate = '1996-01-01'...
üìä Counting filtered records...





‚úÖ A1 Result:

  ANSWER=266

  Format      : Parquet
  Date        : 1996-01-01
  Duration    : 12.52 seconds

üíæ Saved output: outputs/a1_result_parquet_19960101.txt


A1 ‚Äî TXT vs Parquet Comparison


| Metric              | TXT              | Parquet          | Speedup     |
|---------------------|------------------|------------------|-------------|
| Count               | 266           | 266           | N/A         |
| Duration            | 13.07s            | 12.52s            | 1.04x        |
| Partitions          | 3               | 7               | N/A         |

‚úÖ Validation: Both formats return same count (266)


üìä Updating lab_metrics_log.csv...
  ‚úÖ Updated proof/lab_metrics_log.csv


üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏
IMPORTANT: Capture Spark UI Screenshots
üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏üì∏


                                                                                

### A2 ‚Äî Q2: clerks by order key (reduce‚Äëside join via cogroup)

In [6]:
# write some code here
# build (orderkey, clerk) from orders and (orderkey, 1) from lineitem(date)
# cogroup -> expand -> sortByKey -> take(20)
# ============================================================
# Section 4 - Part A: Task A2 - Clerks by Order Key
# ============================================================

print("\n" + "=" * 60)
print("Section 4 ‚Äî Part A: Task A2 - Clerks by Order Key")
print("=" * 60)

import time
from datetime import datetime, timezone
import csv

# ========== A2 IMPLEMENTATION ==========

def run_a2_clerks_by_orderkey(input_path, target_date, use_parquet=False):
    """
    A2 (Q2) ‚Äî First 20 (o_clerk, o_orderkey) where l_orderkey=o_orderkey 
              and l_shipdate=DATE
    
    Strategy: Reduce-side join via cogroup
    
    Args:
        input_path: Base path to TPC-H data (TXT or Parquet)
        target_date: Ship date to filter (YYYY-MM-DD format)
        use_parquet: True to use Parquet, False for TXT
    
    Returns:
        tuple: (results_list, duration_seconds, metrics_dict)
    
    Output Format: First 20 (o_clerk, o_orderkey) tuples
    """
    
    print(f"\n{'='*60}")
    print(f"Task A2 ‚Äî Clerks by Order Key on {target_date}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"Strategy: Reduce-side join via cogroup")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # ========== LOAD DATA ==========
    
    if use_parquet:
        print(f"üìÇ Loading Parquet tables...\n")
        
        # Load lineitem
        lineitem_path = os.path.join(input_path, "lineitem")
        lineitem_df = spark.read.parquet(lineitem_path)
        
        # Save explain plan for lineitem
        plan_path = f"proof/plan_a2_lineitem_parquet_{target_date.replace('-', '')}.txt"
        os.makedirs("proof", exist_ok=True)
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A2 Lineitem Parquet Load Plan ({target_date}) ===\n\n")
            f.write(lineitem_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"  ‚úÖ Lineitem plan saved: {plan_path}")
        
        # Convert to RDD
        lineitem_rdd = lineitem_df.rdd
        print(f"  ‚úÖ Loaded lineitem: {lineitem_rdd.count():,} rows, {lineitem_rdd.getNumPartitions()} partitions")
        
        # Load orders
        orders_path = os.path.join(input_path, "orders")
        orders_df = spark.read.parquet(orders_path)
        
        # Save explain plan for orders
        plan_path = f"proof/plan_a2_orders_parquet_{target_date.replace('-', '')}.txt"
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A2 Orders Parquet Load Plan ({target_date}) ===\n\n")
            f.write(orders_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"  ‚úÖ Orders plan saved: {plan_path}\n")
        
        # Convert to RDD
        orders_rdd = orders_df.rdd
        print(f"  ‚úÖ Loaded orders: {orders_rdd.count():,} rows, {orders_rdd.getNumPartitions()} partitions\n")
        
    else:
        print(f"üìÇ Loading TXT tables...\n")
        
        # Load lineitem.tbl
        lineitem_path = os.path.join(input_path, "lineitem.tbl")
        lineitem_rdd = sc.textFile(lineitem_path).map(parse_lineitem)
        print(f"  ‚úÖ Loaded lineitem.tbl: {lineitem_rdd.count():,} rows, {lineitem_rdd.getNumPartitions()} partitions")
        
        # Load orders.tbl
        orders_path = os.path.join(input_path, "orders.tbl")
        orders_rdd = sc.textFile(orders_path).map(parse_orders)
        print(f"  ‚úÖ Loaded orders.tbl: {orders_rdd.count():,} rows, {orders_rdd.getNumPartitions()} partitions\n")
    
    # ========== STEP 1: BUILD KEY-VALUE PAIRS ==========
    
    print(f"üîß Step 1: Building key-value pairs...\n")
    
    # From lineitem: filter by shipdate, then emit (l_orderkey, 1)
    
    if use_parquet:
        # Parquet RDD: Row objects with named fields
        lineitem_filtered = lineitem_rdd \
            .filter(lambda row: row.l_shipdate == target_date) \
            .map(lambda row: (row.l_orderkey, 1))
    else:
        # TXT RDD: tuples with positional access
        # l_orderkey at index 0, l_shipdate at index 10
        lineitem_filtered = lineitem_rdd \
            .filter(lambda t: t[10] == target_date) \
            .map(lambda t: (t[0], 1))
    
    lineitem_count = lineitem_filtered.count()
    print(f"  ‚úÖ Lineitem filtered by shipdate '{target_date}': {lineitem_count:,} records")
    print(f"     Format: (l_orderkey, 1)\n")
    
    # From orders: emit (o_orderkey, o_clerk)
    
    if use_parquet:
        # Parquet: Row objects
        orders_keyed = orders_rdd.map(lambda row: (row.o_orderkey, row.o_clerk))
    else:
        # TXT: tuples
        # o_orderkey at index 0, o_clerk at index 6
        orders_keyed = orders_rdd.map(lambda t: (t[0], t[6]))
    
    orders_count = orders_keyed.count()
    print(f"  ‚úÖ Orders keyed by orderkey: {orders_count:,} records")
    print(f"     Format: (o_orderkey, o_clerk)\n")
    
    # ========== STEP 2: COGROUP (REDUCE-SIDE JOIN) ==========
    
    print(f"üîÄ Step 2: Performing reduce-side join via cogroup...\n")
    
    cogrouped = lineitem_filtered.cogroup(orders_keyed)
    
    print(f"  ‚úÖ Cogroup completed")
    print(f"     Result format: (orderkey, (lineitem_values, orders_values))\n")
    
    # ========== STEP 3: EXPAND COGROUP RESULTS ==========
    
    print(f"üì§ Step 3: Expanding cogroup results...\n")
    
    def expand_cogroup(item):
        """
        Expand cogroup result to list of (clerk, orderkey) tuples
        
        Args:
            item: (orderkey, (lineitem_iter, orders_iter))
        
        Returns:
            List of (o_clerk, o_orderkey) tuples
        """
        orderkey, (lineitem_vals, orders_vals) = item
        
        # Convert iterables to lists
        lineitem_list = list(lineitem_vals)
        orders_list = list(orders_vals)
        
        # Inner join: only if both sides have values
        if len(lineitem_list) > 0 and len(orders_list) > 0:
            # Emit (clerk, orderkey) for each clerk
            return [(clerk, orderkey) for clerk in orders_list]
        else:
            return []
    
    # Apply expansion and flatten
    joined_rdd = cogrouped.flatMap(expand_cogroup)
    
    joined_count = joined_rdd.count()
    print(f"  ‚úÖ Join expanded: {joined_count:,} (clerk, orderkey) pairs")
    print(f"     Format: (o_clerk, o_orderkey)\n")
    
    # ========== STEP 4: SORT AND TAKE TOP 20 ==========
    
    print(f"üî¢ Step 4: Sorting and taking top 20...\n")
    
    # sortByKey sorts by first element of tuple (o_clerk)
    sorted_rdd = joined_rdd.sortByKey(ascending=True)
    
    # Take first 20
    top_20 = sorted_rdd.take(20)
    
    end_time = time.time()
    duration = end_time - start_time
    
    # ========== OUTPUT ==========
    
    print(f"\n{'='*60}")
    print(f"‚úÖ A2 Results (First 20):")
    print(f"{'='*60}\n")
    
    print(f"  Format      : {'Parquet' if use_parquet else 'TXT'}")
    print(f"  Date        : {target_date}")
    print(f"  Duration    : {duration:.2f} seconds")
    print(f"  Total Pairs : {joined_count:,}")
    print(f"\n  Top 20 (o_clerk, o_orderkey):\n")
    
    for i, (clerk, orderkey) in enumerate(top_20, 1):
        print(f"    {i:2d}. {clerk:<20} | Order: {orderkey}")
    
    print(f"\n{'='*60}\n")
    
    # ========== SAVE OUTPUT ==========
    
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a2_result_{format_suffix}_{target_date.replace('-', '')}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A2 - Clerks by Order Key (Reduce-side Join)\n")
        f.write(f"Date: {target_date}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n")
        f.write(f"Total Pairs: {joined_count:,}\n")
        f.write(f"\nFirst 20 (o_clerk, o_orderkey):\n\n")
        
        for clerk, orderkey in top_20:
            f.write(f"{clerk},{orderkey}\n")
    
    print(f"üíæ Saved output: {output_path}\n")
    
    # ========== COLLECT METRICS ==========
    
    metrics = {
        'task': 'A2',
        'format': 'Parquet' if use_parquet else 'TXT',
        'date': target_date,
        'total_pairs': joined_count,
        'duration_s': round(duration, 2),
        'num_partitions_lineitem': lineitem_filtered.getNumPartitions(),
        'num_partitions_orders': orders_keyed.getNumPartitions()
    }
    
    return top_20, duration, metrics

# ========== NOTEBOOK EXECUTION ==========

print("\nüéØ Running A2 in Notebook Mode (both TXT and Parquet)\n")

# ========== DEFINE PATHS (FIX FOR NameError) ==========

# Configuration from Section 1
TARGET_DATE = "1996-01-01"  # Same date as A1
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")

# ========== RUN A2 WITH TXT ==========

print("\n" + "üî∑" * 30)
print("Running A2 with TXT format")
print("üî∑" * 30 + "\n")

results_txt, duration_txt, metrics_txt = run_a2_clerks_by_orderkey(
    input_path=INPUT_TXT_PATH,
    target_date=TARGET_DATE,
    use_parquet=False
)

# ========== RUN A2 WITH PARQUET ==========

print("\n" + "üî∂" * 30)
print("Running A2 with Parquet format")
print("üî∂" * 30 + "\n")

results_parquet, duration_parquet, metrics_parquet = run_a2_clerks_by_orderkey(
    input_path=INPUT_PARQUET_PATH,
    target_date=TARGET_DATE,
    use_parquet=True
)

# ========== COMPARISON ==========

print("\n" + "=" * 60)
print("A2 ‚Äî TXT vs Parquet Comparison")
print("=" * 60 + "\n")

comparison_table = f"""
| Metric              | TXT                  | Parquet              | Speedup     |
|---------------------|----------------------|----------------------|-------------|
| Total Pairs         | {metrics_txt['total_pairs']:,}              | {metrics_parquet['total_pairs']:,}              | N/A         |
| Duration            | {duration_txt:.2f}s                | {duration_parquet:.2f}s                | {duration_txt/duration_parquet:.2f}x        |
| Partitions Lineitem | {metrics_txt['num_partitions_lineitem']}                    | {metrics_parquet['num_partitions_lineitem']}                    | N/A         |
| Partitions Orders   | {metrics_txt['num_partitions_orders']}                    | {metrics_parquet['num_partitions_orders']}                    | N/A         |
"""

print(comparison_table)

# Validation: Check if top 20 results match
results_match = results_txt == results_parquet

if results_match:
    print(f"‚úÖ Validation: Both formats return identical top 20 results")
else:
    print(f"‚ö†Ô∏è  WARNING: Results differ between TXT and Parquet!")
    print(f"\n  TXT first 5:")
    for i, (clerk, orderkey) in enumerate(results_txt[:5], 1):
        print(f"    {i}. {clerk}, {orderkey}")
    
    print(f"\n  Parquet first 5:")
    for i, (clerk, orderkey) in enumerate(results_parquet[:5], 1):
        print(f"    {i}. {clerk}, {orderkey}")

print("\n" + "=" * 60 + "\n")

# ========== UPDATE METRICS LOG ==========

print("üìä Updating lab_metrics_log.csv...")

metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    # Get next run_id
    with open(metrics_log_path, 'r') as rf:
        lines = list(csv.reader(rf))
        run_id = len(lines)
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    # Add TXT entry
    writer.writerow([
        run_id,
        'A2',
        'TXT',
        TARGET_DATE,
        2,  # lineitem.tbl + orders.tbl
        'N/A',  # To be filled from Spark UI
        duration_txt,
        'N/A',  # Shuffle Read (cogroup causes shuffle)
        'N/A',  # Shuffle Write
        f'cogroup join lineitem+orders, shipdate={TARGET_DATE}',
        timestamp
    ])
    
    # Add Parquet entry
    writer.writerow([
        run_id + 1,
        'A2',
        'Parquet',
        TARGET_DATE,
        'N/A',  # Multiple parquet files
        'N/A',
        duration_parquet,
        'N/A',
        'N/A',
        f'same query with Parquet',
        timestamp
    ])

print(f"  ‚úÖ Updated {metrics_log_path}\n")

# ========== SPARK UI REMINDER ==========

print("\n" + "üì∏" * 30)
print("IMPORTANT: Capture Spark UI Screenshots for A2")
print("üì∏" * 30 + "\n")

print("üéØ Required Screenshots:\n")

print("  1. Jobs Tab:")
print("     - A2-TXT cogroup job")
print("     - A2-Parquet cogroup job")
print("     ‚Üí Save as: proof/screenshots/a2_jobs_cogroup.png\n")

print("  2. Stages Tab (CRITICAL for cogroup):")
print("     - Look for 'cogroup' stage")
print("     - Capture Shuffle Read/Write metrics")
print("     - Note: cogroup causes shuffle on BOTH sides")
print("     ‚Üí Save as: proof/screenshots/a2_cogroup_shuffle.png\n")

print("  3. SQL Tab (Parquet only):")
print("     - Physical plans for lineitem and orders loads")
print("     ‚Üí Save as: proof/screenshots/a2_parquet_plans.png\n")

print("üìù Update lab_metrics_log.csv with:")
print("   - Shuffle Read (MB) from Stages tab")
print("   - Shuffle Write (MB) from Stages tab")
print("   - Input Size for both lineitem and orders\n")

print("=" * 60 + "\n")

# ========== SUMMARY ==========

print("\n" + "‚úÖ" * 30)
print("Section 4 Summary ‚Äî Task A2 Complete")
print("‚úÖ" * 30 + "\n")

print("üìã Deliverables:")
print(f"  ‚úÖ Output TXT    : outputs/a2_result_txt_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Output Parquet: outputs/a2_result_parquet_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Plans Parquet : proof/plan_a2_lineitem_parquet_*.txt")
print(f"                     proof/plan_a2_orders_parquet_*.txt")
print(f"  ‚úÖ Metrics Log   : proof/lab_metrics_log.csv\n")

print("üéØ Next Steps:")
print("  1. Capture Spark UI screenshots (Jobs, Stages with shuffle)")
print("  2. Update lab_metrics_log.csv with shuffle metrics")
print("  3. Proceed to A3 (broadcast hash join)\n")

print("=" * 60 + "\n")


Section 4 ‚Äî Part A: Task A2 - Clerks by Order Key

üéØ Running A2 in Notebook Mode (both TXT and Parquet)


üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑
Running A2 with TXT format
üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑


Task A2 ‚Äî Clerks by Order Key on 1996-01-01
Format: TXT
Strategy: Reduce-side join via cogroup

üìÇ Loading TXT tables...



                                                                                

  ‚úÖ Loaded lineitem.tbl: 600,572 rows, 3 partitions


                                                                                

  ‚úÖ Loaded orders.tbl: 150,000 rows, 2 partitions

üîß Step 1: Building key-value pairs...



                                                                                

  ‚úÖ Lineitem filtered by shipdate '1996-01-01': 266 records
     Format: (l_orderkey, 1)



                                                                                

  ‚úÖ Orders keyed by orderkey: 150,000 records
     Format: (o_orderkey, o_clerk)

üîÄ Step 2: Performing reduce-side join via cogroup...

  ‚úÖ Cogroup completed
     Result format: (orderkey, (lineitem_values, orders_values))

üì§ Step 3: Expanding cogroup results...



                                                                                

  ‚úÖ Join expanded: 262 (clerk, orderkey) pairs
     Format: (o_clerk, o_orderkey)

üî¢ Step 4: Sorting and taking top 20...



                                                                                


‚úÖ A2 Results (First 20):

  Format      : TXT
  Date        : 1996-01-01
  Duration    : 18.07 seconds
  Total Pairs : 262

  Top 20 (o_clerk, o_orderkey):

     1. Clerk#000000005      | Order: 473858
     2. Clerk#000000007      | Order: 591104
     3. Clerk#000000007      | Order: 312610
     4. Clerk#000000021      | Order: 58247
     5. Clerk#000000027      | Order: 573478
     6. Clerk#000000030      | Order: 343043
     7. Clerk#000000033      | Order: 480613
     8. Clerk#000000033      | Order: 307783
     9. Clerk#000000043      | Order: 235043
    10. Clerk#000000050      | Order: 18372
    11. Clerk#000000050      | Order: 438982
    12. Clerk#000000050      | Order: 237415
    13. Clerk#000000059      | Order: 138789
    14. Clerk#000000059      | Order: 161221
    15. Clerk#000000063      | Order: 473351
    16. Clerk#000000081      | Order: 501218
    17. Clerk#000000082      | Order: 221441
    18. Clerk#000000087      | Order: 151330
    19. Clerk#000000089      | O

                                                                                

  ‚úÖ Loaded lineitem: 600,572 rows, 7 partitions
  ‚úÖ Orders plan saved: proof/plan_a2_orders_parquet_19960101.txt



                                                                                

  ‚úÖ Loaded orders: 150,000 rows, 4 partitions

üîß Step 1: Building key-value pairs...



                                                                                

  ‚úÖ Lineitem filtered by shipdate '1996-01-01': 266 records
     Format: (l_orderkey, 1)



                                                                                

  ‚úÖ Orders keyed by orderkey: 150,000 records
     Format: (o_orderkey, o_clerk)

üîÄ Step 2: Performing reduce-side join via cogroup...

  ‚úÖ Cogroup completed
     Result format: (orderkey, (lineitem_values, orders_values))

üì§ Step 3: Expanding cogroup results...



                                                                                

  ‚úÖ Join expanded: 262 (clerk, orderkey) pairs
     Format: (o_clerk, o_orderkey)

üî¢ Step 4: Sorting and taking top 20...



[Stage 37:>                                                         (0 + 8) / 8]


‚úÖ A2 Results (First 20):

  Format      : Parquet
  Date        : 1996-01-01
  Duration    : 32.22 seconds
  Total Pairs : 262

  Top 20 (o_clerk, o_orderkey):

     1. Clerk#000000005      | Order: 473858
     2. Clerk#000000007      | Order: 591104
     3. Clerk#000000007      | Order: 312610
     4. Clerk#000000021      | Order: 58247
     5. Clerk#000000027      | Order: 573478
     6. Clerk#000000030      | Order: 343043
     7. Clerk#000000033      | Order: 480613
     8. Clerk#000000033      | Order: 307783
     9. Clerk#000000043      | Order: 235043
    10. Clerk#000000050      | Order: 18372
    11. Clerk#000000050      | Order: 438982
    12. Clerk#000000050      | Order: 237415
    13. Clerk#000000059      | Order: 138789
    14. Clerk#000000059      | Order: 161221
    15. Clerk#000000063      | Order: 473351
    16. Clerk#000000081      | Order: 501218
    17. Clerk#000000082      | Order: 221441
    18. Clerk#000000087      | Order: 151330
    19. Clerk#000000089     

                                                                                

### A3 ‚Äî Q3: part & supplier names (broadcast hash join)

In [5]:
# write some code here
# broadcast maps for part and supplier
# map over lineitem(date) -> join in-map -> take(20)

# ============================================================
# Section 5 - Part A: Task A3 - Part & Supplier Names
# ============================================================

print("\n" + "=" * 60)
print("Section 5 ‚Äî Part A: Task A3 - Part & Supplier Names")
print("=" * 60)

import time
from datetime import datetime, timezone
import csv
import os

# ========== DEFINE PATHS (FIX: Declare variables early) ==========

# Configuration from Section 1
DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
TARGET_DATE = "1996-01-01"

# ========== A3 IMPLEMENTATION ==========

def run_a3_part_supplier_broadcast(input_path, target_date, use_parquet=False):
    """
    A3 (Q3) ‚Äî First 20 (l_orderkey, p_name, s_name) for shipped items on DATE
    
    Strategy: Broadcast hash join (map-side join)
    - Broadcast small dimension tables (part, supplier)
    - Map over filtered lineitem to join in-memory
    
    Args:
        input_path: Base path to TPC-H data (TXT or Parquet)
        target_date: Ship date to filter (YYYY-MM-DD format)
        use_parquet: True to use Parquet, False for TXT
    
    Returns:
        tuple: (results_list, duration_seconds, metrics_dict)
    
    Output Format: First 20 (l_orderkey, p_name, s_name) tuples
    """
    
    print(f"\n{'='*60}")
    print(f"Task A3 ‚Äî Part & Supplier Names on {target_date}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"Strategy: Broadcast hash join (map-side)")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # ========== STEP 1: LOAD & BROADCAST DIMENSION TABLES ==========
    
    print(f"üì° Step 1: Loading and broadcasting dimension tables...\n")
    
    if use_parquet:
        # Load part table
        part_path = os.path.join(input_path, "part")
        part_df = spark.read.parquet(part_path)
        
        # Save explain plan
        plan_path = f"proof/plan_a3_part_parquet_{target_date.replace('-', '')}.txt"
        os.makedirs("proof", exist_ok=True)
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A3 Part Parquet Load Plan ({target_date}) ===\n\n")
            f.write(part_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"  ‚úÖ Part plan saved: {plan_path}")
        
        # Convert to RDD and broadcast
        part_rdd = part_df.rdd
        part_count = part_rdd.count()
        
        # Broadcast: {p_partkey -> (p_partkey, p_name, ...)}
        part_bc = broadcast_dimension(
            spark, 
            part_rdd, 
            lambda row: row.p_partkey
        )
        
        print(f"  ‚úÖ Broadcasted part: {part_count:,} entries")
        
        # Load supplier table
        supplier_path = os.path.join(input_path, "supplier")
        supplier_df = spark.read.parquet(supplier_path)
        
        # Save explain plan
        plan_path = f"proof/plan_a3_supplier_parquet_{target_date.replace('-', '')}.txt"
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A3 Supplier Parquet Load Plan ({target_date}) ===\n\n")
            f.write(supplier_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"  ‚úÖ Supplier plan saved: {plan_path}")
        
        # Convert to RDD and broadcast
        supplier_rdd = supplier_df.rdd
        supplier_count = supplier_rdd.count()
        
        # Broadcast: {s_suppkey -> (s_suppkey, s_name, ...)}
        supplier_bc = broadcast_dimension(
            spark,
            supplier_rdd,
            lambda row: row.s_suppkey
        )
        
        print(f"  ‚úÖ Broadcasted supplier: {supplier_count:,} entries\n")
        
        # Load lineitem
        lineitem_path = os.path.join(input_path, "lineitem")
        lineitem_df = spark.read.parquet(lineitem_path)
        lineitem_rdd = lineitem_df.rdd
        
    else:
        # Load part.tbl
        part_path = os.path.join(input_path, "part.tbl")
        part_rdd = sc.textFile(part_path).map(parse_part)
        part_count = part_rdd.count()
        
        # Broadcast part: key = p_partkey (index 0)
        part_bc = broadcast_dimension(
            spark,
            part_rdd,
            lambda t: t[0]  # p_partkey at index 0
        )
        
        print(f"  ‚úÖ Broadcasted part: {part_count:,} entries")
        
        # Load supplier.tbl
        supplier_path = os.path.join(input_path, "supplier.tbl")
        supplier_rdd = sc.textFile(supplier_path).map(parse_supplier)
        supplier_count = supplier_rdd.count()
        
        # Broadcast supplier: key = s_suppkey (index 0)
        supplier_bc = broadcast_dimension(
            spark,
            supplier_rdd,
            lambda t: t[0]  # s_suppkey at index 0
        )
        
        print(f"  ‚úÖ Broadcasted supplier: {supplier_count:,} entries\n")
        
        # Load lineitem.tbl
        lineitem_path = os.path.join(input_path, "lineitem.tbl")
        lineitem_rdd = sc.textFile(lineitem_path).map(parse_lineitem)
    
    lineitem_count = lineitem_rdd.count()
    print(f"  ‚úÖ Loaded lineitem: {lineitem_count:,} rows\n")
    
    # ========== STEP 2: FILTER LINEITEM BY SHIPDATE ==========
    
    print(f"üîç Step 2: Filtering lineitem by shipdate = '{target_date}'...\n")
    
    if use_parquet:
        lineitem_filtered = lineitem_rdd.filter(lambda row: row.l_shipdate == target_date)
    else:
        # l_shipdate at index 10
        lineitem_filtered = lineitem_rdd.filter(lambda t: t[10] == target_date)
    
    filtered_count = lineitem_filtered.count()
    print(f"  ‚úÖ Filtered lineitem: {filtered_count:,} records\n")
    
    # ========== STEP 3: MAP-SIDE JOIN (BROADCAST JOIN) ==========
    
    print(f"üîó Step 3: Performing map-side join with broadcast tables...\n")
    
    def map_join_part_supplier(lineitem_row):
        """
        Join lineitem with part and supplier using broadcast variables
        
        Args:
            lineitem_row: Row (Parquet) or tuple (TXT)
        
        Returns:
            (l_orderkey, p_name, s_name) or None if no match
        """
        if use_parquet:
            # Parquet: Row objects
            l_orderkey = lineitem_row.l_orderkey
            l_partkey = lineitem_row.l_partkey
            l_suppkey = lineitem_row.l_suppkey
        else:
            # TXT: tuples
            # l_orderkey at 0, l_partkey at 1, l_suppkey at 2
            l_orderkey = lineitem_row[0]
            l_partkey = lineitem_row[1]
            l_suppkey = lineitem_row[2]
        
        # Lookup in broadcast maps
        part_tuple = part_bc.value.get(l_partkey)
        supplier_tuple = supplier_bc.value.get(l_suppkey)
        
        # Inner join: only return if both dimensions found
        if part_tuple is not None and supplier_tuple is not None:
            if use_parquet:
                # Parquet Row objects
                p_name = part_tuple.p_name
                s_name = supplier_tuple.s_name
            else:
                # TXT tuples
                # p_name at index 1, s_name at index 1
                p_name = part_tuple[1]
                s_name = supplier_tuple[1]
            
            return (l_orderkey, p_name, s_name)
        else:
            return None
    
    # Apply map-side join
    joined_rdd = lineitem_filtered \
        .map(map_join_part_supplier) \
        .filter(lambda x: x is not None)  # Remove None (non-matching rows)
    
    joined_count = joined_rdd.count()
    print(f"  ‚úÖ Map-side join completed: {joined_count:,} results")
    print(f"     Format: (l_orderkey, p_name, s_name)\n")
    
    # ========== STEP 4: SORT AND TAKE TOP 20 ==========
    
    print(f"üî¢ Step 4: Sorting by orderkey and taking top 20...\n")
    
    # Sort by l_orderkey (first element)
    sorted_rdd = joined_rdd.sortBy(lambda t: t[0], ascending=True)
    
    # Take first 20
    top_20 = sorted_rdd.take(20)
    
    end_time = time.time()
    duration = end_time - start_time
    
    # ========== OUTPUT ==========
    
    print(f"\n{'='*60}")
    print(f"‚úÖ A3 Results (First 20):")
    print(f"{'='*60}\n")
    
    print(f"  Format         : {'Parquet' if use_parquet else 'TXT'}")
    print(f"  Date           : {target_date}")
    print(f"  Duration       : {duration:.2f} seconds")
    print(f"  Total Results  : {joined_count:,}")
    print(f"\n  Top 20 (l_orderkey, p_name, s_name):\n")
    
    for i, (orderkey, p_name, s_name) in enumerate(top_20, 1):
        print(f"    {i:2d}. Order: {orderkey:>6} | Part: {p_name:<30} | Supplier: {s_name}")
    
    print(f"\n{'='*60}\n")
    
    # ========== SAVE OUTPUT ==========
    
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a3_result_{format_suffix}_{target_date.replace('-', '')}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A3 - Part & Supplier Names (Broadcast Hash Join)\n")
        f.write(f"Date: {target_date}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n")
        f.write(f"Total Results: {joined_count:,}\n")
        f.write(f"\nFirst 20 (l_orderkey, p_name, s_name):\n\n")
        
        for orderkey, p_name, s_name in top_20:
            f.write(f"{orderkey},{p_name},{s_name}\n")
    
    print(f"üíæ Saved output: {output_path}\n")
    
    # ========== COLLECT METRICS ==========
    
    metrics = {
        'task': 'A3',
        'format': 'Parquet' if use_parquet else 'TXT',
        'date': target_date,
        'total_results': joined_count,
        'duration_s': round(duration, 2),
        'broadcast_size_part': part_count,
        'broadcast_size_supplier': supplier_count,
        'lineitem_filtered': filtered_count
    }
    
    return top_20, duration, metrics

# ========== NOTEBOOK EXECUTION ==========

print("\nüéØ Running A3 in Notebook Mode (both TXT and Parquet)\n")

# ========== RUN A3 WITH TXT ==========

print("\n" + "üî∑" * 30)
print("Running A3 with TXT format")
print("üî∑" * 30 + "\n")

results_txt, duration_txt, metrics_txt = run_a3_part_supplier_broadcast(
    input_path=INPUT_TXT_PATH,
    target_date=TARGET_DATE,
    use_parquet=False
)

# ========== RUN A3 WITH PARQUET ==========

print("\n" + "üî∂" * 30)
print("Running A3 with Parquet format")
print("üî∂" * 30 + "\n")

results_parquet, duration_parquet, metrics_parquet = run_a3_part_supplier_broadcast(
    input_path=INPUT_PARQUET_PATH,
    target_date=TARGET_DATE,
    use_parquet=True
)

# ========== COMPARISON ==========

print("\n" + "=" * 60)
print("A3 ‚Äî TXT vs Parquet Comparison")
print("=" * 60 + "\n")

comparison_table = f"""
| Metric              | TXT                  | Parquet              | Speedup     |
|---------------------|----------------------|----------------------|-------------|
| Total Results       | {metrics_txt['total_results']:,}              | {metrics_parquet['total_results']:,}              | N/A         |
| Duration            | {duration_txt:.2f}s                | {duration_parquet:.2f}s                | {duration_txt/duration_parquet:.2f}x        |
| Broadcast Part      | {metrics_txt['broadcast_size_part']:,}              | {metrics_parquet['broadcast_size_part']:,}              | N/A         |
| Broadcast Supplier  | {metrics_txt['broadcast_size_supplier']:,}              | {metrics_parquet['broadcast_size_supplier']:,}              | N/A         |
| Lineitem Filtered   | {metrics_txt['lineitem_filtered']:,}              | {metrics_parquet['lineitem_filtered']:,}              | N/A         |
"""

print(comparison_table)

# Validation
results_match = results_txt == results_parquet

if results_match:
    print(f"‚úÖ Validation: Both formats return identical top 20 results")
else:
    print(f"‚ö†Ô∏è  WARNING: Results differ between TXT and Parquet!")
    print(f"\n  TXT first 5:")
    for i, (orderkey, p_name, s_name) in enumerate(results_txt[:5], 1):
        print(f"    {i}. {orderkey}, {p_name}, {s_name}")
    
    print(f"\n  Parquet first 5:")
    for i, (orderkey, p_name, s_name) in enumerate(results_parquet[:5], 1):
        print(f"    {i}. {orderkey}, {p_name}, {s_name}")

print("\n" + "=" * 60 + "\n")

# ========== UPDATE METRICS LOG ==========

print("üìä Updating lab_metrics_log.csv...")

metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    # Get next run_id
    with open(metrics_log_path, 'r') as rf:
        lines = list(csv.reader(rf))
        run_id = len(lines)
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    # Add TXT entry
    writer.writerow([
        run_id,
        'A3',
        'TXT',
        TARGET_DATE,
        3,  # lineitem.tbl + part.tbl + supplier.tbl
        'N/A',  # To be filled from Spark UI
        duration_txt,
        0,  # No shuffle for broadcast join
        0,
        f'broadcast join lineitem+part+supplier, shipdate={TARGET_DATE}, part_bc={metrics_txt["broadcast_size_part"]}, supp_bc={metrics_txt["broadcast_size_supplier"]}',
        timestamp
    ])
    
    # Add Parquet entry
    writer.writerow([
        run_id + 1,
        'A3',
        'Parquet',
        TARGET_DATE,
        'N/A',  # Multiple parquet files
        'N/A',
        duration_parquet,
        0,
        0,
        f'same query with Parquet',
        timestamp
    ])

print(f"  ‚úÖ Updated {metrics_log_path}\n")

# ========== SUMMARY ==========

print("\n" + "‚úÖ" * 30)
print("Section 5 Summary ‚Äî Task A3 Complete")
print("‚úÖ" * 30 + "\n")

print("üìã Deliverables:")
print(f"  ‚úÖ Output TXT    : outputs/a3_result_txt_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Output Parquet: outputs/a3_result_parquet_{TARGET_DATE.replace('-', '')}.txt")
print(f"  ‚úÖ Plans Parquet : proof/plan_a3_part_parquet_*.txt")
print(f"                     proof/plan_a3_supplier_parquet_*.txt")
print(f"  ‚úÖ Metrics Log   : proof/lab_metrics_log.csv\n")

print("üéØ Key Observations for A3:")
print("  ‚úÖ Broadcast join = NO SHUFFLE (faster than A2 cogroup)")
print("  ‚úÖ Dimension tables loaded once and broadcasted to all workers")
print("  ‚úÖ Map-side join = each partition processes independently")
print("  ‚úÖ Ideal for small dimension tables (part, supplier)\n")

print("üéØ Next Steps:")
print("  1. Capture Spark UI screenshots (Jobs, Stages, Storage)")
print("  2. Verify NO shuffle in Stages tab")
print("  3. Check broadcast size in Storage tab")
print("  4. Update lab_metrics_log.csv")
print("  5. Proceed to A4 (mixed joins)\n")

print("=" * 60 + "\n")


Section 5 ‚Äî Part A: Task A3 - Part & Supplier Names

üéØ Running A3 in Notebook Mode (both TXT and Parquet)


üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑
Running A3 with TXT format
üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑


Task A3 ‚Äî Part & Supplier Names on 1996-01-01
Format: TXT
Strategy: Broadcast hash join (map-side)

üì° Step 1: Loading and broadcasting dimension tables...



                                                                                

  ‚úÖ Broadcasted dimension: 20,000 entries
  ‚úÖ Broadcasted part: 20,000 entries
  ‚úÖ Broadcasted dimension: 1,000 entries
  ‚úÖ Broadcasted supplier: 1,000 entries



                                                                                

  ‚úÖ Loaded lineitem: 600,572 rows

üîç Step 2: Filtering lineitem by shipdate = '1996-01-01'...



                                                                                

  ‚úÖ Filtered lineitem: 266 records

üîó Step 3: Performing map-side join with broadcast tables...



                                                                                

  ‚úÖ Map-side join completed: 266 results
     Format: (l_orderkey, p_name, s_name)

üî¢ Step 4: Sorting by orderkey and taking top 20...



                                                                                


‚úÖ A3 Results (First 20):

  Format         : TXT
  Date           : 1996-01-01
  Duration       : 14.79 seconds
  Total Results  : 266

  Top 20 (l_orderkey, p_name, s_name):

     1. Order:   2309 | Part: burnished orchid rose rosy tomato | Supplier: Supplier#000000519
     2. Order:   2595 | Part: purple floral green slate smoke | Supplier: Supplier#000000675
     3. Order:   4773 | Part: turquoise yellow wheat salmon dim | Supplier: Supplier#000000315
     4. Order:   9381 | Part: turquoise blush indian moccasin burlywood | Supplier: Supplier#000000020
     5. Order:  17189 | Part: lavender green chocolate pink peach | Supplier: Supplier#000000561
     6. Order:  17600 | Part: chartreuse snow saddle ghost medium | Supplier: Supplier#000000968
     7. Order:  18372 | Part: navy black coral rose papaya   | Supplier: Supplier#000000641
     8. Order:  19462 | Part: royal steel lime purple light  | Supplier: Supplier#000000679
     9. Order:  22149 | Part: sienna snow frosted cornflo

                                                                                

  ‚úÖ Broadcasted dimension: 20,000 entries
  ‚úÖ Broadcasted part: 20,000 entries
  ‚úÖ Supplier plan saved: proof/plan_a3_supplier_parquet_19960101.txt
  ‚úÖ Broadcasted dimension: 1,000 entries
  ‚úÖ Broadcasted supplier: 1,000 entries



                                                                                

  ‚úÖ Loaded lineitem: 600,572 rows

üîç Step 2: Filtering lineitem by shipdate = '1996-01-01'...



                                                                                

  ‚úÖ Filtered lineitem: 266 records

üîó Step 3: Performing map-side join with broadcast tables...



                                                                                

  ‚úÖ Map-side join completed: 266 results
     Format: (l_orderkey, p_name, s_name)

üî¢ Step 4: Sorting by orderkey and taking top 20...






‚úÖ A3 Results (First 20):

  Format         : Parquet
  Date           : 1996-01-01
  Duration       : 29.46 seconds
  Total Results  : 266

  Top 20 (l_orderkey, p_name, s_name):

     1. Order:   2309 | Part: burnished orchid rose rosy tomato | Supplier: Supplier#000000519
     2. Order:   2595 | Part: purple floral green slate smoke | Supplier: Supplier#000000675
     3. Order:   4773 | Part: turquoise yellow wheat salmon dim | Supplier: Supplier#000000315
     4. Order:   9381 | Part: turquoise blush indian moccasin burlywood | Supplier: Supplier#000000020
     5. Order:  17189 | Part: lavender green chocolate pink peach | Supplier: Supplier#000000561
     6. Order:  17600 | Part: chartreuse snow saddle ghost medium | Supplier: Supplier#000000968
     7. Order:  18372 | Part: navy black coral rose papaya   | Supplier: Supplier#000000641
     8. Order:  19462 | Part: royal steel lime purple light  | Supplier: Supplier#000000679
     9. Order:  22149 | Part: sienna snow frosted cor

                                                                                

### A4 ‚Äî Q4: shipped items by nation (mixed joins)

In [6]:
# write some code here
# reduce-side for (lineitem ‚®ù orders); broadcast for (customer, nation)
# aggregate to (n_nationkey, n_name, count)
# ============================================================
# Section 6 - Part A: Task A4 - Nation Shipment Counts
# ============================================================

print("\n" + "=" * 60)
print("Section 6 ‚Äî Part A: Task A4 - Nation Shipment Counts")
print("=" * 60)

import time
from datetime import datetime, timezone
import csv
import os

# ========== DEFINE PATHS ==========

DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
TARGET_DATE = "1996-01-01"

# ========== A4 IMPLEMENTATION ==========

def run_a4_nation_counts(input_path, target_date, use_parquet=False):
    """
    A4 (Q4) ‚Äî (n_nationkey, n_name, count) for items shipped to each nation on DATE
    
    Strategy: Mixed joins
    - Reduce-side join: lineitem ‚®ù orders (both large)
    - Broadcast join: customer, nation (small dimensions)
    
    Output Format: (n_nationkey, n_name, count) sorted by nationkey
    """
    
    print(f"\n{'='*60}")
    print(f"Task A4 ‚Äî Nation Shipment Counts on {target_date}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"Strategy: Reduce-side (lineitem+orders) + Broadcast (customer+nation)")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # ========== STEP 1: BROADCAST DIMENSIONS ==========
    
    print(f"üì° Step 1: Broadcasting dimension tables...\n")
    
    if use_parquet:
        # Customer
        customer_df = spark.read.parquet(os.path.join(input_path, "customer"))
        customer_rdd = customer_df.rdd
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda row: row.c_custkey)
        print(f"  ‚úÖ Broadcasted customer: {customer_rdd.count():,} entries")
        
        # Nation
        nation_df = spark.read.parquet(os.path.join(input_path, "nation"))
        nation_rdd = nation_df.rdd
        nation_bc = broadcast_dimension(spark, nation_rdd, lambda row: row.n_nationkey)
        print(f"  ‚úÖ Broadcasted nation: {nation_rdd.count():,} entries\n")
        
        # Load lineitem & orders
        lineitem_rdd = spark.read.parquet(os.path.join(input_path, "lineitem")).rdd
        orders_rdd = spark.read.parquet(os.path.join(input_path, "orders")).rdd
        
    else:
        # Customer
        customer_rdd = sc.textFile(os.path.join(input_path, "customer.tbl")).map(parse_customer)
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda t: t[0])  # c_custkey
        print(f"  ‚úÖ Broadcasted customer: {customer_rdd.count():,} entries")
        
        # Nation
        nation_rdd = sc.textFile(os.path.join(input_path, "nation.tbl")).map(parse_nation)
        nation_bc = broadcast_dimension(spark, nation_rdd, lambda t: t[0])  # n_nationkey
        print(f"  ‚úÖ Broadcasted nation: {nation_rdd.count():,} entries\n")
        
        # Load lineitem & orders
        lineitem_rdd = sc.textFile(os.path.join(input_path, "lineitem.tbl")).map(parse_lineitem)
        orders_rdd = sc.textFile(os.path.join(input_path, "orders.tbl")).map(parse_orders)
    
    # ========== STEP 2: FILTER LINEITEM ==========
    
    print(f"üîç Step 2: Filtering lineitem by shipdate...\n")
    
    if use_parquet:
        lineitem_filtered = lineitem_rdd.filter(lambda row: row.l_shipdate == target_date)
    else:
        lineitem_filtered = lineitem_rdd.filter(lambda t: t[10] == target_date)  # l_shipdate
    
    print(f"  ‚úÖ Filtered: {lineitem_filtered.count():,} records\n")
    
    # ========== STEP 3: REDUCE-SIDE JOIN (lineitem ‚®ù orders) ==========
    
    print(f"üîÄ Step 3: Reduce-side join lineitem ‚®ù orders...\n")
    
    if use_parquet:
        lineitem_keyed = lineitem_filtered.map(lambda row: (row.l_orderkey, row))
        orders_keyed = orders_rdd.map(lambda row: (row.o_orderkey, row.o_custkey))
    else:
        lineitem_keyed = lineitem_filtered.map(lambda t: (t[0], t))  # (l_orderkey, row)
        orders_keyed = orders_rdd.map(lambda t: (t[0], t[1]))  # (o_orderkey, o_custkey)
    
    # Cogroup join
    joined = lineitem_keyed.cogroup(orders_keyed) \
        .flatMap(lambda kv: [(kv[0], custkey) for lineitem_list in [list(kv[1][0])] 
                              for orders_list in [list(kv[1][1])] 
                              for _ in lineitem_list 
                              for custkey in orders_list])
    
    print(f"  ‚úÖ Join completed: {joined.count():,} (orderkey, custkey) pairs\n")
    
    # ========== STEP 4: BROADCAST JOIN (customer ‚Üí nation) ==========
    
    print(f"üîó Step 4: Broadcast join with customer+nation...\n")
    
    def map_to_nation(item):
        orderkey, custkey = item
        customer = customer_bc.value.get(custkey)
        if customer:
            if use_parquet:
                nationkey = customer.c_nationkey
            else:
                nationkey = customer[3]  # c_nationkey at index 3
            
            nation = nation_bc.value.get(nationkey)
            if nation:
                if use_parquet:
                    return (nation.n_nationkey, nation.n_name)
                else:
                    return (nation[0], nation[1])  # (n_nationkey, n_name)
        return None
    
    nations = joined.map(map_to_nation).filter(lambda x: x is not None)
    
    # ========== STEP 5: AGGREGATE BY NATION ==========
    
    print(f"üìä Step 5: Aggregating by nation...\n")
    
    counts = nations.map(lambda kv: (kv, 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .map(lambda kv: (kv[0][0], kv[0][1], kv[1])) \
        .sortBy(lambda t: t[0])  # Sort by nationkey
    
    results = counts.collect()
    
    end_time = time.time()
    duration = end_time - start_time
    
    # ========== OUTPUT ==========
    
    print(f"\n{'='*60}")
    print(f"‚úÖ A4 Results (Nation Shipment Counts):")
    print(f"{'='*60}\n")
    
    print(f"  Format   : {'Parquet' if use_parquet else 'TXT'}")
    print(f"  Date     : {target_date}")
    print(f"  Duration : {duration:.2f}s")
    print(f"  Nations  : {len(results)}\n")
    
    for nationkey, name, count in results:
        print(f"  {nationkey:2d} | {name:<20} | {count:>5}")
    
    print(f"\n{'='*60}\n")
    
    # ========== SAVE OUTPUT ==========
    
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a4_result_{format_suffix}_{target_date.replace('-', '')}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A4 - Nation Shipment Counts (Mixed Join)\n")
        f.write(f"Date: {target_date}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n\n")
        
        for nationkey, name, count in results:
            f.write(f"{nationkey},{name},{count}\n")
    
    print(f"üíæ Saved: {output_path}\n")
    
    # ========== METRICS ==========
    
    metrics = {
        'task': 'A4',
        'format': 'Parquet' if use_parquet else 'TXT',
        'date': target_date,
        'duration_s': round(duration, 2),
        'num_nations': len(results)
    }
    
    return results, duration, metrics

# ========== RUN A4 ==========

print("\nüéØ Running A4 (both TXT and Parquet)\n")

# TXT

results_txt, duration_txt, metrics_txt = run_a4_nation_counts(
    INPUT_TXT_PATH, TARGET_DATE, use_parquet=False
)

# Parquet

results_parquet, duration_parquet, metrics_parquet = run_a4_nation_counts(
    INPUT_PARQUET_PATH, TARGET_DATE, use_parquet=True
)

# ========== COMPARISON ==========

print(f"\n{'='*60}")
print("A4 ‚Äî TXT vs Parquet Comparison")
print(f"{'='*60}\n")

print(f"| Metric    | TXT       | Parquet   | Speedup |")
print(f"|-----------|-----------|-----------|---------|")
print(f"| Duration  | {duration_txt:.2f}s     | {duration_parquet:.2f}s     | {duration_txt/duration_parquet:.2f}x    |")
print(f"| Nations   | {len(results_txt)}        | {len(results_parquet)}        | -       |")
print(f"\n{'='*60}\n")

# ========== UPDATE METRICS LOG ==========

metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    with open(metrics_log_path, 'r') as rf:
        run_id = len(list(csv.reader(rf)))
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    writer.writerow([
        run_id, 'A4', 'TXT', TARGET_DATE, 4, 'N/A', duration_txt,
        'N/A', 'N/A', 'mixed join: reduce-side (lineitem+orders) + broadcast (customer+nation)', timestamp
    ])
    
    writer.writerow([
        run_id + 1, 'A4', 'Parquet', TARGET_DATE, 'N/A', 'N/A', duration_parquet,
        'N/A', 'N/A', 'same query with Parquet', timestamp
    ])

print(f"‚úÖ Updated {metrics_log_path}\n")

# ========== SPARK UI REMINDER ==========


print("\nüéØ Spark UI Screenshots for A4:\n")
print("  1. Jobs Tab ‚Üí A4-TXT and A4-Parquet (cogroup + broadcast)")
print("  2. Stages Tab ‚Üí Cogroup stage (shuffle metrics)")
print("  3. Storage Tab ‚Üí Broadcast variables (customer, nation)")
print("\nüì∏" * 30 + "\n")

print("‚úÖ Section 6 ‚Äî Task A4 Complete\n")
print("=" * 60 + "\n")


Section 6 ‚Äî Part A: Task A4 - Nation Shipment Counts

üéØ Running A4 (both TXT and Parquet)

üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑üî∑

Task A4 ‚Äî Nation Shipment Counts on 1996-01-01
Format: TXT
Strategy: Reduce-side (lineitem+orders) + Broadcast (customer+nation)

üì° Step 1: Broadcasting dimension tables...



                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries
  ‚úÖ Broadcasted customer: 15,000 entries
  ‚úÖ Broadcasted dimension: 25 entries
  ‚úÖ Broadcasted nation: 25 entries

üîç Step 2: Filtering lineitem by shipdate...



                                                                                

  ‚úÖ Filtered: 266 records

üîÄ Step 3: Reduce-side join lineitem ‚®ù orders...



                                                                                

  ‚úÖ Join completed: 266 (orderkey, custkey) pairs

üîó Step 4: Broadcast join with customer+nation...

üìä Step 5: Aggregating by nation...



                                                                                


‚úÖ A4 Results (Nation Shipment Counts):

  Format   : TXT
  Date     : 1996-01-01
  Duration : 13.94s
  Nations  : 25

   0 | ALGERIA              |     5
   1 | ARGENTINA            |     8
   2 | BRAZIL               |    15
   3 | CANADA               |    12
   4 | EGYPT                |     8
   5 | ETHIOPIA             |     6
   6 | FRANCE               |     9
   7 | GERMANY              |    17
   8 | INDIA                |     8
   9 | INDONESIA            |    13
  10 | IRAN                 |    14
  11 | IRAQ                 |    13
  12 | JAPAN                |    19
  13 | JORDAN               |    14
  14 | KENYA                |     7
  15 | MOROCCO              |    10
  16 | MOZAMBIQUE           |     9
  17 | PERU                 |     8
  18 | CHINA                |     4
  19 | ROMANIA              |    21
  20 | SAUDI ARABIA         |     9
  21 | VIETNAM              |     7
  22 | RUSSIA               |    13
  23 | UNITED KINGDOM       |     8
  24 | UNITED S

                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries
  ‚úÖ Broadcasted customer: 15,000 entries
  ‚úÖ Broadcasted dimension: 25 entries
  ‚úÖ Broadcasted nation: 25 entries

üîç Step 2: Filtering lineitem by shipdate...



                                                                                

  ‚úÖ Filtered: 266 records

üîÄ Step 3: Reduce-side join lineitem ‚®ù orders...



                                                                                

  ‚úÖ Join completed: 266 (orderkey, custkey) pairs

üîó Step 4: Broadcast join with customer+nation...

üìä Step 5: Aggregating by nation...






‚úÖ A4 Results (Nation Shipment Counts):

  Format   : Parquet
  Date     : 1996-01-01
  Duration : 18.39s
  Nations  : 25

   0 | ALGERIA              |     5
   1 | ARGENTINA            |     8
   2 | BRAZIL               |    15
   3 | CANADA               |    12
   4 | EGYPT                |     8
   5 | ETHIOPIA             |     6
   6 | FRANCE               |     9
   7 | GERMANY              |    17
   8 | INDIA                |     8
   9 | INDONESIA            |    13
  10 | IRAN                 |    14
  11 | IRAQ                 |    13
  12 | JAPAN                |    19
  13 | JORDAN               |    14
  14 | KENYA                |     7
  15 | MOROCCO              |    10
  16 | MOZAMBIQUE           |     9
  17 | PERU                 |     8
  18 | CHINA                |     4
  19 | ROMANIA              |    21
  20 | SAUDI ARABIA         |     9
  21 | VIETNAM              |     7
  22 | RUSSIA               |    13
  23 | UNITED KINGDOM       |     8
  24 | UNIT

                                                                                

### A5 ‚Äî Q5: monthly US vs CANADA volumes

In [7]:
# write some code here
# compute (nationkey, n_name, yyyy-mm, count) across full data
# write CSV for plotting; keep timings for TXT vs PARQUET
# ============================================================
# Section 7 - Part A: Task A5 - Monthly Volumes US vs CANADA
# ============================================================

import time
from datetime import datetime, timezone
import csv
import os

# Paths
DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")

def run_a5_monthly_volumes(input_path, use_parquet=False):
    """
    A5 (Q5) - Monthly volumes for US vs CANADA across full warehouse
    
    Strategy:
    - Broadcast nation, customer
    - Reduce-side join lineitem+orders
    - Extract YYYY-MM from l_shipdate
    - Aggregate by (nationkey, n_name, YYYY-MM)
    
    Output: (nationkey, n_name, YYYY-MM, count)
    """
    
    print(f"\n{'='*60}")
    print(f"Task A5 - Monthly Volumes (US vs CANADA)")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # Broadcast dimensions
    if use_parquet:
        nation_df = spark.read.parquet(os.path.join(input_path, "nation"))
        nation_rdd = nation_df.rdd
        nation_bc = broadcast_dimension(spark, nation_rdd, lambda row: row.n_nationkey)
        
        customer_df = spark.read.parquet(os.path.join(input_path, "customer"))
        customer_rdd = customer_df.rdd
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda row: row.c_custkey)
        
        lineitem_rdd = spark.read.parquet(os.path.join(input_path, "lineitem")).rdd
        orders_rdd = spark.read.parquet(os.path.join(input_path, "orders")).rdd
        
    else:
        nation_rdd = sc.textFile(os.path.join(input_path, "nation.tbl")).map(parse_nation)
        nation_bc = broadcast_dimension(spark, nation_rdd, lambda t: t[0])
        
        customer_rdd = sc.textFile(os.path.join(input_path, "customer.tbl")).map(parse_customer)
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda t: t[0])
        
        lineitem_rdd = sc.textFile(os.path.join(input_path, "lineitem.tbl")).map(parse_lineitem)
        orders_rdd = sc.textFile(os.path.join(input_path, "orders.tbl")).map(parse_orders)
    
    # Filter US and CANADA
    us_canada_nations = nation_bc.value
    us_canada_keys = [k for k, v in us_canada_nations.items() 
                      if (v.n_name if use_parquet else v[1]) in ['UNITED STATES', 'CANADA']]
    
    # Join lineitem+orders (reduce-side)
    if use_parquet:
        lineitem_keyed = lineitem_rdd.map(lambda row: (row.l_orderkey, (row.l_shipdate,)))
        orders_keyed = orders_rdd.map(lambda row: (row.o_orderkey, row.o_custkey))
    else:
        lineitem_keyed = lineitem_rdd.map(lambda t: (t[0], (t[10],)))  # (orderkey, shipdate)
        orders_keyed = orders_rdd.map(lambda t: (t[0], t[1]))  # (orderkey, custkey)
    
    joined = lineitem_keyed.cogroup(orders_keyed) \
        .flatMap(lambda kv: [(shipdate, custkey) 
                             for lineitem_list in [list(kv[1][0])] 
                             for orders_list in [list(kv[1][1])] 
                             for shipdate, in lineitem_list 
                             for custkey in orders_list])
    
    # Map to nation via customer broadcast
    def map_to_nation_month(item):
        shipdate, custkey = item
        customer = customer_bc.value.get(custkey)
        if customer:
            nationkey = customer.c_nationkey if use_parquet else customer[3]
            if nationkey in us_canada_keys:
                nation = nation_bc.value.get(nationkey)
                if nation:
                    n_name = nation.n_name if use_parquet else nation[1]
                    year_month = shipdate[:7]  # YYYY-MM
                    return (nationkey, n_name, year_month)
        return None
    
    nations_months = joined.map(map_to_nation_month).filter(lambda x: x is not None)
    
    # Aggregate
    counts = nations_months.map(lambda kv: (kv, 1)) \
        .reduceByKey(lambda a, b: a + b) \
        .map(lambda kv: (kv[0][0], kv[0][1], kv[0][2], kv[1])) \
        .sortBy(lambda t: (t[0], t[2]))  # Sort by nationkey, year-month
    
    results = counts.collect()
    
    end_time = time.time()
    duration = end_time - start_time
    
    # Output
    print(f"\nResults: {len(results)} monthly volumes\n")
    
    for nationkey, name, year_month, count in results[:20]:
        print(f"  {nationkey} | {name:<15} | {year_month} | {count:>6}")
    
    if len(results) > 20:
        print(f"  ... ({len(results) - 20} more rows)")
    
    print(f"\nDuration: {duration:.2f}s\n")
    
    # Save output
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a5_result_{format_suffix}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A5 - Monthly Volumes (US vs CANADA)\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n\n")
        
        for nationkey, name, year_month, count in results:
            f.write(f"{nationkey},{name},{year_month},{count}\n")
    
    print(f"Saved: {output_path}\n")
    
    metrics = {
        'task': 'A5',
        'format': 'Parquet' if use_parquet else 'TXT',
        'duration_s': round(duration, 2),
        'num_results': len(results)
    }
    
    return results, duration, metrics

# Run A5
print("\n" + "="*60)
print("Running A5 (TXT and Parquet)")
print("="*60)

results_txt, duration_txt, metrics_txt = run_a5_monthly_volumes(INPUT_TXT_PATH, use_parquet=False)
results_parquet, duration_parquet, metrics_parquet = run_a5_monthly_volumes(INPUT_PARQUET_PATH, use_parquet=True)

# Comparison
print(f"\n{'='*60}")
print("A5 - TXT vs Parquet Comparison")
print(f"{'='*60}\n")

print(f"| Metric    | TXT       | Parquet   | Speedup |")
print(f"|-----------|-----------|-----------|---------|")
print(f"| Duration  | {duration_txt:.2f}s     | {duration_parquet:.2f}s     | {duration_txt/duration_parquet:.2f}x    |")
print(f"| Results   | {len(results_txt)}        | {len(results_parquet)}        | -       |")
print(f"\n{'='*60}\n")

# Update metrics log
metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    with open(metrics_log_path, 'r') as rf:
        run_id = len(list(csv.reader(rf)))
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    writer.writerow([
        run_id, 'A5', 'TXT', 'full-warehouse', 4, 'N/A', duration_txt,
        'N/A', 'N/A', 'monthly volumes US+CANADA, cogroup lineitem+orders, broadcast customer+nation', timestamp
    ])
    
    writer.writerow([
        run_id + 1, 'A5', 'Parquet', 'full-warehouse', 'N/A', 'N/A', duration_parquet,
        'N/A', 'N/A', 'same query with Parquet', timestamp
    ])

print(f"Updated {metrics_log_path}\n")

print("="*60)
print("Section 7 - Task A5 Complete")
print("="*60)


Running A5 (TXT and Parquet)

Task A5 - Monthly Volumes (US vs CANADA)
Format: TXT



                                                                                

  ‚úÖ Broadcasted dimension: 25 entries


                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries


                                                                                


Results: 166 monthly volumes

  3 | CANADA          | 1992-01 |     29
  3 | CANADA          | 1992-02 |     99
  3 | CANADA          | 1992-03 |    198
  3 | CANADA          | 1992-04 |    285
  3 | CANADA          | 1992-05 |    349
  3 | CANADA          | 1992-06 |    317
  3 | CANADA          | 1992-07 |    362
  3 | CANADA          | 1992-08 |    335
  3 | CANADA          | 1992-09 |    292
  3 | CANADA          | 1992-10 |    286
  3 | CANADA          | 1992-11 |    321
  3 | CANADA          | 1992-12 |    314
  3 | CANADA          | 1993-01 |    344
  3 | CANADA          | 1993-02 |    314
  3 | CANADA          | 1993-03 |    350
  3 | CANADA          | 1993-04 |    305
  3 | CANADA          | 1993-05 |    286
  3 | CANADA          | 1993-06 |    322
  3 | CANADA          | 1993-07 |    324
  3 | CANADA          | 1993-08 |    336
  ... (146 more rows)

Duration: 14.39s

Saved: outputs/a5_result_txt.txt


Task A5 - Monthly Volumes (US vs CANADA)
Format: Parquet

  ‚úÖ Broadcast

                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries


[Stage 95:>                                                         (0 + 8) / 8]


Results: 166 monthly volumes

  3 | CANADA          | 1992-01 |     29
  3 | CANADA          | 1992-02 |     99
  3 | CANADA          | 1992-03 |    198
  3 | CANADA          | 1992-04 |    285
  3 | CANADA          | 1992-05 |    349
  3 | CANADA          | 1992-06 |    317
  3 | CANADA          | 1992-07 |    362
  3 | CANADA          | 1992-08 |    335
  3 | CANADA          | 1992-09 |    292
  3 | CANADA          | 1992-10 |    286
  3 | CANADA          | 1992-11 |    321
  3 | CANADA          | 1992-12 |    314
  3 | CANADA          | 1993-01 |    344
  3 | CANADA          | 1993-02 |    314
  3 | CANADA          | 1993-03 |    350
  3 | CANADA          | 1993-04 |    305
  3 | CANADA          | 1993-05 |    286
  3 | CANADA          | 1993-06 |    322
  3 | CANADA          | 1993-07 |    324
  3 | CANADA          | 1993-08 |    336
  ... (146 more rows)

Duration: 20.98s

Saved: outputs/a5_result_parquet.txt


A5 - TXT vs Parquet Comparison

| Metric    | TXT       | Parquet   |

                                                                                

### A6 ‚Äî Q6: Pricing Summary (filtered by DATE)

In [8]:
# write some code here
# implement sums/averages over lineitem for given date
# emit tuples per (l_returnflag, l_linestatus, ...)
# ============================================================
# Section 8 - Part A: Task A6 - Pricing Summary Report
# ============================================================

import time
from datetime import datetime, timezone
import csv
import os

# Paths
DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
TARGET_DATE = "1996-01-01"

def run_a6_pricing_summary(input_path, target_date, use_parquet=False):
    """
    A6 (Q6) - Modified TPC-H Q1 Pricing Summary Report
    
    Filter: l_shipdate = target_date
    Compute:
    - sum_qty = SUM(l_quantity)
    - sum_base_price = SUM(l_extendedprice)
    - sum_disc_price = SUM(l_extendedprice * (1 - l_discount))
    - sum_charge = SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax))
    - avg_qty = AVG(l_quantity)
    - avg_price = AVG(l_extendedprice)
    - avg_disc = AVG(l_discount)
    - count_order = COUNT(*)
    
    Strategy: Single pass with combiners (in-mapper combining pattern)
    """
    
    print(f"\n{'='*60}")
    print(f"Task A6 - Pricing Summary Report")
    print(f"Date: {target_date}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # Load lineitem
    if use_parquet:
        lineitem_df = spark.read.parquet(os.path.join(input_path, "lineitem"))
        
        # Save formatted plan
        plan_path = f"proof/plan_a6_lineitem_parquet_{target_date.replace('-', '')}.txt"
        os.makedirs("proof", exist_ok=True)
        
        with open(plan_path, 'w') as f:
            f.write(f"=== A6 Lineitem Parquet Load Plan ({target_date}) ===\n\n")
            f.write(lineitem_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        
        print(f"Saved plan: {plan_path}")
        
        lineitem_rdd = lineitem_df.rdd
        lineitem_filtered = lineitem_rdd.filter(lambda row: row.l_shipdate == target_date)
        
    else:
        lineitem_rdd = sc.textFile(os.path.join(input_path, "lineitem.tbl")).map(parse_lineitem)
        lineitem_filtered = lineitem_rdd.filter(lambda t: t[10] == target_date)
    
    filtered_count = lineitem_filtered.count()
    print(f"Filtered lineitem: {filtered_count} records\n")
    
    # Aggregation with combiners
    def map_to_aggregates(row):
        if use_parquet:
            qty = float(row.l_quantity)
            price = float(row.l_extendedprice)
            disc = float(row.l_discount)
            tax = float(row.l_tax)
        else:
            qty = float(row[4])
            price = float(row[5])
            disc = float(row[6])
            tax = float(row[7])
        
        disc_price = price * (1 - disc)
        charge = disc_price * (1 + tax)
        
        return (
            qty,
            price,
            disc_price,
            charge,
            qty,
            price,
            disc,
            1
        )
    
    def combine_aggregates(agg1, agg2):
        return tuple(a + b for a, b in zip(agg1, agg2))
    
    aggregates = lineitem_filtered \
        .map(map_to_aggregates) \
        .reduce(combine_aggregates)
    
    sum_qty, sum_base_price, sum_disc_price, sum_charge, \
        total_qty, total_price, total_disc, count_order = aggregates
    
    avg_qty = total_qty / count_order if count_order > 0 else 0
    avg_price = total_price / count_order if count_order > 0 else 0
    avg_disc = total_disc / count_order if count_order > 0 else 0
    
    end_time = time.time()
    duration = end_time - start_time
    
    print(f"\n{'='*60}")
    print(f"A6 Results - Pricing Summary")
    print(f"{'='*60}\n")
    
    print(f"Date            : {target_date}")
    print(f"Format          : {'Parquet' if use_parquet else 'TXT'}")
    print(f"Duration        : {duration:.2f}s")
    print(f"Count Orders    : {count_order}\n")
    
    print(f"Sum Qty         : {sum_qty:,.2f}")
    print(f"Sum Base Price  : ${sum_base_price:,.2f}")
    print(f"Sum Disc Price  : ${sum_disc_price:,.2f}")
    print(f"Sum Charge      : ${sum_charge:,.2f}\n")
    
    print(f"Avg Qty         : {avg_qty:.2f}")
    print(f"Avg Price       : ${avg_price:.2f}")
    print(f"Avg Discount    : {avg_disc:.4f}\n")
    
    print(f"{'='*60}\n")
    
    # Save output
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a6_result_{format_suffix}_{target_date.replace('-', '')}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A6 - Pricing Summary Report\n")
        f.write(f"Date: {target_date}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n\n")
        f.write(f"count_order,sum_qty,sum_base_price,sum_disc_price,sum_charge,avg_qty,avg_price,avg_disc\n")
        f.write(f"{count_order},{sum_qty:.2f},{sum_base_price:.2f},{sum_disc_price:.2f},{sum_charge:.2f},{avg_qty:.2f},{avg_price:.2f},{avg_disc:.4f}\n")
    
    print(f"Saved: {output_path}\n")
    
    metrics = {
        'task': 'A6',
        'format': 'Parquet' if use_parquet else 'TXT',
        'date': target_date,
        'duration_s': round(duration, 2),
        'count_order': count_order
    }
    
    return (count_order, sum_qty, sum_base_price, sum_disc_price, sum_charge, 
            avg_qty, avg_price, avg_disc), duration, metrics

# Run A6
print("\n" + "="*60)
print("Running A6 (TXT and Parquet)")
print("="*60)

results_txt, duration_txt, metrics_txt = run_a6_pricing_summary(INPUT_TXT_PATH, TARGET_DATE, use_parquet=False)
results_parquet, duration_parquet, metrics_parquet = run_a6_pricing_summary(INPUT_PARQUET_PATH, TARGET_DATE, use_parquet=True)

# Comparison
print(f"\n{'='*60}")
print("A6 - TXT vs Parquet Comparison")
print(f"{'='*60}\n")

print(f"| Metric       | TXT       | Parquet   | Speedup |")
print(f"|--------------|-----------|-----------|---------|")
print(f"| Duration     | {duration_txt:.2f}s     | {duration_parquet:.2f}s     | {duration_txt/duration_parquet:.2f}x    |")
print(f"| Count Orders | {results_txt[0]}       | {results_parquet[0]}       | -       |")
print(f"| Sum Charge   | ${results_txt[4]:,.2f} | ${results_parquet[4]:,.2f} | -       |")

if abs(results_txt[4] - results_parquet[4]) < 0.01:
    print(f"\nValidation: Results match (difference < $0.01)")
else:
    print(f"\nWARNING: Results differ by ${abs(results_txt[4] - results_parquet[4]):,.2f}")

print(f"\n{'='*60}\n")

# Update metrics log
metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    with open(metrics_log_path, 'r') as rf:
        run_id = len(list(csv.reader(rf)))
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    writer.writerow([
        run_id, 'A6', 'TXT', TARGET_DATE, 1, 'N/A', duration_txt,
        0, 0, f'pricing summary single-pass combiners, {results_txt[0]} orders', timestamp
    ])
    
    writer.writerow([
        run_id + 1, 'A6', 'Parquet', TARGET_DATE, 'N/A', 'N/A', duration_parquet,
        0, 0, f'same query Parquet', timestamp
    ])

print(f"Updated {metrics_log_path}\n")

print("="*60)
print("Section 8 - Task A6 Complete")
print("="*60 + "\n")


Running A6 (TXT and Parquet)

Task A6 - Pricing Summary Report
Date: 1996-01-01
Format: TXT



                                                                                

Filtered lineitem: 266 records



                                                                                


A6 Results - Pricing Summary

Date            : 1996-01-01
Format          : TXT
Duration        : 7.45s
Count Orders    : 266

Sum Qty         : 7,050.00
Sum Base Price  : $9,670,322.45
Sum Disc Price  : $9,158,295.64
Sum Charge      : $9,535,800.64

Avg Qty         : 26.50
Avg Price       : $36354.60
Avg Discount    : 0.0527


Saved: outputs/a6_result_txt_19960101.txt


Task A6 - Pricing Summary Report
Date: 1996-01-01
Format: Parquet

Saved plan: proof/plan_a6_lineitem_parquet_19960101.txt


                                                                                

Filtered lineitem: 266 records






A6 Results - Pricing Summary

Date            : 1996-01-01
Format          : Parquet
Duration        : 11.16s
Count Orders    : 266

Sum Qty         : 7,050.00
Sum Base Price  : $9,670,322.45
Sum Disc Price  : $9,158,295.64
Sum Charge      : $9,535,800.64

Avg Qty         : 26.50
Avg Price       : $36354.60
Avg Discount    : 0.0527


Saved: outputs/a6_result_parquet_19960101.txt


A6 - TXT vs Parquet Comparison

| Metric       | TXT       | Parquet   | Speedup |
|--------------|-----------|-----------|---------|
| Duration     | 7.45s     | 11.16s     | 0.67x    |
| Count Orders | 266       | 266       | -       |
| Sum Charge   | $9,535,800.64 | $9,535,800.64 | -       |

Validation: Results match (difference < $0.01)


Updated proof/lab_metrics_log.csv

Section 8 - Task A6 Complete



                                                                                

### A7 ‚Äî Q7: Shipping Priority Top‚Äë10

In [9]:
# write some code here
# join customer, orders, lineitem with appropriate filters and groupBy
# compute revenue and order by desc; limit 10
# ============================================================
# Section 9 - Part A: Task A7 - Shipping Priority (Modified Q3)
# ============================================================

import time
from datetime import datetime, timezone
import csv
import os

# Paths
DATA_DIR = "data"
TPCH_DIR = os.path.join(DATA_DIR, "tpch")
INPUT_TXT_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-TXT")
INPUT_PARQUET_PATH = os.path.join(TPCH_DIR, "TPC-H-0.1-PARQUET")
SHIPDATE_BEFORE = "1995-03-15"
ORDERDATE_AFTER = "1995-03-01"

def run_a7_shipping_priority(input_path, shipdate_before, orderdate_after, use_parquet=False):
    """
    A7 (Q7) - Modified TPC-H Q3 Shipping Priority Query
    
    Find top 10 unshipped orders by revenue where:
    - l_shipdate > shipdate_before
    - o_orderdate < orderdate_after
    
    Output: (l_orderkey, revenue, o_orderdate, o_shippriority)
    Revenue = SUM(l_extendedprice * (1 - l_discount))
    
    Strategy:
    - Broadcast customer (small)
    - Reduce-side join lineitem + orders (both large)
    - Aggregate revenue by orderkey
    - Sort by revenue DESC, take top 10
    """
    
    print(f"\n{'='*60}")
    print(f"Task A7 - Shipping Priority (Modified Q3)")
    print(f"Shipdate > {shipdate_before}, Orderdate < {orderdate_after}")
    print(f"Format: {'Parquet' if use_parquet else 'TXT'}")
    print(f"{'='*60}\n")
    
    start_time = time.time()
    
    # Load tables
    if use_parquet:
        # Save formatted plans
        os.makedirs("proof", exist_ok=True)
        
        lineitem_df = spark.read.parquet(os.path.join(input_path, "lineitem"))
        plan_path = f"proof/plan_a7_lineitem_parquet.txt"
        with open(plan_path, 'w') as f:
            f.write(f"=== A7 Lineitem Parquet Load Plan ===\n\n")
            f.write(lineitem_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        print(f"Saved plan: {plan_path}")
        
        orders_df = spark.read.parquet(os.path.join(input_path, "orders"))
        plan_path = f"proof/plan_a7_orders_parquet.txt"
        with open(plan_path, 'w') as f:
            f.write(f"=== A7 Orders Parquet Load Plan ===\n\n")
            f.write(orders_df._jdf.queryExecution().explainString(
                spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
            ))
        print(f"Saved plan: {plan_path}\n")
        
        customer_df = spark.read.parquet(os.path.join(input_path, "customer"))
        customer_rdd = customer_df.rdd
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda row: row.c_custkey)
        
        lineitem_rdd = lineitem_df.rdd
        orders_rdd = orders_df.rdd
        
    else:
        customer_rdd = sc.textFile(os.path.join(input_path, "customer.tbl")).map(parse_customer)
        customer_bc = broadcast_dimension(spark, customer_rdd, lambda t: t[0])
        
        lineitem_rdd = sc.textFile(os.path.join(input_path, "lineitem.tbl")).map(parse_lineitem)
        orders_rdd = sc.textFile(os.path.join(input_path, "orders.tbl")).map(parse_orders)
    
    print(f"Broadcasted customer: {customer_rdd.count()} entries\n")
    
    # Filter lineitem: l_shipdate > shipdate_before
    if use_parquet:
        lineitem_filtered = lineitem_rdd.filter(lambda row: row.l_shipdate > shipdate_before)
    else:
        lineitem_filtered = lineitem_rdd.filter(lambda t: t[10] > shipdate_before)
    
    print(f"Filtered lineitem: {lineitem_filtered.count()} records (shipdate > {shipdate_before})\n")
    
    # Filter orders: o_orderdate < orderdate_after
    if use_parquet:
        orders_filtered = orders_rdd.filter(lambda row: row.o_orderdate < orderdate_after)
    else:
        orders_filtered = orders_rdd.filter(lambda t: t[4] < orderdate_after)
    
    print(f"Filtered orders: {orders_filtered.count()} records (orderdate < {orderdate_after})\n")
    
    # Reduce-side join lineitem + orders
    if use_parquet:
        lineitem_keyed = lineitem_filtered.map(lambda row: (
            row.l_orderkey,
            (float(row.l_extendedprice), float(row.l_discount))
        ))
        orders_keyed = orders_filtered.map(lambda row: (
            row.o_orderkey,
            (row.o_custkey, row.o_orderdate, row.o_shippriority)
        ))
    else:
        lineitem_keyed = lineitem_filtered.map(lambda t: (
            t[0],
            (float(t[5]), float(t[6]))
        ))
        orders_keyed = orders_filtered.map(lambda t: (
            t[0],
            (t[1], t[4], t[7])
        ))
    
    # Cogroup join
    joined = lineitem_keyed.cogroup(orders_keyed) \
        .flatMap(lambda kv: [
            (kv[0], price, disc, custkey, orderdate, shippriority)
            for lineitem_list in [list(kv[1][0])]
            for orders_list in [list(kv[1][1])]
            for price, disc in lineitem_list
            for custkey, orderdate, shippriority in orders_list
        ])
    
    print(f"Joined: {joined.count()} (orderkey, price, disc, custkey, orderdate, priority)\n")
    
    # Aggregate revenue by orderkey
    def compute_revenue(item):
        orderkey, price, disc, custkey, orderdate, shippriority = item
        revenue = price * (1 - disc)
        return ((orderkey, orderdate, shippriority), revenue)
    
    revenue_by_order = joined \
        .map(compute_revenue) \
        .reduceByKey(lambda a, b: a + b) \
        .map(lambda kv: (kv[0][0], kv[1], kv[0][1], kv[0][2]))
    
    # Sort by revenue DESC, take top 10
    top_10 = revenue_by_order \
        .sortBy(lambda t: t[1], ascending=False) \
        .take(10)
    
    end_time = time.time()
    duration = end_time - start_time
    
    # Output
    print(f"\n{'='*60}")
    print(f"A7 Results - Top 10 Shipping Priority")
    print(f"{'='*60}\n")
    
    print(f"Format   : {'Parquet' if use_parquet else 'TXT'}")
    print(f"Duration : {duration:.2f}s\n")
    
    for i, (orderkey, revenue, orderdate, shippriority) in enumerate(top_10, 1):
        print(f"{i:2d}. Order: {orderkey:>6} | Revenue: ${revenue:>12,.2f} | Date: {orderdate} | Priority: {shippriority}")
    
    print(f"\n{'='*60}\n")
    
    # Save output
    output_dir = "outputs"
    os.makedirs(output_dir, exist_ok=True)
    
    format_suffix = "parquet" if use_parquet else "txt"
    output_path = os.path.join(output_dir, f"a7_result_{format_suffix}.txt")
    
    with open(output_path, 'w') as f:
        f.write(f"Task: A7 - Shipping Priority (Modified Q3)\n")
        f.write(f"Shipdate > {shipdate_before}, Orderdate < {orderdate_after}\n")
        f.write(f"Format: {'Parquet' if use_parquet else 'TXT'}\n")
        f.write(f"Duration: {duration:.2f}s\n\n")
        f.write(f"l_orderkey,revenue,o_orderdate,o_shippriority\n")
        
        for orderkey, revenue, orderdate, shippriority in top_10:
            f.write(f"{orderkey},{revenue:.2f},{orderdate},{shippriority}\n")
    
    print(f"Saved: {output_path}\n")
    
    metrics = {
        'task': 'A7',
        'format': 'Parquet' if use_parquet else 'TXT',
        'duration_s': round(duration, 2),
        'top_revenue': round(top_10[0][1], 2) if top_10 else 0
    }
    
    return top_10, duration, metrics

# Run A7
print("\n" + "="*60)
print("Running A7 (TXT and Parquet)")
print("="*60)

results_txt, duration_txt, metrics_txt = run_a7_shipping_priority(
    INPUT_TXT_PATH, SHIPDATE_BEFORE, ORDERDATE_AFTER, use_parquet=False
)

results_parquet, duration_parquet, metrics_parquet = run_a7_shipping_priority(
    INPUT_PARQUET_PATH, SHIPDATE_BEFORE, ORDERDATE_AFTER, use_parquet=True
)

# Comparison
print(f"\n{'='*60}")
print("A7 - TXT vs Parquet Comparison")
print(f"{'='*60}\n")

print(f"| Metric       | TXT         | Parquet     | Speedup |")
print(f"|--------------|-------------|-------------|---------|")
print(f"| Duration     | {duration_txt:.2f}s       | {duration_parquet:.2f}s       | {duration_txt/duration_parquet:.2f}x    |")
if results_txt and results_parquet:
    print(f"| Top Revenue  | ${results_txt[0][1]:,.2f} | ${results_parquet[0][1]:,.2f} | -       |")
    
    if abs(results_txt[0][1] - results_parquet[0][1]) < 0.01:
        print(f"\nValidation: Results match (difference < $0.01)")
    else:
        print(f"\nWARNING: Results differ by ${abs(results_txt[0][1] - results_parquet[0][1]):,.2f}")

print(f"\n{'='*60}\n")

# Update metrics log
metrics_log_path = "proof/lab_metrics_log.csv"

with open(metrics_log_path, 'a', newline='') as f:
    writer = csv.writer(f)
    
    with open(metrics_log_path, 'r') as rf:
        run_id = len(list(csv.reader(rf)))
    
    timestamp = datetime.now(timezone.utc).isoformat()
    
    top_rev_txt = round(results_txt[0][1], 2) if results_txt else 0
    top_rev_pq = round(results_parquet[0][1], 2) if results_parquet else 0
    
    writer.writerow([
        run_id, 'A7', 'TXT', f'shipdate>{SHIPDATE_BEFORE}', 3, 'N/A', duration_txt,
        'N/A', 'N/A', f'cogroup lineitem+orders, broadcast customer, top 10 revenue ${top_rev_txt:,.2f}', timestamp
    ])
    
    writer.writerow([
        run_id + 1, 'A7', 'Parquet', f'shipdate>{SHIPDATE_BEFORE}', 'N/A', 'N/A', duration_parquet,
        'N/A', 'N/A', f'same query Parquet, top revenue ${top_rev_pq:,.2f}', timestamp
    ])

print(f"Updated {metrics_log_path}\n")

print("="*60)
print("Section 9 - Task A7 Complete")
print("="*60)
print("\n" + "="*60)
print("Part A (A1-A7) - All Tasks Complete")
print("="*60 + "\n")


Running A7 (TXT and Parquet)

Task A7 - Shipping Priority (Modified Q3)
Shipdate > 1995-03-15, Orderdate < 1995-03-01
Format: TXT



                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries
Broadcasted customer: 15000 entries



                                                                                

Filtered lineitem: 324322 records (shipdate > 1995-03-15)



                                                                                

Filtered orders: 71815 records (orderdate < 1995-03-01)



                                                                                

Joined: 11887 (orderkey, price, disc, custkey, orderdate, priority)



                                                                                


A7 Results - Top 10 Shipping Priority

Format   : TXT
Duration : 15.62s

 1. Order: 584291 | Revenue: $  354,494.73 | Date: 1995-02-21 | Priority: 0
 2. Order: 132774 | Revenue: $  350,015.70 | Date: 1995-02-27 | Priority: 0
 3. Order: 568514 | Revenue: $  348,837.83 | Date: 1995-02-18 | Priority: 0
 4. Order: 181793 | Revenue: $  344,198.09 | Date: 1995-02-14 | Priority: 0
 5. Order: 306247 | Revenue: $  340,789.15 | Date: 1995-02-17 | Priority: 0
 6. Order: 519556 | Revenue: $  323,264.53 | Date: 1995-02-15 | Priority: 0
 7. Order: 492164 | Revenue: $  321,305.19 | Date: 1995-02-19 | Priority: 0
 8. Order: 311266 | Revenue: $  319,173.77 | Date: 1995-02-17 | Priority: 0
 9. Order: 155972 | Revenue: $  315,457.78 | Date: 1995-01-08 | Priority: 0
10. Order: 108514 | Revenue: $  314,967.08 | Date: 1995-02-20 | Priority: 0


Saved: outputs/a7_result_txt.txt


Task A7 - Shipping Priority (Modified Q3)
Shipdate > 1995-03-15, Orderdate < 1995-03-01
Format: Parquet

Saved plan: proof/plan_a

                                                                                

  ‚úÖ Broadcasted dimension: 15,000 entries
Broadcasted customer: 15000 entries



                                                                                

Filtered lineitem: 324322 records (shipdate > 1995-03-15)



                                                                                

Filtered orders: 71815 records (orderdate < 1995-03-01)



                                                                                

Joined: 11887 (orderkey, price, disc, custkey, orderdate, priority)



[Stage 135:>                                                        (0 + 8) / 8]


A7 Results - Top 10 Shipping Priority

Format   : Parquet
Duration : 22.98s

 1. Order: 584291 | Revenue: $  354,494.73 | Date: 1995-02-21 | Priority: 0
 2. Order: 132774 | Revenue: $  350,015.70 | Date: 1995-02-27 | Priority: 0
 3. Order: 568514 | Revenue: $  348,837.83 | Date: 1995-02-18 | Priority: 0
 4. Order: 181793 | Revenue: $  344,198.09 | Date: 1995-02-14 | Priority: 0
 5. Order: 306247 | Revenue: $  340,789.15 | Date: 1995-02-17 | Priority: 0
 6. Order: 519556 | Revenue: $  323,264.53 | Date: 1995-02-15 | Priority: 0
 7. Order: 492164 | Revenue: $  321,305.19 | Date: 1995-02-19 | Priority: 0
 8. Order: 311266 | Revenue: $  319,173.77 | Date: 1995-02-17 | Priority: 0
 9. Order: 155972 | Revenue: $  315,457.78 | Date: 1995-01-08 | Priority: 0
10. Order: 108514 | Revenue: $  314,967.08 | Date: 1995-02-20 | Priority: 0


Saved: outputs/a7_result_parquet.txt


A7 - TXT vs Parquet Comparison

| Metric       | TXT         | Parquet     | Speedup |
|--------------|-------------|----

                                                                                

## Evidence for Part A

In [None]:
# write some code here
# capture DF explain('formatted') when using parquet readers
# collect timings and notes TXT vs PARQUET; broadcast vs reduce-side


## Part B ‚Äî Streaming (Structured Streaming)

### B1 ‚Äî HourlyTripCount

In [16]:
# write some code here
# readStream from data/taxi-data (file source with schema)
# withWatermark if needed; window='1 hour'; count
# writeStream with checkpoint dir and output dir
# ============================================================
# Part B - Streaming Analytics (NYC Taxi)
# ============================================================

print("\n" + "="*60)
print("Part B - Streaming Analytics (NYC Taxi)")
print("="*60 + "\n")

# ============================================================
# Section 10 - Part B: Task B1 - Hourly Trip Count
# ============================================================

import time
from datetime import datetime, timezone
import csv
import os
import shutil
import tarfile
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.sql.functions import window, count, col, to_timestamp, expr

print("\n" + "="*60)
print("Section 10 - Part B: Task B1 - Hourly Trip Count")
print("="*60 + "\n")

# Paths
DATA_DIR = "data"
TAXI_ARCHIVE = os.path.join(DATA_DIR, "taxi-data.tar.gz")
TAXI_DIR = os.path.join(DATA_DIR, "taxi-data")
CHECKPOINT_DIR = "checkpoint/b1_hourly"
OUTPUT_DIR = "output/b1_hourly"

# Clean previous runs
for path in [CHECKPOINT_DIR, OUTPUT_DIR]:
    if os.path.exists(path):
        shutil.rmtree(path)
    os.makedirs(path, exist_ok=True)

# Step 1: Extract and verify
print("Step 1: Verifying NYC Taxi data...\n")

if not os.path.exists(TAXI_DIR):
    os.makedirs(TAXI_DIR, exist_ok=True)

taxi_files_before = []
for root, dirs, files in os.walk(TAXI_DIR):
    taxi_files_before.extend([os.path.join(root, f) for f in files if f.endswith('.csv')])

if len(taxi_files_before) > 0:
    print(f"‚úÖ Taxi data already extracted ({len(taxi_files_before)} CSV files)")
    for src in taxi_files_before:
        if os.path.dirname(src) != TAXI_DIR:
            dst = os.path.join(TAXI_DIR, os.path.basename(src))
            if not os.path.exists(dst):
                shutil.copy(src, dst)

elif os.path.exists(TAXI_ARCHIVE):
    print(f"Extracting {TAXI_ARCHIVE}...")
    with tarfile.open(TAXI_ARCHIVE, 'r:gz') as tar:
        tar.extractall(path=TAXI_DIR)
    
    taxi_files_after = []
    for root, dirs, files in os.walk(TAXI_DIR):
        for f in files:
            if f.endswith('.csv'):
                full_path = os.path.join(root, f)
                taxi_files_after.append(full_path)
                if root != TAXI_DIR:
                    dst = os.path.join(TAXI_DIR, f)
                    if not os.path.exists(dst):
                        shutil.move(full_path, dst)
    
    print(f"‚úÖ Extracted {len(taxi_files_after)} CSV files\n")
else:
    raise FileNotFoundError(f"Archive not found: {TAXI_ARCHIVE}")

taxi_files = sorted([f for f in os.listdir(TAXI_DIR) if f.endswith('.csv')])

if len(taxi_files) == 0:
    raise FileNotFoundError(f"No CSV files in {TAXI_DIR}")

print(f"‚úÖ Found {len(taxi_files)} CSV files")
print(f"Sample: {taxi_files[:3]}\n")

# Step 2: Inspect actual CSV structure
print("Step 2: Inspecting CSV schema...\n")

sample_file = os.path.join(TAXI_DIR, taxi_files[0])
print(f"Inspecting: {sample_file}")

with open(sample_file, 'r') as f:
    first_line = f.readline().strip()
    columns = first_line.split(',')
    print(f"Columns detected: {len(columns)}")
    print(f"First 10 values: {columns[:10]}")
    print(f"Sample row: {first_line[:200]}...\n")

# Step 3: Define correct schema for NYC Taxi Yellow Cab 2015 (20 columns)
print("Step 3: Defining NYC Taxi Yellow Cab 2015 schema (20 columns)...\n")

# NYC Yellow Cab 2015 schema (20 columns)
taxi_schema = StructType([
    StructField("vendor_id", StringType(), True),           # 0
    StructField("pickup_datetime", StringType(), True),     # 1 ‚Üê CORRECT POSITION
    StructField("dropoff_datetime", StringType(), True),    # 2
    StructField("passenger_count", IntegerType(), True),    # 3
    StructField("trip_distance", DoubleType(), True),       # 4
    StructField("pickup_longitude", DoubleType(), True),    # 5
    StructField("pickup_latitude", DoubleType(), True),     # 6
    StructField("rate_code", IntegerType(), True),          # 7
    StructField("store_and_fwd_flag", StringType(), True),  # 8
    StructField("dropoff_longitude", DoubleType(), True),   # 9
    StructField("dropoff_latitude", DoubleType(), True),    # 10
    StructField("payment_type", IntegerType(), True),       # 11
    StructField("fare_amount", DoubleType(), True),         # 12
    StructField("surcharge", DoubleType(), True),           # 13
    StructField("mta_tax", DoubleType(), True),             # 14
    StructField("tip_amount", DoubleType(), True),          # 15
    StructField("tolls_amount", DoubleType(), True),        # 16
    StructField("total_amount", DoubleType(), True),        # 17
    StructField("improvement_surcharge", DoubleType(), True), # 18
    StructField("congestion_surcharge", DoubleType(), True)  # 19
])

print("Schema: 20 columns (vendor_id ‚Üí congestion_surcharge)")
print("pickup_datetime at position 1 (correct for Yellow Cab 2015 format)\n")

# Step 4: Configure Structured Streaming
print("Step 4: Configuring Structured Streaming...\n")

print(f"Input: {TAXI_DIR}")
print(f"Checkpoint: {CHECKPOINT_DIR}")
print(f"Output: {OUTPUT_DIR}")
print(f"Mode: PERMISSIVE\n")

taxi_stream = spark.readStream \
    .format("csv") \
    .option("header", "false") \
    .option("mode", "PERMISSIVE") \
    .schema(taxi_schema) \
    .option("maxFilesPerTrigger", 1) \
    .load(TAXI_DIR)

print("‚úÖ Stream configured\n")

# Step 5: Parse timestamps and build aggregation
print("Step 5: Building aggregation pipeline...\n")

# Parse pickup_datetime (now correctly at position 1)
taxi_stream_clean = taxi_stream \
    .withColumn("pickup_ts", expr("try_to_timestamp(pickup_datetime, 'yyyy-MM-dd HH:mm:ss')")) \
    .filter(col("pickup_ts").isNotNull())

# B1: Hourly counts
hourly_counts = taxi_stream_clean \
    .withWatermark("pickup_ts", "10 minutes") \
    .groupBy(window(col("pickup_ts"), "1 hour")) \
    .agg(count("*").alias("trip_count"))

hourly_output = hourly_counts.select(
    col("window.start").alias("window_start"),
    col("window.end").alias("window_end"),
    col("trip_count")
)

print("Pipeline:")
print("  1. Read CSV (20 columns, PERMISSIVE)")
print("  2. Parse pickup_datetime (position 1) ‚Üí pickup_ts")
print("  3. Filter NULL timestamps")
print("  4. Watermark: 10 minutes")
print("  5. Window: 1 hour")
print("  6. Count(*)\n")

# Step 6: Save plan
print("Step 6: Saving plan...\n")

plan_path = "proof/plan_b1_hourly_stream.txt"
os.makedirs("proof", exist_ok=True)

with open(plan_path, 'w') as f:
    f.write("=== B1 Hourly Trip Count Stream Plan ===\n\n")
    f.write("Dataset: NYC Yellow Cab 2015 (20 columns)\n")
    f.write("Schema: vendor_id, pickup_datetime, ..., congestion_surcharge\n")
    f.write("Timestamp column: pickup_datetime (position 1)\n")
    f.write("Window: 1 hour tumbling\n")
    f.write("Watermark: 10 minutes\n")
    f.write("Output Mode: append\n\n")
    f.write(hourly_output._jdf.queryExecution().explainString(
        spark._jvm.org.apache.spark.sql.execution.ExplainMode.fromString("formatted")
    ))

print(f"‚úÖ Saved: {plan_path}\n")

# Step 7: Start streaming query
print("Step 7: Starting streaming query...\n")
print("="*60)

query_b1 = hourly_output.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", OUTPUT_DIR) \
    .option("checkpointLocation", CHECKPOINT_DIR) \
    .trigger(processingTime="10 seconds") \
    .start()

print(f"Query ID: {query_b1.id}")
print(f"Status: {query_b1.status['message']}\n")

# Step 8: Monitor batches
print("Step 8: Processing batches (60s)...\n")
print(f"{'Batch':<8} {'InputRows':<12} {'ProcessedRows/s':<18} {'InputRate':<15} {'Duration (ms)':<15}")
print("-" * 75)

start_time = time.time()
batch_count = 0
total_input_rows = 0

for i in range(6):
    time.sleep(10)
    progress = query_b1.lastProgress
    
    if progress:
        batch_count += 1
        num_input = progress.get('numInputRows', 0)
        total_input_rows += num_input
        processed = progress.get('processedRowsPerSecond', 0)
        input_rate = progress.get('inputRowsPerSecond', 0)
        duration = progress.get('durationMs', {}).get('triggerExecution', 0)
        
        print(f"{batch_count:<8} {num_input:<12} {processed:<18.2f} {input_rate:<15.2f} {duration:<15}")

total_duration = time.time() - start_time

print(f"\n{'='*60}")
print(f"Batches: {batch_count} | Input rows: {total_input_rows:,} | Duration: {total_duration:.2f}s")
print(f"{'='*60}\n")

query_b1.stop()
print("‚úÖ Query stopped\n")

# Step 9: Read results
print("Step 9: Reading results...\n")
print(f"{'='*60}")
print("B1 Results - Hourly Trip Counts")
print(f"{'='*60}\n")

output_files = []
if os.path.exists(OUTPUT_DIR):
    for root, dirs, files in os.walk(OUTPUT_DIR):
        output_files.extend([f for f in files if f.endswith('.parquet')])

if len(output_files) > 0:
    results_b1 = spark.read.parquet(OUTPUT_DIR)
    results_b1_sorted = results_b1.orderBy("window_start")
    
    total_windows = results_b1_sorted.count()
    total_trips = results_b1_sorted.agg({"trip_count": "sum"}).collect()[0][0] or 0
    
    print(f"üìä Summary:")
    print(f"  Windows: {total_windows} | Trips: {total_trips:,} | Duration: {total_duration:.2f}s\n")
    
    results_b1_sorted.show(20, truncate=False)
    
    # Save output
    output_file = "outputs/b1_hourly_trip_count.txt"
    os.makedirs("outputs", exist_ok=True)
    
    with open(output_file, 'w') as f:
        f.write("Task: B1 - Hourly Trip Count\n")
        f.write(f"Dataset: NYC Yellow Cab 2015 (20 columns)\n")
        f.write(f"Windows: {total_windows} | Trips: {total_trips:,} | Duration: {total_duration:.2f}s\n\n")
        f.write("window_start,window_end,trip_count\n")
        for row in results_b1_sorted.collect():
            f.write(f"{row.window_start},{row.window_end},{row.trip_count}\n")
    
    print(f"\n‚úÖ Saved: {output_file}\n")
    
    # Update metrics
    metrics_log_path = "proof/lab_metrics_log.csv"
    if not os.path.exists(metrics_log_path):
        with open(metrics_log_path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['run_id', 'task', 'format', 'date', 'num_files', 'input_size_mb',
                           'duration_s', 'shuffle_read_mb', 'shuffle_write_mb', 'notes', 'timestamp'])
    
    with open(metrics_log_path, 'a', newline='') as f:
        writer = csv.writer(f)
        with open(metrics_log_path, 'r') as rf:
            run_id = len(list(csv.reader(rf)))
        
        writer.writerow([
            run_id, 'B1', 'Streaming', '2015-12-01', len(taxi_files), 'N/A',
            round(total_duration, 2), 'N/A', 'N/A',
            f'{total_windows} windows, {total_trips:,} trips, {batch_count} batches, NYC Yellow Cab 2015 (20 cols)',
            datetime.now(timezone.utc).isoformat()
        ])
    
    print(f"‚úÖ Updated: {metrics_log_path}\n")
    
else:
    print(f"‚ö†Ô∏è  No output generated\n")
    print(f"Check:")
    print(f"  - pickup_datetime at position 1 parsed correctly")
    print(f"  - Timestamps not NULL after try_to_timestamp")
    print(f"  - Spark logs for parsing errors\n")

print("="*60)
print("Section 10 - Task B1 Complete")
print("="*60 + "\n")

print("üìã Evidence Generated:")
print(f"  {'‚úÖ' if len(output_files) > 0 else '‚ö†Ô∏è '} proof/plan_b1_hourly_stream.txt")
print(f"  {'‚úÖ' if len(output_files) > 0 else '‚ö†Ô∏è '} outputs/b1_hourly_trip_count.txt")
print(f"  {'‚úÖ' if len(output_files) > 0 else '‚ö†Ô∏è '} proof/lab_metrics_log.csv")
print(f"  {'‚úÖ' if len(output_files) > 0 else '‚ö†Ô∏è '} output/b1_hourly/*.parquet ({len(output_files)} files)\n")



Part B - Streaming Analytics (NYC Taxi)


Section 10 - Part B: Task B1 - Hourly Trip Count

Step 1: Verifying NYC Taxi data...

‚úÖ Taxi data already extracted (2880 CSV files)
‚úÖ Found 1440 CSV files
Sample: ['part-2015-12-01-0000.csv', 'part-2015-12-01-0001.csv', 'part-2015-12-01-0002.csv']

Step 2: Inspecting CSV schema...

Inspecting: data/taxi-data/part-2015-12-01-0000.csv
Columns detected: 20
First 10 values: ['yellow', '2', '2015-12-01 00:00:00', '2015-12-01 00:00:00', '2', '2.69', '-73.972335815429687', '40.762378692626953', '1', 'N']
Sample row: yellow,2,2015-12-01 00:00:00,2015-12-01 00:00:00,2,2.69,-73.972335815429687,40.762378692626953,1,N,-73.993629455566406,40.745998382568359,1,21.5,0,0.5,3.34,0,0.3,25.64...

Step 3: Defining NYC Taxi Yellow Cab 2015 schema (20 columns)...

Schema: 20 columns (vendor_id ‚Üí congestion_surcharge)
pickup_datetime at position 1 (correct for Yellow Cab 2015 format)

Step 4: Configuring Structured Streaming...

Input: data/taxi-data
Checkpoi

25/12/07 11:30:00 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Query ID: 2be3e1cc-fcdf-458a-b713-e255fa852b07
Status: Initializing sources

Step 8: Processing batches (60s)...

Batch    InputRows    ProcessedRows/s    InputRate       Duration (ms)  
---------------------------------------------------------------------------


25/12/07 11:30:27 WARN FileStreamSource: Listed 1440 file(s) in 26551 ms
25/12/07 11:30:31 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000} milliseconds, but spent 29430 milliseconds


1        1            0.44               0.03            2259           


                                                                                

2        11           3.39               1.23            3243           


                                                                                

3        19           7.00               1.90            2716           

Batches: 3 | Input rows: 31 | Duration: 59.22s



25/12/07 11:31:00 WARN DAGScheduler: Failed to cancel job group 209bb312-fa77-4034-bebe-ee308109630f. Cannot find active jobs for it.
                                                                                

‚úÖ Query stopped

Step 9: Reading results...

B1 Results - Hourly Trip Counts



25/12/07 11:31:03 WARN DAGScheduler: Failed to cancel job group 209bb312-fa77-4034-bebe-ee308109630f. Cannot find active jobs for it.


üìä Summary:
  Windows: 0 | Trips: 0 | Duration: 59.22s

+------------+----------+----------+
|window_start|window_end|trip_count|
+------------+----------+----------+
+------------+----------+----------+


‚úÖ Saved: outputs/b1_hourly_trip_count.txt

‚úÖ Updated: proof/lab_metrics_log.csv

Section 10 - Task B1 Complete

üìã Evidence Generated:
  ‚úÖ proof/plan_b1_hourly_stream.txt
  ‚úÖ outputs/b1_hourly_trip_count.txt
  ‚úÖ proof/lab_metrics_log.csv
  ‚úÖ output/b1_hourly/*.parquet (5 files)



### B2 ‚Äî RegionEventCount (goldman, citigroup)

In [None]:
# write some code here
# bounding boxes on dropoff lon/lat; label key 'goldman' or 'citigroup'
# window='1 hour'; counts per key; writeStream append


### B3 ‚Äî TrendingArrivals (10-minute windows + state)

In [None]:
# write some code here
# 10-minute windows; compare current vs previous window with state
# trigger alert print to stdout; persist per-batch status files


## Evidence for Part B

In [None]:
# write some code here
# collect driver logs; list output dirs; include Spark UI screenshots


## Reproducibility Checklist

- ENV.md present with versions and configs  
- Exact spark-submit commands recorded  
- Plans saved for any DF stage used  
- UI screenshots for representative stages  
- All outputs in deterministic locations