In [1]:
!pip install -q pandas numpy pyarrow pandarallel altair

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pandarallel (setup.py) ... [?25l[?25hdone


In [3]:
# =============================================================================
# MONADSQUISHY: DEDE THAILAND DATA LAKEHOUSE PIPELINE
# =============================================================================
# This code implements the DEDE Data Lakehouse Pipeline.
# FOCUS: Full 30 Test Cases (Group A-D), Chart 1 Only, Top 5 Dirty Values Tooltip.

import pandas as pd
import numpy as np
import altair as alt
import re
from datetime import datetime, timedelta

# Try to use pandarallel for speed
try:
    from pandarallel import pandarallel
    pandarallel.initialize(progress_bar=False, verbose=0)
    HAS_PARALLEL = True
except ImportError:
    HAS_PARALLEL = False

# Ensure Altair renders
try:
    alt.renderers.enable('default')
except:
    pass

# =============================================================================
# 1. CORE ENGINE
# =============================================================================

TYPE_SPECS = {
    "string":   {"default": None,   "dtype": "string",  "coerce": lambda x: x.astype(str)},
    "float":    {"default": np.nan, "dtype": "float64", "coerce": lambda x: pd.to_numeric(x, errors='coerce')},
    "integer":  {"default": 0,      "dtype": "Int64",   "coerce": lambda x: pd.to_numeric(x, errors='coerce').astype('Int64')},
    "datetime": {"default": pd.NaT, "dtype": "datetime64[ns]", "coerce": lambda x: pd.to_datetime(x, errors='coerce')},
    "boolean":  {"default": False,  "dtype": "bool",    "coerce": lambda x: x.astype(bool)}
}

class Monad:
    __slots__ = ['value', 'input_row', 'output_column', 'status', 'final_value', 'logs', 'stopped', 'step']

    def __init__(self, value, row, col):
        self.value = value
        self.input_row = row
        self.output_column = col
        self.status = 'pending'
        self.final_value = None
        self.logs = []
        self.stopped = False
        self.step = 0

    def _log(self, func, status, details=None):
        role = getattr(func, '_role', 'validator')
        name = getattr(func, '__name__', 'unknown')
        self.logs.append({
            'row': self.input_row,
            'col': self.output_column,
            'step': self.step,
            'role_type': role,
            'role': name,
            'status': status,
            'details': details
        })

    def __or__(self, func):
        if self.stopped:
            self.step += 1
            return self

        self.step += 1
        role = getattr(func, '_role', 'validator')

        try:
            res = func(self.value)
            if role == 'validator':
                self.value = res
                self.status = 'valid'
                self._log(func, 'passed')
            elif role == 'transformer':
                self.value = res
                self.status = 'success'
                self.final_value = res
                self.stopped = True
                self._log(func, 'passed')
        except Exception as e:
            if role == 'validator':
                self.status = 'dirty'
                self.final_value = None
                self.stopped = True
                self._log(func, 'failed', str(e))
            elif role == 'transformer':
                self._log(func, 'failed', str(e))
        return self

    def apply(self, pipeline):
        transformer_seen = False
        for func in pipeline:
            if getattr(func, '_role', 'validator') == 'transformer':
                transformer_seen = True
            self | func

        if self.status == 'success': pass
        elif self.status == 'dirty': self.final_value = None
        elif self.status in ['valid', 'pending']:
            if transformer_seen:
                self.status = 'dirty'
                self.final_value = None
                self.logs.append({
                    'row': self.input_row, 'col': self.output_column, 'step': self.step + 1,
                    'role_type': 'system', 'role': 'chain_exhausted', 'status': 'failed', 'details': 'All transformers failed'
                })
            else:
                self.status = 'success'
                self.final_value = self.value
        return self

class SquishyEngine:
    def __init__(self, config_list, source_df):
        self.config = config_list
        self.df = source_df
        self.logs = []
        self.final_df = None

    def run(self):
        print(f"Processing {len(self.df)} rows across {len(self.config)} columns...")
        final_df = pd.DataFrame(index=self.df.index)
        all_logs = []

        for col_def in self.config:
            target = col_def['target']
            source = col_def.get('source', target)
            pipeline = col_def['pipeline']
            spec = TYPE_SPECS.get(col_def.get('type', 'string'))

            # Handle Multi-Column Source
            if isinstance(source, list):
                if not all(col in self.df.columns for col in source):
                    print(f"Skipping {target}: One or more sources missing from {source}")
                    continue
            else:
                if source not in self.df.columns:
                    print(f"Skipping {target}: Source {source} not found.")
                    continue

            def apply_logic(row):
                if isinstance(source, list): val = row[source]
                else: val = row[source]

                m = Monad(val, row.name, target).apply(pipeline)
                val_out = m.final_value if m.status == 'success' else spec['default']
                return (val_out, m.logs)

            if HAS_PARALLEL: results = self.df.parallel_apply(apply_logic, axis=1)
            else: results = self.df.apply(apply_logic, axis=1)

            final_df[target] = results.apply(lambda x: x[0])
            col_log_lists = results.apply(lambda x: x[1]).tolist()
            for log_list in col_log_lists: all_logs.extend(log_list)

            if target in final_df.columns:
                 final_df[target] = spec['coerce'](final_df[target])

        self.final_df = final_df
        self.logs = pd.DataFrame(all_logs)
        return final_df

def validator(f): f._role = 'validator'; return f
def transformer(f): f._role = 'transformer'; return f

# =============================================================================
# 2. FULL 30 TEST CASES (Group A-D)
# =============================================================================

# --- GROUP A: BRONZE INGESTION ---
@validator
def TC01_Null_Check(v):
    if pd.isna(v): raise ValueError("Missing Value")
    if isinstance(v, str):
        if v.strip() == '' or v.lower() in ['null', 'n/a', '-', 'tbd', 'nan']:
            raise ValueError("Missing Value (Literal)")
    return v

@transformer
def TC02_Thai_Year_Conversion(v):
    s = str(v).strip()
    match = re.search(r'(\d{4})', s)
    if match:
        year = int(match.group(1))
        if year > 2400:
            new_year = year - 543
            return pd.to_datetime(s.replace(str(year), str(new_year)), errors='coerce')
    return pd.to_datetime(v, errors='coerce')

@transformer
def TC03_Currency_Sanitization(v):
    s = str(v).upper().replace('THB', '').replace('฿', '').replace(',', '').strip()
    try: return float(s)
    except: raise ValueError("Invalid Currency Format")

@validator
def TC04_Thai_Char_Enforcement(v):
    if not re.search(r'[\u0E00-\u0E7F]', str(v)): raise ValueError("Thai Characters Required")
    return v

@validator
def TC05_English_Cleanliness(v):
    if re.search(r'[\u0E00-\u0E7F]', str(v)): raise ValueError("English Characters Only Required")
    return v

@validator
def TC06_Thailand_Geofence(v):
    val = float(v)
    if not (5.0 <= val <= 21.0) and not (97.0 <= val <= 106.0): raise ValueError("Coordinates outside Thailand")
    return v

@transformer
def TC07_Fuel_Standardization(v):
    s = str(v).lower()
    if any(x in s for x in ['solar', 'sun', 'pv']): return 'Solar'
    if 'wind' in s: return 'Wind'
    if 'hydro' in s: return 'Hydro'
    if 'biomass' in s: return 'Biomass'
    raise ValueError("Unknown Fuel Type")

@validator
def TC08_Plant_Code_Format(v):
    if not re.match(r'^PLT-\d{4}$', str(v)): raise ValueError("Invalid Plant Code Format")
    return v

# --- GROUP B: FACT TABLE LOGIC ---
@validator
def TC09_Positive_Capacity(v):
    if float(v) <= 0: raise ValueError("Capacity must be positive")
    return v

@validator
def TC10_Non_Negative_Generation(v):
    if float(v) < 0: raise ValueError("Generation cannot be negative")
    return v

@validator
def TC11_Physics_Grid_Check(row):
    gross = float(row['gross_generation_mwh'])
    sold = float(row['sold_to_grid_mwh'])
    if sold > gross: raise ValueError(f"Physics Violation: Sold > Gross")
    return sold

@validator
def TC12_Curtailment_Cap(row):
    gross = float(row['gross_generation_mwh'])
    curt = float(row['curtailment_mwh'])
    if curt > gross: raise ValueError("Curtailment exceeds Generation")
    return curt

@validator
def TC13_Availability_Factor(v):
    f = float(v)
    if not (0.0 <= f <= 1.0): raise ValueError("Factor must be 0-1")
    return v

@validator
def TC14_CO2_Calculation(row):
    gross = float(row['gross_generation_mwh'])
    co2 = float(row['co2_avoided_tonnes'])
    expected = gross * 0.5 # Mock Factor
    if not (expected * 0.5 <= co2 <= expected * 1.5): # Loose tolerance for demo
        raise ValueError("CO2 Calculation Mismatch")
    return co2

@validator
def TC15_Uniqueness_Composite(row):
    key = f"{row['report_date']}_{row['plant_code']}"
    if key == "2024-01-01_PLT-0000": raise ValueError("Duplicate Fact Record") # Mock
    return key

@validator
def TC16_Subsidy_Policy(row):
    pol = str(row['policy_key'])
    sub = float(row['subsidy_paid_thb'])
    if pol == 'None' and sub > 0: raise ValueError("Subsidy paid without Policy")
    return sub

# --- GROUP C: DIMENSIONAL INTEGRITY ---
@validator
def TC17_Valid_Region(v):
    if v not in ['North', 'South', 'East', 'West', 'Central', 'North-East']: raise ValueError("Invalid Region")
    return v

@validator
def TC18_Year_Sync(row):
    ad = int(row['year_ad'])
    be = int(row['year_be'])
    if be != ad + 543: raise ValueError("Year AD/BE Mismatch")
    return be

@validator
def TC19_Future_Date_Block(v):
    if v > pd.Timestamp.now(): raise ValueError("Future Date")
    return v

@validator
def TC20_COD_Validity(row):
    cod = pd.to_datetime(row['cod_date'])
    rep = pd.to_datetime(row['report_date'])
    if cod > rep: raise ValueError("Report date before COD")
    return cod

@validator
def TC21_Active_Status(v):
    if v not in ['Active', 'Inactive', 'Construction']: raise ValueError("Invalid Plant Status")
    return v

@validator
def TC22_Renewable_Only(v):
    if v in ['Coal', 'Gas', 'Nuclear']: raise ValueError("Non-Renewable Fuel")
    return v

@validator
def TC23_Adder_Rate_Cap(v):
    if float(v) > 10.0: raise ValueError("Adder Rate Exceeds Cap")
    return v

@validator
def TC24_Grid_Authority_Map(row):
    prov = str(row['province_en'])
    auth = str(row['grid_authority'])
    if prov == 'Bangkok' and auth != 'MEA': raise ValueError("Bangkok must be MEA")
    if prov != 'Bangkok' and auth == 'MEA': raise ValueError("Upcountry cannot be MEA")
    return auth

# --- GROUP D: CROSS-REFERENCE ---
@validator
def TC25_Orphan_Plant_Key(v):
    if v not in [101, 102, 103, 999]: raise ValueError("Orphan Plant Key")
    return v

@validator
def TC26_Data_Freshness(v):
    d = pd.to_datetime(v)
    if (pd.Timestamp.now() - d).days > 30: raise ValueError("Data Stale (>30 Days)")
    return v

@validator
def TC27_Owner_Type_Consistency(v):
    if v not in ['Private', 'State', 'Public']: raise ValueError("Invalid Owner Type")
    return v

@validator
def TC28_Solar_Irradiance_Check(row):
    fuel = row['fuel_raw']
    zone = row['solar_irradiance_zone']
    if 'Solar' in fuel and pd.isna(zone): raise ValueError("Solar missing Irradiance Zone")
    return fuel

@validator
def TC29_District_Hierarchy(row):
    if row['province_en'] == 'Bangkok' and 'Chiang' in row['district_th']:
        raise ValueError("District mismatch")
    return row['district_th']

@validator
def TC30_Generation_Outlier(row):
    gen = float(row['gross_generation_mwh'])
    cap = float(row['installed_capacity_mw'])
    max_possible = cap * 24 * 31
    if gen > max_possible: raise ValueError("Generation exceeds Physical Limit")
    return gen

# =============================================================================
# 3. PIPELINE CONFIGURATION (ALL 30)
# =============================================================================

dede_pipeline_config = [
    # Group A
    {"target": "clean_plant_id", "source": "plant_code", "pipeline": [TC01_Null_Check, TC08_Plant_Code_Format]},
    {"target": "clean_date", "source": "report_date", "type": "datetime", "pipeline": [TC01_Null_Check, TC02_Thai_Year_Conversion, TC19_Future_Date_Block]},
    {"target": "clean_fuel", "source": "fuel_raw", "pipeline": [TC01_Null_Check, TC07_Fuel_Standardization, TC22_Renewable_Only]},
    {"target": "province_th_std", "source": "prov_th_raw", "pipeline": [TC01_Null_Check, TC04_Thai_Char_Enforcement]},
    {"target": "province_en_std", "source": "province_en", "pipeline": [TC01_Null_Check, TC05_English_Cleanliness]},
    {"target": "clean_lat", "source": "latitude", "type": "float", "pipeline": [TC01_Null_Check, TC06_Thailand_Geofence]},
    {"target": "clean_currency", "source": "subsidy_raw_text", "type": "float", "pipeline": [TC03_Currency_Sanitization]},

    # Group B
    {"target": "valid_capacity", "source": "installed_capacity_mw", "type": "float", "pipeline": [TC01_Null_Check, TC09_Positive_Capacity]},
    {"target": "valid_gen", "source": "gross_generation_mwh", "type": "float", "pipeline": [TC01_Null_Check, TC10_Non_Negative_Generation]},
    {"target": "valid_sold", "source": ["gross_generation_mwh", "sold_to_grid_mwh"], "type": "float", "pipeline": [TC11_Physics_Grid_Check]},
    {"target": "valid_curt", "source": ["gross_generation_mwh", "curtailment_mwh"], "type": "float", "pipeline": [TC12_Curtailment_Cap]},
    {"target": "valid_avail", "source": "availability_factor", "type": "float", "pipeline": [TC13_Availability_Factor]},
    {"target": "valid_co2", "source": ["gross_generation_mwh", "co2_avoided_tonnes"], "type": "float", "pipeline": [TC14_CO2_Calculation]},
    {"target": "unique_fact", "source": ["report_date", "plant_code"], "pipeline": [TC15_Uniqueness_Composite]},
    {"target": "valid_sub_pol", "source": ["policy_key", "subsidy_paid_thb"], "type": "float", "pipeline": [TC16_Subsidy_Policy]},

    # Group C
    {"target": "valid_region", "source": "region", "pipeline": [TC17_Valid_Region]},
    {"target": "year_sync", "source": ["year_ad", "year_be"], "type": "integer", "pipeline": [TC18_Year_Sync]},
    {"target": "valid_cod", "source": ["cod_date", "report_date"], "type": "datetime", "pipeline": [TC20_COD_Validity]},
    {"target": "valid_status", "source": "plant_status", "pipeline": [TC21_Active_Status]},
    {"target": "valid_adder", "source": "adder_rate", "type": "float", "pipeline": [TC23_Adder_Rate_Cap]},
    {"target": "valid_auth", "source": ["province_en", "grid_authority"], "pipeline": [TC24_Grid_Authority_Map]},

    # Group D
    {"target": "fk_check", "source": "plant_key", "pipeline": [TC25_Orphan_Plant_Key]},
    {"target": "freshness", "source": "last_updated", "type": "datetime", "pipeline": [TC26_Data_Freshness]},
    {"target": "owner_check", "source": "owner_type", "pipeline": [TC27_Owner_Type_Consistency]},
    {"target": "solar_tech", "source": ["fuel_raw", "solar_irradiance_zone"], "pipeline": [TC28_Solar_Irradiance_Check]},
    {"target": "district_chk", "source": ["province_en", "district_th"], "pipeline": [TC29_District_Hierarchy]},
    {"target": "outlier_chk", "source": ["gross_generation_mwh", "installed_capacity_mw"], "type": "float", "pipeline": [TC30_Generation_Outlier]}
]

# =============================================================================
# 4. DATA GENERATION (RICH MOCK DATA)
# =============================================================================

def generate_dede_data(n=600):
    print(f"Generating {n} rows with full DEDE schema coverage...")
    df = pd.DataFrame()

    # Basic
    df['plant_code'] = np.random.choice(['PLT-1001', 'PLT-1002', 'BAD-CODE', 'PLT-0000'], n)
    df['report_date'] = np.random.choice(['2024-01-01', '01/01/2567', '2099-01-01', 'TBD'], n)
    df.iloc[0] = ['PLT-0000', '2024-01-01'] # Force TC15 duplicate

    # Locations
    df['prov_th_raw'] = np.random.choice(['เชียงใหม่', 'Bangkok', 'China', '???'], n)
    df['province_en'] = np.random.choice(['Bangkok', 'Chiang Mai'], n)
    df['district_th'] = np.random.choice(['Mueang Chiang Mai', 'Chatuchak'], n)
    df['region'] = np.random.choice(['North', 'Mars', 'Central'], n)
    df['latitude'] = np.random.choice([13.75, 99.9], n)
    df['grid_authority'] = np.random.choice(['MEA', 'PEA'], n)

    # Tech
    df['fuel_raw'] = np.random.choice(['Solar', 'Sun', 'Coal', 'Wind', 'Gas'], n)
    df['solar_irradiance_zone'] = np.random.choice(['Zone 1', np.nan], n)
    df['plant_status'] = np.random.choice(['Active', 'Exploded', 'Construction'], n)
    df['plant_key'] = np.random.choice([101, 102, 103, 0], n)

    # Physics
    df['installed_capacity_mw'] = np.random.uniform(1, 10, n)
    df['gross_generation_mwh'] = df['installed_capacity_mw'] * 5 * np.random.uniform(0.5, 1.5, n)
    df['sold_to_grid_mwh'] = df['gross_generation_mwh'] * np.random.uniform(0.9, 1.2, n) # >1.0 fails TC11
    df['curtailment_mwh'] = df['gross_generation_mwh'] * np.random.uniform(0.0, 1.2, n)
    df['availability_factor'] = np.random.uniform(0.8, 1.2, n)
    df['co2_avoided_tonnes'] = df['gross_generation_mwh'] * 0.5 * np.random.uniform(0.1, 2.0, n)

    # Policy / Dates
    df['policy_key'] = np.random.choice(['FiT', 'None'], n)
    df['subsidy_paid_thb'] = np.where(df['policy_key'] == 'None', np.random.choice([0, 1000], n), 5000)
    df['subsidy_raw_text'] = np.random.choice(['฿5,000', '1000', 'Ten Baht', '???'], n)
    df['adder_rate'] = np.random.choice([0.5, 12.0], n)

    df['year_ad'] = 2024
    df['year_be'] = np.random.choice([2567, 2024], n)
    df['cod_date'] = np.random.choice(['2023-01-01', '2025-01-01'], n)
    df['last_updated'] = np.random.choice([pd.Timestamp.now(), pd.Timestamp.now() - pd.Timedelta(days=100)], n)
    df['owner_type'] = np.random.choice(['Private', 'State', 'Alien'], n)

    return df

# =============================================================================
# 5. DASHBOARD: CHART 1 ONLY (WITH BAD INPUT TOOLTIPS)
# =============================================================================

def get_top_bad_inputs(engine, df_source, target_col, config):
    source_col = None
    for c in config:
        if c['target'] == target_col:
            source_col = c.get('source', target_col)
            break

    if not source_col: return "N/A"

    # Filter logs: exclude generic missing
    col_logs = engine.logs[
        (engine.logs['col'] == target_col) &
        (engine.logs['status'] == 'failed') &
        (engine.logs['role'] != 'TC01_Null_Check')
    ]

    if col_logs.empty: return "None"

    row_indices = col_logs['row'].values
    # Handle multi-column source
    if isinstance(source_col, list):
        valid_cols = [c for c in source_col if c in df_source.columns]
        if not valid_cols: return "Error"
        raw_values = df_source.loc[row_indices, valid_cols].astype(str).agg(' | '.join, axis=1)
    else:
        if source_col not in df_source.columns: return "Error"
        raw_values = df_source.loc[row_indices, source_col].astype(str)

    top_5 = raw_values.value_counts().head(5)
    summary_parts = [f"'{val}' ({count})" for val, count in top_5.items()]
    return ", ".join(summary_parts)

def show_chart1_only(engine, df_orig, config):
    if engine.logs.empty: print("No logs."); return

    print("\nGenerating Chart 1 (Data Quality Summary)...")
    summary_data = []

    for col in engine.logs['col'].unique():
        col_logs = engine.logs[engine.logs['col'] == col]

        missing = col_logs[(col_logs['role'] == 'TC01_Null_Check') & (col_logs['status'] == 'failed')]['row'].nunique()
        dirty_logs = col_logs[(col_logs['status'] == 'failed') & (col_logs['role'] != 'TC01_Null_Check')]
        dirty = dirty_logs['row'].nunique()
        passed = len(df_orig) - missing - dirty

        bad_inputs_str = get_top_bad_inputs(engine, df_orig, col, config) if dirty > 0 else "N/A"

        summary_data.append({'Column': col, 'Status': 'Passed', 'Count': passed, 'Top_Issues': "N/A"})
        summary_data.append({'Column': col, 'Status': 'Missing', 'Count': missing, 'Top_Issues': "Null/Empty values"})
        summary_data.append({'Column': col, 'Status': 'Invalid', 'Count': dirty, 'Top_Issues': bad_inputs_str})

    df_sum = pd.DataFrame(summary_data)
    status_order = {'Passed': 0, 'Invalid': 1, 'Missing': 2}
    df_sum['rank'] = df_sum['Status'].map(status_order)

    chart = alt.Chart(df_sum, title="1. Data Quality Summary (All 30 Test Cases)").mark_bar().encode(
        x=alt.X('Count:Q', stack='normalize', axis=alt.Axis(format='%')),
        y=alt.Y('Column:N', sort='ascending'),
        color=alt.Color('Status:N',
                        scale=alt.Scale(domain=['Passed', 'Missing', 'Invalid'],
                                        range=['#2ca02c', '#ff7f0e', '#d62728'])),
        order=alt.Order('rank', sort='ascending'),
        tooltip=[
            alt.Tooltip('Column', title='Column'),
            alt.Tooltip('Status', title='Status'),
            alt.Tooltip('Count', title='Row Count'),
            alt.Tooltip('Top_Issues', title='Top 5 Dirty Values')
        ]
    ).properties(width=700, height=800) # Increased height for 30 columns

    display(chart)

if __name__ == "__main__":
    df = generate_dede_data(600)
    engine = SquishyEngine(dede_pipeline_config, df)
    res = engine.run()

    show_chart1_only(engine, df, dede_pipeline_config)

Generating 600 rows with full DEDE schema coverage...
Processing 600 rows across 27 columns...

Generating Chart 1 (Data Quality Summary)...
