01 â€“ Ingest & Signal Catalog (Prototype)

This notebook:

Loads raw SCADA wide + Events + optional Metadata

Builds signal_catalog (parsed headers + canonical signals + unit checks)

Writes Parquet snapshots:

outputs/stages/scada_wide.parquet

outputs/stages/events_raw.parquet

outputs/stages/metadata.parquet (if present)

outputs/stages/signal_catalog.parquet

Validation included:

row/column counts

unmapped signals

unit mismatch report

top plants discovered

In [None]:
import sys
from pathlib import Path

# Remove cached modules to force reload
for module in list(sys.modules.keys()):
    if "pv_fleet_health" in module:
        del sys.modules[module]

from pv_fleet_health.config import load_config_yaml  # noqa: E402
from pv_fleet_health.io import load_inputs, save_parquet  # noqa: E402
from pv_fleet_health.paths import Paths  # noqa: E402
from pv_fleet_health.scada_headers import build_signal_catalog  # noqa: E402

In [None]:
ROOT = Path("..").resolve()  # if running from notebooks/; adjust if needed
paths = Paths(ROOT)
paths.ensure()

cfg = load_config_yaml(str(ROOT / "config.yaml"))
print(cfg)
print(str(ROOT / cfg.scada_path))

Config(scada_path='C:\\00_Developement\\pv-fleet-health\\data\\scada_wide.csv', events_path='C:\\00_Developement\\pv-fleet-health\\data\\events.csv', metadata_path='C:\\00_Developement\\pv-fleet-health\\data\\plant_metadata.csv', timestamp_col='Timestamp', timestamp_format=None, default_timezone='Europe/Athens', standard_freq='15min', daylight_poa_threshold_wm2=50.0, poa_for_kpi_min_wm2=200.0, min_valid_poa_wm2=0.0, max_valid_poa_wm2=1400.0, min_valid_tmod_c=-20.0, max_valid_tmod_c=90.0, max_pf_abs=1.2, max_interp_gap_minutes=15, drop_if_missing_key_fraction=0.1, allow_interp_signals=frozenset({'poa_irradiance_wm2', 'tamb_c', 'tmod_c'}), allow_ffill_signals=frozenset(), counter_reset_negative_kwh_threshold=-0.01, clearsky_qc_quantile=0.98, clearsky_qc_min_points_per_day=50, model_min_points=2000, walkforward_train_days=60, walkforward_test_days=14, residual_z_threshold=4.0, rolling_window_days=7, selected_plant=None, random_seed=42)
C:\00_Developement\pv-fleet-health\data\scada_wide.cs

In [None]:
scada_wide, events_raw, metadata = load_inputs(cfg)

print("SCADA:", scada_wide.shape, "cols sample:", list(scada_wide.columns[:5]))
print("Events:", events_raw.shape, "cols:", list(events_raw.columns))
print("Metadata:", None if metadata is None else metadata.shape)

# Quick validation: timestamp coverage
print("SCADA ts min/max:", scada_wide[cfg.timestamp_col].min(), scada_wide[cfg.timestamp_col].max())

SCADA: (2400, 244) cols sample: ['Timestamp', '[Solar Concept 3721 KWp Lexaina] Array 1 AC output frequency error (Hz)', '[Solar Concept 3721 KWp Lexaina] Array 1 AC voltage unbalance (%)', '[Solar Concept 3721 KWp Lexaina] Array 1 Array active output power (kW)', '[Solar Concept 3721 KWp Lexaina] Array 1 Array apparent output power (kVA)']
Events: (31812, 8) cols: ['Severity', 'Description', 'State', 'Ack', 'Start Date', 'Duration', 'End Date', 'Source']
Metadata: None
SCADA ts min/max: 2025-12-01 00:15:00+02:00 2025-12-26 00:00:00+02:00


In [None]:
# Build signal catalog
signal_catalog = build_signal_catalog(list(scada_wide.columns), cfg.timestamp_col)

