# Strompreise

In [None]:
import time
from typing import Optional, Tuple, List
import datetime as dt
import pandas as pd
import pytz
import requests
import streamlit as st
import plotly.graph_objects as go

In [None]:
resolution_choice = "quarterly"

In [14]:
# ---------------------------------------------------------
# SMARD Loader
# ---------------------------------------------------------
SERIES_ID = "4169"
REGION_CANDIDATES = ["DE-LU", "DE"]
SMARD_BASE = "https://www.smard.de/app"
HEADERS = {"User-Agent": "Mozilla/5.0 (compatible; Streamlit-SMARD/1.0)"}
_last_tried: List[str] = []

class SmardError(Exception):
    pass

def _safe_get_json(url: str, timeout: int = 30) -> dict:
    _last_tried.append(url)
    r = requests.get(url, headers=HEADERS, timeout=timeout)
    if r.status_code != 200:
        raise SmardError(f"HTTP {r.status_code} für {url}")
    try:
        return r.json()
    except Exception:
        snippet = (r.text or "")[:160].replace("\n", " ")
        ctype = r.headers.get("Content-Type", "")
        raise SmardError(f"Kein JSON von {url}. Content-Type='{ctype}', Antwort: '{snippet}...'")

def _get_index(region: str, resolution: str) -> list[int]:
    url = f"{SMARD_BASE}/chart_data/{SERIES_ID}/{region}/index_{resolution}.json"
    data = _safe_get_json(url)
    ts = data.get("timestamps") or []
    if not ts:
        raise SmardError(f"Keine timestamps in index_{resolution}.json ({region})")
    return ts

def _try_load_series(region: str, resolution: str, ts: int) -> Optional[pd.DataFrame]:
    for path in ["table_data", "chart_data"]:
        url = f"{SMARD_BASE}/{path}/{SERIES_ID}/{region}/{SERIES_ID}_{region}_{resolution}_{ts}.json"
        try:
            data = _safe_get_json(url)
            series = data.get("series")
            if series:
                return pd.DataFrame(series, columns=["ts_ms", "eur_per_mwh"])
        except SmardError:
            continue
    return None

# @st.cache_data(ttl=900)
def load_smard_series(prefer_resolution: str = "quarterhour", max_backsteps: int = 12) -> Tuple[pd.DataFrame, str, str]:
    resolutions = [prefer_resolution] + ([r for r in ["hour"] if r != prefer_resolution])
    for region in REGION_CANDIDATES:
        for resolution in resolutions:
            try:
                idx = _get_index(region, resolution)
            except SmardError:
                continue
            for ts in reversed(idx[-(max_backsteps + 1):]):
                df = _try_load_series(region, resolution, ts)
                if df is not None and not df.empty:
                    return df, resolution, region
                time.sleep(0.15)
    raise SmardError("Keine gültige SMARD-Datei gefunden (region/auflösung/ts).")

In [29]:
df_raw, used_resolution, used_region = load_smard_series(prefer_resolution=resolution_choice, max_backsteps=12)

In [30]:
df_raw

Unnamed: 0,ts_ms,eur_per_mwh
0,1760306400000,93.45
1,1760310000000,90.26
2,1760313600000,90.83
3,1760317200000,89.12
4,1760320800000,93.45
...,...,...
163,1760893200000,
164,1760896800000,
165,1760900400000,
166,1760904000000,


In [26]:
# ---------------------------------------------------------
# Data preparation
# ---------------------------------------------------------
tz_berlin = pytz.timezone("Europe/Berlin")
df_raw["ts"] = pd.to_datetime(df_raw["ts_ms"], unit="ms", utc=True).dt.tz_convert("Europe/Berlin")
df_raw["ct_per_kwh"] = df_raw["eur_per_mwh"] * 0.1

# ---------------------------------------------------------
# Zeitfenster: von aktuellem Mittag (12:00) bis nächstes Mittag (12:00)
# ---------------------------------------------------------
tz_berlin = pytz.timezone("Europe/Berlin")
now = dt.datetime.now(tz=tz_berlin)
today = now.date()

# define nominal window 12:00→12:00
start_window = tz_berlin.localize(dt.datetime.combine(today, dt.time(12, 0)))
end_window = start_window + dt.timedelta(days=1)

df_raw["ts"] = pd.to_datetime(df_raw["ts_ms"], unit="ms", utc=True).dt.tz_convert("Europe/Berlin")
df_raw["ct_per_kwh"] = df_raw["eur_per_mwh"] * 0.1

