In [5]:
import pandas as pd
import numpy as np
import os
import kagglehub
from scipy.stats import trim_mean

# -------------------------------------------------------------
# I. WIEDERVERWENDBARE HILFSFUNKTIONEN (LE5: Modularität)
# -------------------------------------------------------------

def t_mean_10_intraday(data):
    """Trimmed Mean 10% gesamt (5% pro Seite) für stündliche Daten (Intraday Robustheit)."""
    data = data.dropna()
    if len(data) < 10:
        return np.nan
    # 0.05 cut = 5% unten und 5% oben entfernt (10% gesamt)
    return trim_mean(data, proportiontocut=0.05) 

def t_mean_3_cities(data):
    """Trimmed Mean der 5 Städte: Entfernt den kleinsten/größten Wert (Cross-Sectional Robustheit)."""
    data = data.dropna()
    if len(data) < 3: 
        return np.nan 
    sorted_data = np.sort(data)
    # Entfernt den ersten (min) und letzten (max) Wert und bildet den Mean der restlichen 3
    return np.mean(sorted_data[1:-1])

# -------------------------------------------------------------
# II. STUFE 1: AKQUISE & BEREINIGUNG (LE1 & LE2)
# -------------------------------------------------------------

def acquire_and_prepare_raw_data(handle: str):
    """Lädt Daten, konvertiert Zeitstempel und führt die initiale Imputation durch."""
    print("Status: Rohdatenakquise gestartet...")
    
    # 1. Daten herunterladen und Pfad abrufen (LE1)
    path = kagglehub.dataset_download(handle=handle)
    path_energy = path + "/energy_dataset.csv"
    path_weather = path + "/weather_features.csv"

    energy_df = pd.read_csv(path_energy)
    weather_df = pd.read_csv(path_weather)

    # 2. Zeitspalten konvertieren und als Index setzen (LE1)
    energy_df['time'] = pd.to_datetime(energy_df['time'], utc=True)
    energy_df = energy_df.set_index('time')

    weather_df['dt_iso'] = pd.to_datetime(weather_df['dt_iso'], utc=True)
    weather_df = weather_df.set_index('dt_iso')

    # 3. Initiale Imputation des Kollegen (Fehlende Werte) (LE2)
    energy_df['total load actual'].fillna(energy_df['total load forecast'], inplace=True)
    
    print("Status: Rohdaten geladen und initial vorbereitet.")
    return energy_df, weather_df

# -------------------------------------------------------------
# III. STUFE 2: AGGREGATION & SPEICHERN (LE3 & LE5)
# -------------------------------------------------------------

def aggregate_and_save_trimmed_mean(energy_df: pd.DataFrame, weather_df: pd.DataFrame, output_folder: str) -> bool:
    """Führt die zweistufige T-Mean Aggregation durch und speichert die Ergebnisse."""
    
    # --- 1. ENERGIELAST AGGREGATION (Intraday T-Mean 10%) ---
    final_energy_trimmed = (
        energy_df['total load actual']
        .resample('D')
        .apply(t_mean_10_intraday) 
        .to_frame(name='TotalLoad_TMean')
    )
    print("Status: Energielast erfolgreich aggregiert (Intraday T-Mean 10%).")

    # --- 2. WETTER AGGREGATION (Zweistufige Cross-Sectional T-Mean) ---
    
    # Stufe A: Intraday T-Mean pro Stadt
    daily_temp_per_city = (
        weather_df
        .groupby('city_name')['temp']
        .resample('D')
        .apply(t_mean_10_intraday)
        .to_frame(name='DailyTemp_TMean_PerCity')
    )
    
    # Stufe B: Cross-Sectional T-Mean über 5 Städte
    daily_temp_unstacked = daily_temp_per_city.unstack(level='city_name')
    daily_temp_unstacked.columns = daily_temp_unstacked.columns.droplevel(0) 

    final_weather_trimmed_cross_sectional = (
        daily_temp_unstacked
        .apply(t_mean_3_cities, axis=1)
        .to_frame(name='DailyTemp_TMean_Regional')
    )
    print("Status: Wetterdaten erfolgreich Cross-Sectional aggregiert.")

    # 3. DATEN SPEICHERN (Zustandswechsel)
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    final_energy_trimmed.to_parquet(os.path.join(output_folder, 'LE3_Energy_TMean_Aggregated.parquet'))
    final_weather_trimmed_cross_sectional.to_parquet(os.path.join(output_folder, 'LE3_Weather_TMean_Aggregated.parquet'))
    
    print(f"Erfolg: Pipeline abgeschlossen. Ergebnisse gespeichert unter {output_folder}")
    return True


In [6]:
# -------------------------------------------------------------
# IV. TEST DER KOMPLETTEN PIPELINE
# -------------------------------------------------------------

# Konfigurierbare Zustände:
KAGGLE_HANDLE = "nicholasjhana/energy-consumption-generation-prices-and-weather"
OUTPUT_FOLDER = 'data_pipeline_output_final' 

print("--- Data Wrangling Pipeline gestartet ---")

try:
    # Schritt 1: Akquise und Vorbereitung (LE1/LE2)
    energy_raw, weather_raw = acquire_and_prepare_raw_data(KAGGLE_HANDLE)
    
    # Schritt 2: Aggregation und Speicherung (LE3/LE5)
    success = aggregate_and_save_trimmed_mean(energy_raw, weather_raw, OUTPUT_FOLDER)
    
    print(f"\nGesamt-Pipeline-Ausführung erfolgreich: {success}")
    if success:
        print(f"Prüfen Sie den Ordner '{OUTPUT_FOLDER}' auf die aggregierten .parquet-Dateien.")

except Exception as e:
    print(f"\nKritischer Fehler in der Gesamt-Pipeline: {e}")

--- Data Wrangling Pipeline gestartet ---
Status: Rohdatenakquise gestartet...


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  energy_df['total load actual'].fillna(energy_df['total load forecast'], inplace=True)


Status: Rohdaten geladen und initial vorbereitet.
Status: Energielast erfolgreich aggregiert (Intraday T-Mean 10%).
Status: Wetterdaten erfolgreich Cross-Sectional aggregiert.
Erfolg: Pipeline abgeschlossen. Ergebnisse gespeichert unter data_pipeline_output_final

Gesamt-Pipeline-Ausführung erfolgreich: True
Prüfen Sie den Ordner 'data_pipeline_output_final' auf die aggregierten .parquet-Dateien.
