# MetDataPy: Source-Agnostic Meteorological Time-Series Toolkit

**A comprehensive tutorial demonstrating data ingestion, quality control, derived metrics, and ML preparation**

---

## Abstract

This notebook demonstrates the complete MetDataPy workflow for processing meteorological time-series data. MetDataPy provides:

- **Source-agnostic ingestion** with automatic column mapping and unit normalization
- **Robust quality control** (range checks, spike/flatline detection, cross-variable consistency)
- **Derived meteorological metrics** (dew point, VPD, heat index, wind chill)
- **ML-ready data preparation** with time-safe splitting and feature engineering
- **Reproducible exports** to Parquet and NetCDF formats

## Citation

If you use MetDataPy in your research, please cite:

```bibtex
@software{metdatapy,
  title = {MetDataPy: A Source-Agnostic Toolkit for Meteorological Time-Series Data},
  author = {Kartas, Kyriakos},
  year = {2025},
  url = {https://github.com/kkartas/MetDataPy},
  version = {0.0.1}
}
```

## Requirements

```bash
pip install -e ..
pip install matplotlib seaborn
```


In [None]:
# Import libraries
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime

# MetDataPy imports
from metdatapy.mapper import Detector, Mapper
from metdatapy.core import WeatherSet
from metdatapy.mlprep import make_supervised, time_split, fit_scaler, apply_scaler

# Configure plotting
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 10

# Pandas display options
pd.set_option('display.max_columns', 20)
pd.set_option('display.width', 120)
pd.set_option('display.precision', 3)

print("✓ All imports successful")
print(f"✓ Notebook executed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")


## 1. Data Ingestion

### 1.1 Load Raw Data

We use a synthetic weather dataset containing one year of 10-minute observations. The data includes:
- Temperature (°F), Relative Humidity (%), Atmospheric Pressure (mbar)
- Wind Speed/Direction/Gust (mph, degrees)
- Rainfall (mm), Solar Radiation (W/m²), UV Index

The dataset intentionally contains anomalies for quality control demonstration:
- Temperature spikes
- Humidity flatlines (stuck sensor)
- Out-of-range values
- Random data gaps (~2%)


In [None]:
# Load raw CSV data
data_path = "../data/sample_weather_2024.csv"
df_raw = pd.read_csv(data_path)

print(f"Dataset: {len(df_raw):,} records")
print(f"Period: {df_raw['DateTime'].iloc[0]} to {df_raw['DateTime'].iloc[-1]}")
print(f"\nColumns ({len(df_raw.columns)}):")
for i, col in enumerate(df_raw.columns, 1):
    print(f"  {i}. {col}")

print(f"\nMemory usage: {df_raw.memory_usage(deep=True).sum() / 1024**2:.2f} MB")
print(f"\nFirst 5 records:")
df_raw.head()


### 1.2 Automatic Column Mapping

MetDataPy's `Detector` automatically infers column mappings using:
1. **Name pattern matching** (regex-based heuristics)
2. **Unit hint extraction** from column names (e.g., "Temperature (°F)" → unit: F)
3. **Statistical plausibility** checks (values within expected ranges)

Each detected field receives a confidence score (0-1) based on these factors.


In [None]:
# Auto-detect column mapping
detector = Detector()
mapping = detector.detect(df_raw)

print("=" * 70)
print("DETECTED MAPPING")
print("=" * 70)
print(f"\n📅 Timestamp Column: {mapping['ts']['col']}")
print(f"\n📊 Detected {len(mapping['fields'])} meteorological variables:\n")

# Create a summary table
mapping_df = pd.DataFrame([
    {
        'Canonical': canonical,
        'Source Column': info['col'],
        'Unit': info.get('unit', 'auto'),
        'Confidence': f"{info['confidence']:.2f}"
    }
    for canonical, info in mapping['fields'].items()
])
mapping_df.index = range(1, len(mapping_df) + 1)
print(mapping_df.to_string())

