# üîç Silver Layer Data Quality Validation
**Date:** November 12, 2025  
**Objective:** Validate Silver layer data after ETL improvements - check energy-radiation alignment, anomaly detection, data cleanliness

---

## Key Improvements Made:
- ‚úÖ Facility-specific capacity thresholds (COLEASF: 145 MWh, others: 115 MWh)
- ‚úÖ Night energy detection (flag > 1 MWh during 22-6h)
- ‚úÖ Daytime zero-energy detection (flag when energy=0 but radiation>300)
- ‚úÖ Peak hour anomalies (flag < 5 MWh during 11-15h)
- ‚úÖ Weather quality bounds validation
- ‚úÖ Air quality range checks

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import subprocess
import json
from datetime import datetime

# Set style
sns.set_style("darkgrid")
plt.rcParams['figure.figsize'] = (14, 6)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

print("‚úÖ Libraries loaded successfully")

## 1Ô∏è‚É£ Load Silver Data & Compare with Bronze

In [None]:
from pathlib import Path

# Configuration
SILVER_DIR = Path('/home/pvlakehouse/dlh-pv/src/pv_lakehouse/exported_data')

print("‚úÖ Loading Silver Layer Data from CSV files...")
print(f"üìÇ Data directory: {SILVER_DIR}")

# Load Silver Energy Data
print("\n‚ö° Loading Silver Energy Data...")
try:
    energy_df = pd.read_csv(SILVER_DIR / 'lh_silver_clean_hourly_energy.csv')
    print(f"‚úÖ Loaded {len(energy_df):,} energy records")
    print(f"   Columns: {list(energy_df.columns)}")
    print(f"\n   Sample data:")
    print(energy_df.head(3))
except Exception as e:
    print(f"‚ùå Error loading energy data: {e}")
    energy_df = None

# Load Silver Weather Data  
print("\nüå§Ô∏è  Loading Silver Weather Data...")
try:
    weather_df = pd.read_csv(SILVER_DIR / 'lh_silver_clean_hourly_weather.csv')
    print(f"‚úÖ Loaded {len(weather_df):,} weather records")
    print(f"   Columns: {list(weather_df.columns)}")
    print(f"\n   Sample data:")
    print(weather_df.head(3))
except Exception as e:
    print(f"‚ùå Error loading weather data: {e}")
    weather_df = None

# Load Silver Air Quality Data
print("\nüí® Loading Silver Air Quality Data...")
try:
    aqi_df = pd.read_csv(SILVER_DIR / 'lh_silver_clean_hourly_air_quality.csv')
    print(f"‚úÖ Loaded {len(aqi_df):,} air quality records")
    print(f"   Columns: {list(aqi_df.columns)}")
    print(f"\n   Sample data:")
    print(aqi_df.head(3))
except Exception as e:
    print(f"‚ùå Error loading air quality data: {e}")
    aqi_df = None

print("\n" + "="*80)
print("‚úÖ ALL DATA LOADED SUCCESSFULLY!")
print("="*80)


## 2Ô∏è‚É£ Quality Metrics Summary - Check Improvements

In [None]:
# Quality Metrics from Silver Energy
print("üìä ENERGY QUALITY DISTRIBUTION:")
print("=" * 80)
if energy_df is not None:
    quality_dist = energy_df['quality_flag'].value_counts()
    total_records = len(energy_df)
    print(f"\nTotal records: {total_records:,}\n")
    for flag, count in quality_dist.items():
        pct = (count / total_records) * 100
        print(f"  {flag:10s}: {count:6,} records ({pct:6.2f}%)")
else:
    print("‚ùå Energy data not loaded")

# Quality issues breakdown
print("\n? ENERGY QUALITY ISSUES BREAKDOWN:")
print("=" * 80)
if energy_df is not None:
    issues_data = energy_df[energy_df['quality_issues'].notna() & (energy_df['quality_issues'] != '')]
    print(f"\nRecords with quality issues: {len(issues_data):,}\n")
    
    # Parse and count issue types
    issue_types = {}
    for issues_str in issues_data['quality_issues']:
        if pd.notna(issues_str) and issues_str != '':
            for issue in str(issues_str).split('|'):
                issue = issue.strip()
                if issue:
                    issue_types[issue] = issue_types.get(issue, 0) + 1
    
    # Sort by frequency
    for issue, count in sorted(issue_types.items(), key=lambda x: x[1], reverse=True):
        pct = (count / total_records) * 100
        print(f"  {issue:35s}: {count:6,} ({pct:6.2f}%)")
