ETL

In [None]:
import json, textwrap, traceback
from io import StringIO
from pathlib import Path
import pathlib

import os
from dotenv import load_dotenv

import numpy as np
import pandas as pd
from openai import OpenAI
import math, re

# ───────── Config ─────────
SAVE_DIR = Path("/Users/jakob/ba_etl/adaptive/cleaned")     
DATA_PATH   = "/Users/jakob/ba_etl/adaptive/data/raw/rotten_tomatoes_movies.csv" 
MODEL_NAME  = "o4-mini"
SAMPLE_SIZE = 5
RNG_STATE   = 42

load_dotenv()                     # lädt Variablen aus .env
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

# ───────── Schema laden ─────────
SCHEMA_PATH = Path("schema_testing.json")
SCHEMA_SPEC = json.loads(SCHEMA_PATH.read_text(encoding="utf-8"))

# ───────── Helper ─────────
def schema_as_text(spec):
    return "\n".join(f"- {d['field']}: (type: {d['type']}) {d['rule']}"
                     for d in spec)

SCHEMA_TEXT = schema_as_text(SCHEMA_SPEC)

def load_df(path=DATA_PATH):
    return pd.read_csv(path, on_bad_lines="skip")

def make_context_and_meta(df):
    ctx = df.sample(SAMPLE_SIZE, random_state=RNG_STATE).to_string(index=False)
    buf = StringIO(); df.info(buf=buf)
    return ctx, buf.getvalue()

def get_completion(prompt: str):
    client = OpenAI(api_key=OPENAI_API_KEY)
    return client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": prompt}]
    ).choices[0].message.content

def exec_generated_code(code: str, g: dict):
    """Letzten ```-Block ausführen."""
    if "```" in code:
        code = code.split("```")[-2]
    clean = code.replace("python", "", 1).replace("```", "").strip()
    exec(clean, g)

def error_snippet(exc, max_lines=10, max_chars=800):
    tb = traceback.TracebackException.from_exception(exc, capture_locals=False)
    txt = "".join(list(tb.format())[-max_lines:]).strip()
    if isinstance(exc, AssertionError) and "Invalid entries" in str(exc):
        txt = "AssertionError: many invalid entries (gekürzt)"
    return (txt[:max_chars] + " …") if len(txt) > max_chars else txt

def completeness_report(df0, out, v_llm, v_det, inv_det, savedir):
    print("\n— Completeness Report —")
    print(f"Original rows:            {len(df0):>6}")
    print(f"After transformation:     {len(out):>6}   (-{len(df0)-len(out)})")
    print(f"LLM validator valid:      {len(v_llm):>6}   (-{len(out)-len(v_llm)})")
    if inv_det:
        bad = pd.DataFrame(inv_det)
        print("\nBeispiele ungültiger Datensätze (dynamic):")
        print(bad.head().to_string(index=False))
        rep = Path(savedir) / f"{Path(DATA_PATH).stem}_invalid_records.csv"
        bad.to_csv(rep, index=False)
        print(f"\n❌  Fehlerliste gespeichert unter: {rep}")

# ───────── Prompt-Builder ─────────
def build_transformation_prompt(ctx, meta):
    return textwrap.dedent(f"""
    [ROLE]
    You are an expert data scientist specialised in cleansing and standardising film datasets.

    [PROCESS]
      (a) [PLAN] Outline your high-level approach.
      (b) [THINK] Write Pandas code step by step.
      (c) [CHECK] Self-verify logic.
      (d) [ANSWER] Return only executable Python starting with `output = []`

    [SCHEMA]
    {SCHEMA_TEXT}

    [CONTEXT]
    {ctx}

    [METADATA]
    {meta}
    """).strip()

