### Temperature Data Preprocessing

1. **Data Extraction**  
   - Extract hourly air temperature data from the DWD API.  
   - Calculate the mean temperature across all measurement stations.  
   - Round the result to two decimal places.

2. **Upsampling & Interpolation**  
   - Upsample the hourly temperature series to a 15-minute frequency.  
   - Apply **Inverse Distance Weighted (IDW) linear interpolation** to fill intermediate timestamps.

3. **Forecast Alignment**  
   - Shift the entire temperature series by +1 day to simulate forecasted temperature values.  
   - Merge with the price dataset on the `delivery_time` column.

4. **Standardization**  
   - Apply **Z-score standardization** to the temperature feature.


#Temperature Data Preprocessing

**1.Data Extraction**

Hourly air temperature observations were retrieved from the German Weather Service (DWD) Climate Data Center open data API.

For each timestamp, the mean temperature across all available stations was calculated.

The aggregated values were rounded to two decimal places to ensure consistency.

**2.Upsampling & Interpolation**

The hourly mean temperature series was upsampled to a 15-minute frequency to match the resolution of the electricity price data.

Missing intermediate timestamps were filled using **Inverse Distance Weighted (IDW) linear interpolation**, where weights decrease proportionally with the temporal distance. The interpolation ensures a smooth transition between hourly observations while preserving local variability.

**3.Forecast Alignment**

To simulate the availability of weather forecasts, the temperature series was shifted by +1 day.

The resulting series was aligned with the electricity price dataset through the delivery_time timestamp column.

**4.Reproducibility**

All preprocessing steps were implemented in Python 3.12 using pandas (≥2.2), numpy (≥1.26), and scipy (≥1.13).

The IDW interpolation was implemented manually without reliance on geospatial libraries, to ensure transparency and control over the weighting scheme.

In [None]:
import re, io, zipfile
from datetime import datetime, timedelta
from urllib.parse import urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import pandas as pd

