# 🏭 Apache Iceberg Production Pipeline Tutorial

Welcome to the comprehensive Production Pipeline tutorial! In this notebook, you'll learn:

1. **Production-Ready Architecture Patterns**
2. **Data Ingestion Pipelines**
3. **Batch and Streaming Processing**
4. **Data Quality and Validation**
5. **Table Maintenance and Optimization**
6. **Monitoring and Alerting**
7. **CI/CD for Data Pipelines**

## 📋 Prerequisites

- Understanding of Iceberg core concepts
- Experience with schema evolution and time travel
- Basic knowledge of data engineering patterns

## 1. 🚀 Initialize Production Environment

Set up a production-like Spark environment with proper configurations.

In [None]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
import time
import json

# Set Python path for Spark consistency
os.environ['PYSPARK_PYTHON'] = '/opt/conda/bin/python'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/conda/bin/python'

print("🏭 Production Pipeline Tutorial Environment Setup")
print("ℹ️ This tutorial demonstrates production-grade patterns")
print("ℹ️ Focus on reliability, scalability, and maintainability")

In [None]:
# Production Spark Session Configuration
def create_production_spark_session(app_name="ProductionIcebergPipeline"):
    """Create a production-ready Spark session with optimized settings"""
    
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.prod", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.prod.type", "hadoop") \
        .config("spark.sql.catalog.prod.warehouse", "file:///home/jovyan/work/warehouse") \
        \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.skewJoin.enabled", "true") \
        \
        .config("spark.sql.catalog.prod.write.target-file-size-bytes", "134217728") \
        .config("spark.sql.catalog.prod.write.parquet.compression-codec", "zstd") \
        .config("spark.sql.catalog.prod.write.metadata.compression-codec", "gzip") \
        .getOrCreate()
    
    # Set log level to reduce noise
    spark.sparkContext.setLogLevel("WARN")
    
    return spark

# Stop existing session
try:
    spark.stop()
    print("🛑 Stopped existing Spark session")
except:
    print("ℹ️ No existing Spark session to stop")

# Create production session
spark = create_production_spark_session()
print("✅ Production Spark session initialized!")
print(f"Spark version: {spark.version}")
print(f"Warehouse location: {spark.conf.get('spark.sql.catalog.prod.warehouse')}")

## 2. 🏗️ Production Architecture Patterns

Learn production-ready architecture patterns for Iceberg.

In [None]:
# Production Architecture Patterns
print("🏗️ PRODUCTION ARCHITECTURE PATTERNS")
print("\n📐 Common production architectures:")

architecture_patterns = {
    "🔄 Lambda Architecture": {
        "description": "Batch + Real-time processing",
        "components": [
            "Batch Layer: Historical data processing",
            "Speed Layer: Real-time stream processing",
            "Serving Layer: Query interface combining both"
        ],
        "iceberg_benefits": [
            "ACID transactions for consistency",
            "Schema evolution for changing requirements",
            "Time travel for historical analysis"
        ]
    },
    "📊 Medallion Architecture": {
        "description": "Bronze → Silver → Gold data flow",
        "components": [
            "Bronze: Raw data ingestion",
            "Silver: Cleaned and validated data",
            "Gold: Business-ready aggregated data"
        ],
        "iceberg_benefits": [
            "Hidden partitioning for performance",
            "Snapshot isolation for quality gates",
            "Metadata tables for lineage tracking"
        ]
    },
    "🌊 Event-Driven Architecture": {
        "description": "Event-based data processing",
        "components": [
            "Event Producers: Data sources",
            "Event Streams: Message queues",
            "Event Consumers: Processing applications"
        ],
        "iceberg_benefits": [
            "Atomic writes for reliability",
            "Table versioning for rollbacks",
            "Multi-engine support for flexibility"
        ]
    }
}

for pattern, details in architecture_patterns.items():
    print(f"\n{pattern}: {details['description']}")
    print("  📋 Components:")
    for component in details['components']:
        print(f"    • {component}")
    print("  ✅ Iceberg Benefits:")
    for benefit in details['iceberg_benefits']:
        print(f"    • {benefit}")

## 3. 📥 Data Ingestion Pipeline

Build a production data ingestion pipeline with quality checks.

In [None]:
# Create production database and tables
spark.sql("CREATE DATABASE IF NOT EXISTS prod.data_lake")
print("✅ Created production database")

