# 05 - Sink and Output

## Overview
This notebook writes the transformed and aggregated streaming data to persistent storage using Parquet format.

## Sink Configuration
We will configure streaming sinks with:
- **Checkpointing**: Ensures exactly-once processing semantics
- **Parquet format**: Columnar storage for efficient analytics
- **Partitioning**: Organizes data by date for optimal queries
- **Trigger intervals**: Controls micro-batch processing frequency

## Production Considerations

### Checkpointing
Checkpoints store:
- Stream offset information
- Metadata about processed data
- State store data for aggregations

This enables fault-tolerance and exactly-once guarantees.

### Output Modes
- **Append**: Only new rows (best for most ETL scenarios)
- **Complete**: Entire result table (for aggregations)
- **Update**: Only changed rows (requires Delta Lake or similar)

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pathlib import Path
import os
import shutil

## Initialize Spark Session

In [None]:
# Get existing Spark session or create new one
try:
    spark = SparkSession.getActiveSession()
    if spark is None:
        raise Exception("No active session")
    print("Using existing Spark session")
except:
    spark = SparkSession.builder \
        .appName("TransactionStreamingETL") \
        .master("local[*]") \
        .config("spark.sql.streaming.schemaInference", "false") \
        .config("spark.sql.shuffle.partitions", "4") \
        .getOrCreate()
    print("Created new Spark session")

spark.sparkContext.setLogLevel("WARN")
print(f"Spark Version: {spark.version}")

## Configure Paths

In [None]:
# Configure paths
BASE_DIR = Path(os.path.abspath('')).parent
SQL_DIR = BASE_DIR / 'sql'
INPUT_DIR = str(BASE_DIR / 'data' / 'input')
OUTPUT_DIR = str(BASE_DIR / 'data' / 'output')

# Output subdirectories
TRANSFORMED_OUTPUT = f"{OUTPUT_DIR}/transformed_transactions"
AGGREGATED_OUTPUT = f"{OUTPUT_DIR}/transaction_metrics"

# Checkpoint directories
TRANSFORMED_CHECKPOINT = f"{OUTPUT_DIR}/checkpoints/transformed"
AGGREGATED_CHECKPOINT = f"{OUTPUT_DIR}/checkpoints/aggregated"

print(f"Output Directory: {OUTPUT_DIR}")
print(f"Transformed Output: {TRANSFORMED_OUTPUT}")
print(f"Aggregated Output: {AGGREGATED_OUTPUT}")
print(f"\nCheckpoint Locations:")
print(f"  Transformed: {TRANSFORMED_CHECKPOINT}")
print(f"  Aggregated: {AGGREGATED_CHECKPOINT}")

## Clean Previous Outputs (Optional)

For fresh runs, clean up existing output and checkpoint directories.

**Warning**: In production, never delete checkpoints as this will cause data loss!

In [None]:
# Clean up previous outputs (optional - for demo purposes)
def clean_directory(path):
    if os.path.exists(path):
        shutil.rmtree(path)
        print(f"Cleaned: {path}")
    os.makedirs(path, exist_ok=True)

# Uncomment to clean (use with caution)
# clean_directory(TRANSFORMED_OUTPUT)
# clean_directory(AGGREGATED_OUTPUT)
# clean_directory(TRANSFORMED_CHECKPOINT)
# clean_directory(AGGREGATED_CHECKPOINT)

# Ensure directories exist
os.makedirs(TRANSFORMED_OUTPUT, exist_ok=True)
os.makedirs(AGGREGATED_OUTPUT, exist_ok=True)
os.makedirs(TRANSFORMED_CHECKPOINT, exist_ok=True)
os.makedirs(AGGREGATED_CHECKPOINT, exist_ok=True)

print("Output directories ready")

## Set Up Complete Pipeline

Ensure all views are available by running the complete pipeline if needed.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Check existing views
existing_views = [table.name for table in spark.catalog.listTables() if table.isTemporary]

