In [None]:
!pip -q install requests tqdm

In [None]:
import os
os.environ["ITAD_API_KEY"] = "ae3e5f344491238a7acbb2c5850f62f8d9d1aa47"

In [None]:
# =========================================================
# Module 0 — Setup & Config (API key + JSON config)
# =========================================================
# If running in Colab, uncomment:
# !pip -q install requests tqdm

import os, time, json
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime, timedelta, timezone

import numpy as np
import pandas as pd
import requests
from tqdm import tqdm

# --- ITAD API constants ---
ITAD_BASE = "https://api.isthereanydeal.com"
STEAM_SHOP_ID = 61  # Steam

# --- Config JSON: write once if missing, else read ---
CFG_PATH = Path("steam_panel_config.json")
DEFAULT_CFG = {
    "countries": ["AR","AU","BR","CA","CN","DE","FR","GB","JP","PL","TR","US"],
    "weeks_back": 104,
    "label_horizon_weeks": 8
}
if not CFG_PATH.exists():
    CFG_PATH.write_text(json.dumps(DEFAULT_CFG, indent=2), encoding="utf-8")

cfg = json.loads(CFG_PATH.read_text())
COUNTRIES = cfg["countries"]
WEEKS_BACK = int(cfg["weeks_back"])

# --- ITAD API key (prefer environment variable) ---
# Example for Colab: os.environ["ITAD_API_KEY"] = "YOUR_KEY"
os.environ.setdefault("ITAD_API_KEY", "ae3e5f344491238a7acbb2c5850f62f8d9d1aa47")
ITAD_API_KEY = os.getenv("ITAD_API_KEY")
if not ITAD_API_KEY or ITAD_API_KEY == "REPLACE_ME_WITH_YOUR_ITAD_KEY":
    raise ValueError("Set ITAD_API_KEY via environment or inline string before running.")

pd.options.display.max_rows = 10
pd.options.display.width = 160


In [None]:
# =========================================================
# Module 1 — Read CSVs and build the release dimension
# Inputs required: steam_top_sellers.csv, steam_prices.csv
# =========================================================

def read_csv_flexible(path_main: str, alt_names: List[str] = None) -> pd.DataFrame:
    """Try main path first; if not found, try alternates."""
    alt_names = alt_names or []
    try:
        return pd.read_csv(path_main)
    except FileNotFoundError:
        for alt in alt_names:
            try:
                return pd.read_csv(alt)
            except FileNotFoundError:
                pass
        raise

# Must contain at least `appid`
df_sellers = read_csv_flexible("steam_top_sellers.csv", ["steam_top_sellers (1).csv"])
# Should contain release info per appid + (country/cc)
df_prices  = read_csv_flexible("steam_prices_normalized.csv", ["steam_prices_normalized (1).csv"])

df_sellers.columns = [c.strip() for c in df_sellers.columns]
df_prices.columns  = [c.strip() for c in df_prices.columns]

def build_release_dim(df_prices: pd.DataFrame) -> pd.DataFrame:
    """
    Normalize release info into a stable schema:
    [appid, country, release_time, release_price, release_currency, release_country].

    Column mapping (priority order):
      - release_time   <= release_date_iso (ISO) or release_date (legacy human format)
      - release_price  <= initial
      - release_currency <= currency
      - country        <= country (or 'cc' if present)
    """
    rel = df_prices.copy()
    rel.columns = [c.strip() for c in rel.columns]

    # --- Map to canonical names (use ISO first, then legacy) ---
    rename_map = {}
    if "release_date_iso" in rel.columns:
        rename_map["release_date_iso"] = "release_time"
    elif "release_date" in rel.columns:
        rename_map["release_date"] = "release_time"

    if "initial" in rel.columns:
        rename_map["initial"] = "release_price"
    if "currency" in rel.columns:
        rename_map["currency"] = "release_currency"
    if "cc" in rel.columns and "country" not in rel.columns:
        rename_map["cc"] = "country"

    rel = rel.rename(columns=rename_map)

    # --- Types & basic cleaning ---
    rel["appid"] = (
        pd.to_numeric(rel["appid"], errors="coerce")
          .astype("Int64")
          .astype(str)
    )

    if "country" in rel.columns:
        rel["country"] = rel["country"].astype(str).str.upper()
    else:
        rel["country"] = pd.NA

    if "release_currency" in rel.columns:
        rel["release_currency"] = rel["release_currency"].astype(str).str.upper()

    # Default release_country to country if not explicitly provided
    rel["release_country"] = rel.get("release_country", rel.get("country"))

    # --- Parse release_time robustly ---
    if "release_time" in rel.columns:
        # ISO parses naturally; legacy strings will be coerced if any remain.
        rel["release_time"] = (
            pd.to_datetime(rel["release_time"], errors="coerce", utc=True)
              .dt.tz_convert(None)
        )

    if "release_price" in rel.columns:
        rel["release_price"] = pd.to_numeric(rel["release_price"], errors="coerce")

    # --- Keep canonical columns only ---
    cols = ["appid", "country", "release_time",
            "release_price", "release_currency", "release_country"]
    rel = rel[[c for c in cols if c in rel.columns]]

    # --- One row per (appid, country); choose earliest release_time when available ---
    if "release_time" in rel.columns:
        rel = (rel.sort_values(["appid", "country", "release_time"])
                 .drop_duplicates(["appid", "country"], keep="first"))
    else:
        rel = rel.drop_duplicates(["appid", "country"], keep="first")

    return rel



In [None]:
# --- Build and persist release_dim (restart-safe artifact) ---
rel = build_release_dim(df_prices)

# Basic hygiene
need = {"appid","country","release_time","release_price","release_currency"}
assert need.issubset(set(rel.columns)), f"release_dim missing: {need - set(rel.columns)}"
dup = rel.duplicated(["appid","country"]).sum()
assert dup == 0, f"release_dim has {dup} duplicate (appid,country) keys"

# Persist both parquet (preferred) and csv (fallback)
rel.to_parquet("release_dim.parquet", index=False)
rel.to_csv("release_dim.csv", index=False)
print("[M1] release_dim saved -> release_dim.parquet (and csv); shape:", rel.shape)



# --- Build or recover `appids` for lookup (robust to kernel restarts) ---
# Try using an existing `appids`; if missing, rebuild from sellers CSV.
try:
    appids  # if already defined by Module 1, this will succeed
except NameError:
    # Re-load sellers if needed (works even if M1 wasn't run)
    try:
        df_sellers
    except NameError:
        df_sellers = pd.read_csv("steam_top_sellers.csv")
        df_sellers.columns = [c.strip() for c in df_sellers.columns]

    # Normalize to string appids safely (handles floats like 12345.0)
    appids = (
        pd.to_numeric(df_sellers["appid"], errors="coerce")
          .dropna()
          .astype(int)
          .astype(str)
          .unique()
          .tolist()
    )

print(f"[M2] total appids for lookup: {len(appids)}")
assert len(appids) > 0, "No appids found for /lookup. Check steam_top_sellers.csv."


[M1] release_dim saved -> release_dim.parquet (and csv); shape: (513, 6)
[M2] total appids for lookup: 50


In [None]:
# =========================================================
# Module 2 — Map appid -> gid (Steam, shop_id=61)
# =========================================================



def lookup_gids_for_appids(appids: List[str], shop_id: int = STEAM_SHOP_ID,
                           batch: int = 100) -> Dict[str, str]:
    """POST /lookup/id/shop/{shop_id}/v1 to translate 'app/{appid}' to GIDs."""
    url = f"{ITAD_BASE}/lookup/id/shop/{shop_id}/v1"
    out = {}
    for i in tqdm(range(0, len(appids), batch), desc="Lookup GIDs"):
        chunk = appids[i:i+batch]
        payload = [f"app/{a}" for a in chunk]
        r = requests.post(url, params={"key": ITAD_API_KEY}, json=payload, timeout=30)
        r.raise_for_status()
        data = r.json()
        for k, v in data.items():
            app = k.split("/", 1)[1]  # "app/12345" -> "12345"
            if v:
                out[app] = v
        time.sleep(0.2)  # gentle throttle
    return out

gid_map = lookup_gids_for_appids(appids)
df_gid  = pd.DataFrame({"appid": list(gid_map.keys()), "gid": list(gid_map.values())})
print("Mapped:", len(df_gid), "/", len(appids))



Lookup GIDs: 100%|██████████| 1/1 [00:01<00:00,  1.19s/it]

Mapped: 50 / 50





In [None]:
# =========================================================
# Module 3 — Pull ITAD history and flatten to raw events (Steam only)
# =========================================================

def _extract_events_list(data) -> list:
    """Tolerant extractor for event arrays nested under several possible keys."""
    if isinstance(data, dict):
        for k in ("data","history","list","events"):
            if k in data and isinstance(data[k], list):
                return data[k]
        return []
    return data or []

def _amount(d: Optional[dict]) -> Optional[float]:
    """Normalize ITAD amount fields to float or None."""
    if not isinstance(d, dict):
        return None
    if d.get("amount") is not None:
        return float(d["amount"])
    if d.get("amountInt") is not None:
        return float(d["amountInt"]) / 100.0
    return None

def fetch_history(gid: str, country: str, since_days: int = 365*12,
                  max_retries=3, backoff=0.8) -> list:
    url = f"{ITAD_BASE}/games/history/v2"
    since = (
        datetime.now(timezone.utc) - timedelta(days=since_days)
    ).isoformat(timespec="seconds").replace("+00:00","Z")

    def do_req(params: dict) -> list:
        attempt = 0
        while True:
            try:
                r = requests.get(url, params=params, timeout=30)
                r.raise_for_status()
                return _extract_events_list(r.json())
            except requests.HTTPError as e:
                code = e.response.status_code if e.response is not None else None
                if code and 500 <= code < 600 and attempt < max_retries:
                    time.sleep(backoff * (2**attempt)); attempt += 1; continue
                raise

    # Try with country
    params = {"key": ITAD_API_KEY, "id": gid, "country": country.lower(), "since": since}
    events = do_req(params)
    # Fallback without country if empty
    if not events:
        params2 = {"key": ITAD_API_KEY, "id": gid, "since": since}
        events = do_req(params2)
    return events