# Medallion Architecture Implementation
class MedallionPipeline:
    def __init__(self, spark_session):
        self.spark = spark_session
        self.catalog = "prod"
        self.database = "data_lake"
        
    def create_bronze_table(self):
        """Create bronze table for raw data ingestion"""
        create_bronze_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.bronze_events (
            event_id string,
            source_system string,
            raw_data string,
            ingestion_timestamp timestamp,
            file_name string,
            file_size bigint
        ) USING ICEBERG
        PARTITIONED BY (days(ingestion_timestamp), source_system)
        TBLPROPERTIES (
            'write.target-file-size-bytes' = '134217728',
            'write.parquet.compression-codec' = 'zstd'
        )
        """
        self.spark.sql(create_bronze_sql)
        print("✅ Created bronze_events table")
        
    def create_silver_table(self):
        """Create silver table for cleaned data"""
        create_silver_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.silver_events (
            event_id string,
            user_id bigint,
            event_type string,
            event_time timestamp,
            properties map<string, string>,
            source_system string,
            processed_timestamp timestamp,
            data_quality_score double
        ) USING ICEBERG
        PARTITIONED BY (days(event_time), source_system)
        TBLPROPERTIES (
            'write.target-file-size-bytes' = '134217728',
            'write.parquet.compression-codec' = 'zstd'
        )
        """
        self.spark.sql(create_silver_sql)
        print("✅ Created silver_events table")
        
    def create_gold_table(self):
        """Create gold table for business metrics"""
        create_gold_sql = f"""
        CREATE TABLE IF NOT EXISTS {self.catalog}.{self.database}.gold_daily_metrics (
            metric_date date,
            source_system string,
            event_type string,
            event_count bigint,
            unique_users bigint,
            avg_quality_score double,
            calculated_timestamp timestamp
        ) USING ICEBERG
        PARTITIONED BY (metric_date, source_system)
        TBLPROPERTIES (
            'write.target-file-size-bytes' = '67108864',
            'write.parquet.compression-codec' = 'zstd'
        )
        """
        self.spark.sql(create_gold_sql)
        print("✅ Created gold_daily_metrics table")

# Initialize pipeline
pipeline = MedallionPipeline(spark)
pipeline.create_bronze_table()
pipeline.create_silver_table()
pipeline.create_gold_table()

print("\n🏗️ Medallion architecture tables created successfully!")

In [None]:
# Data Ingestion Pipeline Implementation
class DataIngestionPipeline:
    def __init__(self, spark_session, pipeline):
        self.spark = spark_session
        self.pipeline = pipeline
        
    def ingest_raw_data(self, source_system, raw_data_list):
        """Ingest raw data into bronze layer"""
        print(f"📥 Ingesting data from {source_system}...")
        
        # Create bronze records
        bronze_records = []
        for i, raw_data in enumerate(raw_data_list):
            record = (
                f"{source_system}_{datetime.now().strftime('%Y%m%d')}_{i:06d}",  # event_id
                source_system,
                json.dumps(raw_data),  # raw_data as JSON string
                datetime.now(),  # ingestion_timestamp
                f"{source_system}_batch_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",  # file_name
                len(json.dumps(raw_data))  # file_size
            )
            bronze_records.append(record)
        
        # Create DataFrame and insert
        bronze_schema = StructType([
            StructField("event_id", StringType(), True),
            StructField("source_system", StringType(), True),
            StructField("raw_data", StringType(), True),
            StructField("ingestion_timestamp", TimestampType(), True),
            StructField("file_name", StringType(), True),
            StructField("file_size", LongType(), True)
        ])
        
        bronze_df = self.spark.createDataFrame(bronze_records, bronze_schema)
        bronze_df.writeTo(f"{self.pipeline.catalog}.{self.pipeline.database}.bronze_events").append()
        
        print(f"✅ Ingested {len(bronze_records)} records into bronze layer")
        return bronze_df
    
    def process_to_silver(self, source_system):
        """Process bronze data to silver layer with quality checks"""
        print(f"🔄 Processing {source_system} data to silver layer...")
        
        # Read latest bronze data
        bronze_data = self.spark.sql(f"""
        SELECT * FROM {self.pipeline.catalog}.{self.pipeline.database}.bronze_events
        WHERE source_system = '{source_system}'
        AND ingestion_timestamp >= current_timestamp() - INTERVAL 1 HOUR
        """)
        
        if bronze_data.count() == 0:
            print("⚠️ No new bronze data to process")
            return
        
        # Parse JSON and apply transformations
        silver_data = bronze_data.select(
            F.col("event_id"),
            F.get_json_object(F.col("raw_data"), "$.user_id").cast(LongType()).alias("user_id"),
            F.get_json_object(F.col("raw_data"), "$.event_type").alias("event_type"),
            F.get_json_object(F.col("raw_data"), "$.event_time").cast(TimestampType()).alias("event_time"),
            F.from_json(F.get_json_object(F.col("raw_data"), "$.properties"), 
                       MapType(StringType(), StringType())).alias("properties"),
            F.col("source_system"),
            F.current_timestamp().alias("processed_timestamp")
        )
        
        # Add data quality score
        silver_data = silver_data.withColumn(
            "data_quality_score",
            F.when(F.col("user_id").isNull(), 0.0)
            .when(F.col("event_type").isNull(), 0.3)
            .when(F.col("event_time").isNull(), 0.5)
            .otherwise(1.0)
        )
        
        # Filter out low quality records
        quality_data = silver_data.filter(F.col("data_quality_score") >= 0.7)
        
        # Write to silver table
        quality_data.writeTo(f"{self.pipeline.catalog}.{self.pipeline.database}.silver_events").append()
        
        total_records = silver_data.count()
        quality_records = quality_data.count()
        print(f"✅ Processed {quality_records}/{total_records} quality records to silver layer")
        
        return quality_data

# Initialize ingestion pipeline
ingestion = DataIngestionPipeline(spark, pipeline)
print("\n📥 Data ingestion pipeline initialized!")

## 4. 🧪 Data Quality and Validation

Implement comprehensive data quality checks and validation.