# Filter: keep data that falls inside that nominal 24-h window
df = df_raw[(df_raw["ts"] >= start_window - dt.timedelta(hours=12)) &
            (df_raw["ts"] < end_window + dt.timedelta(hours=12))].copy()

# ensure we actually cover the full delivery range; extend end to +12 h after last timestamp if needed
if not df.empty:
    last_ts = df["ts"].max()
    if last_ts < end_window:
        end_window = last_ts + dt.timedelta(hours=12)

# if df.empty:
#     st.info("Für dieses Zeitfenster liegen noch keine Day-Ahead-Daten vor.")
#     st.stop()

# # ---------------------------------------------------------
# # Komponenten: Spot + Gebühren (inkl. MwSt)
# # ---------------------------------------------------------
# fees = st.session_state.fees
# df["spot_ct"] = df["ct_per_kwh"]

# fees_no_vat = (
#     fees["stromsteuer_ct"]
#     + fees["umlagen_ct"]
#     + fees["konzessionsabgabe_ct"]
#     + fees["netzentgelt_ct"]
# )
# df["fees_incl_vat_ct"] = (fees_no_vat + df["spot_ct"]) * (fees["mwst"] / 100.0) + fees_no_vat
# df["total_ct"] = df["spot_ct"] + df["fees_incl_vat_ct"]


In [27]:
df_raw

Unnamed: 0,ts_ms,eur_per_mwh,ts,ct_per_kwh
0,1760306400000,93.45,2025-10-13 00:00:00+02:00,9.345
1,1760310000000,90.26,2025-10-13 01:00:00+02:00,9.026
2,1760313600000,90.83,2025-10-13 02:00:00+02:00,9.083
3,1760317200000,89.12,2025-10-13 03:00:00+02:00,8.912
4,1760320800000,93.45,2025-10-13 04:00:00+02:00,9.345
...,...,...,...,...
163,1760893200000,,2025-10-19 19:00:00+02:00,
164,1760896800000,,2025-10-19 20:00:00+02:00,
165,1760900400000,,2025-10-19 21:00:00+02:00,
166,1760904000000,,2025-10-19 22:00:00+02:00,


In [28]:
df

Unnamed: 0,ts_ms,eur_per_mwh,ts,ct_per_kwh
72,1760565600000,88.16,2025-10-16 00:00:00+02:00,8.816
73,1760569200000,85.81,2025-10-16 01:00:00+02:00,8.581
74,1760572800000,88.0,2025-10-16 02:00:00+02:00,8.8
75,1760576400000,86.66,2025-10-16 03:00:00+02:00,8.666
76,1760580000000,88.34,2025-10-16 04:00:00+02:00,8.834
77,1760583600000,85.05,2025-10-16 05:00:00+02:00,8.505
78,1760587200000,96.19,2025-10-16 06:00:00+02:00,9.619
79,1760590800000,116.12,2025-10-16 07:00:00+02:00,11.612
80,1760594400000,132.77,2025-10-16 08:00:00+02:00,13.277
81,1760598000000,119.93,2025-10-16 09:00:00+02:00,11.993


## Verschiedene Quellen

In [10]:
%reload_ext autoreload
%autoreload 2


In [11]:
from datetime import datetime, timedelta, timezone
import pandas as pd

from price_sources import (
    fetch_smard_day_ahead,
    fetch_entsoe_day_ahead,
    fetch_smartenergy,
    _fmt_utc,
    DE_LU_EIC,
)

In [20]:
# 1) SMARD (no key)
df_smard = fetch_smard_day_ahead(resolution="quarterhour")

In [19]:
df_smard

Unnamed: 0,ts,eur_per_mwh,ct_per_kwh,source,resolution
0,2025-10-13 00:00:00+02:00,93.45,9.345,SMARD,hour
1,2025-10-13 01:00:00+02:00,90.26,9.026,SMARD,hour
2,2025-10-13 02:00:00+02:00,90.83,9.083,SMARD,hour
3,2025-10-13 03:00:00+02:00,89.12,8.912,SMARD,hour
4,2025-10-13 04:00:00+02:00,93.45,9.345,SMARD,hour
...,...,...,...,...,...
91,2025-10-17 19:00:00+02:00,130.69,13.069,SMARD,hour
92,2025-10-17 20:00:00+02:00,116.99,11.699,SMARD,hour
93,2025-10-17 21:00:00+02:00,104.05,10.405,SMARD,hour
94,2025-10-17 22:00:00+02:00,102.18,10.218,SMARD,hour


In [6]:
import datetime as dt
import requests
from xml.etree import ElementTree as ET
ENTSOE_BASE = "https://web-api.tp.entsoe.eu/api"
DE_LU_EIC = "10Y1001A1001A82H"  # Bidding zone code for DE/LU (BZN)


