In [None]:
# 02_harmonisierung — Cell 1
from __future__ import annotations

from pathlib import Path
import sys, os, json, re
import pandas as pd

# --- Projekt-Konfiguration laden (aus 01_import_und_setup erzeugt) ---
BASE   = Path.cwd().resolve().parents[0] if Path.cwd().name.lower()=="notebooks" else Path.cwd()
OUT    = BASE / "data" / "processed"
CONFIG = json.loads((OUT / "project_config.json").read_text(encoding="utf-8"))

RAW   = Path(CONFIG["paths"]["raw"])
OUT   = Path(CONFIG["paths"]["processed"])
FIG   = Path(CONFIG["paths"]["figures"])
MAP_CSV = Path(CONFIG["paths"]["mapping_csv"])
KANON = CONFIG["kanon"]

RAW.mkdir(parents=True, exist_ok=True)
OUT.mkdir(parents=True, exist_ok=True)
FIG.mkdir(parents=True, exist_ok=True)

# --- zu verarbeitende Jahre/Dateien (bei Bedarf hier anpassen) ---
YEAR_TO_FILE = {
    2021: "statista_2021.csv",
    2022: "statista_2022.csv",
    2024: "statista_2024.csv",
}

print("Konfiguration geladen.")
print("RAW:", RAW)
print("OUT:", OUT)
print("FIG:", FIG)
print("Mapping CSV:", MAP_CSV)


In [None]:
# 02_harmonisierung — Cell 2
# Versuche, Utils aus src/ zu importieren; sonst lokale Fallbacks bereitstellen.
try:
    sys.path.append(str(BASE / "src"))  # erlaubt from utils.* Imports
    from utils.io_utils import read_csv_robust, pick_statista_cols, clean_value_col
    from utils.text_utils import norm_text
    print("Utils aus src/ geladen.")
except Exception as e:
    print("Warnung: Utils nicht gefunden, nutze Fallback-Implementierungen.", e)

    def read_csv_robust(path: Path) -> pd.DataFrame:
        encodings = ["utf-8-sig", "cp1252", "latin1"]
        last = None
        for enc in encodings:
            try:
                return pd.read_csv(path, encoding=enc, sep=None, engine="python")
            except UnicodeDecodeError as ex:
                last = ex
                continue
        return pd.read_csv(path, encoding="latin1", sep=None, engine="python")

    def pick_statista_cols(df: pd.DataFrame) -> tuple[str, str | None]:
        cat_col = val_col = None
        normalized = {str(c).strip().lower(): c for c in df.columns}
        for key, c in normalized.items():
            if key in {"category","kategorie","warengruppe","produktkategorie","bereich"}:
                cat_col = c
            if key in {"value","wert","prozent","anteil","share","%","rate","anzahl"}:
                val_col = c
        if cat_col is None:
            raise ValueError("Kategorien-Spalte fehlt (category/kategorie/warengruppe/...).")
        return cat_col, val_col

    def clean_value_col(series: pd.Series) -> pd.Series:
        return (series.astype(str)
                     .str.replace("%","", regex=False)
                     .str.replace(",",".", regex=False)
                     .str.strip()
                     .pipe(pd.to_numeric, errors="coerce")
                     .fillna(0.0))

    def norm_text(s: str) -> str:
        if pd.isna(s): return ""
        s = str(s).strip().lower()
        s = s.replace("&","und").replace("-"," ")
        s = re.sub(r"\s+"," ", s)
        s = s.replace("accressoires","accessoires").replace("hi tech","high tech")
        return s


In [None]:
# 02_harmonisierung — Cell 3
mapping_df = pd.read_csv(MAP_CSV, encoding="utf-8")
assert {"source","source_normalized","kanon"}.issubset(mapping_df.columns), "Mapping CSV hat unerwartetes Format."

# MAP_NORM: normalisierte Quellbegriffe -> KANON
MAP_NORM = dict(mapping_df[["source_normalized","kanon"]].values)

# Sanity: doppelte Normalisate identifizieren
dups = (mapping_df.groupby("source_normalized")
                    .size()
                    .reset_index(name="n")
                    .query("n>1"))
if not dups.empty:
    display(dups.head())
    print("Hinweis: Es gibt mehrfach vorkommende normalisierte Keys. Prüfe semantische Duplikate im Mapping.")