def flatten_events(gid: str, country: str, events: list, appid: str) -> list:
    """Turn ITAD events into a tidy row list."""
    rows = []
    for e in events or []:
        ts = pd.to_datetime(
            e.get("timestamp") or e.get("recorded") or e.get("time"),
            utc=True, errors="coerce"
        )
        if pd.isna(ts):
            continue
        deal = e.get("deal") or {}
        shop = e.get("shop") or {}
        price   = _amount(deal.get("price"))
        regular = _amount(deal.get("regular"))
        cut_raw = deal.get("cut")
        cut     = float(cut_raw) if cut_raw is not None else None
        curr    = (deal.get("price") or {}).get("currency")
        shop_id = int(shop.get("id") or 0)
        rows.append({
            "appid": str(appid),
            "gid": str(gid),
            "country": country,
            "timestamp": ts.tz_convert(None),
            "price": None if price   is None else float(price),
            "regular": None if regular is None else float(regular),
            "cut": cut,
            "currency": curr,
            "shop_id": shop_id
        })
    return rows

def collect_events_for_universe(df_gid: pd.DataFrame, countries: List[str]) -> pd.DataFrame:
    """Fetch and flatten events for all (appid x country)."""
    event_rows = []
    for _, r in tqdm(df_gid.iterrows(), total=len(df_gid), desc="Fetch events"):
        gid, appid = r["gid"], str(r["appid"])
        for cc in countries:
            ev = fetch_history(gid, cc, since_days=365*12)
            event_rows += flatten_events(gid, cc, ev, appid)
    df_events = pd.DataFrame(event_rows)
    if df_events.empty:
        return df_events
    df_events = df_events[df_events["shop_id"] == STEAM_SHOP_ID].copy()
    df_events["country"] = df_events["country"].str.upper()
    df_events.sort_values(["appid","country","timestamp"], inplace=True)
    return df_events

df_events = collect_events_for_universe(df_gid, COUNTRIES)
print("Raw events pulled (Steam only):", df_events.shape)
df_events.to_csv("steam_events_raw.csv", index=False)  # audit/reuse


Fetch events: 100%|██████████| 50/50 [08:42<00:00, 10.45s/it]


Raw events pulled (Steam only): (20416, 9)


In [None]:
# =========================================================
# Module 4 — Join release info & create light event labels
#   - SALE / BASE_PRICE_CHANGE / CURRENCY_CHANGE / OTHER
#   - No thresholding here and no static baseline price.
#   - Robust to pre-release drop and currency boundaries.
# =========================================================
# --- Module-4 bootstrap: ensure release_dim is available ---
import os, pandas as pd
import numpy as np
def ensure_release_dim(df_prices: pd.DataFrame = None) -> pd.DataFrame:
    """Load release_dim from artifact, or rebuild from df_prices if needed."""
    # 1) use in-memory var if present
    if "release_dim" in globals():
        return globals()["release_dim"]

    # 2) try persisted artifact
    if os.path.exists("release_dim.parquet"):
        rel = pd.read_parquet("release_dim.parquet")
        print("[M4] loaded release_dim.parquet:", rel.shape)
        return rel
    if os.path.exists("release_dim.csv"):
        rel = pd.read_csv("release_dim.csv", parse_dates=["release_time"])
        print("[M4] loaded release_dim.csv:", rel.shape)
        return rel

    # 3) rebuild from prices if provided
    if df_prices is not None:
        rel = build_release_dim(df_prices)
        print("[M4] rebuilt release_dim from df_prices:", rel.shape)
        return rel

    raise FileNotFoundError(
        "release_dim not in memory and no artifact on disk. "
        "Run Module 1 or pass df_prices to rebuild."
    )

# If you already have df_prices in memory, you can pass it; else leave None.
release_dim = ensure_release_dim(df_prices=None)

# Final sanity + sort for stable merges
release_dim["appid"] = release_dim["appid"].astype(str)
release_dim["country"] = release_dim["country"].astype(str).str.upper().str.strip()
release_dim = release_dim.sort_values(["appid","country"])

print("[M4] release_dim ready. shape=", release_dim.shape)

# --- now continue Module 4 as usual ---
df_raw_enriched = attach_release_info(df_events, release_dim)


# -----------------------------------------------
# 0) Join release info
# -----------------------------------------------
def attach_release_info(df_events: pd.DataFrame, release_dim: pd.DataFrame) -> pd.DataFrame:
    """
    Left-join release info, then guarantee presence of release_* columns with safe defaults:
      - release_time      -> NaT if missing
      - release_price     -> NaN if missing
      - release_currency  -> fallback to current event currency
      - release_country   -> fallback to current event country
    """
    out = df_events.copy()
    out["appid"] = out["appid"].astype(str)
    out["country"] = out["country"].astype(str).str.upper()

    out = out.merge(release_dim, on=["appid", "country"], how="left", validate="m:1")

    # Safety nets: ensure downstream never KeyErrors on these columns
    if "release_time" not in out.columns:
        out["release_time"] = pd.NaT
    if "release_price" not in out.columns:
        out["release_price"] = np.nan
    if "release_currency" not in out.columns:
        out["release_currency"] = out.get("currency")  # same currency => deltas are meaningful
    if "release_country" not in out.columns:
        out["release_country"] = out.get("country")

    return out


# -----------------------------------------------
# 1) Primary labeling (pre-drop, light rules)
# -----------------------------------------------
def label_events_raw(df_raw: pd.DataFrame) -> pd.DataFrame:
    """
    Boolean-only labeling.

    Steps:
      0) Clean price/regular (≤0 treated as missing) and drop rows where both are NaN
      0b) Normalize currency fields to string to avoid tri-state booleans
      1) Flag sale
      2) Flag currency change (including group-start currency mismatch vs release currency)
      3) Base price change within same-currency stretches (initial pass; will be recomputed post-drop)
      4) Release-day flags
      5) Release-relative deltas (computed only when currencies match)
    """
    gcols = ["appid", "country"]
    df = df_raw.copy()

    # --- 0) Clean + drop pure-noise rows ---
    for c in ("price", "regular"):
        df[c] = pd.to_numeric(df.get(c), errors="coerce")
        df.loc[df[c] <= 0, c] = pd.NA
    df = df[~(df["price"].isna() & df["regular"].isna())].copy()

    # --- 0b) Normalize currency fields to string (robust comparisons) ---
    for c in ("currency", "release_currency"):
        if c in df.columns:
            df[c] = df[c].astype(str)

    # Stable ordering for groupwise logic
    df = df.sort_values(gcols + ["timestamp"], kind="mergesort").copy()

    # --- 1) Sale rule ---
    df["is_sale"] = (
        pd.to_numeric(df.get("cut"), errors="coerce").fillna(0) > 0
    ) | (
        df["price"].notna() & df["regular"].notna() & (df["price"] < df["regular"])
    )

    # --- 2) Currency change flags ---
    df["prev_currency"] = df.groupby(gcols, sort=False)["currency"].shift()
    df["is_currency_change"] = (
        df["currency"].notna() & df["prev_currency"].notna() &
        df["currency"].ne(df["prev_currency"])
    )

    # Group-start mismatch vs release currency -> mark as at_start
    df["_is_group_start"] = df.groupby(gcols, sort=False).cumcount().eq(0)
    df["is_currency_change_at_start"] = (
        df["_is_group_start"] &
        df["currency"].notna() &
        df.get("release_currency").notna() &
        df["currency"].ne(df["release_currency"])
    )
    # Consider group-start mismatch as currency change too (for completeness)
    df.loc[df["is_currency_change_at_start"], "is_currency_change"] = True

    # --- 3) Initial base price change (will be recomputed post-drop anyway) ---
    df["is_base_change"] = False
    for (_, _), g in df.groupby(gcols, sort=False):
        anchor = None
        last_ccy = None
        for idx, r in g.iterrows():
            ccy = r["currency"]
            reg = r["regular"]

            # Reset on currency boundary or detected currency change
            if (last_ccy is None) or (ccy != last_ccy) \
               or bool(r["is_currency_change"]) or bool(r["is_currency_change_at_start"]):
                anchor = None
                last_ccy = ccy

            # Only consider non-sale rows with a valid regular price
            if (not bool(r["is_sale"])) and pd.notna(reg):
                if anchor is None:
                    anchor = float(reg)  # first anchor is not a change
                else:
                    thr = max(0.1, 0.01 * anchor)  # ignore tiny ticks
                    if abs(float(reg) - anchor) > thr:
                        df.at[idx, "is_base_change"] = True
                        anchor = float(reg)
                    else:
                        anchor = float(reg)

    # --- 4) Release-day flags ---
    ts_day  = pd.to_datetime(df["timestamp"]).dt.normalize()
    rel_day = pd.to_datetime(df.get("release_time")).dt.normalize()
    df["is_release_day"] = rel_day.notna() & ts_day.eq(rel_day)
    df["is_release_day_sale"] = df["is_release_day"] & df["is_sale"]
    df["is_release_day_base_change"] = df["is_release_day"] & df["is_base_change"]

    # --- 5) Deltas vs release (only when currencies match) ---
    df["days_since_release"] = (ts_day - rel_day).dt.days
    same_ccy = df["currency"].eq(df.get("release_currency")).fillna(False)
    df["delta_from_release_price"] = np.where(
        same_ccy, df["price"] - df["release_price"], np.nan
    )
    df["delta_pct_from_release"] = np.where(
        same_ccy & df["release_price"].gt(0).fillna(False),
        df["price"] / df["release_price"] - 1.0,
        np.nan
    )

    # Cleanup temp
    df.drop(columns=["prev_currency", "_is_group_start"], inplace=True, errors="ignore")
    return df


# -----------------------------------------------
# 2) Post-drop helpers
# -----------------------------------------------
def reflag_group_start_currency_mismatch(df: pd.DataFrame) -> pd.DataFrame:
    """
    After any row filtering (e.g., dropping pre-release), group starts change.
    Recompute 'is_currency_change_at_start' at the new group heads and
    ensure 'is_currency_change' is also True for those rows.
    """
    d = df.copy()
    for c in ("currency", "release_currency"):
        if c in d.columns:
            d[c] = d[c].astype(str)

    gcols = ["appid","country"]
    d = d.sort_values(gcols + ["timestamp"], kind="mergesort").copy()
    d["_is_group_start_new"] = d.groupby(gcols, sort=False).cumcount().eq(0)

    at_start = (
        d["_is_group_start_new"] &
        d["currency"].notna() &
        d.get("release_currency").notna() &
        d["currency"].ne(d["release_currency"])
    )

    # Set/overwrite the flags at new group heads
    if "is_currency_change_at_start" not in d.columns:
        d["is_currency_change_at_start"] = False
    d.loc[:, "is_currency_change_at_start"] = d["is_currency_change_at_start"] | at_start

    if "is_currency_change" not in d.columns:
        d["is_currency_change"] = False
    d.loc[at_start, "is_currency_change"] = True

    d.drop(columns=["_is_group_start_new"], inplace=True, errors="ignore")
    return d