else:
    print("‚ùå Energy data not loaded")


## 3Ô∏è‚É£ Energy vs Radiation Analysis - Divergence Check

In [None]:
# Analyze Energy-Radiation correlation
print("üìà ENERGY-RADIATION CORRELATION BY FACILITY:")
print("=" * 100)

if energy_df is not None and weather_df is not None:
    # Merge energy and weather data
    merged_df = pd.merge(
        energy_df,
        weather_df[['facility_code', 'date_hour', 'shortwave_radiation']],
        on=['facility_code', 'date_hour'],
        how='inner'
    )
    
    print(f"\nMerged {len(merged_df):,} records (energy + weather)\n")
    
    # Calculate correlation by facility
    for facility in sorted(merged_df['facility_code'].unique()):
        facility_data = merged_df[merged_df['facility_code'] == facility]
        facility_data_positive = facility_data[facility_data['energy_mwh'] > 0]
        
        if len(facility_data_positive) > 0:
            correlation = facility_data_positive['energy_mwh'].corr(facility_data_positive['shortwave_radiation'])
            
            print(f"  {facility:12s}:")
            print(f"    Records: {len(facility_data):6,}  |  Energy>0: {len(facility_data_positive):6,}")
            print(f"    Energy (MWh):    Min={facility_data['energy_mwh'].min():8.2f}  Max={facility_data['energy_mwh'].max():8.2f}  Avg={facility_data['energy_mwh'].mean():8.2f}")
            print(f"    Radiation (W/m¬≤): Min={facility_data['shortwave_radiation'].min():8.2f}  Max={facility_data['shortwave_radiation'].max():8.2f}  Avg={facility_data['shortwave_radiation'].mean():8.2f}")
            print(f"    üìä Correlation: {correlation:7.4f} {'‚úÖ' if correlation > 0.85 else '‚ö†Ô∏è' if correlation > 0.7 else '‚ùå'}")
            print()

# Check for anomalies: High radiation but zero energy (during daytime)
print("\n‚ö†Ô∏è ANOMALIES: Zero Energy with High Radiation (Daytime):")
print("=" * 100)

if energy_df is not None and weather_df is not None:
    anomaly_data = pd.merge(
        energy_df,
        weather_df[['facility_code', 'date_hour', 'shortwave_radiation']],
        on=['facility_code', 'date_hour'],
        how='inner'
    )
    
    # Convert date_hour to datetime if string
    if anomaly_data['date_hour'].dtype == 'object':
        anomaly_data['date_hour'] = pd.to_datetime(anomaly_data['date_hour'])
    
    # Extract hour
    anomaly_data['hour'] = anomaly_data['date_hour'].dt.hour
    
    # Find anomalies: zero energy, high radiation, daytime (6-18)
    anomalies = anomaly_data[
        (anomaly_data['energy_mwh'] == 0) & 
        (anomaly_data['shortwave_radiation'] > 300) & 
        (anomaly_data['hour'] >= 6) & 
        (anomaly_data['hour'] <= 18)
    ]
    
    print(f"\nFound {len(anomalies):,} anomalies\n")
    
    if len(anomalies) > 0:
        anomaly_summary = anomalies.groupby('facility_code').agg({
            'energy_mwh': 'count',
            'shortwave_radiation': 'mean'
        }).rename(columns={'energy_mwh': 'anomaly_count', 'shortwave_radiation': 'avg_radiation'})
        
        print(anomaly_summary.to_string())
        print(f"\n  Sample anomaly records:")
        print(anomalies[['facility_code', 'date_hour', 'energy_mwh', 'shortwave_radiation', 'quality_flag']].head(10).to_string(index=False))
    else:
        print("  ‚úÖ No anomalies found!")
else:
    print("‚ùå Data not loaded")


## 4Ô∏è‚É£ Weather Data Validation - Physical Bounds Check

In [None]:
# Weather data validation
print("üå°Ô∏è  WEATHER DATA STATISTICS & BOUNDS CHECK:")
print("=" * 100)