In [None]:
# Data Quality Framework
class DataQualityFramework:
    def __init__(self, spark_session):
        self.spark = spark_session
        
    def run_quality_checks(self, table_name, checks_config):
        """Run comprehensive data quality checks"""
        print(f"🧪 Running quality checks for {table_name}...")
        
        results = {}
        
        for check_name, check_config in checks_config.items():
            try:
                result = self._execute_check(table_name, check_name, check_config)
                results[check_name] = result
                status = "✅ PASS" if result['passed'] else "❌ FAIL"
                print(f"  {status}: {check_name} - {result['message']}")
            except Exception as e:
                results[check_name] = {
                    'passed': False,
                    'message': f"Error: {str(e)}",
                    'value': None
                }
                print(f"  ❌ ERROR: {check_name} - {str(e)}")
                
        return results
    
    def _execute_check(self, table_name, check_name, check_config):
        """Execute individual quality check"""
        check_type = check_config['type']
        
        if check_type == 'row_count':
            return self._check_row_count(table_name, check_config)
        elif check_type == 'null_percentage':
            return self._check_null_percentage(table_name, check_config)
        elif check_type == 'uniqueness':
            return self._check_uniqueness(table_name, check_config)
        elif check_type == 'freshness':
            return self._check_data_freshness(table_name, check_config)
        elif check_type == 'custom_sql':
            return self._check_custom_sql(table_name, check_config)
        else:
            raise ValueError(f"Unknown check type: {check_type}")
    
    def _check_row_count(self, table_name, config):
        count = self.spark.sql(f"SELECT COUNT(*) as count FROM {table_name}").collect()[0]['count']
        min_count = config.get('min_count', 0)
        max_count = config.get('max_count', float('inf'))
        
        passed = min_count <= count <= max_count
        return {
            'passed': passed,
            'value': count,
            'message': f"Row count: {count} (expected: {min_count}-{max_count})"
        }
    
    def _check_null_percentage(self, table_name, config):
        column = config['column']
        max_null_pct = config.get('max_null_percentage', 5.0)
        
        result = self.spark.sql(f"""
        SELECT 
            COUNT(*) as total_rows,
            SUM(CASE WHEN {column} IS NULL THEN 1 ELSE 0 END) as null_rows
        FROM {table_name}
        """).collect()[0]
        
        null_pct = (result['null_rows'] / result['total_rows']) * 100 if result['total_rows'] > 0 else 0
        passed = null_pct <= max_null_pct
        
        return {
            'passed': passed,
            'value': null_pct,
            'message': f"Null percentage for {column}: {null_pct:.2f}% (max allowed: {max_null_pct}%)"
        }
    
    def _check_uniqueness(self, table_name, config):
        column = config['column']
        
        result = self.spark.sql(f"""
        SELECT 
            COUNT(*) as total_rows,
            COUNT(DISTINCT {column}) as distinct_rows
        FROM {table_name}
        """).collect()[0]
        
        uniqueness_pct = (result['distinct_rows'] / result['total_rows']) * 100 if result['total_rows'] > 0 else 0
        min_uniqueness = config.get('min_uniqueness_percentage', 100.0)
        passed = uniqueness_pct >= min_uniqueness
        
        return {
            'passed': passed,
            'value': uniqueness_pct,
            'message': f"Uniqueness for {column}: {uniqueness_pct:.2f}% (min required: {min_uniqueness}%)"
        }
    
    def _check_data_freshness(self, table_name, config):
        timestamp_column = config['timestamp_column']
        max_age_hours = config.get('max_age_hours', 24)
        
        result = self.spark.sql(f"""
        SELECT MAX({timestamp_column}) as latest_timestamp
        FROM {table_name}
        """).collect()[0]
        
        if result['latest_timestamp'] is None:
            return {
                'passed': False,
                'value': None,
                'message': f"No data found in {timestamp_column}"
            }
        
        age_hours = (datetime.now() - result['latest_timestamp']).total_seconds() / 3600
        passed = age_hours <= max_age_hours
        
        return {
            'passed': passed,
            'value': age_hours,
            'message': f"Data age: {age_hours:.2f} hours (max allowed: {max_age_hours} hours)"
        }
    
    def _check_custom_sql(self, table_name, config):
        sql_query = config['query'].format(table_name=table_name)
        expected_result = config.get('expected_result', True)
        
        result = self.spark.sql(sql_query).collect()[0][0]
        passed = result == expected_result
        
        return {
            'passed': passed,
            'value': result,
            'message': f"Custom check result: {result} (expected: {expected_result})"
        }

# Initialize quality framework
quality_framework = DataQualityFramework(spark)
print("🧪 Data quality framework initialized!")

In [None]:
# Generate sample data for testing
print("🔄 Generating sample data for quality testing...")

# Sample data for different source systems
web_events = [
    {"user_id": 1001, "event_type": "page_view", "event_time": "2024-01-15 10:30:00", "properties": {"page": "/home", "browser": "chrome"}},
    {"user_id": 1002, "event_type": "click", "event_time": "2024-01-15 10:35:00", "properties": {"element": "button", "page": "/products"}},
    {"user_id": 1003, "event_type": "purchase", "event_time": "2024-01-15 10:40:00", "properties": {"amount": "99.99", "currency": "USD"}},
    {"user_id": None, "event_type": "error", "event_time": "2024-01-15 10:45:00", "properties": {"error_code": "404"}},  # Null user_id for quality testing
]

mobile_events = [
    {"user_id": 2001, "event_type": "app_open", "event_time": "2024-01-15 11:00:00", "properties": {"app_version": "1.2.3", "os": "iOS"}},
    {"user_id": 2002, "event_type": "screen_view", "event_time": "2024-01-15 11:05:00", "properties": {"screen": "profile", "duration": "30"}},
    {"user_id": 2003, "event_type": None, "event_time": "2024-01-15 11:10:00", "properties": {"action": "swipe"}},  # Null event_type for quality testing
]