def recompute_is_base_change(df, abs_thr=0.10, rel_thr=0.01) -> pd.DataFrame:
    """
    Recompute `is_base_change` within (appid, country) using an anchor-based logic:
      - Only evaluate on NON-sale rows with valid `regular`.
      - Reset anchor on currency boundary or explicit currency-change flags.
      - A change is flagged if |regular - anchor| > max(abs_thr, rel_thr * anchor).
    """
    d = df.copy()

    # Ensure numeric + sanitize non-positive regulars
    if "regular" in d.columns:
        d["regular"] = pd.to_numeric(d["regular"], errors="coerce")
        d.loc[d["regular"] <= 0, "regular"] = np.nan

    # Normalize currency fields to string
    for c in ("currency", "release_currency"):
        if c in d.columns:
            d[c] = d[c].astype(str)

    gcols = ["appid", "country"]
    if not set(gcols).issubset(d.columns):
        raise ValueError("Missing required grouping columns: appid/country")
    d = d.sort_values(gcols + ["timestamp"], kind="mergesort").copy()

    def _flags(group: pd.DataFrame) -> pd.Series:
        anchor = None
        last_ccy = None
        out = []
        for _, r in group.iterrows():
            ccy = str(r.get("currency"))
            reg = r.get("regular")
            is_sale = bool(r.get("is_sale", False))
            ccy_changed = bool(r.get("is_currency_change", False)) or bool(r.get("is_currency_change_at_start", False))

            # Reset anchor on currency boundary or currency-change flags
            if (last_ccy is None) or (ccy != last_ccy) or ccy_changed:
                anchor = None
                last_ccy = ccy

            flag = False
            if (not is_sale) and pd.notna(reg):
                reg = float(reg)
                thr = max(abs_thr, rel_thr * anchor) if anchor is not None else None
                if anchor is None:
                    anchor = reg
                else:
                    if abs(reg - anchor) > thr:
                        flag = True
                        anchor = reg
                    else:
                        anchor = reg
            out.append(flag)
        return pd.Series(out, index=group.index)

    d["is_base_change"] = (
        d.groupby(gcols, sort=False, group_keys=False)
         .apply(_flags)
         .astype(bool)
    )
    return d


def neutralize_release_day_mismatch(df):
    """
    If the currency on release-day differs from the release currency,
    mark `is_currency_change_at_start=True` and null out delta columns to keep same-currency convention.
    """
    d = df.copy()
    needed = {"is_release_day", "currency", "release_currency"}
    if not needed.issubset(d.columns):
        return d

    d["currency"] = d["currency"].astype(str)
    d["release_currency"] = d["release_currency"].astype(str)

    mask = d["is_release_day"].fillna(False) & (d["currency"] != d["release_currency"])
    if "is_currency_change_at_start" in d.columns:
        d.loc[mask, "is_currency_change_at_start"] = True
    else:
        d["is_currency_change_at_start"] = False
        d.loc[mask, "is_currency_change_at_start"] = True

    for c in ("delta_from_release_price", "delta_pct_from_release"):
        if c in d.columns:
            d.loc[mask, c] = np.nan
    return d


# -----------------------------------------------
# From M3 to M4: attach + label, then persist
# (Only the post-drop pipeline has a small insertion)
# -----------------------------------------------

# 1) Attach release info
df_raw_enriched = attach_release_info(df_events, release_dim)

# 2) Primary labels (pre-drop)
df_raw_enriched = label_events_raw(df_raw_enriched)

# 3) Remove F2P rows (unchanged)
F2P_APPIDS = {"1085660","1222670","230410","236390","2767030","730"}
df_raw_enriched = df_raw_enriched[~df_raw_enriched["appid"].isin(F2P_APPIDS)].copy()

# 4) Drop pre-release rows (unchanged)
df_raw_enriched = df_raw_enriched[
    df_raw_enriched["release_time"].isna() |
    (df_raw_enriched["timestamp"] >= df_raw_enriched["release_time"])
].copy()

# 5) NEW: Re-flag group-start currency mismatch AFTER the drop
#    (fixes the 36/51 start-of-group currency mismatch that were not flagged)
df_raw_enriched = reflag_group_start_currency_mismatch(df_raw_enriched)

# 6) Recompute base changes (anchor method) on the post-drop table (unchanged intent)
df_raw_enriched = recompute_is_base_change(df_raw_enriched, abs_thr=0.10, rel_thr=0.01)

# 7) (Optional) Neutralize deltas for release-day currency mismatch (unchanged)
df_raw_enriched = neutralize_release_day_mismatch(df_raw_enriched)

# 8) Persist (unchanged)
df_raw_enriched.to_csv("steam_events_raw_enriched.csv", index=False)
df_raw_enriched = pd.read_csv("steam_events_raw_enriched.csv", parse_dates=["timestamp","release_time"])

# 9) Guardrails (unchanged)
need = {
    "is_sale","is_currency_change","is_currency_change_at_start",
    "is_base_change","is_release_day","is_release_day_sale",
    "is_release_day_base_change","days_since_release",
    "delta_from_release_price","delta_pct_from_release",
}
missing = [c for c in need if c not in df_raw_enriched.columns]
assert not missing, f"M4 incomplete; missing: {missing}"

# (Optional) sanity print
head_idx = (
    df_raw_enriched.sort_values(["appid","country","timestamp"], kind="mergesort")
                   .groupby(["appid","country"]).head(1).index
)
print("First-row base_change AFTER recompute:",
      int(df_raw_enriched.loc[head_idx, "is_base_change"].sum()))

[M4] loaded release_dim.parquet: (513, 6)
[M4] release_dim ready. shape= (513, 6)


  .apply(_flags)


First-row base_change AFTER recompute: 0


In [None]:
# --- Sanity checks (should all be zero) ---
d = df_raw_enriched.sort_values(["appid","country","timestamp"], kind="mergesort").copy()

# A) group-start currency mismatch should be flagged as at_start
head = d.groupby(["appid","country"], sort=False).head(1).copy()
need = head["currency"].astype(str) != head["release_currency"].astype(str)
miss = need & ~head["is_currency_change_at_start"].fillna(False)
print("start_mismatch_total:", int(need.sum()))
print("start_mismatch_not_flagged:", int(miss.sum()))

# B) cross-currency rows must have NaN deltas
bad_delta = (d["currency"].astype(str) != d["release_currency"].astype(str)) & d["delta_from_release_price"].notna()
print("delta_nonNaN_on_mismatch:", int(bad_delta.sum()))


start_mismatch_total: 51
start_mismatch_not_flagged: 0
delta_nonNaN_on_mismatch: 0


In [None]:
# ============================================================
# Focused audit & optional removal for (2) ccy_mismatch and (5) price!=regular & cut==0
# - Comments in English
# - Safe saves with auto-mkdir; no scraping (only SteamDB URLs are composed)
# - Requires you already have df_raw_enriched from Module 4 and the helpers:
#   reflag_group_start_currency_mismatch, recompute_is_base_change
#   (If not, paste those helpers from your Module 4.)
# ============================================================
import pandas as pd
import numpy as np
from pathlib import Path

def add_steamdb_urls(df):
    """
    Add SteamDB links with lowercase region:
      - steamdb_url: app main page
      - steamdb_price_url: direct 'Price history' tab
    """
    d = df.copy()
    d["appid"] = d["appid"].astype(str)
    cc = d["country"].astype(str).str.lower()
    base = "https://steamdb.info/app/"
    d["steamdb_url"] = base + d["appid"] + "/?cc=" + cc
    d["steamdb_price_url"] = base + d["appid"] + "/price/?cc=" + cc
    return d

# ---------- 0) Utilities ----------
def _ensure_outdir():
    pref = Path("/mnt/data")
    out_dir = pref if pref.exists() else Path.cwd() / "outputs"
    out_dir.mkdir(parents=True, exist_ok=True)
    return out_dir

def _first_post_on_or_after_release(df: pd.DataFrame) -> pd.DataFrame:
    """Return the first row ON/AFTER release per (appid,country)."""
    d = df.copy()
    d["timestamp"] = pd.to_datetime(d["timestamp"], errors="coerce")
    for c in ("appid","country","currency","release_currency"):
        if c in d.columns:
            d[c] = d[c].astype(str)
    d = d.sort_values(["appid","country","timestamp"], kind="mergesort")
    if "days_since_release" not in d.columns:
        rel_day = pd.to_datetime(d["release_time"], errors="coerce").dt.normalize()
        ts_day  = pd.to_datetime(d["timestamp"],    errors="coerce").dt.normalize()
        d["days_since_release"] = (ts_day - rel_day).dt.days
    mask_post = d["days_since_release"].fillna(0).ge(0)
    first_post = d.loc[mask_post].groupby(["appid","country"], sort=False).head(1).copy()
    # Attach robust SteamDB links (lowercase cc + price tab)
    first_post = add_steamdb_urls(first_post)
    return first_post

def _extract_type_2_and_5(first_post: pd.DataFrame):
    """Build type-2 (ccy mismatch) and type-5 (price!=regular & cut==0) masks and tables."""
    tol = 1e-6
    same_ccy = first_post["currency"].astype(str) == first_post["release_currency"].astype(str)

    # (2) ccy_mismatch at the first post-release row
    t2_mask = ~same_ccy
    t2 = first_post.loc[t2_mask].copy()

    # (5) price != regular but cut==0  (possible hidden sale / mis-typed cut)
    #     Only evaluate when price & regular are present.
    t5_mask = (
        (first_post["cut"].fillna(0) <= 0) &
        first_post["price"].notna() & first_post["regular"].notna() &
        (first_post["price"] - first_post["regular"]).abs().gt(tol)
    )
    t5 = first_post.loc[t5_mask].copy()
    return t2, t5

def drop_first_post_rows(df_enriched: pd.DataFrame,
                         first_post_subset: pd.DataFrame,
                         confirm_col: str | None = None) -> pd.DataFrame:
    """
    Drop the first-post rows (per appid,country) specified by `first_post_subset`.
    If `confirm_col` is provided, only drop rows where that column is True (allows manual curation).
    After dropping, re-flag start-of-group currency mismatch and recompute is_base_change.
    """
    d = df_enriched.copy()
    keys = ["appid","country","timestamp"]

    # Optional manual confirmation column
    if confirm_col is not None and confirm_col in first_post_subset.columns:
        first_post_subset = first_post_subset[first_post_subset[confirm_col].fillna(False)]

    # Prepare dropping keys
    drop_keys = first_post_subset[keys].drop_duplicates().copy()
    for c in ("appid","country"):
        drop_keys[c] = drop_keys[c].astype(str)

    d[keys[0]] = d[keys[0]].astype(str)
    d[keys[1]] = d[keys[1]].astype(str)

    # Mark and drop
    drop_keys["_drop"] = 1
    d2 = d.merge(drop_keys, on=keys, how="left")
    removed = int(d2["_drop"].fillna(0).sum())
    d2 = d2[d2["_drop"].isna()].drop(columns=["_drop"])

    # Post-drop fixes
    d2 = reflag_group_start_currency_mismatch(d2)
    d2 = recompute_is_base_change(d2, abs_thr=0.10, rel_thr=0.01)

    print(f"Dropped {removed} first-post rows; reflagged start-of-group FX; recomputed base changes.")
    return d2

