# EIA-860 Generator Data Ingestion (PySpark)

This notebook:
- Pulls EIA-860 operating generator capacity data via API (last 15 years)
- Uses PySpark for data processing (local mode for now, scales to cluster later)
- Saves cleaned data to Parquet format
- Handles pagination, retries, and resume capability

**Requirements:**
- `.env` file with `EIA_API_KEY=YOUR_KEY`
- Java 11+ installed (we're using Temurin 17)
- PySpark installed (`pip install pyspark`)

In [1]:
import os
import sys
import time
import requests
from pathlib import Path
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

# Load environment variables
load_dotenv()
EIA_API_KEY = os.getenv("EIA_API_KEY")

print(f"API key loaded: {bool(EIA_API_KEY)}")
print(f"Java version check: {os.environ.get('JAVA_HOME', 'Not set - using system default')}")

API key loaded: True
Java version check: Not set - using system default


In [2]:
# Tell Spark where Python is
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session (local mode)
spark = SparkSession.builder \
    .appName("EIA_Generator_Ingestion") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.hadoop.fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem") \
    .master("local[*]") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark master: {spark.sparkContext.master}")

Spark version: 4.1.1
Spark master: local[*]


In [3]:
# Define paths
RAW_DATA_DIR = Path("../data/raw")
PROCESSED_DATA_DIR = Path("../data/processed")

RAW_DATA_DIR.mkdir(parents=True, exist_ok=True)
PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)

RAW_PARQUET_PATH = RAW_DATA_DIR / "eia_generators_raw.parquet"
PROCESSED_PARQUET_PATH = PROCESSED_DATA_DIR / "eia_generators_cleaned.parquet"

In [4]:
# EIA API configuration
BASE_URL = "https://api.eia.gov/v2/electricity/operating-generator-capacity/data/"

PARAMS = {
    "api_key": EIA_API_KEY,
    "frequency": "monthly",
    
    # Data columns
    "data[0]": "county",
    "data[1]": "latitude",
    "data[2]": "longitude",
    "data[3]": "nameplate-capacity-mw",
    "data[4]": "net-summer-capacity-mw",
    "data[5]": "net-winter-capacity-mw",
    "data[6]": "operating-year-month",
    "data[7]": "planned-derate-summer-cap-mw",
    "data[8]": "planned-derate-year-month",
    "data[9]": "planned-retirement-year-month",
    "data[10]": "planned-uprate-summer-cap-mw",
    "data[11]": "planned-uprate-year-month",
    
    # Sort and filter
    "sort[0][column]": "period",
    "sort[0][direction]": "desc",
    "start": "2011-01",
    "end": "2025-12",
}

In [5]:
# Define schema for the data (load everything as strings first)
schema = StructType([
    StructField("period", StringType(), True),
    StructField("stateDescription", StringType(), True),
    StructField("balancingAuthorityCode", StringType(), True),
    StructField("plantCode", StringType(), True),
    StructField("plantName", StringType(), True),
    StructField("generatorId", StringType(), True),
    StructField("sector", StringType(), True),
    StructField("technology", StringType(), True),
    StructField("energySourceCode", StringType(), True),
    StructField("primeMoverCode", StringType(), True),
    StructField("county", StringType(), True),
    StructField("latitude", StringType(), True),  # ‚Üê Changed to StringType
    StructField("longitude", StringType(), True),  # ‚Üê Changed to StringType
    StructField("nameplate-capacity-mw", StringType(), True),  # ‚Üê Changed to StringType
    StructField("net-summer-capacity-mw", StringType(), True),  # ‚Üê Changed to StringType
    StructField("net-winter-capacity-mw", StringType(), True),  # ‚Üê Changed to StringType
    StructField("operating-year-month", StringType(), True),
    StructField("planned-derate-summer-cap-mw", StringType(), True),  # ‚Üê Changed to StringType
    StructField("planned-derate-year-month", StringType(), True),
    StructField("planned-retirement-year-month", StringType(), True),
    StructField("planned-uprate-summer-cap-mw", StringType(), True),  # ‚Üê Changed to StringType
    StructField("planned-uprate-year-month", StringType(), True),
])

In [6]:
def fetch_page_with_retries(url, params, max_retries=5, backoff_seconds=5):
    """
    Fetch a single page from the EIA API with exponential backoff retry logic.
    """
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=60)
            
            if response.status_code == 200:
                return response
            elif response.status_code == 503:
                wait_time = backoff_seconds * (attempt + 1)
                print(f"Server error 503. Retry {attempt + 1}/{max_retries} after {wait_time} seconds...")
                time.sleep(wait_time)
            else:
                print(f"Unexpected status code: {response.status_code}")
                response.raise_for_status()
                
        except requests.exceptions.Timeout:
            wait_time = backoff_seconds * (attempt + 1)
            print(f"Request timeout. Retry {attempt + 1}/{max_retries} after {wait_time} seconds...")
            time.sleep(wait_time)
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(backoff_seconds * (attempt + 1))
    
    raise Exception(f"Failed to fetch data after {max_retries} retries")