# Optional: debug mode plant filter
if cfg.selected_plant is not None:
    signal_catalog = signal_catalog[signal_catalog["plant_name"] == cfg.selected_plant].copy()

print("signal_catalog rows:", len(signal_catalog))
display(signal_catalog.head(10))

signal_catalog rows: 243


Unnamed: 0,raw_column_name,plant_name,component_type,component_id,raw_signal_name,unit_raw,unit,canonical_signal,mapped,pattern,expected_unit,unit_ok
0,[Solar Concept 3721 KWp Lexaina] Array 1 AC ou...,Solar Concept 3721 KWp Lexaina,array,1,AC output frequency error,Hz,Hz,ac_frequency_error_hz,True,(?i)\bAC output frequency error\b,Hz,True
1,[Solar Concept 3721 KWp Lexaina] Array 1 AC vo...,Solar Concept 3721 KWp Lexaina,array,1,AC voltage unbalance,%,%,ac_voltage_unbalance_pct,True,(?i)\bAC voltage unbalance\b,%,True
2,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array active output power,kW,kW,ac_power_kw,True,(?i)\bArray active output power\b,kW,True
3,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array apparent output power,kVA,kva,ac_power_kva,True,(?i)\bArray apparent output power\b,kva,True
4,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output current,A,A,ac_current_a,True,(?i)\bArray output current\b,A,True
5,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output current of phase L1,A,A,ac_current_a_L1,True,(?i)\bArray output current of phase (?P<phase>...,A,True
6,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output current of phase L2,A,A,ac_current_a_L2,True,(?i)\bArray output current of phase (?P<phase>...,A,True
7,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output current of phase L3,A,A,ac_current_a_L3,True,(?i)\bArray output current of phase (?P<phase>...,A,True
8,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output energy,kWh,kWh,energy_kwh_interval,True,(?i)\bArray output energy\b,kWh,True
9,[Solar Concept 3721 KWp Lexaina] Array 1 Array...,Solar Concept 3721 KWp Lexaina,array,1,Array output frequency,Hz,Hz,ac_frequency_hz,True,(?i)\bArray output frequency\b,Hz,True


In [None]:
# Validation reports
unmapped = signal_catalog[signal_catalog["canonical_signal"].str.startswith("unmapped::")]
unit_bad = signal_catalog[signal_catalog["mapped"] & ~signal_catalog["unit_ok"]]

print("Unmapped signals:", len(unmapped))
print("Unit mismatches:", len(unit_bad))

display(
    unmapped[
        [
            "plant_name",
            "component_type",
            "component_id",
            "raw_signal_name",
            "unit_raw",
            "raw_column_name",
        ]
    ].head(30)
)
display(
    unit_bad[
        ["plant_name", "raw_signal_name", "unit_raw", "expected_unit", "raw_column_name"]
    ].head(30)
)

print("Top plants found:", signal_catalog["plant_name"].value_counts().head(10).to_dict())

Unmapped signals: 0
Unit mismatches: 0


Unnamed: 0,plant_name,component_type,component_id,raw_signal_name,unit_raw,raw_column_name


Unnamed: 0,plant_name,raw_signal_name,unit_raw,expected_unit,raw_column_name


Top plants found: {'Solar Concept 3721 KWp Lexaina': 243}


In [None]:
# Save stage artifacts (Parquet)
save_parquet(scada_wide, str(paths.stage_dir / "scada_wide.parquet"))
save_parquet(events_raw, str(paths.stage_dir / "events_raw.parquet"))
if metadata is not None:
    save_parquet(metadata, str(paths.stage_dir / "metadata.parquet"))
save_parquet(signal_catalog, str(paths.stage_dir / "signal_catalog.parquet"))

print("Wrote stage outputs to:", paths.stage_dir)

Wrote stage outputs to: C:\00_Developement\pv-fleet-health\outputs\stages