else:
    print("MAP_NORM OK – keine doppelten normalisierten Keys.")


In [None]:
# 02_harmonisierung — Cell 4
def load_and_harmonize_year(year: int, filename: str,
                            raw_dir: Path = RAW,
                            map_norm: dict[str,str] = MAP_NORM) -> pd.DataFrame:
    """
    Lädt eine Statista-CSV, erkennt Kategorien- und Werte-Spalte, bereinigt Prozentwerte,
    mappt Quellkategorien auf KANON und aggregiert Teilkategorien.
    Rückgabe: DataFrame mit Spalten [year, Kategorie, value]
    """
    path = raw_dir / filename
    df = read_csv_robust(path)

    cat_col, val_col = pick_statista_cols(df)
    work = df[[cat_col] + ([val_col] if val_col else [])].copy()
    work.rename(columns={cat_col: "source_category"}, inplace=True)
    work["value"] = clean_value_col(work[val_col]) if val_col else 0.0

    # Mapping
    work["Kategorie"] = work["source_category"].map(lambda x: map_norm.get(norm_text(x), "IGNORE"))
    before = len(work)
    work = work[work["Kategorie"] != "IGNORE"].copy()
    dropped = before - len(work)

    if dropped:
        print(f"[{year}] {dropped} Zeile(n) ohne Mapping entfernt (IGNORE).")

    # Aggregation (z. B. Bekleidung/Schuhe/Accessoires)
    out = (work.groupby("Kategorie", as_index=False)["value"].sum()
                 .assign(year=year)[["year","Kategorie","value"]])

    # Numerik & Bounds
    out["value"] = pd.to_numeric(out["value"], errors="coerce").fillna(0.0)
    return out


In [None]:
# 02_harmonisierung — Cell 5
frames = []
missing_files = []
for yr, fname in YEAR_TO_FILE.items():
    fpath = RAW / fname
    if not fpath.exists():
        missing_files.append((yr, fpath))
        continue
    frames.append(load_and_harmonize_year(yr, fname))

if missing_files:
    print("Warnung: fehlende Dateien:")
    for yr, p in missing_files:
        print(f"  - {yr}: {p}")

assert frames, "Keine Daten geladen. Prüfe YEAR_TO_FILE und data/raw."

stat_all = pd.concat(frames, ignore_index=True)
# Wide-Format (Kategorien x Jahr)
stat_pivot = (stat_all.pivot(index="Kategorie", columns="year", values="value")
                       .reindex(KANON)           # fixe Reihenfolge
                       .fillna(0.0))

display(stat_all.head(10))
display(stat_pivot.head(len(KANON)))


In [None]:
# 02_harmonisierung — Cell 6
# 1) Wertebereich 0–100 %
if not stat_all["value"].between(0, 100).all():
    offenders = stat_all[~stat_all["value"].between(0, 100)].sort_values("value")
    display(offenders.head(10))
    raise ValueError("Es existieren Prozentwerte außerhalb 0–100 %. Quelle prüfen.")

# 2) Duplikate je (year, Kategorie) nach Aggregation?
dups = (stat_all.groupby(["year","Kategorie"]).size()
                  .reset_index(name="n")
                  .query("n>1"))
if not dups.empty:
    display(dups)
    raise ValueError("Duplikate je (year, Kategorie) nach Aggregation gefunden.")

# 3) Vollständigkeit ggü. KANON (Hinweis, kein harter Fehler)
fehlend = set(KANON) - set(stat_pivot.index)
if fehlend:
    print("Hinweis – folgende KANON-Kategorien fehlen in den vorliegenden Daten:", fehlend)
else:
    print("Alle KANON-Kategorien vorhanden (zumindest als Index).")

print("Qualitätschecks erfolgreich.")


In [None]:
# 02_harmonisierung — Cell 7
LONG_OUT  = OUT / "statista_long_2021_2022_2024.csv"
WIDE_OUT  = OUT / "statista_harmonisiert_2021_2022_2024.csv"

(stat_all.sort_values(["year","Kategorie"])
         .to_csv(LONG_OUT, index=False, encoding="utf-8"))
(stat_pivot.reset_index()
          .to_csv(WIDE_OUT, index=False, encoding="utf-8"))

print("Exportiert:")
print(" -", LONG_OUT)
print(" -", WIDE_OUT)
print("Weiter mit 30_reporting.ipynb → Abbildungen generieren.")