# ---------- 1) Build the audit views for type (2) and (5) ----------
first_post = _first_post_on_or_after_release(df_raw_enriched)
t2_ccy_mismatch, t5_price_neq_reg_cut0 = _extract_type_2_and_5(first_post)

print(f"(2) ccy_mismatch: {len(t2_ccy_mismatch)} rows")
print(f"(5) price!=regular & cut==0: {len(t5_price_neq_reg_cut0)} rows")

# Save the two lists for manual audit (adds an optional 'confirm_delete' column placeholder)
out_dir = _ensure_outdir()
ts = pd.Timestamp.now().strftime("%Y%m%d_%H%M%S")

t2_path = out_dir / f"audit_type2_ccy_mismatch_first_post_{ts}.csv"
t5_path = out_dir / f"audit_type5_price_ne_reg_cut0_first_post_{ts}.csv"

if "confirm_delete" not in t2_ccy_mismatch.columns:
    t2_ccy_mismatch["confirm_delete"] = False  # you can edit this CSV and set True for rows you want to remove

cols_keep = [c for c in [
    "appid","country","timestamp","currency","release_currency",
    "price","regular","release_price","cut",
    "is_sale","is_currency_change","is_currency_change_at_start","is_base_change",
    "is_release_day","days_since_release","steamdb_url","confirm_delete"
] if c in t2_ccy_mismatch.columns]

t2_ccy_mismatch.to_csv(t2_path, index=False, columns=cols_keep)
t5_price_neq_reg_cut0.to_csv(t5_path, index=False)

print("Saved audit CSVs:")
print(" -", t2_path)
print(" -", t5_path)

# ---------- 2) OPTION A: delete ALL type-2 rows immediately (no manual confirmation) ----------
# WARNING: this removes the first-post row for those (appid,country); use only if you are sure.
# df_clean = drop_first_post_rows(df_raw_enriched, t2_ccy_mismatch, confirm_col=None)

# ---------- 3) OPTION B: manual confirm, then delete only confirmed rows ----------
# After you edit the CSV (set confirm_delete=True where you verified mismatch on SteamDB),
# load it back and call drop_first_post_rows with confirm_col="confirm_delete".
# confirmed = pd.read_csv(t2_path, parse_dates=["timestamp"])
# df_clean = drop_first_post_rows(df_raw_enriched, confirmed, confirm_col="confirm_delete")

# ---------- 4) (optional) persist the cleaned table ----------
# out_clean = out_dir / f"steam_events_raw_enriched_cleaned_drop_t2_{ts}.csv"
# df_clean.to_csv(out_clean, index=False)
# print("Cleaned table saved to:", out_clean)


(2) ccy_mismatch: 51 rows
(5) price!=regular & cut==0: 0 rows
Saved audit CSVs:
 - /content/outputs/audit_type2_ccy_mismatch_first_post_20251107_062905.csv
 - /content/outputs/audit_type5_price_ne_reg_cut0_first_post_20251107_062905.csv


In [None]:
# --- PRE-REBASE SANITY CHECK (READ-ONLY) ---
# What countries/currencies would need FX, based on "post-release & currency!=release_currency"?

d = df_raw_enriched.copy()
post = d['days_since_release'].fillna(0).ge(0)
mismatch = d['currency'].astype(str) != d['release_currency'].astype(str)

need_fx_view = d.loc[post & mismatch, ['appid','country','currency','release_currency']]

print("Rows needing FX by country:")
print(need_fx_view['country'].value_counts().head(20))

print("\nRelease_currency among rows needing FX:")
print(need_fx_view['release_currency'].value_counts().head(10))



Rows needing FX by country:
country
PL    1598
TR    1026
JP      80
Name: count, dtype: int64

Release_currency among rows needing FX:
release_currency
PLN    1598
USD    1026
JPY      80
Name: count, dtype: int64


In [None]:
import pandas as pd
import numpy as np

# ---------- you already have this; keep it as-is ----------
def detect_leading_fx_need(df: pd.DataFrame, scope={"TR","PL","JP"}) -> dict:
    d = df.copy()
    d["timestamp"] = pd.to_datetime(d["timestamp"], errors="coerce")
    d["release_time"] = pd.to_datetime(d["release_time"], errors="coerce")
    d["ts_day"] = d["timestamp"].dt.normalize()
    d["rel_day"] = d["release_time"].dt.normalize()
    d["days_since_release"] = (d["ts_day"] - d["rel_day"]).dt.days
    for c in ("appid","country","currency","release_currency"):
        d[c] = d[c].astype(str)
    if scope is not None:
        d = d[d["country"].isin(scope)].copy()

    gcols = ["appid","country"]
    d = d.sort_values(gcols + ["timestamp"], kind="mergesort")

    post = d["days_since_release"].fillna(0).ge(0)
    mismatch = d["currency"].astype(str) != d["release_currency"].astype(str)
    match    = ~mismatch

    post_rank     = post.groupby(d[gcols].apply(tuple, axis=1)).cumsum()
    is_first_post = (post & (post_rank == 1))
    first_post_mismatch = (is_first_post & mismatch)
    grp_key = d[gcols].apply(tuple, axis=1)
    needs_group = first_post_mismatch.groupby(grp_key).transform("max").astype(bool)

    seen_match_after = ((post & match).groupby(grp_key).cumsum() > 0)
    leading_seg_mask = post & needs_group & (~seen_match_after)

    out = {}
    out["rows_by_country"]   = d.loc[leading_seg_mask, "country"].value_counts().to_dict()
    out["groups_by_country"] = (d.loc[leading_seg_mask, gcols]
                                  .drop_duplicates()
                                  .groupby("country").size().to_dict())
    out["impacted_groups_df"] = (d.loc[leading_seg_mask, gcols + ["release_currency"]]
                                   .assign(one=1)
                                   .groupby(gcols + ["release_currency"], as_index=False)["one"].sum()
                                   .rename(columns={"one":"rows_in_leading_segment"})
                                   .sort_values(["country","rows_in_leading_segment","appid"],
                                                ascending=[True,False,True]))
    out["jp_sample_df"] = out["impacted_groups_df"][out["impacted_groups_df"]["country"]=="JP"].head(25)
    out["leading_mask"] = leading_seg_mask
    return out

# --- FIXED: build daily FX over the FULL events date span ---
def build_fx_daily_fixed(ffx: pd.DataFrame,
                         events_df: pd.DataFrame,
                         two_sided_fill: bool = True,
                         alias_map: dict | None = None):
    """
    Make a COMPLETE daily USD-per-unit table for all currencies across the FULL
    [min(ts_day) .. max(ts_day)] span seen in events_df.
    This fixes the 'can't bfill earlier than the currency's own first date' problem.
    Returns: (fx_daily, report)
      - fx_daily: ['ccy','date','usd'] with no gaps if that ccy has at least 1 snapshot
      - report: {'span':(min_date,max_date), 'ccy_with_no_snapshot': [...]}
    """
    fx = ffx.copy()
    fx["date"] = pd.to_datetime(fx["date"], errors="coerce")
    fx["ccy"]  = fx["ccy"].astype(str).str.upper().str.strip()
    if alias_map:
        fx["ccy"] = fx["ccy"].replace(alias_map)

    fx = fx.dropna(subset=["date","ccy","usd_per_unit"]).sort_values(["ccy","date"], kind="mergesort")

    # global span from events_df
    ev = events_df.copy()
    ev["timestamp"] = pd.to_datetime(ev["timestamp"], errors="coerce")
    span_min = ev["timestamp"].min().normalize()
    span_max = ev["timestamp"].max().normalize()
    full_idx = pd.date_range(span_min, span_max, freq="D")

    # build per-ccy on the FULL index
    chunks = []
    ccy_with_no_snapshot = []
    for ccy, g in fx.groupby("ccy", sort=False):
        if g.empty:
            ccy_with_no_snapshot.append(ccy)
            continue
        s = g.set_index("date")["usd_per_unit"].sort_index()
        s = s.reindex(full_idx)             # <-- extend to FULL span
        s = s.ffill()
        if two_sided_fill:
            s = s.bfill()
        df_ccy = pd.DataFrame({"ccy": ccy, "date": s.index, "usd": s.values})
        # If even after ffill+bfill it's all NaN, mark as no snapshot
        if not df_ccy["usd"].notna().any():
            ccy_with_no_snapshot.append(ccy)
        chunks.append(df_ccy)

    fx_daily = pd.concat(chunks, ignore_index=True) if chunks else pd.DataFrame(columns=["ccy","date","usd"])
    report = {"span": (span_min, span_max), "ccy_with_no_snapshot": sorted(set(ccy_with_no_snapshot))}
    return fx_daily, report

    return fx_daily