if weather_df is not None:
    metrics = {
        'Temperature (¬∞C)': ('temperature_2m', -50, 60),
        'Radiation (W/m¬≤)': ('shortwave_radiation', 0, 1361),
        'Cloud Cover (%)': ('cloud_cover', 0, 100)
    }
    
    print()
    for metric_name, (column, min_bound, max_bound) in metrics.items():
        if column in weather_df.columns:
            data = weather_df[column]
            out_of_bounds = len(data[(data < min_bound) | (data > max_bound)])
            
            print(f"  {metric_name:25s}:")
            print(f"    Min: {data.min():10.2f}  |  Max: {data.max():10.2f}  |  Avg: {data.mean():10.2f}")
            print(f"    Out of bounds [{min_bound:6.2f}, {max_bound:6.2f}]: {out_of_bounds:6,} ({100*out_of_bounds/len(data):6.2f}%)")
            print()

# Check quality flags distribution for weather
print("\nüìä WEATHER QUALITY DISTRIBUTION:")
print("=" * 80)
if weather_df is not None:
    quality_dist = weather_df['quality_flag'].value_counts()
    total_records = len(weather_df)
    print(f"\nTotal records: {total_records:,}\n")
    for flag, count in quality_dist.items():
        pct = (count / total_records) * 100
        print(f"  {flag:10s}: {count:6,} records ({pct:6.2f}%)")
else:
    print("‚ùå Weather data not loaded")


## 5Ô∏è‚É£ Air Quality Data Validation - AQI & PM2.5 Analysis

In [None]:
# Air quality validation
aqi_stats_query = """
SELECT 
    'PM2.5' as metric,
    ROUND(MIN(pm2_5), 2) as min_val,
    ROUND(MAX(pm2_5), 2) as max_val,
    ROUND(AVG(pm2_5), 2) as mean_val,
    COUNT(CASE WHEN pm2_5 < 0 OR pm2_5 > 500 THEN 1 END) as out_of_bounds
FROM iceberg.silver.clean_hourly_air_quality
UNION ALL
SELECT 
    'PM10',
    ROUND(MIN(pm10), 2),
    ROUND(MAX(pm10), 2),
    ROUND(AVG(pm10), 2),
    COUNT(CASE WHEN pm10 < 0 OR pm10 > 500 THEN 1 END)
FROM iceberg.silver.clean_hourly_air_quality
UNION ALL
SELECT 
    'NO2',
    ROUND(MIN(nitrogen_dioxide), 2),
    ROUND(MAX(nitrogen_dioxide), 2),
    ROUND(AVG(nitrogen_dioxide), 2),
    COUNT(CASE WHEN nitrogen_dioxide < 0 OR nitrogen_dioxide > 500 THEN 1 END)
FROM iceberg.silver.clean_hourly_air_quality
UNION ALL
SELECT 
    'AQI Value',
    ROUND(MIN(aqi_value), 2),
    ROUND(MAX(aqi_value), 2),
    ROUND(AVG(aqi_value), 2),
    COUNT(CASE WHEN aqi_value < 0 OR aqi_value > 500 THEN 1 END)
FROM iceberg.silver.clean_hourly_air_quality
"""

print("üí® AIR QUALITY DATA STATISTICS & BOUNDS CHECK:")
print("=" * 100)
aqi_stats = load_trino_data(aqi_stats_query)
for line in aqi_stats:
    print(line)

# AQI distribution
aqi_dist_query = """
SELECT 
    CASE 
        WHEN aqi_value <= 50 THEN 'Good (0-50)'
        WHEN aqi_value <= 100 THEN 'Moderate (51-100)'
        WHEN aqi_value <= 200 THEN 'Unhealthy (101-200)'
        ELSE 'Hazardous (201+)'
    END as aqi_category,
    COUNT(*) as count,
    ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as pct
FROM iceberg.silver.clean_hourly_air_quality
WHERE aqi_value IS NOT NULL
GROUP BY aqi_category
ORDER BY aqi_value
"""

print("\nüìä AQI DISTRIBUTION BY CATEGORY:")
print("=" * 60)
aqi_dist = load_trino_data(aqi_dist_query)
for line in aqi_dist:
    print(line)

## 6Ô∏è‚É£ Data Completeness & Anomaly Summary

