# Comprehensive Data Processing Pipeline

This notebook consolidates ALL data processing required before forecasting:

1. **Schema-Enforced Ingestion**: Read CSV files with proper Spark schemas
2. **Bronze Layer Creation**: Clean and standardize data with type conversion
3. **Data Quality Validation**: Comprehensive validation highlighting problematic rows
4. **Silver Layer Creation**: Enhanced data with metadata and quality scores
5. **Forecasting Preparation**: Prepare clean, validated data for forecasting workflows

## Key Features

- **Proper Spark Schemas**: Enforced data types and constraints for bronze/silver tables
- **Data Validation**: Highlights source files and specific rows with issues  
- **Quality Scoring**: Each row gets a data quality score for filtering
- **Comprehensive Reporting**: Detailed processing and validation reports
- **Modular Design**: Functions can be reused across different workflows

This replaces the previous scattered processing logic and provides a single source of truth for data preparation.

In [None]:
import logging
from pathlib import Path
from pprint import pprint

from databricks.connect import DatabricksSession as SparkSession
from pyspark.sql.functions import col, count, when, isnan, isnull

from petrinex.config import load_config
from petrinex.process import (
    process_csvs_to_bronze_and_silver,
    prepare_data_for_forecasting
)
from petrinex.schemas import get_schema, get_validation_rules

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
spark = SparkSession.builder.getOrCreate()
config = load_config("config.yaml")

In [None]:
# ==============================================================================
# STEP 1: Process Conventional Volume Data (CSV -> Bronze -> Silver)
# ==============================================================================

print("📊 STEP 1: Processing Conventional Volume Data")
print("-" * 50)

try:
    conv_bronze_df, conv_silver_df, conv_report = process_csvs_to_bronze_and_silver(
        config=config,
        dataset='conventional',
        spark=spark,
        validate_data=True
    )
    
    print(f"✅ Conventional Volume Processing Complete!")
    print(f"   Bronze rows: {conv_report['row_counts'].get('bronze', 0):,}")
    print(f"   Silver rows: {conv_report['row_counts'].get('silver', 0):,}")
    print(f"   Quality: {conv_report['data_quality_summary'].get('quality_assessment', 'N/A')}")
    
    if conv_report.get('validation_report'):
        val_report = conv_report['validation_report']
        print(f"   Validation errors: {len(val_report.get('validation_errors', []))}")
        print(f"   Validation warnings: {len(val_report.get('validation_warnings', []))}")
    
    print()

except Exception as e:
    print(f"❌ Error processing conventional volume data: {e}")
    conv_bronze_df = None
    conv_silver_df = None
    conv_report = None

In [None]:
# ==============================================================================
# STEP 2: Process NGL Volume Data (CSV -> Bronze -> Silver)
# ==============================================================================

print("📊 STEP 2: Processing NGL Volume Data")
print("-" * 50)

try:
    ngl_bronze_df, ngl_silver_df, ngl_report = process_csvs_to_bronze_and_silver(
        config=config,
        dataset='ngl',
        spark=spark,
        validate_data=True
    )
    
    print(f"✅ NGL Volume Processing Complete!")
    print(f"   Bronze rows: {ngl_report['row_counts'].get('bronze', 0):,}")
    print(f"   Silver rows: {ngl_report['row_counts'].get('silver', 0):,}")
    print(f"   Quality: {ngl_report['data_quality_summary'].get('quality_assessment', 'N/A')}")
    
    if ngl_report.get('validation_report'):
        val_report = ngl_report['validation_report']
        print(f"   Validation errors: {len(val_report.get('validation_errors', []))}")
        print(f"   Validation warnings: {len(val_report.get('validation_warnings', []))}")
    
    print()

except Exception as e:
    print(f"❌ Error processing NGL volume data: {e}")
    ngl_bronze_df = None
    ngl_silver_df = None
    ngl_report = None

In [None]:
# ==============================================================================
# STEP 3: Data Quality Analysis and Reporting
# ==============================================================================

print("📊 STEP 3: Data Quality Analysis")
print("-" * 50)

def print_validation_details(report, dataset_name):
    """Print detailed validation report."""
    if not report or not report.get('validation_report'):
        print(f"   No validation report available for {dataset_name}")
        return
    
    val_report = report['validation_report']
    
    print(f"\n🔍 {dataset_name.upper()} Validation Details:")
    print(f"   Total rows processed: {val_report.get('total_rows', 0):,}")
    
    # Print errors
    errors = val_report.get('validation_errors', [])
    if errors:
        print(f"   ❌ {len(errors)} ERRORS found:")
        for i, error in enumerate(errors[:5], 1):  # Show first 5 errors
            print(f"      {i}. {error}")
        if len(errors) > 5:
            print(f"      ... and {len(errors) - 5} more errors")
    else:
        print(f"   ✅ No validation errors found")
    
    # Print warnings
    warnings = val_report.get('validation_warnings', [])
    if warnings:
        print(f"   ⚠️  {len(warnings)} WARNINGS found:")
        for i, warning in enumerate(warnings[:5], 1):  # Show first 5 warnings
            print(f"      {i}. {warning}")
        if len(warnings) > 5:
            print(f"      ... and {len(warnings) - 5} more warnings")
    else:
        print(f"   ✅ No validation warnings found")
    
    print()