# --- Strict rebase using a PREBUILT fx_daily (no manual FX editing) ---
def fx_rebase_on_detected_prebuilt(df_enriched: pd.DataFrame,
                                   fx_daily: pd.DataFrame,
                                   detected: dict,
                                   scope_countries: set[str] | None = {"TR","PL","JP"}) -> pd.DataFrame:
    """
    Same semantics as your fx_rebase_on_detected, but consumes a prebuilt fx_daily ['ccy','date','usd'].
    """
    d = df_enriched.copy()
    d["timestamp"] = pd.to_datetime(d["timestamp"], errors="coerce")
    d["release_time"] = pd.to_datetime(d["release_time"], errors="coerce")
    d["ts_day"] = d["timestamp"].dt.normalize()
    if "days_since_release" not in d.columns:
        d["days_since_release"] = (d["ts_day"] - d["release_time"].dt.normalize()).dt.days
    for c in ("appid","country","currency","release_currency"):
        d[c] = d[c].astype(str).str.upper().str.strip()

    mask = detected["leading_mask"]
    if getattr(mask, "index", None) is not d.index:
        mask = mask.reindex(d.index, fill_value=False)
    if scope_countries is not None:
        mask = mask & d["country"].isin(scope_countries)

    if "note_day0_fx_needed" not in d.columns:
        d["note_day0_fx_needed"] = False
    if "note_day0_fx_rebased" not in d.columns:
        d["note_day0_fx_rebased"] = False
    d.loc[mask, "note_day0_fx_needed"] = True

    if not bool(mask.any()):
        print("[Info] No rows to rebase under current scope.")
        return d

    # Map FX from prebuilt grid
    left = d.loc[mask].copy()
    left["_row_id"] = left.index
    ev = fx_daily.rename(columns={"ccy":"currency","usd":"usd_per_event"})
    rl = fx_daily.rename(columns={"ccy":"release_currency","usd":"usd_per_release"})
    left = (left.merge(ev, how="left", left_on=["currency","ts_day"], right_on=["currency","date"]).drop(columns=["date"])
                 .merge(rl, how="left", left_on=["release_currency","ts_day"], right_on=["release_currency","date"]).drop(columns=["date"]))

    ok = left["usd_per_event"].gt(0) & left["usd_per_release"].gt(0)
    left_ok = left.loc[ok].copy()

    ratio = left_ok["usd_per_event"] / left_ok["usd_per_release"]
    for col in ("price","regular"):
        if col in left_ok.columns:
            left_ok[col] = pd.to_numeric(left_ok[col], errors="coerce") * ratio

    left_ok["currency"] = left_ok["release_currency"]
    left_ok["note_day0_fx_rebased"] = True

    row_ids = left_ok["_row_id"].to_numpy()
    d.loc[row_ids, ["price","regular","currency","note_day0_fx_rebased"]] = \
        left_ok[["price","regular","currency","note_day0_fx_rebased"]].to_numpy()

    # light relabel + sanity (same as你的版本)
    d["is_sale"] = (pd.to_numeric(d.get("cut"), errors="coerce").fillna(0) > 0) | (
        d["price"].notna() & d["regular"].notna() & (d["price"] < d["regular"])
    )
    same_ccy = d["currency"] == d["release_currency"]
    d["delta_from_release_price"] = np.where(same_ccy,
        pd.to_numeric(d["price"], errors="coerce") - pd.to_numeric(d["release_price"], errors="coerce"),
        np.nan)
    d["delta_pct_from_release"] = np.where(same_ccy & pd.to_numeric(d["release_price"], errors="coerce").gt(0),
        pd.to_numeric(d["price"], errors="coerce") / pd.to_numeric(d["release_price"], errors="coerce") - 1.0,
        np.nan)

    d_chk = d.sort_values(["appid","country","timestamp"], kind="mergesort")
    fp = d_chk[d_chk["days_since_release"].fillna(0).ge(0)].groupby(["appid","country"], sort=False).head(1)
    ok_first = bool((fp["currency"] == fp["release_currency"]).all())
    print(f"[Sanity] first-post matches release_currency: {ok_first}")
    print(f"[Sanity] rebased rows: {len(row_ids)} | changed countries: {dict(d.loc[row_ids, 'country'].value_counts())}")
    return d




In [None]:
# Detect leading window as before

rows = [
    ("2023-08-03","USD",1.0000), ("2023-08-03","EUR",1.1000), ("2023-08-03","PLN",0.2500), ("2023-08-03","TRY",0.0350), ("2023-08-03","JPY",0.0070),
    ("2024-06-01","USD",1.0000), ("2024-06-01","EUR",1.0800), ("2024-06-01","PLN",0.2600), ("2024-06-01","TRY",0.0300), ("2024-06-01","JPY",0.0068),
    ("2025-06-01","USD",1.0000), ("2025-06-01","EUR",1.0700), ("2025-06-01","PLN",0.2550), ("2025-06-01","TRY",0.0280), ("2025-06-01","JPY",0.0069),
]
fx_df = pd.DataFrame(rows, columns=["date","ccy","usd_per_unit"])
fx_df["date"] = pd.to_datetime(fx_df["date"])
scope = {"TR","PL","JP"}
fx_need = detect_leading_fx_need(df_raw_enriched, scope=scope)

# Prebuild FX DAILY over FULL event span (this is the key fix)
fx_daily_fixed, fx_report = build_fx_daily_fixed(fx_df, df_raw_enriched, two_sided_fill=True)
print("[FX span]", fx_report["span"])
print("[Currencies with NO snapshot at all]", fx_report["ccy_with_no_snapshot"])

# Rebase using the prebuilt grid (no manual edits)
df_rebased = fx_rebase_on_detected_prebuilt(
    df_enriched=df_raw_enriched,
    fx_daily=fx_daily_fixed,
    detected=fx_need,
    scope_countries=scope
)

# Save with your fixed name (no timestamp)
df_rebased.to_csv("steam_events_raw_enriched_rebased_fx.csv", index=False)
print("[SAVE] -> steam_events_raw_enriched_rebased_fx.csv")



[FX span] (Timestamp('2015-11-06 00:00:00'), Timestamp('2025-11-06 00:00:00'))
[Currencies with NO snapshot at all] []
[Sanity] first-post matches release_currency: True
[Sanity] rebased rows: 2704 | changed countries: {'PL': np.int64(1598), 'TR': np.int64(1026), 'JP': np.int64(80)}
[SAVE] -> steam_events_raw_enriched_rebased_fx.csv


In [None]:
# Verify where 3354750 appears
import pandas as pd

appid = "3354750"
sellers = pd.read_csv("steam_top_sellers.csv")
prices  = pd.read_csv("steam_prices.csv")
events  = pd.read_csv("steam_events_raw.csv")  # or use your in-memory df_events

print("in sellers:", (sellers["appid"].astype(str) == appid).any())
print("in prices:",  (prices["appid"].astype(str)  == appid).any())
print("in events:",  (events["appid"].astype(str)  == appid).any())


in sellers: True
in prices: False
in events: False


In [None]:
# --- Why is appid 3354750 missing? Quick triage ---

import pandas as pd

appid = "3354750"

# Load what you already produced
df_events  = pd.read_csv("steam_events_raw.csv")                 # M3 output
df_sellers = pd.read_csv("steam_top_sellers.csv")                # seed list
# If df_gid exists in memory from M2, use it; otherwise load if you saved it.
# df_gid should have columns: ['appid','gid']
# If you never saved df_gid, re-run your M2 lookup for this single app.

print("in sellers:", (df_sellers["appid"].astype(str) == appid).any())
print("in events :", (df_events["appid"].astype(str)  == appid).any())

try:
    df_gid
    has_map = (df_gid["appid"].astype(str) == appid).any()
    print("in gid map:", has_map)
    if has_map:
        print(df_gid.loc[df_gid["appid"].astype(str)==appid])
    else:
        print("Likely cause: /lookup did not return a gid for this appid.")
except NameError:
    print("df_gid not in memory. If needed, re-run your M2 lookup for this appid to confirm mapping.")


in sellers: True
in events : False
in gid map: True
      appid                                   gid
39  3354750  018d937f-4b80-737f-9a8a-409b0056f110


In [None]:
# --- Robust one-off probe for a single GID (handles dict OR list JSON shapes) ---

import os, requests, pandas as pd
from datetime import datetime, timedelta, timezone

ITAD_BASE = "https://api.isthereanydeal.com"
ITAD_API_KEY = os.environ.get("ITAD_API_KEY")  # make sure it's set
STEAM_SHOP_ID = 61

def _extract_events_list(data):
    """Return a list of events regardless of JSON envelope shape."""
    if isinstance(data, list):
        return data
    if isinstance(data, dict):
        for k in ("data", "history", "list", "events"):
            v = data.get(k)
            if isinstance(v, list):
                return v
        return []
    return []

def probe_history(gid: str, countries=("US","GB","DE","JP","CN"), days=365*12) -> pd.DataFrame:
    """
    Call /games/history/v2 for several countries + a no-country fallback.
    Summarize by shop and show a small head.
    """
    assert ITAD_API_KEY, "ITAD_API_KEY is not set in environment."
    url = f"{ITAD_BASE}/games/history/v2"
    since = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat(timespec="seconds").replace("+00:00","Z")

    frames = []
    for cc in list(countries) + [None]:  # final call without country
        params = {"key": ITAD_API_KEY, "id": gid, "since": since}
        if cc:
            params["country"] = cc.lower()
        r = requests.get(url, params=params, timeout=30)
        r.raise_for_status()
        events = _extract_events_list(r.json())
        if not events:
            continue

        rows = []
        for e in events:
            shop = e.get("shop") or {}
            deal = e.get("deal") or {}
            price = (deal.get("price") or {})  # dict or None
            regular = (deal.get("regular") or {})
            rows.append({
                "ts": e.get("timestamp") or e.get("recorded") or e.get("time"),
                "shop_id": int(shop.get("id") or 0),
                "shop_name": shop.get("name"),
                "ccy": price.get("currency"),
                # use amountInt if present, otherwise amount (already float)
                "price_amt": (price.get("amountInt")/100.0 if price.get("amountInt") is not None else price.get("amount")),
                "regular_amt": (regular.get("amountInt")/100.0 if regular.get("amountInt") is not None else regular.get("amount")),
                "cut": deal.get("cut"),
                "query_country": cc or "no-country",
            })
        frames.append(pd.DataFrame(rows))

    out = pd.concat(frames, ignore_index=True) if frames else pd.DataFrame()
    if out.empty:
        print("No events returned from API for this gid.")
        return out

    # Normalize and summarize
    out["shop_id"] = pd.to_numeric(out["shop_id"], errors="coerce").fillna(0).astype(int)
    steam_cnt = int((out["shop_id"] == STEAM_SHOP_ID).sum())
    print(f"Total events: {len(out)} | Steam events: {steam_cnt}")
    print("By shop_id:", out["shop_id"].value_counts().to_dict())
    print(out.head(10))
    return out


# Example: pass the gid you printed from your df_gid table
probe_history("018d937f-4b80-737f-9a8a-409b0056f110")


Total events: 11 | Steam events: 0
By shop_id: {16: 6, 52: 5}
                          ts  shop_id        shop_name  ccy  price_amt  regular_amt  cut query_country
0  2025-09-16T19:48:25+02:00       16  Epic Game Store  USD        0.0          0.0    0            US
1  2025-09-16T19:11:10+02:00       52         EA Store  USD        0.0          0.0    0            US
2  2025-09-16T20:02:32+02:00       16  Epic Game Store  GBP        0.0          0.0    0            GB
3  2025-09-16T19:50:37+02:00       52         EA Store  GBP        0.0          0.0    0            GB
4  2025-09-16T20:16:01+02:00       16  Epic Game Store  EUR        0.0          0.0    0            DE
5  2025-09-16T19:11:19+02:00       52         EA Store  EUR        0.0          0.0    0            DE
6  2025-09-16T19:51:46+02:00       52         EA Store  JPY        0.0          0.0    0            JP
7  2025-09-16T19:50:49+02:00       16  Epic Game Store  JPY        0.0          0.0    0            JP
8  2025-09-

