# Data Acquisition and Preprocessing

This notebook handles the cleaning and preprocessing of raw power market data:
- **EPEX Spot (DE)**: Day-ahead prices from EPEX with quarter-hour resolution 
- **HUPX (HU)**: Hungarian power exchange data with hourly resolution

## Key preprocessing tasks:
1. Load and inspect raw data files
2. Standardize timestamp formats across datasets
3. Clean column names and remove unnecessary columns
4. Handle missing values and data quality issues
5. Aggregate quarter-hour EPEX data to hourly resolution
6. Create unified dataset structure
7. Export cleaned data for analysis

In [3]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
import os
from pathlib import Path

pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', 50)


print(f"Pd --v: {pd.__version__}")
print(f"NPY --v: {np.__version__}")

Pd --v: 2.3.1
NPY --v: 2.3.2


In [None]:
project_root = Path.cwd().parent
data_raw_path = project_root / "data" / "raw"
data_processed_path = project_root / "data" / "processed"

data_processed_path.mkdir(parents=True, exist_ok=True)

epex_file = data_raw_path / "Day-ahead_prices_202507070000_202508040000_Hour_Quarterhour.csv"
hupx_file = data_raw_path / "Labs_DAM_Aggregated_Trading_Data_20250802_190631.csv"

print("File paths:")
print(f"EPEX (DE) data: {epex_file}")
print(f"HUPX (HU) data: {hupx_file}")
print(f"Processed data output: {data_processed_path}")

print(f"\nFile existence check:")
print(f"EPEX file exists: {epex_file.exists()}")
print(f"HUPX file exists: {hupx_file.exists()}")

In [None]:
print("=" * 60)

epex_raw = pd.read_csv(epex_file, sep=';')

print(f"EPEX data shape: {epex_raw.shape}")
print(f"Columns ({len(epex_raw.columns)}):")
for i, col in enumerate(epex_raw.columns):
    print(f"  {i+1:2d}. {col}")

print(f"\nFirst few rows:")
print(epex_raw.head(3))

print(f"\nData types:")
print(epex_raw.dtypes)

In [None]:
# Load HUPX (Hungarian) data
print("=" * 60)
print("LOADING AND INSPECTING HUPX (HUNGARIAN) DATA")
print("=" * 60)

hupx_raw = pd.read_csv(hupx_file)

print(f"HUPX data shape: {hupx_raw.shape}")
print(f"Columns ({len(hupx_raw.columns)}):")
for i, col in enumerate(hupx_raw.columns):
    print(f"  {i+1}. {col}")

print(f"\nFirst few rows:")
print(hupx_raw.head(3))

print(f"\nData types:")
print(hupx_raw.dtypes)

print(f"\nUnique values in 'Status' column:")
print(hupx_raw['Status'].value_counts())

In [None]:
print("=" * 60)
print("CLEANING EPEX DATA")
print("=" * 60)

# Extract relevant columns for Germany and Hungary
epex_clean = epex_raw[['Start date', 'Germany/Luxembourg [€/MWh] Original resolutions', 
                       'Hungary [€/MWh] Original resolutions']].copy()

# Rename columns for clarity
epex_clean.columns = ['timestamp_start', 'de_price_epex', 'hu_price_epex']

# Convert timestamp to datetime
epex_clean['timestamp_start'] = pd.to_datetime(epex_clean['timestamp_start'])

# Replace '-' with NaN and convert prices to numeric
epex_clean['de_price_epex'] = pd.to_numeric(epex_clean['de_price_epex'], errors='coerce')
epex_clean['hu_price_epex'] = pd.to_numeric(epex_clean['hu_price_epex'], errors='coerce')

# Filter out rows where both prices are NaN (quarter-hour resolution artifacts)
epex_clean = epex_clean.dropna(subset=['de_price_epex', 'hu_price_epex'], how='all')

print(f"EPEX cleaned data shape: {epex_clean.shape}")
print(f"Date range: {epex_clean['timestamp_start'].min()} to {epex_clean['timestamp_start'].max()}")
print(f"Missing values:")
print(epex_clean.isnull().sum())
print(f"\nFirst few rows:")
print(epex_clean.head())

In [None]:
print("=" * 60)
print("AGGREGATING EPEX DATA TO HOURLY RESOLUTION")
print("=" * 60)

# Create hour column for aggregation
epex_clean['hour'] = epex_clean['timestamp_start'].dt.floor('H')

# Aggregate quarter-hour data to hourly (using mean for price data)
epex_hourly = epex_clean.groupby('hour').agg({
    'de_price_epex': 'mean',
    'hu_price_epex': 'mean'
}).reset_index()

# Rename hour column to timestamp for consistency
epex_hourly.rename(columns={'hour': 'timestamp'}, inplace=True)