# Print detailed validation reports
if conv_report:
    print_validation_details(conv_report, "Conventional Volume")

if ngl_report:
    print_validation_details(ngl_report, "NGL Volume")


In [None]:
# ==============================================================================
# STEP 4: Schema Validation and Verification
# ==============================================================================

print("📊 STEP 4: Schema Validation and Verification")
print("-" * 50)

def verify_table_schema(df, table_type, layer):
    """Verify that the DataFrame matches expected schema."""
    if df is None:
        print(f"   ❌ {table_type} {layer} DataFrame is None")
        return False
    
    expected_schema = get_schema(table_type, layer)
    expected_columns = {field.name for field in expected_schema.fields}
    actual_columns = set(df.columns)
    
    missing_columns = expected_columns - actual_columns
    extra_columns = actual_columns - expected_columns
    
    print(f"\n🔍 {table_type.upper()} {layer.upper()} Schema Verification:")
    print(f"   Expected columns: {len(expected_columns)}")
    print(f"   Actual columns: {len(actual_columns)}")
    
    if missing_columns:
        print(f"   ❌ Missing columns: {missing_columns}")
    
    if extra_columns:
        print(f"   ℹ️  Extra columns: {extra_columns}")
    
    if not missing_columns and not extra_columns:
        print(f"   ✅ Schema matches perfectly!")
        return True
    else:
        print(f"   ⚠️  Schema has differences")
        return False

# Verify schemas
conv_bronze_schema_ok = verify_table_schema(conv_bronze_df, "conv_vol", "bronze")
conv_silver_schema_ok = verify_table_schema(conv_silver_df, "conv_vol", "silver")

ngl_bronze_schema_ok = verify_table_schema(ngl_bronze_df, "ngl_vol", "bronze")
ngl_silver_schema_ok = verify_table_schema(ngl_silver_df, "ngl_vol", "silver")

print(f"\n📋 Schema Verification Summary:")
print(f"   Conv Bronze: {'✅' if conv_bronze_schema_ok else '❌'}")
print(f"   Conv Silver: {'✅' if conv_silver_schema_ok else '❌'}")
print(f"   NGL Bronze: {'✅' if ngl_bronze_schema_ok else '❌'}")
print(f"   NGL Silver: {'✅' if ngl_silver_schema_ok else '❌'}")
print()


In [None]:
# ==============================================================================
# STEP 5: Data Quality Score Analysis
# ==============================================================================

print("📊 STEP 5: Data Quality Score Analysis")
print("-" * 50)

def analyze_data_quality_scores(df, dataset_name):
    """Analyze data quality score distribution."""
    if df is None or "_data_quality_score" not in df.columns:
        print(f"   ❌ No data quality scores available for {dataset_name}")
        return
    
    # Calculate quality score statistics
    quality_stats = df.select("_data_quality_score").describe().collect()
    
    print(f"\n📈 {dataset_name} Data Quality Score Distribution:")
    for stat in quality_stats:
        metric = stat["summary"]
        value = stat["_data_quality_score"]
        if value and metric in ["count", "mean", "min", "max", "stddev"]:
            if metric == "count":
                print(f"   Total rows: {value}")
            else:
                print(f"   {metric.capitalize()}: {float(value):.2f}")
    
    # Count rows by quality bands
    quality_bands = df.select(
        count(when(col("_data_quality_score") >= 95, 1)).alias("excellent"),
        count(when((col("_data_quality_score") >= 85) & (col("_data_quality_score") < 95), 1)).alias("good"),
        count(when((col("_data_quality_score") >= 70) & (col("_data_quality_score") < 85), 1)).alias("fair"),
        count(when(col("_data_quality_score") < 70, 1)).alias("poor"),
        count(when(col("_data_quality_score").isNull(), 1)).alias("no_score")
    ).collect()[0]
    
    total_rows = sum([quality_bands[band] for band in quality_bands.asDict().keys()])
    
    print(f"   Quality Bands:")
    print(f"      Excellent (≥95): {quality_bands['excellent']:,} ({quality_bands['excellent']/total_rows*100:.1f}%)")
    print(f"      Good (85-94): {quality_bands['good']:,} ({quality_bands['good']/total_rows*100:.1f}%)")
    print(f"      Fair (70-84): {quality_bands['fair']:,} ({quality_bands['fair']/total_rows*100:.1f}%)")
    print(f"      Poor (<70): {quality_bands['poor']:,} ({quality_bands['poor']/total_rows*100:.1f}%)")
    print(f"      No Score: {quality_bands['no_score']:,} ({quality_bands['no_score']/total_rows*100:.1f}%)")

# Analyze quality scores
if conv_silver_df:
    analyze_data_quality_scores(conv_silver_df, "Conventional Volume")