def build_validator_prompt():
    schema_json = json.dumps(SCHEMA_SPEC, indent=2, ensure_ascii=False)
    return textwrap.dedent(f"""
    [ROLE]
    Validation stage of the ETL pipeline.

    [SCHEMA_JSON]
    {schema_json}

    [PROCESS]
      (a) [PLAN] outline checks
      (b) [THINK] derive assertions
      (c) [CHECK] ensure logic
      (d) [ANSWER] one ```python``` block that
          • iterates **over `output` (list of dicts)**        <─ WICHTIG
          • builds valid_output / invalid_entries / duplicate_count
          • raises AssertionError if invalid_entries is non-empty.

    [FORMAT] Use [PLAN] [THINK] [CHECK] [ANSWER].
    [OUTPUT] show only code inside the block.
    """).strip()

# ───────── Main ─────────
def main():
    try:
        print("Running with pandas", pd.__version__)
        df = load_df(); globals()["df"] = df
        ctx, meta = make_context_and_meta(df)

        # 1) Transformation --------------------------------------------------
        t_code = get_completion(build_transformation_prompt(ctx, meta))
        print("\n— Transformation Code —\n", t_code)
        exec_generated_code(t_code, globals())
        if "output" not in globals():
            raise RuntimeError("LLM produced no `output`")

        # 2) LLM-Validator ---------------------------------------------------
        v_code = get_completion(build_validator_prompt())
        print("\n— Validator Code —\n", v_code)
        try:
            exec_generated_code(v_code, globals())      # setzt valid_output
        except Exception as exc:
            # ─ Retry --------------------------------------------------------
            snippet = error_snippet(exc)
            print("⚠️  validator crashed – retrying …\n", snippet)
            for var in ("output", "valid_output", "invalid_entries",
                        "duplicate_count"):
                globals().pop(var, None)

            retry_prompt = (build_transformation_prompt(ctx, meta)
                            + f"\n[VALIDATION_ERROR]\n{snippet}")
            t_code = get_completion(retry_prompt)
            print("\n— Retry Transformation Code —\n", t_code)
            exec_generated_code(t_code, globals())
            if "output" not in globals():
                raise RuntimeError("Retry produced no `output`")

            v_code = get_completion(build_validator_prompt())
            print("\n— Retry Validator Code —\n", v_code)
            exec_generated_code(v_code, globals())      # bricht hier, wenn wieder Fehler
        # 3) Report + Persist  -------------------------------------------------
        completeness_report(df, output, valid_output,
                            valid_output, [],          # Platzhalter
                            "cleaned")
        
        SAVE_DIR.mkdir(parents=True, exist_ok=True)
        out_path = SAVE_DIR / Path(DATA_PATH).name
        pd.DataFrame(valid_output).to_csv(out_path, index=False)
        print(f"\n✅ Bereinigter Datensatz gespeichert unter: {out_path}")

    except Exception:
        traceback.print_exc()

if __name__ == "__main__":
    main()

Merging

In [None]:
# ─── Fuzzy-Merge aller Clean-CSVs (Titel-Normalisierung + ±1-Jahr) ──────
import pandas as pd
from pathlib import Path
import numpy as np
import re, textwrap, datetime as dt

try:
    from unidecode import unidecode          # Akzente ➜ ASCII
except ImportError:
    unidecode = lambda s: s                  # Fallback, falls Paket fehlt

# Basisordner: hier liegen die bereinigten Einzel-CSVs
CLEAN_DIR  = Path("cleaned")

# Zielpfade
MERGE_OUT  = Path("merged/all_movies_wide_fuzzy.csv")
DUPL_OUT   = Path("merged/all_movies_fuzzy_duplicates.csv")

# 0. Kanonisches Spaltenmapping  ---------------------------------------------
CANON = {
    "imdb_data":              "rating_imdb",
    "movielens_aggregated":   "rating_movielens",
    "metacritic_movies":      "rating_metacritic",
    "rotten_tomatoes_movies": "rating_rt_audience",
}