In [None]:
# Completeness check
completeness_query = """
SELECT 
    'Energy' as table_name,
    COUNT(*) as total_rows,
    COUNT(CASE WHEN energy_mwh IS NOT NULL THEN 1 END) as non_null_energy,
    COUNT(CASE WHEN energy_mwh < 0 THEN 1 END) as negative_energy,
    COUNT(CASE WHEN energy_mwh = 0 THEN 1 END) as zero_energy,
    COUNT(DISTINCT facility_code) as facilities,
    COUNT(DISTINCT DATE(date_hour)) as unique_dates
FROM iceberg.silver.clean_hourly_energy
UNION ALL
SELECT 
    'Weather',
    COUNT(*),
    COUNT(CASE WHEN shortwave_radiation IS NOT NULL THEN 1 END),
    COUNT(CASE WHEN shortwave_radiation < 0 THEN 1 END),
    COUNT(CASE WHEN shortwave_radiation = 0 THEN 1 END),
    COUNT(DISTINCT facility_code),
    COUNT(DISTINCT DATE(date_hour))
FROM iceberg.silver.clean_hourly_weather
UNION ALL
SELECT 
    'Air Quality',
    COUNT(*),
    COUNT(CASE WHEN pm2_5 IS NOT NULL THEN 1 END),
    COUNT(CASE WHEN pm2_5 < 0 THEN 1 END),
    COUNT(CASE WHEN aqi_value IS NULL THEN 1 END),
    COUNT(DISTINCT facility_code),
    COUNT(DISTINCT DATE(date_hour))
FROM iceberg.silver.clean_hourly_air_quality
"""

print("üìã DATA COMPLETENESS SUMMARY:")
print("=" * 140)
completeness = load_trino_data(completeness_query)
for line in completeness:
    print(line)

# Facility distribution
facility_query = """
SELECT 
    facility_code,
    COUNT(DISTINCT DATE(date_hour)) as unique_dates,
    COUNT(*) as total_records,
    ROUND(100.0 * COUNT(CASE WHEN quality_flag = 'GOOD' THEN 1 END) / COUNT(*), 2) as pct_good,
    COUNT(CASE WHEN quality_flag = 'CAUTION' THEN 1 END) as caution_count,
    COUNT(CASE WHEN quality_flag = 'REJECT' THEN 1 END) as reject_count
FROM iceberg.silver.clean_hourly_energy
GROUP BY facility_code
ORDER BY facility_code
"""

print("\nüè¢ FACILITY-LEVEL QUALITY METRICS:")
print("=" * 100)
facility_dist = load_trino_data(facility_query)
for line in facility_dist:
    print(line)

## 7Ô∏è‚É£ Key Findings & Recommendations

In [None]:
print("""
‚ïî‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïó
‚ïë               üéØ SILVER LAYER DATA QUALITY VALIDATION SUMMARY                ‚ïë
‚ïö‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïê‚ïù

‚úÖ IMPROVEMENTS IMPLEMENTED:
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
  1. Facility-specific capacity thresholds
     - COLEASF: 145 MWh (highest capacity)
     - Others: 115 MWh (standard)
  
  2. Quality flag enhancements:
     - Night energy anomaly detection (>1 MWh at 22-6h)
     - Daytime zero-energy detection (energy=0, radiation>300)
     - Peak hour validation (<5 MWh during 11-15h)
  
  3. Weather validation:
     - Temperature bounds: -50¬∞C to +60¬∞C
     - Radiation bounds: 0-1361 W/m¬≤
     - Cloud cover: 0-100%
  
  4. Air quality validation:
     - PM2.5, PM10, NO2, SO2, O3: 0-500 Œºg/m¬≥
     - AQI calculation and categorization
     - Hazard level flagging

‚úÖ EXPECTED RESULTS:
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
  - Energy-Radiation correlation: >0.85 (strong positive)
  - Quality GOOD percentage: >92% (up from 91.19%)
  - No negative energy values (physical bounds enforced)
  - No extreme radiation spikes (validation bounds applied)
  - Zero quality issues in flagged records

‚ö†Ô∏è AREAS TO MONITOR:
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
  1. Correlation by facility - ensure >0.85 for all
  2. Daytime zero-energy incidents - should be captured as CAUTION
  3. Night energy anomalies - should be properly flagged
  4. Weather radiation spikes - should trigger validation
  5. Missing data patterns - check for systematic gaps

üîç NEXT STEPS:
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
  [ ] Review correlation matrix - verify >0.85 thresholds
  [ ] Check anomaly flags - ensure detection is working
  [ ] Validate facility thresholds - check facility-specific distributions
  [ ] Cross-check with manual data samples
  [ ] If quality >92%, move to Phase 2 improvements (advanced rules)
  [ ] If issues found, iterate on detection logic

""")

print("‚úÖ VALIDATION COMPLETE - See results above")