print(f"EPEX hourly data shape: {epex_hourly.shape}")
print(f"Date range: {epex_hourly['timestamp'].min()} to {epex_hourly['timestamp'].max()}")
print(f"Missing values:")
print(epex_hourly.isnull().sum())
print(f"\nFirst few rows:")
print(epex_hourly.head())

# Check for time gaps
time_diff = epex_hourly['timestamp'].diff().dropna()
print(f"\nTime gaps check:")
print(f"Standard interval: {time_diff.mode().iloc[0]}")
print(f"Any non-standard intervals: {(time_diff != pd.Timedelta('1H')).any()}")

In [None]:
print("=" * 60)
print("CLEANING HUPX DATA")
print("=" * 60)

# Clean HUPX data
hupx_clean = hupx_raw.copy()

# Drop unnecessary columns
columns_to_drop = ['Status', 'Baseload price', 'Traded volume']
hupx_clean = hupx_clean.drop(columns=columns_to_drop)

# Rename columns for consistency
hupx_clean.columns = ['delivery_day', 'hour', 'hu_price_hupx']

# Convert delivery day to datetime
hupx_clean['delivery_day'] = pd.to_datetime(hupx_clean['delivery_day'])

# Create proper timestamp by combining delivery day and hour
# Note: HUPX uses hour 1-24, but we need 0-23 for proper datetime
hupx_clean['hour_0based'] = hupx_clean['hour'] - 1
hupx_clean['timestamp'] = hupx_clean['delivery_day'] + pd.to_timedelta(hupx_clean['hour_0based'], unit='h')

# Keep only relevant columns
hupx_clean = hupx_clean[['timestamp', 'hu_price_hupx']].copy()

# Sort by timestamp
hupx_clean = hupx_clean.sort_values('timestamp').reset_index(drop=True)

print(f"HUPX cleaned data shape: {hupx_clean.shape}")
print(f"Date range: {hupx_clean['timestamp'].min()} to {hupx_clean['timestamp'].max()}")
print(f"Missing values:")
print(hupx_clean.isnull().sum())
print(f"\nFirst few rows:")
print(hupx_clean.head())

# Check for time gaps
time_diff = hupx_clean['timestamp'].diff().dropna()
print(f"\nTime gaps check:")
print(f"Standard interval: {time_diff.mode().iloc[0]}")
print(f"Any non-standard intervals: {(time_diff != pd.Timedelta('1H')).any()}")

In [None]:
print("=" * 60)
print("MERGING DATASETS")
print("=" * 60)

# Merge EPEX and HUPX data on timestamp
merged_data = pd.merge(epex_hourly, hupx_clean, on='timestamp', how='outer')

# Sort by timestamp
merged_data = merged_data.sort_values('timestamp').reset_index(drop=True)

print(f"Merged data shape: {merged_data.shape}")
print(f"Date range: {merged_data['timestamp'].min()} to {merged_data['timestamp'].max()}")
print(f"Missing values:")
print(merged_data.isnull().sum())

# Check data coverage
print(f"\nData coverage analysis:")
print(f"EPEX DE prices available: {merged_data['de_price_epex'].notna().sum()} hours")
print(f"EPEX HU prices available: {merged_data['hu_price_epex'].notna().sum()} hours")
print(f"HUPX HU prices available: {merged_data['hu_price_hupx'].notna().sum()} hours")
print(f"Total time periods: {len(merged_data)} hours")

print(f"\nFirst few rows:")
print(merged_data.head(10))

print(f"\nLast few rows:")
print(merged_data.tail(10))

In [None]:
print("=" * 60)
print("ADDING DERIVED COLUMNS FOR ANALYSIS")
print("=" * 60)

# Add time-based columns
merged_data['date'] = merged_data['timestamp'].dt.date
merged_data['hour'] = merged_data['timestamp'].dt.hour
merged_data['weekday'] = merged_data['timestamp'].dt.day_name()
merged_data['week_number'] = merged_data['timestamp'].dt.isocalendar().week
merged_data['month'] = merged_data['timestamp'].dt.month

# Create primary HU price column (prefer HUPX over EPEX for Hungary)
merged_data['hu_price_primary'] = merged_data['hu_price_hupx'].fillna(merged_data['hu_price_epex'])

# Calculate price spreads
merged_data['de_hu_spread'] = merged_data['de_price_epex'] - merged_data['hu_price_primary']
merged_data['epex_hupx_hu_spread'] = merged_data['hu_price_epex'] - merged_data['hu_price_hupx']

# Add data quality flags
merged_data['has_de_price'] = merged_data['de_price_epex'].notna()
merged_data['has_hu_hupx'] = merged_data['hu_price_hupx'].notna()
merged_data['has_hu_epex'] = merged_data['hu_price_epex'].notna()

print(f"Enhanced data shape: {merged_data.shape}")
print(f"Columns: {list(merged_data.columns)}")

# Show weeks covered
weeks_covered = sorted(merged_data['week_number'].dropna().unique())
print(f"\nWeeks covered: {weeks_covered}")