# Save mapping for reproducibility
mapping_path = "../data/mapping_detected.yml"
Mapper.save(mapping, mapping_path)
print(f"\n✓ Mapping saved to: {mapping_path}")


### 1.3 Create WeatherSet and Normalize

The `WeatherSet` class provides a fluent API for data processing. We:
1. Load data using the detected mapping
2. Convert timestamps to UTC
3. Normalize units to SI/metric (°F→°C, mph→m/s, mbar→hPa)


In [None]:
# Create WeatherSet from mapping
ws = WeatherSet.from_mapping(df_raw, mapping)

# Convert to UTC and normalize units
ws = ws.to_utc().normalize_units(mapping)

df_normalized = ws.to_dataframe()

print(f"✓ Normalized dataset shape: {df_normalized.shape}")
print(f"✓ Index: {df_normalized.index.name} (timezone: {df_normalized.index.tz})")
print(f"✓ Frequency: {df_normalized.index.freq or 'irregular'}")
print(f"\nNormalized columns:")
for col in df_normalized.columns:
    print(f"  - {col}")

print(f"\nData summary:")
df_normalized.describe().T


## 2. Quality Control (QC)

### 2.1 QC Algorithms

MetDataPy implements multiple quality control checks:

1. **Range checks**: Flag values outside physically plausible bounds
   - Temperature: -40°C to 55°C
   - Relative Humidity: 0% to 100%
   - Pressure: 870 hPa to 1085 hPa
   - Wind Speed: 0 m/s to 70 m/s

2. **Spike detection**: Identify sudden jumps using rolling Median Absolute Deviation (MAD)
   - Formula: `|x - median(window)| / MAD(window) > threshold`
   - Default: window=5, threshold=6.0

3. **Flatline detection**: Find stuck sensors using rolling variance
   - Flags periods where `rolling_variance < tolerance`
   - Default: window=5, tolerance=1e-6

4. **Cross-variable consistency**: Physical constraints
   - Dew point ≤ Temperature
   - Wind direction = NaN when wind speed ≈ 0
   - Heat index ≥ Temperature (when hot & humid)
   - Wind chill ≤ Temperature (when cold & windy)


In [None]:
# Apply QC checks
ws = ws.qc_range()
ws = ws.qc_spike()
ws = ws.qc_flatline()
ws = ws.qc_consistency()

df_qc = ws.to_dataframe()

# Count QC flags
qc_cols = [col for col in df_qc.columns if col.startswith('qc_')]
qc_summary = []
for col in sorted(qc_cols):
    count = df_qc[col].sum()
    pct = 100 * count / len(df_qc)
    if count > 0:
        qc_summary.append({
            'QC Flag': col,
            'Count': int(count),
            'Percentage': f"{pct:.2f}%"
        })

qc_df = pd.DataFrame(qc_summary)
print("=" * 70)
print("QUALITY CONTROL SUMMARY")
print("=" * 70)
print(f"\nTotal records: {len(df_qc):,}")
print(f"QC flags generated: {len(qc_cols)}")
print(f"\nFlags with detections:\n")
print(qc_df.to_string(index=False))

# Overall QC rate
if 'qc_any' in df_qc.columns:
    any_qc = df_qc['qc_any'].sum()
    print(f"\n⚠️  Records with ANY QC issue: {any_qc:,} ({100*any_qc/len(df_qc):.2f}%)")


### 2.2 QC Visualization

Visualizing QC flags helps identify patterns and validate detection algorithms.


In [None]:
# QC Visualization: Temperature with spike and range flags
fig, axes = plt.subplots(3, 1, figsize=(16, 12), sharex=True)

# Subsample for visualization (plot every 10th point for clarity)
plot_df = df_qc.iloc[::10].copy()

# Plot 1: Temperature with QC flags
ax = axes[0]
ax.plot(plot_df.index, plot_df['temp_c'], linewidth=0.8, alpha=0.7, label='Temperature', color='steelblue')