class DWDTemperatureExtractor:
    def __init__(self):
        # Base URLs for historical and recent hourly air temperature datasets provided by DWD (Climate Data Center)
        self.historical_url = ("https://opendata.dwd.de/climate_environment/CDC/"
                               "observations_germany/climate/hourly/air_temperature/historical/")
        self.recent_url = ("https://opendata.dwd.de/climate_environment/CDC/"
                           "observations_germany/climate/hourly/air_temperature/recent/")
        # Configure HTTP session with retry policy and user agent header
        self.session = requests.Session()
        retry = Retry(total=5, backoff_factor=0.4,
                      status_forcelist=[429, 500, 502, 503, 504],
                      allowed_methods=["GET"])
        self.session.mount("https://", HTTPAdapter(max_retries=retry))
        self.headers = {"User-Agent": "DWDTemperatureExtractor/1.0"}

    def get_file_list(self, base_url, start_date: datetime, end_date: datetime):
        """
        Retrieve a list of candidate ZIP archives from the given DWD directory.
        - For the 'historical' directory: include files whose covered period [file_start, file_end]
          overlaps with the requested [start_date, end_date].
        - For the 'recent' directory: include all files matching '*_akt.zip'.
        """
        try:
            r = self.session.get(base_url, headers=self.headers, timeout=30)
            r.raise_for_status()
            html = r.text

            # Extract all links ending with '.zip'
            zip_names = re.findall(r'href="([^"]+\.zip)"', html)
            out = []

            is_hist = "historical" in base_url
            if is_hist:
                # Historical filenames: e.g., stundenwerte_TU_00003_19500401_20110331_hist.zip
                pat = re.compile(r"stundenwerte_TU_(\d{5})_(\d{8})_(\d{8})_hist\.zip")
                for name in zip_names:
                    m = pat.search(name)
                    if not m:
                        continue
                    _, start_s, end_s = m.groups()
                    fs = datetime.strptime(start_s, "%Y%m%d")
                    fe = datetime.strptime(end_s, "%Y%m%d")
                    # Keep if file coverage overlaps requested range
                    if (fs <= end_date) and (fe >= start_date):
                        out.append(urljoin(base_url, name))
            else:
                # Recent filenames: e.g., stundenwerte_TU_<station>_akt.zip
                pat = re.compile(r"stundenwerte_TU_\d{5}_akt\.zip")
                for name in zip_names:
                    if pat.search(name):
                        out.append(urljoin(base_url, name))

            return sorted(set(out))
        except Exception as e:
            print(f"Failed to retrieve file list: {e}")
            return []

    def download_and_extract_zip(self, zip_url):
        """
        Download a ZIP archive and return the content of the first file starting with 'produkt_' as text.
        DWD files are semicolon-separated text files, typically encoded in UTF-8.
        """
        try:
            resp = self.session.get(zip_url, headers=self.headers, timeout=60, stream=True)
            resp.raise_for_status()
            with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
                # Select the first 'produkt_*' file (usually containing hourly observations)
                data_files = [f for f in zf.namelist() if f.startswith("produkt_")]
                if not data_files:
                    print(f"No product file found in {zip_url}")
                    return None
                with zf.open(data_files[0]) as f:
                    return f.read().decode("utf-8", errors="replace")
        except Exception as e:
            print(f"Error processing file {zip_url}: {e}")
            return None

    def parse_dwd_data(self, content):
        """Parse a DWD 'produkt_*' CSV text file into a pandas DataFrame."""
        if not content:
            return None
        try:
            df = pd.read_csv(io.StringIO(content), sep=';', dtype=str)
            df.columns = [c.strip() for c in df.columns]
            return df
        except Exception as e:
            print(f"Parsing failed: {e}")
            return None

    def filter_by_date_range(self, df, start_date: datetime, end_date: datetime):
        """Filter rows by the inclusive range [start_date, end_date] using column MESS_DATUM (format YYYYMMDDHH)."""
        if df is None or df.empty:
            return None
        if 'MESS_DATUM' not in df.columns:
            print("Column 'MESS_DATUM' not found")
            return None
        try:
            df = df.copy()
            df['datetime'] = pd.to_datetime(df['MESS_DATUM'], format='%Y%m%d%H', errors='coerce')
            df = df.dropna(subset=['datetime'])
            mask = (df['datetime'] >= start_date) & (df['datetime'] <= end_date)
            return df.loc[mask]
        except Exception as e:
            print(f"Date conversion error: {e}")
            return None

    def extract_temperature_data(self, start_date_str: str, end_date_str: str):
        """
        Retrieve and merge all available temperature records within the specified date range.
        Both historical and recent archives are considered. The final dataset is de-duplicated
        and sorted by station ID and timestamp.
        """
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(hours=23, minutes=59)
        print(f"Extracting date range: {start_date} to {end_date}")

        all_dfs = []

        # Process historical archives
        print("Processing historical data...")
        for zip_url in self.get_file_list(self.historical_url, start_date, end_date):
            content = self.download_and_extract_zip(zip_url)
            df = self.parse_dwd_data(content)
            fdf = self.filter_by_date_range(df, start_date, end_date)
            if fdf is not None and not fdf.empty:
                all_dfs.append(fdf)

        # Process recent archives
        print("Processing recent data...")
        for zip_url in self.get_file_list(self.recent_url, start_date, end_date):
            content = self.download_and_extract_zip(zip_url)
            df = self.parse_dwd_data(content)
            fdf = self.filter_by_date_range(df, start_date, end_date)
            if fdf is not None and not fdf.empty:
                all_dfs.append(fdf)

        if not all_dfs:
            print("No records found within the specified range")
            return None

        combined = pd.concat(all_dfs, ignore_index=True)
        combined = combined.drop_duplicates(subset=['STATIONS_ID', 'MESS_DATUM'])
        combined = combined.sort_values(['STATIONS_ID', 'datetime']).reset_index(drop=True)
        print(f"Total records retrieved: {len(combined)}")
        return combined, start_date, end_date

    def transform_to_hourly_average(self, df, start_date_str: str, end_date_str: str):
        """
        Aggregate station-level observations to hourly averages across all available stations.
        The output includes average temperature, number of stations reporting, and standard deviation.
        """
        if df is None or df.empty:
            print("Input dataset is empty")
            return None
        start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
        end_date = datetime.strptime(end_date_str, "%Y-%m-%d") + timedelta(hours=23, minutes=59)

        # Identify a valid temperature column
        temp_col = next((c for c in ['TT_TU', 'LUFTTEMPERATUR', 'TEMPERATURE'] if c in df.columns), None)
        if temp_col is None:
            print(f"No temperature column found. Available columns: {list(df.columns)}")
            return None

        try:
            work = df.copy()
            work[temp_col] = pd.to_numeric(work[temp_col], errors='coerce')
            # Exclude missing values (DWD convention: -999)
            work = work[work[temp_col] > -900]

            work['datetime_hour'] = work['datetime'].dt.floor('H')
            agg = (work.groupby('datetime_hour')[temp_col]
                        .agg(['mean', 'count', 'std'])
                        .rename(columns={'mean': 'temperature_avg',
                                         'count': 'station_count',
                                         'std': 'temperature_std'})
                        .reset_index()
                        .rename(columns={'datetime_hour': 'datetime'}))

            result = agg[['datetime', 'temperature_avg', 'station_count', 'temperature_std']].sort_values('datetime')
            print("Transformation complete:")
            print(f"Raw records: {len(df)}, after cleaning: {len(work)}, hourly aggregates: {len(result)}")
            print(f"Time span: {result['datetime'].min()} → {result['datetime'].max()}")
            print(f"Average number of reporting stations per hour: {agg['station_count'].mean():.1f}")
            return result, start_date, end_date
        except Exception as e:
            print(f"Data transformation error: {e}")
            return None

    @staticmethod
    def save_csv_with_start_year(df: pd.DataFrame, start_date: datetime, prefix: str):
        """
        Save the DataFrame to a CSV file named <prefix>_<YYYY>.csv, where <YYYY> corresponds
        to the year of the start_date. This ensures platform-independent file naming.
        """
        year = start_date.strftime('%Y')
        fname = f"{prefix}_{year}.csv"
        df.to_csv(fname, index=False)
        print(f"Saved: {fname}")
        return fname