Unnamed: 0,ts,shop_id,shop_name,ccy,price_amt,regular_amt,cut,query_country
0,2025-09-16T19:48:25+02:00,16,Epic Game Store,USD,0.0,0.0,0,US
1,2025-09-16T19:11:10+02:00,52,EA Store,USD,0.0,0.0,0,US
2,2025-09-16T20:02:32+02:00,16,Epic Game Store,GBP,0.0,0.0,0,GB
3,2025-09-16T19:50:37+02:00,52,EA Store,GBP,0.0,0.0,0,GB
4,2025-09-16T20:16:01+02:00,16,Epic Game Store,EUR,0.0,0.0,0,DE
...,...,...,...,...,...,...,...,...
6,2025-09-16T19:51:46+02:00,52,EA Store,JPY,0.0,0.0,0,JP
7,2025-09-16T19:50:49+02:00,16,Epic Game Store,JPY,0.0,0.0,0,JP
8,2025-09-16T19:55:57+02:00,16,Epic Game Store,CNY,0.0,0.0,0,CN
9,2025-09-16T19:48:25+02:00,16,Epic Game Store,USD,0.0,0.0,0,no-country


In [None]:
import numpy as np
import pandas as pd

# ---------- Utility: coalesce duplicates by (appid, country, timestamp) ----------
def coalesce_by_key(df: pd.DataFrame,
                    gcols=("appid","country"),
                    time_col="timestamp",
                    keep="last") -> pd.DataFrame:
    """Keep a single row per (appid,country,timestamp) to avoid duplicate-start joins."""
    d = df.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    for c in gcols:
        d[c] = d[c].astype(str)
    d = d.sort_values(list(gcols) + [time_col], kind="mergesort")
    return d.drop_duplicates(subset=[*gcols, time_col], keep=keep)

# ---------- Tolerant sale_state (unchanged) ----------
def compute_sale_state(df: pd.DataFrame,
                       tol_abs: float = 1.0,
                       tol_pct: float = 0.01,
                       price_col: str = "price",
                       reg_col: str = "regular") -> pd.Series:
    """Mark 'in sale' if (regular - price) beats BOTH absolute and percentage thresholds; let cut>0 vote."""
    p = pd.to_numeric(df[price_col], errors="coerce")
    r = pd.to_numeric(df[reg_col], errors="coerce")
    ok = p.notna() & r.notna() & (r > 0)
    diff = r - p
    thr  = np.maximum(tol_abs, tol_pct * r)
    sale = ok & (diff > thr)
    if "cut" in df.columns:
        sale = sale | (pd.to_numeric(df["cut"], errors="coerce").fillna(0) > 0)
    return sale.fillna(False)

# ---------- FIXED: Build SALE episodes (use group.name for keys) ----------
# --- FIXED: use numpy timedelta division to compute days ---
def build_episodes_with_end_event(df: pd.DataFrame,
                                  gcols=("appid","country"),
                                  time_col="timestamp",
                                  tol_abs: float = 1.0,
                                  tol_pct: float = 0.01) -> pd.DataFrame:
    """
    Scan tolerant sale_state and produce:
      start_ts, end_last_sale_ts, end_event_ts,
      duration_days_in_sale, duration_days_inclusive
    NOTE: durations are computed via numpy timedelta division to avoid .total_seconds() on numpy.timedelta64.
    """
    d = df.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    d[gcols[0]] = d[gcols[0]].astype(str).str.upper().str.strip()
    d[gcols[1]] = d[gcols[1]].astype(str).str.upper().str.strip()
    d = d.sort_values(list(gcols)+[time_col], kind="mergesort")

    d["sale_state"] = compute_sale_state(d, tol_abs=tol_abs, tol_pct=tol_pct)

    out = []
    for (a,c), g in d.groupby(list(gcols), sort=False):
        ss = g["sale_state"].to_numpy()
        # ensure numpy datetime64[ns] so that differences are numpy.timedelta64
        ts = g[time_col].to_numpy(dtype="datetime64[ns]")

        in_sale, start_idx = False, None
        for i in range(len(g)):
            if (not in_sale) and ss[i]:
                in_sale, start_idx = True, i
            elif in_sale and (not ss[i]):
                end_last_sale_idx = i - 1
                # --- key change: compute days via numpy timedelta division ---
                dur_in_sale = float((ts[end_last_sale_idx] - ts[start_idx]) / np.timedelta64(1, "D"))
                dur_inclusive = float((ts[i] - ts[start_idx]) / np.timedelta64(1, "D"))
                out.append({
                    gcols[0]: a, gcols[1]: c,
                    "start_ts": ts[start_idx].astype("datetime64[ns]").astype("datetime64[ms]"),
                    "end_last_sale_ts": ts[end_last_sale_idx].astype("datetime64[ns]").astype("datetime64[ms]"),
                    "end_event_ts": ts[i].astype("datetime64[ns]").astype("datetime64[ms]"),
                    "duration_days_in_sale": dur_in_sale,
                    "duration_days_inclusive": dur_inclusive,
                })
                in_sale, start_idx = False, None
        if in_sale:
            end_last_sale_idx = len(g) - 1
            dur_in_sale = float((ts[end_last_sale_idx] - ts[start_idx]) / np.timedelta64(1, "D"))
            out.append({
                gcols[0]: a, gcols[1]: c,
                "start_ts": ts[start_idx].astype("datetime64[ns]").astype("datetime64[ms]"),
                "end_last_sale_ts": ts[end_last_sale_idx].astype("datetime64[ns]").astype("datetime64[ms]"),
                "end_event_ts": pd.NaT,
                "duration_days_in_sale": dur_in_sale,
                "duration_days_inclusive": dur_in_sale,  # fallback when no explicit end row
            })

    episodes = pd.DataFrame(out).sort_values(list(gcols)+["start_ts"], kind="mergesort")
    episodes["ep_id"] = (episodes.groupby(list(gcols), sort=False).cumcount()+1).astype(int)
    # convert numpy datetime64 back to pandas Timestamps for downstream merges
    for c in ["start_ts", "end_last_sale_ts", "end_event_ts"]:
        if c in episodes.columns:
            episodes[c] = pd.to_datetime(episodes[c], errors="coerce")
    return episodes


# (Optional) Backward-compat alias if some old code still calls this name:
# def build_episodes_fsm_from_df(*args, **kwargs):
#     return build_episodes_with_end_event(*args, **kwargs)


# ---------- FIXED: Annotate starts-only with dedup and safe coalescing ----------
def annotate_starts_duration_only(df_rebased: pd.DataFrame,
                                  episodes: pd.DataFrame,
                                  gcols=("appid","country"),
                                  time_col="timestamp",
                                  keep_dup="first",
                                  strict=False) -> pd.DataFrame:
    """
    Left with only start rows: [appid, country, timestamp, sale_ep_duration_days].
    Coalesce both before and after the join to avoid duplicate keys.
    """
    base = coalesce_by_key(df_rebased, gcols=gcols, time_col=time_col, keep=keep_dup)
    key = (episodes[[*gcols, "start_ts", "duration_days"]]
           .rename(columns={"start_ts": time_col, "duration_days":"sale_ep_duration_days"}))
    starts_only = base.merge(key, on=[*gcols, time_col], how="inner", validate="m:1")

    keep_cols = ["appid","country",time_col,"sale_ep_duration_days"]
    keep_cols = [c for c in keep_cols if c in starts_only.columns]
    starts_only = (starts_only[keep_cols]
                   .sort_values([*gcols, time_col], kind="mergesort")
                   .drop_duplicates(subset=[*gcols, time_col], keep="first")
                   .reset_index(drop=True))

    if strict and starts_only.duplicated([*gcols, time_col]).any():
        raise AssertionError("Duplicate (appid,country,timestamp) remain in starts_only.")
    return starts_only

# === Make durations on df_rebased and drop explicit end events ===

# --- 2) Merge inclusive duration back and DROP explicit end-event rows ---
def make_events_with_duration(df_rebased: pd.DataFrame,
                              tol_abs=1.0, tol_pct=0.01,
                              gcols=("appid","country"), time_col="timestamp",
                              add_is_start=True, drop_ends=True,
                              coalesce_out=True, keep_out="last"):
    """
    Left-merge sale_ep_duration_days (INCLUSIVE) onto df_rebased (only start rows get a value),
    then DROP explicit end-event rows (timestamp == end_event_ts).
    Zero-duration starts (start==end_last_sale) are kept as starts.
    """
    episodes = build_episodes_with_end_event(df_rebased, gcols=gcols, time_col=time_col,
                                             tol_abs=tol_abs, tol_pct=tol_pct)

    out = df_rebased.copy()
    out[time_col] = pd.to_datetime(out[time_col], errors="coerce")
    for c in gcols:
        out[c] = out[c].astype(str).str.upper().str.strip()
    if coalesce_out:
        out = (out.sort_values(list(gcols)+[time_col], kind="mergesort")
                 .drop_duplicates(subset=[*gcols, time_col], keep=keep_out))

    # write back INCLUSIVE duration to starts
    start_key = (episodes[[*gcols, "start_ts", "duration_days_inclusive"]]
                 .rename(columns={"start_ts": time_col,
                                  "duration_days_inclusive":"sale_ep_duration_days"}))
    out = out.merge(start_key, on=[*gcols, time_col], how="left", validate="m:1")

    if add_is_start:
        out["sale_ep_is_start"] = out["sale_ep_duration_days"].notna()

    removed_ends = 0
    if drop_ends:
        end_key = (episodes[[*gcols, "end_event_ts"]].dropna()
                   .rename(columns={"end_event_ts": time_col})
                   .assign(__is_end=True))
        out = out.merge(end_key, on=[*gcols, time_col], how="left", validate="m:1")
        out["__is_end"] = out["__is_end"].astype("boolean").fillna(False)
        pure_end_mask = out["__is_end"] & ~out["sale_ep_is_start"].fillna(False)
        removed_ends = int(pure_end_mask.sum())
        out = out.loc[~pure_end_mask].drop(columns=["__is_end"])

    return out, episodes, removed_ends