In [7]:
def ingest_eia_data_simple():
    """
    Simpler ingestion: fetch data with pandas, save to parquet.
    No Spark workers needed during ingestion.
    """
    import pandas as pd
    
    length = 5000
    offset = 0
    total = None
    all_batches = []
    batch_count = 0
    
    # Check if we should resume
    if RAW_PARQUET_PATH.exists():
        existing = pd.read_parquet(RAW_PARQUET_PATH)
        rows_loaded = len(existing)
        print(f"Found existing data: {rows_loaded:,} rows")
        offset = (rows_loaded // length) * length
        batch_count = offset // length
        print(f"Resuming from offset: {offset:,} (batch {batch_count + 1})")
    else:
        print("Starting fresh ingestion with pandas...")
    
    while True:
        params = PARAMS.copy()
        params["offset"] = offset
        params["length"] = length
        
        try:
            response = fetch_page_with_retries(BASE_URL, params)
            data_json = response.json()
        except Exception as e:
            print(f"Fatal error at offset {offset}: {e}")
            print("Saving progress...")
            break
        
        records = data_json.get("response", {}).get("data", [])
        
        if total is None:
            total = int(data_json["response"]["total"])
            estimated_batches = (total + length - 1) // length
            print(f"Total records available: {total:,}")
            print(f"Estimated batches needed: {estimated_batches:,}")
        
        if not records:
            print("No more records.")
            break
        
        batch_df = pd.DataFrame(records)
        all_batches.append(batch_df)
        batch_count += 1
        
        # Progress display
        fetched_so_far = offset + len(records)
        pct_complete = (fetched_so_far / total * 100) if total else 0
        print(f"Batch {batch_count:,} | Fetched {len(records):,} rows at offset {offset:,} | "
              f"Total: {fetched_so_far:,}/{total:,} ({pct_complete:.1f}%)")
        
        offset += length
        
        # Save checkpoint every 100k rows
        if offset % 100000 == 0:
            print(f"üîÑ Checkpoint save at {offset:,} rows...")
            combined = pd.concat(all_batches, ignore_index=True)
            
            if RAW_PARQUET_PATH.exists():
                existing = pd.read_parquet(RAW_PARQUET_PATH)
                combined = pd.concat([existing, combined], ignore_index=True)
            
            combined.to_parquet(RAW_PARQUET_PATH, index=False)
            all_batches = []  # Clear memory
            print(f"‚úÖ Checkpoint saved ({len(combined):,} total rows)")
        
        if offset >= total:
            print("Reached end of dataset.")
            break
    
    # Final save
    if all_batches:
        print("üíæ Saving final batch...")
        new_data = pd.concat(all_batches, ignore_index=True)
        
        if RAW_PARQUET_PATH.exists():
            existing = pd.read_parquet(RAW_PARQUET_PATH)
            combined = pd.concat([existing, new_data], ignore_index=True)
        else:
            combined = new_data
        
        combined.to_parquet(RAW_PARQUET_PATH, index=False)
        print(f"‚úÖ Done! Total rows: {len(combined):,}")
        print(f"üìä Total batches processed: {batch_count:,}")
    else:
        combined = pd.read_parquet(RAW_PARQUET_PATH)
        print(f"‚úÖ Ingestion complete! Total rows: {len(combined):,}")
    
    return combined

In [19]:
# Run pandas ingestion
df_pandas = ingest_eia_data_simple()

Found existing data: 3,972,719 rows
Resuming from offset: 3,970,000 (batch 795)
Total records available: 3,972,719
Estimated batches needed: 795
Batch 795 | Fetched 2,719 rows at offset 3,970,000 | Total: 3,972,719/3,972,719 (100.0%)
Reached end of dataset.
üíæ Saving final batch...
‚úÖ Done! Total rows: 3,975,438
üìä Total batches processed: 795


In [9]:
# Load the raw data (if resuming from a previous run)
if df_raw is None and RAW_PARQUET_PATH.exists():
    df_raw = spark.read.parquet(str(RAW_PARQUET_PATH))
    df_raw.cache()  # ‚Üê ADD THIS
    print(f"Cached raw data in memory")

# Basic stats
print(f"Total rows: {df_raw.count():,}")  # This triggers the cache
print(f"\nSchema:")
df_raw.printSchema()

NameError: name 'df_raw' is not defined