In [None]:
# === Sample Usage of DWDTemperatureExtractor ===

# Initialize extractor
extractor = DWDTemperatureExtractor()

# Define date range (January 2023)
start_date = "2023-01-01"
end_date   = "2023-01-31"

# Step 1: Extract raw station-level temperature data
res = extractor.extract_temperature_data(start_date, end_date)
if res is None:
    raise RuntimeError("No data retrieved for the specified period.")
df_raw, start_dt, end_dt = res

# Step 2: Aggregate to hourly averages across stations
res2 = extractor.transform_to_hourly_average(df_raw, start_date, end_date)
if res2 is None:
    raise RuntimeError("Hourly aggregation failed.")
hourly_df, _, _ = res2

# Step 3: Save results as CSV (prefix 'temperature_hourly')
extractor.save_csv_with_start_year(hourly_df, start_dt, prefix="temperature_hourly")

print(hourly_df.head())


Extracting date range: 2023-01-01 00:00:00 to 2023-01-31 23:59:00
Processing historical data...
Processing recent data...
Total records retrieved: 366656
Transformation complete:
Raw records: 366656, after cleaning: 365984, hourly aggregates: 744
Time span: 2023-01-01 00:00:00 → 2023-01-31 23:00:00
Average number of reporting stations per hour: 491.9
Saved: temperature_hourly_2023.csv
             datetime  temperature_avg  station_count  temperature_std
0 2023-01-01 00:00:00        12.501224            490         3.430335
1 2023-01-01 01:00:00        12.345808            489         3.516412
2 2023-01-01 02:00:00        12.224286            490         3.544961
3 2023-01-01 03:00:00        12.060000            490         3.492275
4 2023-01-01 04:00:00        11.856735            490         3.487102


  work['datetime_hour'] = work['datetime'].dt.floor('H')