def build_and_save_events_with_duration(df_rebased: pd.DataFrame,
                                        tol_abs=1.0, tol_pct=0.01,
                                        gcols=("appid","country"), time_col="timestamp",
                                        out_events_csv="events_with_duration_no_end.csv",
                                        out_episodes_csv="episodes_duration_only.csv",
                                        out_starts_csv="starts_only_duration.csv"):
    """
    Produce three outputs:
      - events_with_duration_no_end.csv  (df_rebased + sale_ep_duration_days; end events removed)
      - episodes_duration_only.csv       (appid,country,start_ts,sale_ep_duration_days)
      - starts_only_duration.csv         (appid,country,timestamp,sale_ep_duration_days)
    Also sanity-check that starts-only rows match the number of episodes.
    """
    events_with_dur, episodes, removed_ends = make_events_with_duration(
        df_rebased, tol_abs=tol_abs, tol_pct=tol_pct,
        gcols=gcols, time_col=time_col,
        add_is_start=True, drop_ends=True,
        coalesce_out=True, keep_out="last"
    )

    episodes_min = (episodes[[*gcols, "start_ts", "duration_days"]]
                    .rename(columns={"duration_days":"sale_ep_duration_days"}))

    starts_only = (events_with_dur.loc[events_with_dur["sale_ep_duration_days"].notna(),
                                       [*gcols, time_col, "sale_ep_duration_days"]]
                   .sort_values([*gcols, time_col], kind="mergesort")
                   .drop_duplicates(subset=[*gcols, time_col], keep="first")
                   .reset_index(drop=True))

    # Sanity
    assert len(starts_only) == len(episodes_min), \
        f"Mismatch: starts_only={len(starts_only)} vs episodes={len(episodes_min)}"

    # Save CSVs (paths configurable)
    events_with_dur.to_csv(out_events_csv, index=False)
    episodes_min.to_csv(out_episodes_csv, index=False)
    starts_only.to_csv(out_starts_csv, index=False)

    summary = {
        "events_in": int(len(df_rebased)),
        "episodes": int(len(episodes_min)),
        "events_removed_end": removed_ends,
        "events_out": int(len(events_with_dur)),
        "starts_only_rows": int(len(starts_only)),
    }
    return events_with_dur, episodes_min, starts_only, summary




In [None]:
import numpy as np
import pandas as pd

# --- 3) Wrapper that saves CSVs (now using inclusive duration) ---
def build_and_save_events_with_duration(df_rebased: pd.DataFrame,
                                        tol_abs=1.0, tol_pct=0.01,
                                        gcols=("appid","country"), time_col="timestamp",
                                        out_events_csv="events_with_duration_no_end.csv",
                                        out_episodes_csv="episodes_duration_only.csv",
                                        out_starts_csv="starts_only_duration.csv"):
    """
    Outputs:
      - events_with_duration_no_end.csv  (df_rebased + sale_ep_duration_days; explicit ends removed)
      - episodes_duration_only.csv       (appid,country,start_ts,sale_ep_duration_days)
      - starts_only_duration.csv         (appid,country,timestamp,sale_ep_duration_days)
    """
    events_with_dur, episodes, removed_ends = make_events_with_duration(
        df_rebased, tol_abs=tol_abs, tol_pct=tol_pct,
        gcols=gcols, time_col=time_col,
        add_is_start=True, drop_ends=True,
        coalesce_out=True, keep_out="last"
    )

    # use inclusive duration for the minimal episode-level table
    episodes_min = (episodes[[*gcols, "start_ts", "duration_days_inclusive"]]
                    .rename(columns={"duration_days_inclusive":"sale_ep_duration_days"}))

    starts_only = (events_with_dur.loc[events_with_dur["sale_ep_duration_days"].notna(),
                                       [*gcols, time_col, "sale_ep_duration_days"]]
                   .sort_values([*gcols, time_col], kind="mergesort")
                   .drop_duplicates(subset=[*gcols, time_col], keep="first")
                   .reset_index(drop=True))

    # sanity
    assert len(starts_only) == len(episodes_min), \
        f"Mismatch: starts_only={len(starts_only)} vs episodes={len(episodes_min)}"

    events_with_dur.to_csv(out_events_csv, index=False)
    episodes_min.to_csv(out_episodes_csv, index=False)
    starts_only.to_csv(out_starts_csv, index=False)

    return (events_with_dur,
            episodes_min,
            starts_only,
            {
                "events_in": int(len(df_rebased)),
                "episodes": int(len(episodes_min)),
                "events_removed_end": removed_ends,
                "events_out": int(len(events_with_dur)),
                "starts_only_rows": int(len(starts_only)),
            })



def drop_neutral_rows(
    df: pd.DataFrame,
    cut_col: str = "cut",
    flag_prefix: str = "is_",
    protect_cols = ("sale_ep_is_start",),  # never drop episode starts
    also_require_price_regular_match: bool = True,
    tol_abs: float = 1.0,
    tol_pct: float = 0.01,
) -> tuple[pd.DataFrame, int, dict]:
    """
    Remove 'neutral' rows AFTER episodes have been built and explicit ends removed.
    A row is neutral if:
      - cut == 0 (or NaN treated as 0), AND
      - all boolean label flags that start with 'is_' are False, AND
      - (optional) price ~= regular within tolerance (no price action).
    Never drop rows marked as 'sale_ep_is_start'.

    Returns: (df_clean, dropped_count, meta_info)
    """
    d = df.copy()

    # 1) cut == 0
    cut_zero = pd.to_numeric(d.get(cut_col, 0), errors="coerce").fillna(0).eq(0)

    # 2) all 'is_*' flags are False (exclude internal markers)
    flag_cols = [c for c in d.columns if c.startswith(flag_prefix)]
    for ex in ("__is_end",):  # keep internal markers out, just in case
        if ex in flag_cols:
            flag_cols.remove(ex)

    if flag_cols:
        flags_any_true = np.column_stack(
            [pd.to_numeric(d[c], errors="coerce").fillna(0).astype(bool) for c in flag_cols]
        ).any(axis=1)
    else:
        flags_any_true = np.zeros(len(d), dtype=bool)

    # 3) optional: no price move relative to regular under tolerance
    if also_require_price_regular_match and {"price", "regular"} <= set(d.columns):
        p = pd.to_numeric(d["price"], errors="coerce")
        r = pd.to_numeric(d["regular"], errors="coerce")
        thr = np.maximum(tol_abs, tol_pct * r)
        no_price_move = (p.notna() & r.notna() & (r > 0) & ((r - p).abs() <= thr)) | r.isna() | p.isna()
    else:
        no_price_move = np.ones(len(d), dtype=bool)

    # 4) never drop episode starts
    protect_start = d.get("sale_ep_is_start", False)
    if isinstance(protect_start, pd.Series):
        protect_start = protect_start.fillna(False).astype(bool)
    else:
        protect_start = np.zeros(len(d), dtype=bool)

    # 5) build mask and apply
    drop_mask = cut_zero & (~flags_any_true) & no_price_move & (~protect_start)
    dropped = int(drop_mask.sum())
    d_clean = d.loc[~drop_mask].reset_index(drop=True)

    meta = {
        "checked_flag_cols": flag_cols,
        "tol_abs": tol_abs,
        "tol_pct": tol_pct,
    }
    return d_clean, dropped, meta



In [None]:
# === Caller script: build episodes, write duration, drop explicit ends, then drop neutral rows ===
# All comments in English as requested.

import pandas as pd

# 0) Safety: require df_rebased in memory
if "df_rebased" not in globals():
    raise RuntimeError("`df_rebased` not found. Run your Module-4 pipeline first to create it in memory.")

# 1) Build episodes (inclusive), write duration to starts, drop explicit end-event rows, save CSVs
events_with_dur, episodes_min, starts_only, summary = build_and_save_events_with_duration(
    df_rebased,
    tol_abs=1.0,           # absolute tolerance (release currency units)
    tol_pct=0.01,          # relative tolerance (1%)
    gcols=("appid","country"),
    time_col="timestamp",
    out_events_csv="events_with_duration_no_end.csv",
    out_episodes_csv="episodes_duration_only.csv",
    out_starts_csv="starts_only_duration.csv",
)

# 2) Quick overview (same spirit as your screenshot)
print("Quick Overview (pre-clean)")
print({k:int(v) for k,v in summary.items()})

by_country = (
    episodes_min.groupby("country", dropna=False)
    .size().reset_index(name="episodes")
    .sort_values("episodes", ascending=False, kind="mergesort")
)
print("\nEpisode count by country (top 20):")
print(by_country.head(20).to_string(index=False))

print("\nEpisodes (head 20):")
print(episodes_min.head(20).to_string(index=False))

# 3) Consistency checks (asserts) on the pre-clean dataset
assert len(starts_only) == len(episodes_min), "starts_only rows must equal episodes"

def _audit_no_explicit_ends(ev: pd.DataFrame, base: pd.DataFrame) -> int:
    """Verify there are no pure explicit end-event rows left in `ev`."""
    eps_full = build_episodes_with_end_event(base, gcols=("appid","country"), time_col="timestamp",
                                             tol_abs=1.0, tol_pct=0.01)
    end_key = (eps_full[["appid","country","end_event_ts"]].dropna()
               .rename(columns={"end_event_ts":"timestamp"})
               .assign(__end_marker=True))
    chk = ev.merge(end_key, on=["appid","country","timestamp"], how="left")
    # use nullable boolean to avoid FutureWarning
    pure_end = chk["__end_marker"].astype("boolean").fillna(False) & ~chk.get("sale_ep_is_start", False).fillna(False)
    return int(pure_end.sum())

pure_end_left = _audit_no_explicit_ends(events_with_dur, df_rebased)
print("\nPure explicit end-event rows remaining in events_with_dur (pre-clean):", pure_end_left)
assert pure_end_left == 0, "There are still explicit end-event rows present."

# 4) Neutral cleanup (must be AFTER explicit ends are removed)
events_clean, neutral_dropped, meta = drop_neutral_rows(
    events_with_dur,
    cut_col="cut",
    flag_prefix="is_",
    protect_cols=("sale_ep_is_start",),     # protects episode starts
    also_require_price_regular_match=True,  # safer: also require price~regular within tolerance
    tol_abs=1.0,
    tol_pct=0.01,
)
events_clean.to_csv("events_with_duration_no_end_clean.csv", index=False)

# 5) Post-clean sanity and summary
# 5.1 No explicit end rows (should remain zero)
pure_end_left_after = _audit_no_explicit_ends(events_clean, df_rebased)
assert pure_end_left_after == 0, "End rows leaked after neutral cleanup."

# 5.2 starts count unchanged
assert events_clean["sale_ep_is_start"].fillna(False).sum() == len(episodes_min), \
    "Number of start rows changed after neutral cleanup."

# 5.3 Update and print final summary
summary["neutral_removed"] = int(neutral_dropped)
summary["events_out_clean"] = int(len(events_clean))

print("\nNeutral rows removed:", neutral_dropped)
print("Saved: events_with_duration_no_end_clean.csv")