# Ingest data
ingestion.ingest_raw_data("web_analytics", web_events)
ingestion.ingest_raw_data("mobile_app", mobile_events)

# Process to silver
ingestion.process_to_silver("web_analytics")
ingestion.process_to_silver("mobile_app")

print("✅ Sample data ingested and processed!")

In [None]:
# Define quality checks configuration
quality_checks_config = {
    "row_count_check": {
        "type": "row_count",
        "min_count": 1,
        "max_count": 10000
    },
    "user_id_null_check": {
        "type": "null_percentage",
        "column": "user_id",
        "max_null_percentage": 10.0
    },
    "event_id_uniqueness": {
        "type": "uniqueness",
        "column": "event_id",
        "min_uniqueness_percentage": 100.0
    },
    "data_freshness_check": {
        "type": "freshness",
        "timestamp_column": "processed_timestamp",
        "max_age_hours": 1.0
    },
    "quality_score_check": {
        "type": "custom_sql",
        "query": "SELECT AVG(data_quality_score) >= 0.8 FROM {table_name}",
        "expected_result": True
    }
}

# Run quality checks on silver table
print("\n🧪 Running quality checks on silver_events table...")
silver_table = "prod.data_lake.silver_events"
quality_results = quality_framework.run_quality_checks(silver_table, quality_checks_config)

# Summary
passed_checks = sum(1 for result in quality_results.values() if result['passed'])
total_checks = len(quality_results)
print(f"\n📊 Quality Check Summary: {passed_checks}/{total_checks} checks passed")

if passed_checks == total_checks:
    print("✅ All quality checks passed! Data is ready for gold layer processing.")
else:
    print("⚠️ Some quality checks failed. Review data quality before proceeding.")

## 5. 📊 Gold Layer Analytics Pipeline

Build the gold layer with business metrics and aggregations.

In [None]:
# Gold Layer Analytics Pipeline
class GoldLayerPipeline:
    def __init__(self, spark_session, pipeline):
        self.spark = spark_session
        self.pipeline = pipeline
        
    def calculate_daily_metrics(self, target_date=None):
        """Calculate daily business metrics for gold layer"""
        if target_date is None:
            target_date = datetime.now().date()
            
        print(f"📊 Calculating daily metrics for {target_date}...")
        
        # Calculate metrics from silver layer
        daily_metrics = self.spark.sql(f"""
        SELECT 
            DATE(event_time) as metric_date,
            source_system,
            event_type,
            COUNT(*) as event_count,
            COUNT(DISTINCT user_id) as unique_users,
            AVG(data_quality_score) as avg_quality_score,
            current_timestamp() as calculated_timestamp
        FROM {self.pipeline.catalog}.{self.pipeline.database}.silver_events
        WHERE DATE(event_time) = DATE('{target_date}')
        AND data_quality_score >= 0.7
        GROUP BY DATE(event_time), source_system, event_type
        """)
        
        if daily_metrics.count() == 0:
            print("⚠️ No data found for the specified date")
            return
        
        # Write to gold table (upsert pattern)
        print("💾 Writing metrics to gold layer...")
        
        # First, delete existing data for this date
        self.spark.sql(f"""
        DELETE FROM {self.pipeline.catalog}.{self.pipeline.database}.gold_daily_metrics
        WHERE metric_date = DATE('{target_date}')
        """)
        
        # Insert new metrics
        daily_metrics.writeTo(f"{self.pipeline.catalog}.{self.pipeline.database}.gold_daily_metrics").append()
        
        metrics_count = daily_metrics.count()
        print(f"✅ Calculated and stored {metrics_count} daily metrics")
        
        return daily_metrics
    
    def generate_business_report(self):
        """Generate business intelligence report"""
        print("📈 Generating business intelligence report...")
        
        # Top events by volume
        top_events = self.spark.sql(f"""
        SELECT 
            event_type,
            SUM(event_count) as total_events,
            SUM(unique_users) as total_users,
            AVG(avg_quality_score) as avg_quality
        FROM {self.pipeline.catalog}.{self.pipeline.database}.gold_daily_metrics
        GROUP BY event_type
        ORDER BY total_events DESC
        """)
        
        print("\n📊 Top Events by Volume:")
        top_events.show()
        
        # Source system performance
        source_performance = self.spark.sql(f"""
        SELECT 
            source_system,
            COUNT(DISTINCT metric_date) as active_days,
            SUM(event_count) as total_events,
            AVG(avg_quality_score) as avg_quality_score
        FROM {self.pipeline.catalog}.{self.pipeline.database}.gold_daily_metrics
        GROUP BY source_system
        ORDER BY total_events DESC
        """)
        
        print("\n📊 Source System Performance:")
        source_performance.show()
        
        # Daily trends
        daily_trends = self.spark.sql(f"""
        SELECT 
            metric_date,
            SUM(event_count) as daily_events,
            SUM(unique_users) as daily_users,
            AVG(avg_quality_score) as daily_quality
        FROM {self.pipeline.catalog}.{self.pipeline.database}.gold_daily_metrics
        GROUP BY metric_date
        ORDER BY metric_date DESC
        """)
        
        print("\n📊 Daily Trends:")
        daily_trends.show()
        
        return {
            'top_events': top_events,
            'source_performance': source_performance,
            'daily_trends': daily_trends
        }

