A power market data pipeline for ingesting, processing, and querying electricity market data from multiple Independent System Operators (ISOs).
- Multi-ISO Support: Fetch data from CAISO, IESO, and PJM (with more coming)
- Standardized Schemas: Unified data format across all ISOs for LMP, adequacy, and load data
- Two-Layer Storage: Raw files preserved for audit trails; processed Parquet for fast analytics
- DuckDB Queries: Query terabytes of data directly from Parquet without a separate database
- Dagster Orchestration: Schedule and monitor data pipelines with a web UI
- Validation Framework: Automatic data quality checks with retry logic
# Clone the repository
git clone https://github.com/yourusername/wattson.git
cd wattson
# Install with pip (editable mode for development)
pip install -e .
# Or with uv
uv pip install -e .-
Copy the environment template:
cp .env.example .env
-
Configure your R2/S3 credentials in
.env:R2_ENDPOINT_URL=https://YOUR_ACCOUNT_ID.r2.cloudflarestorage.com R2_ACCESS_KEY_ID=your_access_key R2_SECRET_ACCESS_KEY=your_secret_key R2_BUCKET_NAME=wattson-data
-
Enable feeds for each ISO in
config/{iso}.toml:[ieso] enabled = ["Adequacy3", "Demand"]
wattson discover ieso
wattson discover caisowattson feeds show # List all enabled feeds
wattson feeds enable ieso Adequacy3
wattson feeds disable caiso lmpwattson fetch ieso Adequacy3 --date 2025-02-08
wattson fetch caiso lmp --date 2025-02-08 --node TH_NP15_GEN-APNDwattson process ieso Adequacy3 --date 2025-02-08
wattson process caiso lmp --date 2025-02-08wattson run ieso --date 2025-02-08
wattson run caiso --date 2025-02-08wattson query summary
wattson query lmp --iso caiso --date 2025-02-07 --node NP15
wattson query sql "SELECT * FROM read_parquet('s3://wattson-data/processed/lmp/**/*.parquet') LIMIT 10"dagster devISO API → Fetch (raw bytes) → R2 raw storage
↓
Process (parse + validate)
↓
Parquet files → R2 processed storage
↓
DuckDB queries
| Layer | Path | Format |
|---|---|---|
| Raw | raw/{iso}/{datatype}/{year}/{month}/{day}/ |
Original (XML, CSV, ZIP) |
| Processed | processed/{datatype}/iso={iso}/year={year}/... |
Parquet (Hive partitioned) |
All ISOs produce unified output. Example LMP record:
| Field | Type | Description |
|---|---|---|
timestamp |
datetime | UTC timestamp |
iso |
string | caiso, ieso, pjm, etc. |
node_id |
string | Unique node identifier |
price_da |
float | Day-ahead price ($/MWh) |
price_rt |
float | Real-time price ($/MWh) |
| ISO | Status | Data Types |
|---|---|---|
| CAISO | Active | LMP (DAM), Load Forecast |
| IESO | Active | Adequacy, Demand |
| PJM | Scaffolded | Awaiting API access |
pytestruff check .
mypy src/src/wattson/
├── cli/ # Click-based CLI
├── core/ # Base classes, schemas, validation
├── providers/ # ISO-specific implementations
│ ├── caiso/
│ ├── ieso/
│ └── pjm/
├── storage/ # R2/S3 client
├── dagster/ # Orchestration assets
└── query/ # DuckDB interface
- Architecture Guide - Design decisions and tradeoffs
- Roadmap - Planned features and priorities
- Python 3.11+
- Dagster - Pipeline orchestration
- DuckDB - Embedded OLAP queries
- Parquet + PyArrow - Columnar storage
- Cloudflare R2 - S3-compatible object storage
- Pydantic - Data validation
MIT