print("\nQuick Overview (post-clean)")
print({k:int(v) for k,v in summary.items()})

# 6) (Optional) a tiny head to eyeball
print("\nClean sample (head 10):")
print(events_clean.head(10)[["appid","country","timestamp","price","regular","cut",
                             "sale_ep_is_start","sale_ep_duration_days"]].to_string(index=False))

# Files produced in working directory:
#  - events_with_duration_no_end.csv           (episode duration annotated; explicit ends removed)
#  - episodes_duration_only.csv                (appid,country,start_ts,sale_ep_duration_days)
#  - starts_only_duration.csv                  (appid,country,timestamp,sale_ep_duration_days)
#  - events_with_duration_no_end_clean.csv     (further removed neutral "no-op" rows)


Quick Overview (pre-clean)
{'events_in': 17895, 'episodes': 8476, 'events_removed_end': 8452, 'events_out': 9442, 'starts_only_rows': 8476}

Episode count by country (top 20):
country  episodes
     BR       761
     AR       760
     US       760
     AU       759
     GB       756
     CA       755
     TR       753
     DE       747
     FR       747
     PL       747
     CN       717
     JP       214

Episodes (head 20):
  appid country            start_ts  sale_ep_duration_days
1086940      AR 2023-12-21 20:22:31              13.977488
1086940      AR 2024-03-14 17:37:53               7.005648
1086940      AR 2024-05-16 17:18:11               6.998171
1086940      AR 2024-06-27 17:59:38              16.981331
1086940      AR 2024-09-05 17:18:42               4.002419
1086940      AR 2024-11-04 18:18:01               7.002465
1086940      AR 2024-11-27 21:03:11               6.901956
1086940      AR 2024-12-19 21:27:38              13.888889
1086940      AR 2025-02-10 18:32:12   

In [None]:
import numpy as np
import pandas as pd

# --- 1) Build a full episode table once (if not already available) ---
def build_episodes_with_end_event(df: pd.DataFrame,
                                  gcols=("appid","country"),
                                  time_col="timestamp",
                                  tol_abs: float = 1.0,
                                  tol_pct: float = 0.01) -> pd.DataFrame:
    """
    Return columns: appid, country, start_ts, end_last_sale_ts, end_event_ts, duration_days_inclusive
    Sale-state uses tolerant same-currency logic; inclusive duration uses the explicit end row when present.
    """
    d = df.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    for c in gcols + ("currency","release_currency"):
        if c in d.columns:
            d[c] = d[c].astype(str).str.upper().str.strip()
    d = d.sort_values(list(gcols)+[time_col], kind="mergesort")

    # sale_state: both thresholds; cut>0 votes
    p = pd.to_numeric(d["price"], errors="coerce")
    r = pd.to_numeric(d["regular"], errors="coerce")
    thr = np.maximum(tol_abs, tol_pct * r)
    sale = (p.notna() & r.notna() & r.gt(0) & ((r - p) > thr)) | (pd.to_numeric(d.get("cut", 0), errors="coerce").fillna(0) > 0)
    d["sale_state"] = sale.fillna(False)

    out = []
    for (a,c), g in d.groupby(list(gcols), sort=False):
        ts = pd.to_datetime(g[time_col]).to_numpy("datetime64[ns]")
        ss = g["sale_state"].to_numpy(bool)
        in_sale = False; start_idx = None
        for i in range(len(g)):
            if (not in_sale) and ss[i]:
                in_sale = True; start_idx = i
            elif in_sale and (not ss[i]):
                end_last_sale_idx = i - 1
                dur_inc_days = (ts[i] - ts[start_idx]).astype("timedelta64[s]").astype(float)/86400.0
                out.append({
                    "appid": str(a), "country": str(c),
                    "start_ts": ts[start_idx].astype("datetime64[ns]"),
                    "end_last_sale_ts": ts[end_last_sale_idx].astype("datetime64[ns]"),
                    "end_event_ts": ts[i].astype("datetime64[ns]"),
                    "duration_days_inclusive": dur_inc_days,
                })
                in_sale = False; start_idx = None
        if in_sale and start_idx is not None:
            end_last_sale_idx = len(g) - 1
            dur_open_days = (ts[end_last_sale_idx] - ts[start_idx]).astype("timedelta64[s]").astype(float)/86400.0
            out.append({
                "appid": str(a), "country": str(c),
                "start_ts": ts[start_idx].astype("datetime64[ns]"),
                "end_last_sale_ts": ts[end_last_sale_idx].astype("datetime64[ns]"),
                "end_event_ts": pd.NaT,
                "duration_days_inclusive": dur_open_days,
            })
    eps = pd.DataFrame(out)
    if not eps.empty:
        eps = eps.sort_values(["appid","country","start_ts"], kind="mergesort").reset_index(drop=True)
    return eps

# --- 2) Propagate duration to all sale rows (group-wise; no merge_asof) ---
def propagate_duration_to_all_sale_rows_clean(
    events_clean: pd.DataFrame,
    episodes_full: pd.DataFrame,
    gcols=("appid","country"),
    time_col="timestamp",
    only_fill_cut_pos: bool = True,
) -> tuple[pd.DataFrame, int]:
    """
    For each (appid,country), map each event row to the latest episode whose start_ts <= timestamp,
    then keep rows with timestamp <= end_last_sale_ts. Fill sale_ep_duration_days for non-start rows.
    Stable and fast; no merge_asof; avoids sorted-key and column-order pitfalls.
    """
    if episodes_full is None or episodes_full.empty:
        return events_clean.copy(), 0

    ev = events_clean.copy()
    ev[time_col] = pd.to_datetime(ev[time_col], errors="coerce")
    for col in gcols:
        ev[col] = ev[col].astype(str).str.upper().str.strip()

    ep = episodes_full.copy()
    ep["start_ts"] = pd.to_datetime(ep["start_ts"], errors="coerce")
    ep["end_last_sale_ts"] = pd.to_datetime(ep["end_last_sale_ts"], errors="coerce")
    for col in gcols:
        ep[col] = ep[col].astype(str).str.upper().str.strip()

    filled_total = 0
    chunks = []

    for (a,c), ev_g in ev.groupby(list(gcols), sort=False):
        ep_g = ep[(ep["appid"]==str(a)) & (ep["country"]==str(c))]
        if ep_g.empty:
            chunks.append(ev_g); continue

        # sort by time
        ev_g = ev_g.sort_values(time_col, kind="mergesort").reset_index(drop=True)
        ep_g = ep_g.sort_values("start_ts", kind="mergesort").reset_index(drop=True)

        t = ev_g[time_col].to_numpy("datetime64[ns]")
        s = ep_g["start_ts"].to_numpy("datetime64[ns]")
        e = ep_g["end_last_sale_ts"].to_numpy("datetime64[ns]")
        d = ep_g["duration_days_inclusive"].to_numpy(float)

        # for each event time, find index of last episode with start_ts <= t
        pos = np.searchsorted(s, t, side="right") - 1
        valid = (pos >= 0)
        # membership: t <= end_last_sale_ts of that episode
        member = np.zeros(len(ev_g), dtype=bool)
        member[valid] = t[valid] <= e[pos[valid]]

        # fill mask: non-start & member & (optional) cut>0 & duration is NaN
        non_start = ~ev_g.get("sale_ep_is_start", False).fillna(False).to_numpy()
        if only_fill_cut_pos and ("cut" in ev_g.columns):
            cut_pos = pd.to_numeric(ev_g["cut"], errors="coerce").fillna(0).to_numpy() > 0
        else:
            cut_pos = np.ones(len(ev_g), dtype=bool)
        need_fill = non_start & member & cut_pos & ev_g["sale_ep_duration_days"].isna().to_numpy()

        # assign duration
        dur_vals = np.full(len(ev_g), np.nan)
        ok_idx = np.where(need_fill)[0]
        if len(ok_idx):
            dur_vals[ok_idx] = d[pos[ok_idx]]
            ev_g.loc[need_fill, "sale_ep_duration_days"] = dur_vals[need_fill]
            filled_total += int(need_fill.sum())

        chunks.append(ev_g)

    out = pd.concat(chunks, axis=0, ignore_index=True)
    return out, filled_total

# --- 3) Simple audits that reuse episodes_full (no rebuild) ---
def audit_no_explicit_ends(evs: pd.DataFrame, eps_full: pd.DataFrame,
                           gcols=("appid","country"), time_col="timestamp") -> int:
    key = (eps_full[[*gcols,"end_event_ts"]].dropna()
           .rename(columns={"end_event_ts": time_col})
           .assign(__end=True))
    merged = evs.merge(key, on=[*gcols, time_col], how="left", validate="m:1")
    pure_end = merged["__end"].astype("boolean").fillna(False) & ~merged.get("sale_ep_is_start", False).fillna(False)
    return int(pure_end.sum())
# Build full episodes once (if not already built in the session)
episodes_full = build_episodes_with_end_event(df_rebased, gcols=("appid","country"), time_col="timestamp",
                                              tol_abs=1.0, tol_pct=0.01)

# Propagate on CLEAN
events_clean = pd.read_csv("events_with_duration_no_end_clean.csv", parse_dates=["timestamp"])
events_clean_prop, filled_cnt = propagate_duration_to_all_sale_rows_clean(
    events_clean, episodes_full, gcols=("appid","country"), time_col="timestamp", only_fill_cut_pos=True
)
print("Filled non-start sale rows on CLEAN:", filled_cnt)
# safety: starts count unchanged; no explicit end reintroduced
assert events_clean_prop["sale_ep_is_start"].fillna(False).sum() == len(pd.read_csv("episodes_duration_only.csv"))
assert audit_no_explicit_ends(events_clean_prop, episodes_full) == 0

events_clean_prop.to_csv("events_with_duration_no_end_clean_propagated.csv", index=False)
print("Saved: events_with_duration_no_end_clean_propagated.csv")

In [None]:
(episodes_min["sale_ep_duration_days"] == 0).mean()


np.float64(0.002831524303916942)

In [None]:
episodes_min.groupby("country")["sale_ep_duration_days"].quantile([.5,.9,.99]).unstack()


Unnamed: 0_level_0,0.50,0.90,0.99
country,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
AR,7.044606,14.004135,23.417762
AU,7.072986,14.002655,25.593628
BR,7.090625,14.005856,27.978560
CA,7.080856,14.005664,27.786109
CN,7.997697,14.004377,27.175597
...,...,...,...
GB,7.093119,14.008762,27.986126
JP,9.988681,14.002337,25.826134
PL,7.050741,14.004218,27.787101
TR,7.072141,14.002970,27.777685