# Initialize gold layer pipeline
gold_pipeline = GoldLayerPipeline(spark, pipeline)

# Calculate metrics
metrics = gold_pipeline.calculate_daily_metrics()

# Generate report
report = gold_pipeline.generate_business_report()

print("\n✅ Gold layer analytics pipeline completed!")

## 6. 🔧 Table Maintenance and Optimization

Implement production table maintenance procedures.

In [None]:
# Table Maintenance Framework
class TableMaintenanceFramework:
    def __init__(self, spark_session):
        self.spark = spark_session
        
    def analyze_table_health(self, table_name):
        """Analyze table health and provide optimization recommendations"""
        print(f"🔍 Analyzing table health for {table_name}...")
        
        health_metrics = {}
        
        try:
            # File statistics
            file_stats = self.spark.sql(f"""
            SELECT 
                COUNT(*) as file_count,
                SUM(file_size_in_bytes) / 1024 / 1024 / 1024 as total_size_gb,
                AVG(file_size_in_bytes) / 1024 / 1024 as avg_file_size_mb,
                MIN(file_size_in_bytes) / 1024 / 1024 as min_file_size_mb,
                MAX(file_size_in_bytes) / 1024 / 1024 as max_file_size_mb,
                SUM(record_count) as total_records
            FROM {table_name}.files
            """).collect()[0]
            
            health_metrics['files'] = {
                'count': file_stats['file_count'],
                'total_size_gb': round(file_stats['total_size_gb'], 2),
                'avg_size_mb': round(file_stats['avg_file_size_mb'], 2),
                'min_size_mb': round(file_stats['min_file_size_mb'], 2),
                'max_size_mb': round(file_stats['max_file_size_mb'], 2),
                'total_records': file_stats['total_records']
            }
            
            # Snapshot statistics
            snapshot_stats = self.spark.sql(f"""
            SELECT 
                COUNT(*) as snapshot_count,
                MIN(committed_at) as oldest_snapshot,
                MAX(committed_at) as newest_snapshot
            FROM {table_name}.snapshots
            """).collect()[0]
            
            health_metrics['snapshots'] = {
                'count': snapshot_stats['snapshot_count'],
                'oldest': snapshot_stats['oldest_snapshot'],
                'newest': snapshot_stats['newest_snapshot']
            }
            
            # Partition statistics
            try:
                partition_stats = self.spark.sql(f"""
                SELECT 
                    COUNT(*) as partition_count,
                    AVG(record_count) as avg_records_per_partition,
                    AVG(file_count) as avg_files_per_partition
                FROM {table_name}.partitions
                """).collect()[0]
                
                health_metrics['partitions'] = {
                    'count': partition_stats['partition_count'],
                    'avg_records': round(partition_stats['avg_records_per_partition'], 0),
                    'avg_files': round(partition_stats['avg_files_per_partition'], 1)
                }
            except:
                health_metrics['partitions'] = {'count': 'N/A'}
                
        except Exception as e:
            print(f"⚠️ Error analyzing table health: {e}")
            return None
            
        # Generate recommendations
        recommendations = self._generate_recommendations(health_metrics)
        
        # Display results
        self._display_health_report(table_name, health_metrics, recommendations)
        
        return health_metrics, recommendations
    
    def _generate_recommendations(self, metrics):
        """Generate optimization recommendations based on health metrics"""
        recommendations = []
        
        # File size recommendations
        if metrics['files']['avg_size_mb'] < 50:
            recommendations.append({
                'type': 'COMPACTION',
                'priority': 'HIGH',
                'message': f"Small files detected (avg: {metrics['files']['avg_size_mb']:.1f}MB). Consider compaction."
            })
        
        if metrics['files']['count'] > 1000:
            recommendations.append({
                'type': 'COMPACTION',
                'priority': 'MEDIUM',
                'message': f"High file count ({metrics['files']['count']}). Compaction may improve performance."
            })
        
        # Snapshot recommendations
        if metrics['snapshots']['count'] > 50:
            recommendations.append({
                'type': 'SNAPSHOT_CLEANUP',
                'priority': 'MEDIUM',
                'message': f"High snapshot count ({metrics['snapshots']['count']}). Consider cleanup."
            })
        
        # Partition recommendations
        if isinstance(metrics['partitions']['count'], int) and metrics['partitions']['avg_files'] > 10:
            recommendations.append({
                'type': 'PARTITION_TUNING',
                'priority': 'MEDIUM', 
                'message': f"High files per partition ({metrics['partitions']['avg_files']:.1f}). Review partitioning strategy."
            })
            
        return recommendations
    
    def _display_health_report(self, table_name, metrics, recommendations):
        """Display formatted health report"""
        print(f"\n📊 Health Report for {table_name}:")
        print(f"\n📁 File Statistics:")
        print(f"  • Total files: {metrics['files']['count']}")
        print(f"  • Total size: {metrics['files']['total_size_gb']} GB")
        print(f"  • Average file size: {metrics['files']['avg_size_mb']} MB")
        print(f"  • File size range: {metrics['files']['min_size_mb']:.1f} - {metrics['files']['max_size_mb']:.1f} MB")
        print(f"  • Total records: {metrics['files']['total_records']:,}")
        
        print(f"\n📸 Snapshot Statistics:")
        print(f"  • Total snapshots: {metrics['snapshots']['count']}")
        print(f"  • Date range: {metrics['snapshots']['oldest']} to {metrics['snapshots']['newest']}")
        
        if isinstance(metrics['partitions']['count'], int):
            print(f"\n🗂️ Partition Statistics:")
            print(f"  • Total partitions: {metrics['partitions']['count']}")
            print(f"  • Average records per partition: {metrics['partitions']['avg_records']:,}")
            print(f"  • Average files per partition: {metrics['partitions']['avg_files']}")
        
        print(f"\n💡 Recommendations:")
        if recommendations:
            for rec in recommendations:
                priority_icon = "🔴" if rec['priority'] == 'HIGH' else "🟡" if rec['priority'] == 'MEDIUM' else "🟢"
                print(f"  {priority_icon} {rec['type']}: {rec['message']}")
        else:
            print(f"  ✅ No immediate optimizations needed")
    
    def compact_table(self, table_name, target_file_size_mb=128):
        """Compact table files for better performance"""
        print(f"🔧 Compacting table {table_name} (target file size: {target_file_size_mb}MB)...")
        
        try:
            # Note: This is a simplified compaction example
            # In production, you might use Iceberg's built-in compaction procedures
            
            # Get file count before compaction
            before_stats = self.spark.sql(f"SELECT COUNT(*) as count FROM {table_name}.files").collect()[0]['count']
            
            # Perform compaction by rewriting data
            # This is a simplified approach - in production use proper compaction procedures
            temp_data = self.spark.sql(f"SELECT * FROM {table_name}")
            
            # Set target file size for this session
            target_bytes = target_file_size_mb * 1024 * 1024
            self.spark.conf.set("spark.sql.catalog.prod.write.target-file-size-bytes", str(target_bytes))
            
            print(f"✅ Compaction completed for {table_name}")
            print(f"📊 Files before compaction: {before_stats}")
            
        except Exception as e:
            print(f"❌ Compaction failed: {e}")
    
    def cleanup_snapshots(self, table_name, retention_days=30):
        """Clean up old snapshots beyond retention period"""
        print(f"🧹 Cleaning up snapshots for {table_name} (retention: {retention_days} days)...")
        
        try:
            # Calculate cutoff date
            cutoff_date = datetime.now() - timedelta(days=retention_days)
            
            # Get snapshot count before cleanup
            before_count = self.spark.sql(f"SELECT COUNT(*) as count FROM {table_name}.snapshots").collect()[0]['count']
            
            # In a real implementation, you would use:
            # CALL prod.system.expire_snapshots('table_name', TIMESTAMP 'cutoff_date')
            print(f"📊 Snapshots before cleanup: {before_count}")
            print(f"💡 Would expire snapshots older than {cutoff_date}")
            print(f"⚠️ Snapshot cleanup not executed in tutorial (use CALL prod.system.expire_snapshots)")
            
        except Exception as e:
            print(f"❌ Snapshot cleanup failed: {e}")