# 1. Clean-CSVs einsammeln  ---------------------------------------------------
frames = []
for csv in CLEAN_DIR.glob("*.csv"):
    name = csv.name
    if (
        name.endswith(("merged.csv", "all_movies_wide.csv", "all_movies_wide_fuzzy.csv"))
        or "_invalid_records" in name
        or "_duplicates"      in name
    ):
        continue

    src = csv.stem                      # imdb_data, movielens_aggregated, …
    df  = pd.read_csv(csv)

    # Jahr-Spalte vereinheitlichen
    if "year" in df.columns and "release_year" not in df.columns:
        df = df.rename(columns={"year": "release_year"})

    # Rating-Spalte vereinheitlichen
    rating_col = CANON.get(src, f"rating_{src}")      # Fallback wie vorher
    df = df.rename(columns={"rating": rating_col})

    df["source"] = src
    frames.append(df)

if not frames:
    raise RuntimeError(f"Keine geeigneten Clean-CSVs in {CLEAN_DIR}")

long_df = pd.concat(frames, ignore_index=True)

# 2. Titel aggressiv normalisieren  ------------------------------------------
def norm_title(title: str) -> str:
    if not isinstance(title, str):
        return ""
    t = unidecode(title).lower()
    t = re.sub(r"\s*\(\d{4}\)$", "", t)          # (YYYY) am Ende entfernen
    t = re.sub(r"\s*\([^)]*\)$", "", t)          # beliebige Alias-Klammer am Ende
    t = re.sub(r"[^\w\s]", " ", t)               # Satzzeichen → Leerzeichen
    t = re.sub(r"\bthe\s*$", "", t)              # trailing "the"
    t = re.sub(r"\s+", " ", t).strip()

    # doppelte Tokens entfernen
    seen, tokens = set(), []
    for tok in t.split():
        if tok not in seen:
            tokens.append(tok); seen.add(tok)
    return " ".join(tokens)

long_df["norm_title"]   = long_df["title"].map(norm_title)
long_df["release_year"] = pd.to_numeric(long_df["release_year"],
                                        errors="coerce").astype("Int64")

# 3. Jahr-Cluster innerhalb ±1  ----------------------------------------------
def year_cluster(sub):
    years = sorted(set([y for y in sub["release_year"].dropna()]))
    clusters, cid = {}, 0
    for y in years:
        if any(abs(y - c) <= 1 for c in clusters.get(cid, [])):
            clusters[cid].append(y)
        else:
            cid += 1; clusters[cid] = [y]
    mapping = {y: c for c, ys in clusters.items() for y in ys}
    return sub["release_year"].map(mapping).fillna(cid + 1).astype(int)

long_df["year_cluster"] = (
    long_df.groupby("norm_title", group_keys=False)
           .apply(year_cluster)
)

# 4. Genres-Priorität (erste nicht-leere Liste) -------------------------------
genres_map = (
    long_df
      .sort_values("source")
      .groupby(["norm_title", "year_cluster"])["genres"]
      .first()
      .rename("genres_any")
      .reset_index()
)

# 5. Pivot → Wide  ------------------------------------------------------------
rating_cols = [c for c in long_df.columns if c.startswith("rating_")]
wide = (
    long_df
      .pivot_table(index=["norm_title", "year_cluster"],
                   columns="source",
                   values=rating_cols,
                   aggfunc="first")
      .reset_index()
)

# Spaltenindex flatten
wide.columns = [
    col[0] if isinstance(col, tuple) else col
    for col in wide.columns
]

# 6. Repräsentativen Titel & kleinstes Jahr wählen ----------------------------
meta = (
    long_df
      .sort_values(["source", "title"])
      .groupby(["norm_title", "year_cluster"])
      .agg({"title": "first", "release_year": "min"})
      .reset_index()
)
wide = wide.merge(meta, on=["norm_title", "year_cluster"], how="left")

# 7. Genres mergen  -----------------------------------------------------------
wide = wide.merge(genres_map, on=["norm_title", "year_cluster"], how="left")