def fetch_entsoe_day_ahead_test(
    token: str,
    start_utc: dt.datetime,
    end_utc: dt.datetime,
    eic_bzn: str = DE_LU_EIC,
) -> pd.DataFrame:
    """
    Fetch Day-Ahead prices via ENTSO-E REST (XML), returning:
      ts (Europe/Berlin), eur_per_mwh, ct_per_kwh, source="ENTSOE", resolution ("PT15M"/"PT60M").
    start_utc / end_utc must be tz-aware in any tz (converted to UTC for the query).
    """
    if start_utc.tzinfo is None or end_utc.tzinfo is None:
        raise ValueError("start_utc and end_utc must be timezone-aware")

    params = {
        "securityToken": token,
        "documentType": "A44",
        "in_Domain": eic_bzn,
        "out_Domain": eic_bzn,
        "periodStart": _fmt_utc(start_utc),
        "periodEnd": _fmt_utc(end_utc),
    }

    resp = requests.get(ENTSOE_BASE, params=params, timeout=45)
    if resp.status_code == 401:
        raise PermissionError("ENTSO-E: Unauthorized (check token)")
    resp.raise_for_status()

    return resp

In [21]:
# 2) ENTSO-E (needs token) – query yesterday→tomorrow to be safe; UTC times
now_utc = dt.datetime.now(timezone.utc)
yest_utc = now_utc - timedelta(days=1)
tom_utc  = now_utc + timedelta(days=1)
ENTSOE_TOKEN = "97721bb0-d6cd-4aa6-9bbc-36c9d95b205c"
resp = fetch_entsoe_day_ahead_test(ENTSOE_TOKEN, yest_utc, tom_utc, eic_bzn=DE_LU_EIC)


In [None]:
# Parse XML
ns = {"ns": "urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3"}
try:
    root = ET.fromstring(resp.content)
except ET.ParseError as ex:
    raise RuntimeError(f"ENTSO-E: XML parse error: {ex}")

In [24]:
rows = []
# Day-ahead doc can include multiple TimeSeries
for ts_node in root.findall(".//ns:TimeSeries", ns):
    rows.append(ts_node)

In [25]:
root

<Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}Publication_MarketDocument' at 0x00000118F06E23E0>

In [26]:
rows

[<Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}TimeSeries' at 0x00000118F06E28E0>,
 <Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}TimeSeries' at 0x00000118F0792FC0>,
 <Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}TimeSeries' at 0x00000118EFEFD7B0>,
 <Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}TimeSeries' at 0x00000118EFEC71A0>,
 <Element '{urn:iec62325.351:tc57wg16:451-3:publicationdocument:7:3}TimeSeries' at 0x00000118EFE2CE00>]

In [None]:
print(resp.text[:1000])

In [None]:
import xml.dom.minidom as md

xml_pretty = md.parseString(resp.content).toprettyxml(indent="  ")
print(xml_pretty[:2000])  # show first 2000 chars

In [None]:

# 3) smartENERGY (if/when you have their JSON endpoint)
# Example placeholders – update url/fields once you have the real endpoint
# url = "https://www.smartenergy.at/api/spot"
# df_se = fetch_smartenergy(url, ts_field="timestamp", price_field="marketprice_eur_mwh", tz="Europe/Vienna")

# Compare coverage recency
def summarize(df, name):
    if df.empty:
        return f"{name}: empty"
    return f"{name}: {df['ts'].min()} → {df['ts'].max()} ({len(df)} rows)"

print(summarize(df_smard, "SMARD"))
print(summarize(df_entsoe, "ENTSO-E"))
# print(summarize(df_se, "smartENERGY"))

[31mSignature:[39m
fetch_smard_day_ahead(
    resolution: [33m"Literal['quarterhour', 'hour']"[39m = [33m'quarterhour'[39m,
    max_backsteps: [33m'int'[39m = [32m16[39m,
) -> [33m'pd.DataFrame'[39m
[31mDocstring:[39m
Fetch Day-Ahead market prices from SMARD for DE/LU.
Version 0.0.1
Returns DataFrame: ts (Europe/Berlin), eur_per_mwh, ct_per_kwh, source="SMARD", resolution.
Strategy:
  - try regions in order: DE-LU, DE
  - get index_{resolution}.json from chart_data
  - walk timestamps backwards, try table_data file first, then chart_data file
  - if quarterhour fails entirely, fall back to hour
[31mFile:[39m      c:\users\christophpromberger\dev\strompreise-app\price_sources.py
[31mType:[39m      function