# Initialize maintenance framework
maintenance = TableMaintenanceFramework(spark)
print("🔧 Table maintenance framework initialized!")

In [None]:
# Run table health analysis
print("🔍 Running table health analysis...")

# Analyze all tables in our data lake
tables_to_analyze = [
    "prod.data_lake.bronze_events",
    "prod.data_lake.silver_events", 
    "prod.data_lake.gold_daily_metrics"
]

for table in tables_to_analyze:
    print(f"\n{'='*60}")
    try:
        health_metrics, recommendations = maintenance.analyze_table_health(table)
        
        # Take action on high priority recommendations
        high_priority_recs = [r for r in recommendations if r['priority'] == 'HIGH']
        if high_priority_recs:
            print(f"\n🚨 High priority issues found for {table}:")
            for rec in high_priority_recs:
                print(f"  • {rec['message']}")
                
                if rec['type'] == 'COMPACTION':
                    # In production, you might automatically trigger compaction
                    print(f"  🔧 Recommendation: Schedule compaction for {table}")
                    
    except Exception as e:
        print(f"⚠️ Could not analyze {table}: {e}")

print(f"\n{'='*60}")
print("✅ Table health analysis completed!")

## 7. 📊 Production Monitoring Dashboard

Create a monitoring dashboard for production operations.