# --- Zwischenergebnis VOR dem ≥2-Ratings-Filter speichern ---------------
UNFILTERED_OUT = Path("merged/all_movies.csv")   # bewusst anderer Name
wide.to_csv(UNFILTERED_OUT, index=False)
print(f"💾 Ungefilterter Wide-Frame gespeichert: {UNFILTERED_OUT}")

# 8. Nur Filme mit ≥ 2 vorhandenen Ratings behalten ---------------------------
rating_cols = [c for c in wide.columns if c.startswith("rating_")]
wide = wide[wide[rating_cols].notna().sum(axis=1) >= 2]

# 9. Spalten sortieren & aufräumen -------------------------------------------
cols_order = ["title", "release_year", "genres_any"] + rating_cols
wide = wide[cols_order].rename(columns={"genres_any": "genres"})

# 10. Fuzzy-Duplikate (falls dennoch Gleiches übrig) --------------------------
dup_mask  = wide.duplicated(subset=["title", "release_year"], keep=False)
duplicates = wide[dup_mask].copy()
uniques    = wide[~dup_mask].copy()

# 11. Speichern  --------------------------------------------------------------
MERGE_OUT.parent.mkdir(parents=True, exist_ok=True)
uniques.to_csv(MERGE_OUT, index=False)
duplicates.to_csv(DUPL_OUT, index=False)

print(textwrap.dedent(f"""
  — Fuzzy-Merge abgeschlossen ({dt.date.today()}) —
  Eingelesene Quellen : {len(frames)}
  Long-Records        : {len(long_df)}
  Filme mit ≥2 Votes  : {len(uniques)}
  Fuzzy-Duplikate     : {len(duplicates)}
  👉 Gemergt  : {MERGE_OUT}
  👉 Duplikate: {DUPL_OUT}
""").strip())

Superscore

In [None]:
# ─── Superscore (0-10 Skala, ungewichtet – wie static_pipeline) ──────────
import pandas as pd
import numpy as np
import textwrap, datetime as dt
from pathlib import Path

MERGED      = Path("testing/cleaned/merged/all_movies_wide_fuzzy.csv")
OUT_CSV     = Path("testing/cleaned/merged/all_movies_superscore_static.csv")
MIN_RATINGS = 2

df = pd.read_csv(MERGED)
rating_cols = [c for c in df.columns if c.startswith("rating_")]
if not rating_cols:
    raise RuntimeError("Keine rating_*-Spalten gefunden.")

# 1. Quelle → 0-10-Normierung  (heuristisch wie in static_pipeline)
def to_0_10(series):
    if series.dropna().empty:
        return series
    mx = series.max()
    # 0-5 → *2  (MovieLens)
    if mx <= 5.5:
        return series * 2
    # 0-100 → /10 (Metacritic, RT%)
    if mx > 10:
        return series / 10
    # sonst 0-10 unverändert (IMDb & Co)
    return series

norm_cols = []
for col in rating_cols:
    ncol = col.replace("rating_", "") + "_norm"   # imdb_norm, metacritic_norm …
    df[ncol] = to_0_10(df[col])
    norm_cols.append(ncol)

# 2. Anzahl verfügbarer normierte Ratings
df["num_available_ratings"] = df[norm_cols].notna().sum(axis=1)

# 3. Nur Filme mit ≥ MIN_RATINGS in Berechnung einbeziehen
mask = df["num_available_ratings"] >= MIN_RATINGS
df.loc[mask, "superscore_mean_0_10"]   = df.loc[mask, norm_cols].mean(axis=1).round(1)
df.loc[mask, "superscore_median_0_10"] = df.loc[mask, norm_cols].median(axis=1).round(1)

# 4. Speichern
df.to_csv(OUT_CSV, index=False)

print(textwrap.dedent(f"""
  — Superscore-Static abgeschlossen ({dt.date.today()}) —
  Rating-Spalten erkannt : {rating_cols}
  Normierte Spalten      : {norm_cols}
  Filme mit ≥{MIN_RATINGS} Ratings : {mask.sum()} / {len(df)}
  👉 Ergebnis : {OUT_CSV}
""").strip())