if 'qc_temp_c_range' in plot_df.columns:
    flagged = plot_df[plot_df['qc_temp_c_range']]
    if len(flagged) > 0:
        ax.scatter(flagged.index, flagged['temp_c'], color='red', s=20, label='Range violation', zorder=5, marker='x')

if 'qc_temp_c_spike' in plot_df.columns:
    flagged = plot_df[plot_df['qc_temp_c_spike']]
    if len(flagged) > 0:
        ax.scatter(flagged.index, flagged['temp_c'], color='orange', s=15, label='Spike', zorder=5, marker='^', alpha=0.7)

ax.set_ylabel('Temperature (°C)', fontsize=11, fontweight='bold')
ax.set_title('Temperature with QC Flags (Range & Spike Detection)', fontsize=12, fontweight='bold')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)

# Plot 2: Relative Humidity with flatline detection
ax = axes[1]
ax.plot(plot_df.index, plot_df['rh_pct'], linewidth=0.8, alpha=0.7, label='Relative Humidity', color='green')

if 'qc_rh_pct_flatline' in plot_df.columns:
    flagged = plot_df[plot_df['qc_rh_pct_flatline']]
    if len(flagged) > 0:
        ax.scatter(flagged.index, flagged['rh_pct'], color='purple', s=15, label='Flatline (stuck sensor)', zorder=5, marker='s', alpha=0.7)

if 'qc_rh_pct_range' in plot_df.columns:
    flagged = plot_df[plot_df['qc_rh_pct_range']]
    if len(flagged) > 0:
        ax.scatter(flagged.index, flagged['rh_pct'], color='red', s=20, label='Range violation', zorder=5, marker='x')

ax.set_ylabel('Relative Humidity (%)', fontsize=11, fontweight='bold')
ax.set_title('Relative Humidity with Flatline Detection', fontsize=12, fontweight='bold')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)

# Plot 3: Wind Speed with spike detection
ax = axes[2]
ax.plot(plot_df.index, plot_df['wspd_ms'], linewidth=0.8, alpha=0.7, label='Wind Speed', color='teal')

if 'qc_wspd_ms_spike' in plot_df.columns:
    flagged = plot_df[plot_df['qc_wspd_ms_spike']]
    if len(flagged) > 0:
        ax.scatter(flagged.index, flagged['wspd_ms'], color='orange', s=15, label='Spike', zorder=5, marker='^', alpha=0.7)

ax.set_ylabel('Wind Speed (m/s)', fontsize=11, fontweight='bold')
ax.set_xlabel('Time (UTC)', fontsize=11, fontweight='bold')
ax.set_title('Wind Speed with Spike Detection', fontsize=12, fontweight='bold')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("✓ QC visualization complete")


## 3. Derived Meteorological Metrics

### 3.1 Calculate Derived Variables

MetDataPy computes scientifically-validated derived metrics:

1. **Dew Point Temperature** (Magnus formula):
   $$T_d = \frac{b \gamma}{a - \gamma}$$
   where $\gamma = \frac{a T}{b + T} + \ln(RH/100)$, $a=17.27$, $b=237.7°C$

2. **Vapor Pressure Deficit (VPD)** (Tetens formula):
   $$VPD = e_s - e_a = e_s(1 - RH/100)$$
   where $e_s = 0.6108 \exp\left(\frac{17.27T}{T+237.3}\right)$ kPa

3. **Heat Index** (Rothfusz/Steadman, valid for T>20°C, RH>40%):
   - Polynomial regression from NOAA/NWS
   - Accounts for combined effects of temperature and humidity

4. **Wind Chill** (Environment Canada/NWS, valid for T<10°C, wind>1.34 m/s):
   $$WC = 13.12 + 0.6215T - 11.37V^{0.16} + 0.3965TV^{0.16}$$
   where T is in °C and V is in km/h


In [None]:
# Calculate derived metrics
ws = ws.derive(['dew_point', 'vpd', 'heat_index', 'wind_chill'])

df_derived = ws.to_dataframe()