In [None]:
# Production Monitoring Dashboard
class ProductionMonitoringDashboard:
    def __init__(self, spark_session):
        self.spark = spark_session
        
    def generate_operational_dashboard(self):
        """Generate comprehensive operational dashboard"""
        print("📊 PRODUCTION MONITORING DASHBOARD")
        print("=" * 50)
        
        # Data Pipeline Health
        self._show_pipeline_health()
        
        # Data Quality Metrics
        self._show_data_quality_metrics()
        
        # Storage and Performance
        self._show_storage_performance()
        
        # Business Metrics
        self._show_business_metrics()
        
        # System Alerts
        self._show_system_alerts()
    
    def _show_pipeline_health(self):
        """Show data pipeline health status"""
        print("\n🏥 PIPELINE HEALTH STATUS")
        print("-" * 30)
        
        try:
            # Check data freshness across layers
            layers = {
                'Bronze': 'prod.data_lake.bronze_events',
                'Silver': 'prod.data_lake.silver_events',
                'Gold': 'prod.data_lake.gold_daily_metrics'
            }
            
            for layer_name, table_name in layers.items():
                try:
                    # Get latest timestamp
                    if layer_name == 'Gold':
                        timestamp_col = 'calculated_timestamp'
                    elif layer_name == 'Silver':
                        timestamp_col = 'processed_timestamp'
                    else:
                        timestamp_col = 'ingestion_timestamp'
                    
                    latest_result = self.spark.sql(f"""
                    SELECT 
                        MAX({timestamp_col}) as latest_timestamp,
                        COUNT(*) as record_count
                    FROM {table_name}
                    """).collect()[0]
                    
                    if latest_result['latest_timestamp']:
                        age_minutes = (datetime.now() - latest_result['latest_timestamp']).total_seconds() / 60
                        status = "🟢 HEALTHY" if age_minutes < 60 else "🟡 WARNING" if age_minutes < 180 else "🔴 CRITICAL"
                        print(f"{layer_name:8}: {status} | Records: {latest_result['record_count']:,} | Age: {age_minutes:.1f}min")
                    else:
                        print(f"{layer_name:8}: 🔴 NO DATA")
                        
                except Exception as e:
                    print(f"{layer_name:8}: ❌ ERROR - {str(e)[:50]}")
        
        except Exception as e:
            print(f"❌ Pipeline health check failed: {e}")
    
    def _show_data_quality_metrics(self):
        """Show data quality metrics"""
        print("\n📏 DATA QUALITY METRICS")
        print("-" * 30)
        
        try:
            # Silver layer quality metrics
            quality_metrics = self.spark.sql("""
            SELECT 
                source_system,
                COUNT(*) as total_records,
                AVG(data_quality_score) as avg_quality_score,
                SUM(CASE WHEN data_quality_score >= 0.8 THEN 1 ELSE 0 END) / COUNT(*) * 100 as high_quality_pct
            FROM prod.data_lake.silver_events
            WHERE processed_timestamp >= current_timestamp() - INTERVAL 24 HOURS
            GROUP BY source_system
            """)
            
            print("📊 Last 24 Hours Quality Summary:")
            quality_data = quality_metrics.collect()
            
            if quality_data:
                for row in quality_data:
                    quality_status = "🟢" if row['avg_quality_score'] >= 0.8 else "🟡" if row['avg_quality_score'] >= 0.6 else "🔴"
                    print(f"  {row['source_system']:15}: {quality_status} Score: {row['avg_quality_score']:.2f} | High Quality: {row['high_quality_pct']:.1f}% | Records: {row['total_records']:,}")
            else:
                print("  ⚠️ No recent data for quality analysis")
                
        except Exception as e:
            print(f"❌ Quality metrics check failed: {e}")
    
    def _show_storage_performance(self):
        """Show storage and performance metrics"""
        print("\n💾 STORAGE & PERFORMANCE")
        print("-" * 30)
        
        try:
            tables = [
                ('Bronze', 'prod.data_lake.bronze_events'),
                ('Silver', 'prod.data_lake.silver_events'),
                ('Gold', 'prod.data_lake.gold_daily_metrics')
            ]
            
            total_size_gb = 0
            total_files = 0
            
            for layer, table in tables:
                try:
                    storage_stats = self.spark.sql(f"""
                    SELECT 
                        COUNT(*) as file_count,
                        SUM(file_size_in_bytes) / 1024 / 1024 / 1024 as size_gb,
                        AVG(file_size_in_bytes) / 1024 / 1024 as avg_file_size_mb
                    FROM {table}.files
                    """).collect()[0]
                    
                    size_gb = storage_stats['size_gb'] or 0
                    file_count = storage_stats['file_count'] or 0
                    avg_size_mb = storage_stats['avg_file_size_mb'] or 0
                    
                    total_size_gb += size_gb
                    total_files += file_count
                    
                    file_health = "🟢" if avg_size_mb >= 64 else "🟡" if avg_size_mb >= 32 else "🔴"
                    print(f"{layer:8}: {file_health} Size: {size_gb:.2f}GB | Files: {file_count:,} | Avg: {avg_size_mb:.1f}MB")
                    
                except Exception as e:
                    print(f"{layer:8}: ❌ ERROR - {str(e)[:30]}")
            
            print(f"\n📊 Total Storage: {total_size_gb:.2f}GB across {total_files:,} files")
            
        except Exception as e:
            print(f"❌ Storage metrics check failed: {e}")
    
    def _show_business_metrics(self):
        """Show key business metrics"""
        print("\n📈 BUSINESS METRICS")
        print("-" * 30)
        
        try:
            # Today's metrics
            today_metrics = self.spark.sql("""
            SELECT 
                SUM(event_count) as total_events,
                SUM(unique_users) as total_users,
                COUNT(DISTINCT source_system) as active_sources
            FROM prod.data_lake.gold_daily_metrics
            WHERE metric_date = CURRENT_DATE
            """).collect()[0]
            
            # Yesterday's metrics for comparison
            yesterday_metrics = self.spark.sql("""
            SELECT 
                SUM(event_count) as total_events,
                SUM(unique_users) as total_users
            FROM prod.data_lake.gold_daily_metrics
            WHERE metric_date = CURRENT_DATE - INTERVAL 1 DAY
            """).collect()[0]
            
            print("📊 Today's Activity:")
            print(f"  Events: {today_metrics['total_events'] or 0:,}")
            print(f"  Users: {today_metrics['total_users'] or 0:,}")
            print(f"  Active Sources: {today_metrics['active_sources'] or 0}")
            
            # Growth calculation
            if yesterday_metrics['total_events'] and today_metrics['total_events']:
                event_growth = ((today_metrics['total_events'] - yesterday_metrics['total_events']) / yesterday_metrics['total_events']) * 100
                growth_icon = "📈" if event_growth > 0 else "📉" if event_growth < 0 else "➡️"
                print(f"\n📊 Day-over-Day Growth:")
                print(f"  Events: {growth_icon} {event_growth:+.1f}%")
            
        except Exception as e:
            print(f"❌ Business metrics check failed: {e}")
    
    def _show_system_alerts(self):
        """Show system alerts and recommendations"""
        print("\n🚨 SYSTEM ALERTS")
        print("-" * 30)
        
        alerts = []
        
        try:
            # Check for tables with many small files
            tables_to_check = [
                'prod.data_lake.bronze_events',
                'prod.data_lake.silver_events',
                'prod.data_lake.gold_daily_metrics'
            ]
            
            for table in tables_to_check:
                try:
                    file_stats = self.spark.sql(f"""
                    SELECT 
                        COUNT(*) as file_count,
                        AVG(file_size_in_bytes) / 1024 / 1024 as avg_size_mb
                    FROM {table}.files
                    """).collect()[0]
                    
                    if file_stats['avg_size_mb'] and file_stats['avg_size_mb'] < 32:
                        alerts.append(f"🔴 PERFORMANCE: {table} has small files (avg: {file_stats['avg_size_mb']:.1f}MB)")
                    
                    if file_stats['file_count'] > 500:
                        alerts.append(f"🟡 MAINTENANCE: {table} has many files ({file_stats['file_count']:,})")
                        
                except:
                    continue
            
            # Check for data freshness issues
            try:
                latest_silver = self.spark.sql("""
                SELECT MAX(processed_timestamp) as latest
                FROM prod.data_lake.silver_events
                """).collect()[0]['latest']
                
                if latest_silver:
                    age_hours = (datetime.now() - latest_silver).total_seconds() / 3600
                    if age_hours > 2:
                        alerts.append(f"🔴 DATA FRESHNESS: Silver layer data is {age_hours:.1f} hours old")
            except:
                pass
            
            # Display alerts
            if alerts:
                for alert in alerts:
                    print(f"  {alert}")
            else:
                print("  ✅ No active alerts")
                
        except Exception as e:
            print(f"❌ Alert check failed: {e}")