if 'transformed_transactions' not in existing_views or 'transaction_metrics' not in existing_views:
    print("Setting up complete pipeline...")
    
    # Define schema
    transaction_schema = StructType([
        StructField("transaction_id", StringType(), False),
        StructField("user_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("product_category", StringType(), True),
        StructField("amount", DoubleType(), False),
        StructField("quantity", IntegerType(), False),
        StructField("payment_method", StringType(), True),
        StructField("status", StringType(), False),
        StructField("event_time", StringType(), False),
        StructField("country_code", StringType(), True),
        StructField("discount_percent", DoubleType(), True),
        StructField("customer_segment", StringType(), True)
    ])
    
    # Create raw stream
    raw_stream = spark.readStream \
        .format("csv") \
        .schema(transaction_schema) \
        .option("header", "true") \
        .option("maxFilesPerTrigger", 1) \
        .load(INPUT_DIR)
    
    raw_stream.createOrReplaceTempView("raw_transactions")
    
    # Apply transformations
    with open(SQL_DIR / 'transformations.sql', 'r') as f:
        transformation_sql = f.read()
    
    transformed_stream = spark.sql(transformation_sql)
    transformed_stream.createOrReplaceTempView("transformed_transactions")
    
    # Apply aggregations
    with open(SQL_DIR / 'aggregations.sql', 'r') as f:
        aggregation_sql = f.read()
    
    aggregated_stream = spark.sql(aggregation_sql)
    aggregated_stream.createOrReplaceTempView("transaction_metrics")
    
    print("Pipeline setup complete")
else:
    print("Using existing views")
    transformed_stream = spark.table("transformed_transactions")
    aggregated_stream = spark.table("transaction_metrics")

## Sink 1: Write Transformed Data

Write the transformed transaction data to Parquet with partitioning by date.

### Configuration:
- **Format**: Parquet (columnar, compressed)
- **Output Mode**: Append (write only new records)
- **Partitioning**: By event_date for query optimization
- **Checkpointing**: Enabled for fault tolerance
- **Trigger**: Process available data every 30 seconds

In [None]:
# Get transformed stream
transformed_stream = spark.table("transformed_transactions")

# Write transformed data to Parquet
transformed_query = transformed_stream.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", TRANSFORMED_OUTPUT) \
    .option("checkpointLocation", TRANSFORMED_CHECKPOINT) \
    .partitionBy("event_date") \
    .trigger(processingTime='30 seconds') \
    .queryName("TransformedTransactionsWriter") \
    .start()

print("Transformed data streaming query started!")
print(f"Query ID: {transformed_query.id}")
print(f"Query Name: {transformed_query.name}")
print(f"Output Path: {TRANSFORMED_OUTPUT}")
print(f"Checkpoint: {TRANSFORMED_CHECKPOINT}")
print(f"Partitioned by: event_date")

## Sink 2: Write Aggregated Metrics

Write the aggregated metrics to Parquet.

### Configuration:
- **Format**: Parquet
- **Output Mode**: Complete (full result table for aggregations)
- **Checkpointing**: Enabled
- **Trigger**: Process available data every 60 seconds

**Note**: Complete mode is used because we're not using watermarks. In production, consider windowed aggregations with append mode.

In [None]:
# Get aggregated stream
aggregated_stream = spark.table("transaction_metrics")

# Write aggregated data to Parquet
aggregated_query = aggregated_stream.writeStream \
    .format("parquet") \
    .outputMode("complete") \
    .option("path", AGGREGATED_OUTPUT) \
    .option("checkpointLocation", AGGREGATED_CHECKPOINT) \
    .trigger(processingTime='60 seconds') \
    .queryName("AggregatedMetricsWriter") \
    .start()

print("Aggregated metrics streaming query started!")
print(f"Query ID: {aggregated_query.id}")
print(f"Query Name: {aggregated_query.name}")
print(f"Output Path: {AGGREGATED_OUTPUT}")
print(f"Checkpoint: {AGGREGATED_CHECKPOINT}")

## Monitor Streaming Queries

Check the status and progress of all active streaming queries.

In [None]:
# List all active queries
print("Active Streaming Queries:")
print("=" * 80)

for stream in spark.streams.active:
    print(f"\nQuery Name: {stream.name}")
    print(f"Query ID: {stream.id}")
    print(f"Status: {stream.status['message']}")
    print(f"Is Active: {stream.isActive}")
    
    # Get recent progress if available
    if stream.lastProgress:
        progress = stream.lastProgress
        print(f"Batch ID: {progress.get('batchId', 'N/A')}")
        print(f"Input Rows: {progress.get('numInputRows', 0)}")
        print(f"Processing Rate: {progress.get('processedRowsPerSecond', 0):.2f} rows/sec")
    
    print("-" * 80)

## Run for Demonstration

Let the queries run for a period to process data and write to output.

In [None]:
import time

# Run for 2 minutes to process data
runtime_seconds = 120
print(f"Running streaming queries for {runtime_seconds} seconds...")
print("Processing data and writing to Parquet...\n")

for i in range(0, runtime_seconds, 30):
    time.sleep(30)
    elapsed = i + 30
    print(f"[{elapsed}s] Queries running... Check output directories for results.")
    
    # Show progress
    for stream in spark.streams.active:
        if stream.lastProgress:
            progress = stream.lastProgress
            batch_id = progress.get('batchId', 'N/A')
            num_rows = progress.get('numInputRows', 0)
            print(f"  {stream.name}: Batch {batch_id}, {num_rows} rows processed")

print("\nStreaming queries have been running. Check output for results.")

## Verify Output

Check that data has been written to the output directories.

In [None]:
# List output files
print("Output Files:")
print("=" * 80)

print(f"\n1. Transformed Transactions: {TRANSFORMED_OUTPUT}")
if os.path.exists(TRANSFORMED_OUTPUT):
    for root, dirs, files in os.walk(TRANSFORMED_OUTPUT):
        for file in files:
            if file.endswith('.parquet'):
                filepath = os.path.join(root, file)
                size = os.path.getsize(filepath)
                print(f"  {filepath.replace(TRANSFORMED_OUTPUT, '')} ({size:,} bytes)")

print(f"\n2. Aggregated Metrics: {AGGREGATED_OUTPUT}")
if os.path.exists(AGGREGATED_OUTPUT):
    for root, dirs, files in os.walk(AGGREGATED_OUTPUT):
        for file in files:
            if file.endswith('.parquet'):
                filepath = os.path.join(root, file)
                size = os.path.getsize(filepath)
                print(f"  {filepath.replace(AGGREGATED_OUTPUT, '')} ({size:,} bytes)")

## Read Back Data for Verification

Verify written data by reading it back as batch DataFrames.

In [None]:
# Read transformed data back
if os.path.exists(TRANSFORMED_OUTPUT):
    transformed_df = spark.read.parquet(TRANSFORMED_OUTPUT)
    print("Transformed Transactions:")
    print(f"Total Records: {transformed_df.count()}")
    print(f"\nSchema:")
    transformed_df.printSchema()
    print(f"\nSample Data:")
    transformed_df.show(5, truncate=False)
else:
    print("No transformed data found yet. Wait for queries to process data.")

In [None]:
# Read aggregated data back
if os.path.exists(AGGREGATED_OUTPUT):
    aggregated_df = spark.read.parquet(AGGREGATED_OUTPUT)
    print("Aggregated Metrics:")
    print(f"Total Records: {aggregated_df.count()}")
    print(f"\nSchema:")
    aggregated_df.printSchema()
    print(f"\nSample Data:")
    aggregated_df.orderBy("total_revenue", ascending=False).show(10, truncate=False)
else:
    print("No aggregated data found yet. Wait for queries to process data.")

## Stop Streaming Queries

Gracefully stop all active streaming queries.

**Note**: In production, queries would run continuously until explicitly stopped.

In [None]:
# Stop all streaming queries
print("Stopping all streaming queries...\n")

for stream in spark.streams.active:
    print(f"Stopping: {stream.name} (ID: {stream.id})")
    stream.stop()
    print(f"  Stopped successfully")

print("\nAll streaming queries stopped.")

## Checkpoint Inspection

Examine checkpoint directories to understand fault tolerance mechanisms.

In [None]:
# List checkpoint contents
print("Checkpoint Directories:")
print("=" * 80)

for checkpoint_dir in [TRANSFORMED_CHECKPOINT, AGGREGATED_CHECKPOINT]:
    print(f"\n{checkpoint_dir}:")
    if os.path.exists(checkpoint_dir):
        for item in os.listdir(checkpoint_dir):
            item_path = os.path.join(checkpoint_dir, item)
            if os.path.isdir(item_path):
                print(f"  [DIR]  {item}")
            else:
                size = os.path.getsize(item_path)
                print(f"  [FILE] {item} ({size:,} bytes)")
    else:
        print("  (not created yet)")

## Summary

This notebook successfully:

1. Configured streaming sinks with checkpointing
2. Wrote transformed data to Parquet with date partitioning
3. Wrote aggregated metrics to Parquet
4. Monitored streaming query progress
5. Verified output data integrity
6. Demonstrated graceful query shutdown

**Key Components:**

### Checkpointing
- Stores streaming offsets and metadata
- Enables exactly-once processing
- Required for production streaming
- Never delete in production

### Output Formats
- **Parquet**: Columnar, compressed, efficient for analytics
- **Partitioning**: Improves query performance by pruning
- **Append Mode**: Best for most ETL scenarios
- **Complete Mode**: Required for stateful aggregations without watermarks

### Triggers
- **Processing Time**: Fixed interval micro-batches
- **Once**: Process all available data then stop
- **Continuous**: Low-latency processing (experimental)

## Production Recommendations

1. **Monitoring**: Integrate with monitoring systems (Prometheus, Datadog)
2. **Alerting**: Set up alerts for query failures and lag
3. **Scaling**: Adjust shuffle partitions based on data volume
4. **Watermarking**: Use event-time watermarks for late data handling
5. **State Management**: Monitor state store size for memory usage
6. **Backpressure**: Configure maxFilesPerTrigger to control ingestion rate
7. **Testing**: Test recovery from checkpoint after failures

## Next Steps

- Set up orchestration with Airflow or similar
- Add data quality checks and validation
- Implement alerts and monitoring dashboards
- Consider Delta Lake for ACID transactions
- Add schema evolution handling