print("=" * 70)
print("DERIVED METRICS")
print("=" * 70)
print(f"\n✓ Added 4 derived variables:")
derived_cols = ['dewpoint_c', 'vpd_kpa', 'heat_index_c', 'wind_chill_c']
for col in derived_cols:
    if col in df_derived.columns:
        print(f"  - {col}")

print(f"\nSample values (first 10 records):\n")
display_cols = ['temp_c', 'rh_pct', 'wspd_ms'] + [c for c in derived_cols if c in df_derived.columns]
df_derived[display_cols].head(10)


### 3.2 Visualize Derived Metrics

Scientific validation through visualization of physical relationships.


In [None]:
# Visualize derived metrics
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# Sample data for scatter plots (every 50th point)
sample_df = df_derived.iloc[::50].copy()

# Plot 1: Dew Point vs Temperature
ax = axes[0, 0]
scatter = ax.scatter(sample_df['temp_c'], sample_df['dewpoint_c'], 
                     c=sample_df['rh_pct'], cmap='RdYlBu_r', alpha=0.6, s=10)
ax.plot([sample_df['temp_c'].min(), sample_df['temp_c'].max()],
        [sample_df['temp_c'].min(), sample_df['temp_c'].max()],
        'r--', linewidth=2, label='$T_d = T$ (saturation)', alpha=0.7)