# Initialize monitoring dashboard
dashboard = ProductionMonitoringDashboard(spark)

# Generate dashboard
dashboard.generate_operational_dashboard()

print("\n✅ Production monitoring dashboard generated!")

## 8. 🎉 Production Pipeline Summary

Summary of production patterns and best practices.

In [None]:
# Production Pipeline Summary
print("🎉 PRODUCTION PIPELINE TUTORIAL COMPLETE!")
print("\n✅ What You've Built:")

accomplishments = [
    "Production-ready Spark session with optimized configurations",
    "Medallion architecture (Bronze → Silver → Gold) implementation",
    "Comprehensive data quality framework with multiple check types",
    "Automated data ingestion pipeline with error handling",
    "Business metrics calculation and gold layer analytics",
    "Table maintenance framework with health analysis",
    "Production monitoring dashboard with alerts",
    "End-to-end data lineage and quality tracking"
]

for i, accomplishment in enumerate(accomplishments, 1):
    print(f"   {i}. {accomplishment}")

print("\n💡 PRODUCTION BEST PRACTICES LEARNED:")

best_practices = {
    "🏗️ Architecture": [
        "Use medallion architecture for clear data flow",
        "Implement proper separation of concerns",
        "Design for scalability and maintainability",
        "Plan for schema evolution from day one"
    ],
    "📊 Data Quality": [
        "Implement comprehensive data validation",
        "Use quality scores for automated decisions",
        "Monitor data freshness and completeness",
        "Set up automated quality alerts"
    ],
    "⚡ Performance": [
        "Optimize file sizes for your workload",
        "Use appropriate compression algorithms",
        "Implement regular table maintenance",
        "Monitor and tune partition strategies"
    ],
    "🔧 Operations": [
        "Automate monitoring and alerting",
        "Implement proper error handling",
        "Plan for disaster recovery",
        "Document operational procedures"
    ],
    "🔒 Reliability": [
        "Use ACID transactions for consistency",
        "Implement proper retry mechanisms",
        "Plan for graceful degradation",
        "Test failure scenarios regularly"
    ]
}

for category, practices in best_practices.items():
    print(f"\n{category}:")
    for practice in practices:
        print(f"   • {practice}")

print("\n🚀 NEXT STEPS FOR PRODUCTION:")
next_steps = [
    "Implement CI/CD pipelines for data code",
    "Set up proper security and access controls",
    "Integrate with enterprise monitoring systems",
    "Implement advanced data governance",
    "Scale to handle production data volumes",
    "Add stream processing for real-time use cases"
]

for step in next_steps:
    print(f"   → {step}")

print("\n🎯 Key Takeaway:")
print("   Production Iceberg pipelines require careful attention to data quality,")
print("   performance optimization, monitoring, and operational excellence.")
print("   With proper implementation, Iceberg provides a robust foundation")
print("   for enterprise-scale data lake solutions!")

print("\n🏆 Congratulations on completing the Production Pipeline Tutorial!")