# Focus on weeks 28, 29, 30
target_weeks = [28, 29, 30]
target_data = merged_data[merged_data['week_number'].isin(target_weeks)].copy()

print(f"\nTarget weeks (28, 29, 30) data shape: {target_data.shape}")
print(f"Date range for target weeks: {target_data['timestamp'].min()} to {target_data['timestamp'].max()}")

print(f"\nData availability by week:")
for week in target_weeks:
    week_data = target_data[target_data['week_number'] == week]
    print(f"Week {week}: {len(week_data)} hours, DE prices: {week_data['has_de_price'].sum()}, HU HUPX: {week_data['has_hu_hupx'].sum()}")

In [None]:
print("=" * 60)
print("DATA QUALITY REPORT")
print("=" * 60)

# Create summary statistics for target weeks
print("SUMMARY STATISTICS FOR WEEKS 28, 29, 30:")
print("=" * 50)

summary_stats = target_data[['de_price_epex', 'hu_price_hupx', 'hu_price_epex', 
                             'hu_price_primary', 'de_hu_spread']].describe()
print(summary_stats)

print(f"\nMISSING DATA ANALYSIS:")
print("=" * 30)
missing_analysis = target_data[['de_price_epex', 'hu_price_hupx', 'hu_price_epex']].isnull().sum()
total_hours = len(target_data)
print(f"Total hours in target weeks: {total_hours}")
for col, missing_count in missing_analysis.items():
    pct_missing = (missing_count / total_hours) * 100
    print(f"{col}: {missing_count} missing ({pct_missing:.1f}%)")

print(f"\nDATA AVAILABILITY BY WEEK:")
print("=" * 30)
for week in target_weeks:
    week_data = target_data[target_data['week_number'] == week]
    total_week_hours = len(week_data)
    de_available = week_data['de_price_epex'].notna().sum()
    hu_hupx_available = week_data['hu_price_hupx'].notna().sum()
    
    print(f"\nWeek {week} ({total_week_hours} hours):")
    print(f"  DE prices: {de_available}/{total_week_hours} ({de_available/total_week_hours*100:.1f}%)")
    print(f"  HU HUPX:   {hu_hupx_available}/{total_week_hours} ({hu_hupx_available/total_week_hours*100:.1f}%)")

# Check for price anomalies
print(f"\nPRICE ANOMALY CHECK:")
print("=" * 25)
# Negative prices
neg_de = (target_data['de_price_epex'] < 0).sum()
neg_hu = (target_data['hu_price_primary'] < 0).sum()
print(f"Negative DE prices: {neg_de}")
print(f"Negative HU prices: {neg_hu}")

# Extremely high prices (>500 EUR/MWh)
high_de = (target_data['de_price_epex'] > 500).sum()
high_hu = (target_data['hu_price_primary'] > 500).sum()
print(f"Very high DE prices (>500): {high_de}")
print(f"Very high HU prices (>500): {high_hu}")

In [None]:
print("=" * 60)
print("EXPORTING CLEANED DATA")
print("=" * 60)

# Export full merged dataset
full_output_file = data_processed_path / "merged_spot_prices_full.csv"
merged_data.to_csv(full_output_file, index=False)
print(f"Full dataset exported to: {full_output_file}")

# Export target weeks dataset (28, 29, 30)
target_output_file = data_processed_path / "spot_prices_weeks_28_29_30.csv"
target_data.to_csv(target_output_file, index=False)
print(f"Target weeks dataset exported to: {target_output_file}")

# Export summary for quick analysis
summary_data = target_data.groupby(['week_number', 'date']).agg({
    'de_price_epex': ['mean', 'min', 'max'],
    'hu_price_primary': ['mean', 'min', 'max'],
    'de_hu_spread': ['mean', 'min', 'max'],
    'has_de_price': 'sum',
    'has_hu_hupx': 'sum'
}).round(2)

# Flatten column names
summary_data.columns = ['_'.join(col).strip() for col in summary_data.columns]
summary_data = summary_data.reset_index()

summary_output_file = data_processed_path / "daily_summary_weeks_28_29_30.csv"
summary_data.to_csv(summary_output_file, index=False)
print(f"Daily summary exported to: {summary_output_file}")

print(f"\nData processing completed successfully!")
print(f"Files created:")
print(f"  1. {full_output_file.name} - Full merged dataset ({len(merged_data)} hours)")
print(f"  2. {target_output_file.name} - Target weeks only ({len(target_data)} hours)")
print(f"  3. {summary_output_file.name} - Daily summaries ({len(summary_data)} days)")

# Quick preview of target data
print(f"\nFINAL PREVIEW - WEEKS 28, 29, 30:")
print("=" * 40)
print(target_data[['timestamp', 'week_number', 'de_price_epex', 'hu_price_hupx', 
                   'hu_price_primary', 'de_hu_spread']].head(10))