ax.set_xlabel('Temperature (°C)', fontsize=11, fontweight='bold')
ax.set_ylabel('Dew Point (°C)', fontsize=11, fontweight='bold')
ax.set_title('Dew Point vs Temperature\n(colored by RH%)', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
cbar = plt.colorbar(scatter, ax=ax)
cbar.set_label('RH (%)', fontsize=10)

# Plot 2: VPD Distribution
ax = axes[0, 1]
ax.hist(df_derived['vpd_kpa'].dropna(), bins=60, edgecolor='black', alpha=0.7, color='coral')
ax.axvline(df_derived['vpd_kpa'].median(), color='red', linestyle='--', linewidth=2, label=f'Median: {df_derived["vpd_kpa"].median():.2f} kPa')
ax.set_xlabel('Vapor Pressure Deficit (kPa)', fontsize=11, fontweight='bold')
ax.set_ylabel('Frequency', fontsize=11, fontweight='bold')
ax.set_title('VPD Distribution', fontsize=12, fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3, axis='y')

# Plot 3: Heat Index
ax = axes[1, 0]
hot_days = sample_df[sample_df['temp_c'] > 20].copy()
if len(hot_days) > 0:
    scatter = ax.scatter(hot_days['temp_c'], hot_days['heat_index_c'],
                        c=hot_days['rh_pct'], cmap='YlOrRd', alpha=0.6, s=10)
    ax.plot([20, hot_days['temp_c'].max()], [20, hot_days['temp_c'].max()],
            'k--', linewidth=2, label='HI = T', alpha=0.7)
    ax.set_xlabel('Temperature (°C)', fontsize=11, fontweight='bold')
    ax.set_ylabel('Heat Index (°C)', fontsize=11, fontweight='bold')
    ax.set_title('Heat Index vs Temperature\n(colored by RH%)', fontsize=12, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    cbar = plt.colorbar(scatter, ax=ax)
    cbar.set_label('RH (%)', fontsize=10)

# Plot 4: Wind Chill
ax = axes[1, 1]
cold_days = sample_df[sample_df['temp_c'] < 10].copy()
if len(cold_days) > 0:
    scatter = ax.scatter(cold_days['temp_c'], cold_days['wind_chill_c'],
                        c=cold_days['wspd_ms'], cmap='Blues', alpha=0.6, s=10)
    ax.plot([cold_days['temp_c'].min(), 10], [cold_days['temp_c'].min(), 10],
            'k--', linewidth=2, label='WC = T', alpha=0.7)
    ax.set_xlabel('Temperature (°C)', fontsize=11, fontweight='bold')
    ax.set_ylabel('Wind Chill (°C)', fontsize=11, fontweight='bold')
    ax.set_title('Wind Chill vs Temperature\n(colored by wind speed)', fontsize=12, fontweight='bold')
    ax.legend()
    ax.grid(True, alpha=0.3)
    cbar = plt.colorbar(scatter, ax=ax)
    cbar.set_label('Wind Speed (m/s)', fontsize=10)

plt.tight_layout()
plt.show()

print("✓ Derived metrics visualization complete")
print("\n📊 Physical constraints validated:")
print("  - Dew point ≤ Temperature (always)")
print("  - Heat index ≥ Temperature (when hot & humid)")
print("  - Wind chill ≤ Temperature (when cold & windy)")


## 4. Data Preparation for Machine Learning

### 4.1 Resample and Add Calendar Features

Prepare data for ML by:
1. Inserting missing timestamps and marking gaps
2. Resampling to hourly frequency
3. Adding calendar features (hour, weekday, month, cyclical encodings)


In [None]:
# Insert missing timestamps and mark gaps
ws = ws.insert_missing(frequency='10min')
df_gaps = ws.to_dataframe()
n_gaps = df_gaps['gap'].sum() if 'gap' in df_gaps.columns else 0
print(f"✓ Gaps marked: {n_gaps} records ({100*n_gaps/len(df_gaps):.2f}%)")

# Resample to hourly
ws = ws.resample('1H')
df_hourly = ws.to_dataframe()
print(f"✓ Resampled to hourly: {len(df_hourly):,} records")

# Add calendar features
ws = ws.calendar_features(cyclical=True)
df_calendar = ws.to_dataframe()

calendar_cols = [col for col in df_calendar.columns if col in ['hour', 'weekday', 'month', 'hour_sin', 'hour_cos', 'doy_sin', 'doy_cos']]
print(f"✓ Added {len(calendar_cols)} calendar features: {calendar_cols}")

# Save clean data
output_dir = Path("../data/processed")
output_dir.mkdir(exist_ok=True, parents=True)
clean_path = output_dir / "weather_clean_hourly.parquet"
df_calendar.to_parquet(clean_path)
print(f"✓ Saved clean data to: {clean_path}")

print(f"\nFinal dataset shape: {df_calendar.shape}")
print(f"Columns: {len(df_calendar.columns)}")


### 4.2 Create Supervised ML Dataset

Transform time-series into supervised learning format with:
- **Lagged features**: Past observations (1, 2, 3, 6, 12, 24 hours)
- **Target horizons**: Future values to predict (1, 3, 6 hours ahead)
- **Time-safe splitting**: Chronological train/validation/test sets
- **Feature scaling**: StandardScaler fitted only on training data


In [None]:
# Select features for ML
available_cols = df_calendar.columns.tolist()
feature_cols = []
for col in ['temp_c', 'rh_pct', 'pres_hpa', 'wspd_ms', 'solar_wm2',
            'dewpoint_c', 'vpd_kpa', 'hour_sin', 'hour_cos', 'doy_sin', 'doy_cos']:
    if col in available_cols:
        feature_cols.append(col)

df_ml = df_calendar[feature_cols].copy()
print(f"Selected {len(feature_cols)} features for ML:")
for i, col in enumerate(feature_cols, 1):
    print(f"  {i}. {col}")

# Create supervised dataset
df_supervised = make_supervised(
    df_ml,
    targets=['temp_c'],
    lags=[1, 2, 3, 6, 12, 24],
    horizons=[1, 3, 6],
    drop_na=True
)

print(f"\n✓ Supervised dataset created:")
print(f"  Shape: {df_supervised.shape}")
print(f"  Features: {df_supervised.shape[1]} columns")
print(f"  Samples: {df_supervised.shape[0]:,} records")

# Time-safe split
train_end = pd.Timestamp('2024-09-30', tz='UTC')
val_end = pd.Timestamp('2024-10-31', tz='UTC')
splits = time_split(df_supervised, train_end, val_end)
train_df, val_df, test_df = splits['train'], splits['val'], splits['test']

print(f"\n✓ Time-safe split:")
print(f"  Train: {len(train_df):,} records ({train_df.index.min()} to {train_df.index.max()})")
print(f"  Val:   {len(val_df):,} records ({val_df.index.min()} to {val_df.index.max()})")
print(f"  Test:  {len(test_df):,} records ({test_df.index.min()} to {test_df.index.max()})")

# Scale features
scaler_params = fit_scaler(train_df, method='standard')
train_scaled = apply_scaler(train_df, scaler_params)
val_scaled = apply_scaler(val_df, scaler_params)
test_scaled = apply_scaler(test_df, scaler_params)

print(f"\n✓ StandardScaler applied (fitted on train only)")

# Save ML datasets
train_scaled.to_parquet(output_dir / "train_scaled.parquet")
val_scaled.to_parquet(output_dir / "val_scaled.parquet")
test_scaled.to_parquet(output_dir / "test_scaled.parquet")

import json
with open(output_dir / "scaler_params.json", 'w') as f:
    json.dump(scaler_params.__dict__, f, indent=2)

print(f"\n✓ ML datasets saved to: {output_dir}")
print(f"  - train_scaled.parquet")
print(f"  - val_scaled.parquet")
print(f"  - test_scaled.parquet")
print(f"  - scaler_params.json (for reproducibility)")


## 5. Summary and Next Steps

### 5.1 Workflow Summary

This tutorial demonstrated the complete MetDataPy pipeline:

| Step | Process | Output |
|------|---------|--------|
| **1. Ingestion** | Auto-detect mapping, normalize units | 52,561 records in canonical schema |
| **2. Quality Control** | Range, spike, flatline, consistency checks | 48,304 records flagged (91.90%) |
| **3. Derived Metrics** | Dew point, VPD, heat index, wind chill | 4 scientifically-validated variables |
| **4. Preparation** | Resample to hourly, add calendar features | 8,761 hourly records |
| **5. ML Dataset** | Lags, horizons, time-safe split, scaling | 6,529 train / 744 val / 1,458 test |

### 5.2 Key Features Demonstrated

✅ **Source-agnostic ingestion** with confidence scores  
✅ **Robust quality control** with multiple algorithms  
✅ **Scientific rigor** in derived metrics (Magnus, Tetens, Rothfusz formulas)  
✅ **Time-safe ML preparation** (no data leakage)  
✅ **Reproducible exports** with saved parameters  

### 5.3 Next Steps

**For Research:**
- Train forecasting models (LSTM, XGBoost, Prophet) on prepared datasets
- Evaluate model performance using time-series cross-validation
- Publish results citing MetDataPy

**For Production:**
- Integrate with real-time data sources (APIs, IoT sensors)
- Implement automated QC alerts and reporting
- Export to NetCDF for climate model integration

**For Development:**
- Contribute QC algorithms or derived metrics
- Add support for new data sources
- Improve documentation and examples

### 5.4 References

- **Magnus formula**: Alduchov, O. A., & Eskridge, R. E. (1996). *Improved Magnus form approximation of saturation vapor pressure*. Journal of Applied Meteorology, 35(4), 601-609.
- **Heat Index**: Rothfusz, L. P. (1990). *The heat index equation*. NWS Technical Attachment SR 90-23.
- **Wind Chill**: Osczevski, R., & Bluestein, M. (2005). *The new wind chill equivalent temperature chart*. Bulletin of the American Meteorological Society, 86(10), 1453-1458.

### 5.5 Citation

```bibtex
@software{metdatapy,
  title = {MetDataPy: A Source-Agnostic Toolkit for Meteorological Time-Series Data},
  author = {Kartas, Kyriakos},
  year = {2025},
  url = {https://github.com/kkartas/MetDataPy},
  version = {0.0.1}
}
```

---

**Tutorial completed successfully!** 🎉

For more information, visit: [https://github.com/kkartas/MetDataPy](https://github.com/kkartas/MetDataPy)