if ngl_silver_df:
    analyze_data_quality_scores(ngl_silver_df, "NGL Volume")


In [None]:
# ==============================================================================
# STEP 6: Prepare Data for Forecasting
# ==============================================================================

print("📊 STEP 6: Prepare Data for Forecasting")
print("-" * 50)

# Prepare NGL data for forecasting (this is what the forecasting module needs)
if ngl_silver_df:
    try:
        forecast_ready_df = prepare_data_for_forecasting(
            spark=spark,
            config=config,
            table_type="ngl_vol",
            quality_threshold=70.0  # Only include rows with quality score >= 70
        )
        
        forecast_count = forecast_ready_df.count()
        original_count = ngl_silver_df.count()
        
        print(f"✅ NGL Data prepared for forecasting:")
        print(f"   Original rows: {original_count:,}")
        print(f"   Forecast-ready rows: {forecast_count:,}")
        print(f"   Quality filtered out: {original_count - forecast_count:,} rows")
        
        # Show sample of forecast-ready data
        print(f"\n📋 Sample of forecast-ready data:")
        sample_df = forecast_ready_df.select(
            "WellID", "ProductionMonth", "OperatorName", 
            "GasProduction", "OilProduction", "CondensateProduction",
            "_data_quality_score"
        ).limit(5)
        
        sample_df.show(truncate=False)
        
        print(f"✅ Data is now ready for forecasting workflows!")
        
    except Exception as e:
        print(f"❌ Error preparing data for forecasting: {e}")
        forecast_ready_df = None
else:
    print("❌ No NGL silver data available for forecasting preparation")
    forecast_ready_df = None


In [None]:
# ==============================================================================
# STEP 7: Generate Comprehensive Processing Report
# ==============================================================================

print("📊 STEP 7: Processing Summary Report")
print("=" * 50)

def generate_processing_summary():
    """Generate a comprehensive summary of the processing pipeline."""
    
    summary = {
        "timestamp": "2024-01-01T00:00:00",  # This would be current timestamp in real execution
        "pipeline_status": "COMPLETED",
        "datasets_processed": 0,
        "total_errors": 0,
        "total_warnings": 0,
        "tables_created": [],
        "data_quality_overview": {}
    }
    
    # Process conventional volume results
    if conv_report:
        summary["datasets_processed"] += 1
        if conv_report.get('validation_report'):
            val_report = conv_report['validation_report']
            summary["total_errors"] += len(val_report.get('validation_errors', []))
            summary["total_warnings"] += len(val_report.get('validation_warnings', []))
        
        summary["tables_created"].extend([
            conv_report.get('bronze_table', 'N/A'),
            conv_report.get('silver_table', 'N/A')
        ])
        
        summary["data_quality_overview"]["conventional"] = conv_report.get('data_quality_summary', {})
    
    # Process NGL volume results
    if ngl_report:
        summary["datasets_processed"] += 1
        if ngl_report.get('validation_report'):
            val_report = ngl_report['validation_report']
            summary["total_errors"] += len(val_report.get('validation_errors', []))
            summary["total_warnings"] += len(val_report.get('validation_warnings', []))
        
        summary["tables_created"].extend([
            ngl_report.get('bronze_table', 'N/A'),
            ngl_report.get('silver_table', 'N/A')
        ])
        
        summary["data_quality_overview"]["ngl"] = ngl_report.get('data_quality_summary', {})
    
    # Determine overall status
    if summary["total_errors"] > 0:
        summary["pipeline_status"] = "COMPLETED_WITH_ERRORS"
    elif summary["total_warnings"] > 0:
        summary["pipeline_status"] = "COMPLETED_WITH_WARNINGS"
    else:
        summary["pipeline_status"] = "SUCCESS"
    
    return summary

# Generate and display summary
processing_summary = generate_processing_summary()

print(f"🎯 PROCESSING PIPELINE SUMMARY")
print(f"Status: {processing_summary['pipeline_status']}")
print(f"Datasets Processed: {processing_summary['datasets_processed']}")
print(f"Total Validation Errors: {processing_summary['total_errors']}")
print(f"Total Validation Warnings: {processing_summary['total_warnings']}")

print(f"\n📋 Tables Created:")
for table in processing_summary['tables_created']:
    print(f"   • {table}")

print(f"\n📊 Data Quality Overview:")
for dataset, quality_info in processing_summary['data_quality_overview'].items():
    if quality_info:
        print(f"   {dataset.upper()}:")
        print(f"      Rows: {quality_info.get('total_rows', 0):,}")
        print(f"      Quality: {quality_info.get('quality_assessment', 'N/A')}")
        print(f"      Avg Score: {quality_info.get('average_quality_score', 0):.1f}")

print(f"\n🏁 Processing pipeline complete!")
if forecast_ready_df:
    print(f"✅ Data is ready for forecasting workflows")
else:
    print(f"⚠️  Check processing results before proceeding to forecasting")

print("\n" + "=" * 50)
