# Build a ML pipeline to recommend top 3 games to bet on (within games played that day) for each player each day at ~8AM

In [188]:
from __future__ import annotations

import math
import re
from collections import defaultdict
from collections import Counter
from datetime import timedelta
from itertools import product
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple

import numpy as np
import pandas as pd
import joblib
import scipy.sparse as sp
import s3fs 
from implicit.bpr import BayesianPersonalizedRanking
from sklearn.isotonic import IsotonicRegression
from sklearn.linear_model import LogisticRegression

### Constants ###

In [None]:
# separators
_RE_AT = re.compile(r"\s*@\s*|\s*\bat\b\s*", flags=re.IGNORECASE)     # AWAY @ HOME / AWAY at HOME
_RE_VS = re.compile(r"\s*\bvs\.?\b\s*|&", flags=re.IGNORECASE)         # HOME vs AWAY / HOME & AWAY
_SEP = re.compile(r"\s*(?:@|&|\bvs\.?\b|\bat\b)\s*", flags=re.IGNORECASE)
# ============ EVAL CONFIG ============
BOX_SCORES_XLSX = "NBA Box Scores.xlsx"     # local path
BOX_SCORES_SHEET = "Sheet2"
VENDOR_CSV = "playoff_schedule.csv"         # local path

DATE_START = "2025-05-01"
DATE_END   = "2025-05-14"
OUTPUT_CSV = "./rankings_may01_14.csv"
TOP_K = 3
TOP_N_CAL = 20 

In [89]:
pd.set_option("display.max_rows", 100)
pd.set_option("display.max_colwidth", 120)

In [190]:
DATA_PATH1 = Path(r"df_train.xlsx")
DATA_PATH2 = Path(r"df_validation.xlsx")

### Workflow begins ###

In [191]:
df_train = pd.read_excel(DATA_PATH1)

In [93]:
df_validation = pd.read_excel(DATA_PATH2)

In [95]:
df_validation.shape

(32672, 4)

In [124]:
num_duplicates = df_validation.duplicated().sum()
#print(f"Number of duplicate rows in df_validation: {num_duplicates}")

Number of duplicate rows in df_validation: 2429


In [None]:
playoff_schedule=pd.read_csv('playoff_schedule.csv')

In [97]:
"""
Attach the scheduled game date to each validation row by matching event text.

This routine aligns rows in `df_validation` (e.g., historical bets or validations)
to the vendor's playoff schedule in `playoff_schedule` by normalizing and matching
event descriptions (e.g., "lakers @ warriors"). It handles home/away text
ambiguity by generating both "A @ B" and "B @ A" variants from a schedule field
formatted like "Team A & Team B", then performs a left join on the normalized text.

Matching rules and tie-breaking:
- Text is normalized case- and space-insensitively (lowercased, trimmed, collapsed spaces).
- Only schedule dates on or after the bet's date are considered (bet_date <= Date).
- If multiple scheduled dates match a single validation row (e.g., teams meet several times),
  the earliest valid schedule date is selected deterministically.
  """
# --- Copies so we don't mutate your originals ---
playoff_schedule = playoff_schedule.copy()
df_validation = df_validation.copy()

# 1) Parse dates
playoff_schedule["Date"]   = pd.to_datetime(playoff_schedule["Date"], dayfirst=True, errors="coerce")
df_validation["bet_date"]  = pd.to_datetime(df_validation["betdate"], errors="coerce").dt.date

# 2) Build Game1 / Game2 from "Team A & Team B"
teams = playoff_schedule["Game"].str.split(" & ", n=1, expand=True)
playoff_schedule["TeamA"] = teams[0].str.strip()
playoff_schedule["TeamB"] = teams[1].str.strip()
playoff_schedule["Game1"] = playoff_schedule["TeamA"] + " @ " + playoff_schedule["TeamB"]
playoff_schedule["Game2"] = playoff_schedule["TeamB"] + " @ " + playoff_schedule["TeamA"]

# 3) Long schedule for matching
sched_long = playoff_schedule.melt(
    id_vars=["Date"],
    value_vars=["Game1", "Game2"],
    value_name="event_description"
).drop(columns=["variable"])

# 4) Light normalization (case/space-insensitive match)
def norm(s: pd.Series) -> pd.Series:
    return (s.astype(str)
              .str.replace(r"\s+", " ", regex=True)
              .str.strip()
              .str.lower())

sched_long["event_key"]     = norm(sched_long["event_description"])
df_validation["event_key"]  = norm(df_validation["event_description"])

# 5) Join on normalized event text
tmp = (df_validation
       .reset_index()  # keep original row index as column "index"
       .merge(sched_long[["Date", "event_key"]],
              on="event_key",
              how="left"))

# 6) Keep only matches where bet_date <= Date
tmp = tmp[tmp["bet_date"] <= tmp["Date"]]

# 7) ✅ FIX: pick earliest Date per original row by sorting then deduping
#    (This guarantees at most one match per df_validation row.)
best = (tmp.sort_values(["index", "Date"])
           .drop_duplicates(subset="index", keep="first")
           [["index", "Date"]])

# 8) Write back to df_validation
df_validation["Scheduled Game Date"] = pd.NaT
if not best.empty:
    df_validation.loc[best["index"].to_numpy(), "Scheduled Game Date"] = best["Date"].to_numpy()

# 9) (Optional) Clean up helpers
playoff_schedule.drop(columns=["TeamA", "TeamB"], inplace=True, errors="ignore")
df_validation.drop(columns=["event_key"], inplace=True, errors="ignore")


In [98]:
df_validation.shape

(32672, 6)

In [None]:
Box_Score=pd.read_excel("NBA Box Scores.xlsx",sheet_name="Sheet2")

In [100]:
### Same concept as above but for actual games played (Box Scores)

# --- Copies ---
Box_Score = Box_Score.copy()
df_validation = df_validation.copy()

# 1) Parse dates
Box_Score["Game Date"]   = pd.to_datetime(Box_Score["Game Date"], errors="coerce")
df_validation["bet_date"] = pd.to_datetime(df_validation["bet_date"], errors="coerce")

# 2) Build schedule-like table
sched = Box_Score.rename(columns={"Modified Game Name": "event_description"})[["Game Date", "event_description"]]

# 3) Normalization
def norm(s: pd.Series) -> pd.Series:
    return (s.astype(str)
              .str.replace(r"\s+", " ", regex=True)
              .str.replace(r"\.+$", "", regex=True)  # drop trailing periods
              .str.strip()
              .str.lower())

sched["event_key"]         = norm(sched["event_description"])
df_validation["event_key"] = norm(df_validation["event_description"])

# 4) Left-merge (do NOT filter rows out)
tmp = (df_validation
       .reset_index()  # preserves original row id as "index"
       .merge(sched[["Game Date", "event_key"]],
              on="event_key",
              how="left"))

# 5) Enforce bet_date <= Game Date without dropping rows:
#    mark invalid matches as NaT, then take the earliest valid date per row.
valid = tmp["Game Date"].ge(tmp["bet_date"])
tmp.loc[~valid, "Game Date"] = pd.NaT

# 6) Pick the earliest valid date per original row (min skips NaT)
best = (tmp.groupby("index", as_index=False, sort=False)["Game Date"]
           .min())  # NaT if no valid match

# 7) Write back to df_validation
df_validation["Offical Game Date"] = pd.NaT
df_validation.loc[best["index"].to_numpy(), "Offical Game Date"] = best["Game Date"].to_numpy()

# 8) Clean up
df_validation.drop(columns=["event_key"], inplace=True, errors="ignore")


  df_validation.loc[best["index"].to_numpy(), "Offical Game Date"] = best["Game Date"].to_numpy()


In [101]:
df_validation.shape

(32672, 7)

In [103]:
# EDA :compare preseason schedule to actual games played and quanity mismatches

# --------- Inputs (already loaded) ----------
# playoff_schedule: columns like ["Date", "Game"] where Game = "Team A & Team B"
# Box_Score: columns like ["Official Game Date" or "Game Date", "Modified Game Name" or "Match Up"]
# -------------------------------------------

# 1) Choose date/name columns safely
box = Box_Score.copy()
sched = playoff_schedule.copy()

box_date_col  = "Official Game Date" if "Official Game Date" in box.columns else "Game Date"
box_game_col  = "Modified Game Name" if "Modified Game Name" in box.columns else "Match Up"

# 2) Parse dates (schedule often day-first; box often month/day)
sched["Date"] = pd.to_datetime(sched["Date"], dayfirst=True, errors="coerce")
box[box_date_col] = pd.to_datetime(box[box_date_col], errors="coerce")

# 3) Helpers to normalize team names and extract matchup keys (order-invariant)
def norm_team(t: str) -> str:
    """Lowercase, remove extra spaces/punctuation (but keep alphanumerics and spaces)."""
    s = str(t)
    s = re.sub(r"\s+", " ", s)          # collapse whitespace
    s = s.strip().lower()
    s = re.sub(r"[^a-z0-9 ]+", "", s)   # drop punctuation
    return s

def matchup_key_from_ampersand(s: str):
    """From 'Team A & Team B' -> tuple(sorted(norm(A), norm(B)))."""
    parts = [p.strip() for p in str(s).split("&", 1)]
    if len(parts) != 2:
        return None
    a, b = norm_team(parts[0]), norm_team(parts[1])
    return tuple(sorted((a, b)))

def matchup_key_from_at(s: str):
    """From 'Team A @ Team B' -> tuple(sorted(norm(A), norm(B)))."""
    parts = [p.strip() for p in str(s).split("@", 1)]
    if len(parts) != 2:
        return None
    a, b = norm_team(parts[0]), norm_team(parts[1])
    return tuple(sorted((a, b)))

# 4) Build per-date sets of matchup keys for each source
sched_keys = (sched
    .assign(_key=sched["Game"].map(matchup_key_from_ampersand))
    .dropna(subset=["_key", "Date"])
    .groupby(sched["Date"].dt.date)["_key"]
    .apply(lambda x: set(x))
    .rename("sched_set"))

box_keys = (box
    .assign(_key=box[box_game_col].map(matchup_key_from_at))
    .dropna(subset=["_key", box_date_col])
    .groupby(box[box_date_col].dt.date)["_key"]
    .apply(lambda x: set(x))
    .rename("box_set"))

# 5) Outer join on date to compare both sides
cmp = (pd.concat([sched_keys, box_keys], axis=1)
         .reset_index()
         .rename(columns={"index": "Date"}))

# 6) Compute per-date differences
def set_len(x): return len(x) if isinstance(x, set) else 0
def set_diff(a, b):
    a = a if isinstance(a, set) else set()
    b = b if isinstance(b, set) else set()
    only_a = a - b
    only_b = b - a
    return pd.Series({
        "schedule_count": len(a),
        "official_count": len(b),
        "only_in_schedule": sorted(list(only_a)),
        "only_in_official": sorted(list(only_b)),
        "mismatch_count": len(only_a) + len(only_b),
    })

summary = (cmp
    .apply(lambda row: set_diff(row.get("sched_set"), row.get("box_set")), axis=1)
    .join(cmp[["Date"]]))

# 7) Expand long-form mismatches (optional, handy for investigation)
def explode_mismatches(df):
    rows = []
    for _, r in df.iterrows():
        d = r["Date"]
        for mk in r["only_in_schedule"]:
            rows.append({"Date": d, "source": "schedule_only", "matchup_key": mk})
        for mk in r["only_in_official"]:
            rows.append({"Date": d, "source": "official_only", "matchup_key": mk})
    return pd.DataFrame(rows)

mismatches_long = explode_mismatches(summary)

# ---- Outputs ----
# summary: one row per date with counts and lists of mismatches
# mismatches_long: long table with one row per missing/excess matchup

# Quick prints
#print("Per-date summary (first 10 rows):")
#print(summary.head(10).to_string(index=False))

#print("\nLong-form mismatches (first 20 rows):")
#print(mismatches_long.head(20).to_string(index=False))


In [52]:
# More than 2 thirds of users place 3 or more bets in a day in regular season (caveat - no data on avg games per day being played)

In [107]:
# ---------- 1) Shared cleaning for BOTH training and unseen data ----------
def clean_shared(df):
    """Return a cleaned copy with consistent text and a normalized bet_date."""
    out = df.copy()  # don't mutate caller

    # Parse original timestamp column into pandas datetime (coerce bad values to NaT)
    out["betdate"] = pd.to_datetime(out["betdate"], errors="coerce", utc=False)

    # Make event_description consistent: ensure string type, collapse spaces, trim ends
    out["event_description"] = (
        out["event_description"]
          .astype("string")
          .str.replace(r"\s+", " ", regex=True)
          .str.strip()
    )

    # Split "TeamA @ TeamB" into AwayTeam/HomeTeam (safe even if some rows don't match)
    if "AwayTeam" not in out.columns or "HomeTeam" not in out.columns:
        tmp = out["event_description"].str.split(" @ ", n=1, expand=True)
        out["AwayTeam"] = tmp[0]
        out["HomeTeam"] = tmp[1]

    # Create a normalized date column (midnight, tz-naive) for per-day evaluation
    out["bet_date"] = out["betdate"].dt.normalize()

    return out


# ---------- 2) Train-only wrapper ----------
def preprocess_train(df_train):
    """Apply shared cleaning + train-only hygiene/EDA, and return cleaned train frame."""
    t = clean_shared(df_train)

    # (Optional but common) remove exact duplicate rows in TRAIN to avoid leakage/noise
    # If duplicates truly represent identical records, keep='first' is reasonable.
    before = len(t)
    t = t.drop_duplicates().reset_index(drop=True)
    after = len(t)
    print(f"[train] dropped exact duplicates: {before - after}")

    # Quick QA: how many rows match "TeamA @ TeamB" pattern (for data sanity)
    pattern = re.compile(r"^[A-Za-z0-9\s.\-']+ @ [A-Za-z0-9\s.\-']+$")
    match_rate = t["event_description"].astype(str).str.match(pattern).mean()
    print(f"[train] 'A @ B' format match rate: {match_rate:.1%}")

    # Simple EDA: average daily bets per user and a tiny histogram
    daily_counts = t.groupby(["mask_id", "bet_date"]).size()
    avg_bets_per_user = daily_counts.groupby("mask_id").mean()
    bins = [0, 1, 2, 3, 4, float("inf")]
    labels = ["1", "2", "3", "4", "4+"]
    dist = pd.cut(avg_bets_per_user, bins=bins, labels=labels).value_counts().sort_index()
    dist_df = dist.to_frame(name="unique_users")
    dist_df["percent"] = 100 * dist_df["unique_users"] / dist_df["unique_users"].sum()
    print("[train] avg daily bets per user (buckets):")
    print(dist_df)

    return t


# ---------- 3) Validation/unseen wrapper ----------
def preprocess_validation(df_validation):
    """Apply only the shared cleaning steps needed for inference/evaluation."""
    v = clean_shared(df_validation)

    # NOTE: we typically do NOT drop duplicates in validation, because repeated bets
    # can be real user behavior; your evaluation later collapses to unique items per day anyway.
    # If your business rule says duplicates are true duplicates, you *may* drop them:
    v = v.drop_duplicates().reset_index(drop=True)

    # Optional QA (same check as train, purely informative)
    pattern = re.compile(r"^[A-Za-z0-9\s.\-']+ @ [A-Za-z0-9\s.\-']+$")
    match_rate = v["event_description"].astype(str).str.match(pattern).mean()
    #print(f"[validation] 'A @ B' format match rate: {match_rate:.1%}")

    return v

In [132]:
# Clean training data (shared steps + train-only hygiene/EDA)
df_train = preprocess_train(df_train)
df_train.shape

[train] dropped exact duplicates: 12459
[train] 'A @ B' format match rate: 100.0%
[train] avg daily bets per user (buckets):
    unique_users    percent
1             63   9.077810
2            161  23.198847
3            125  18.011527
4             83  11.959654
4+           262  37.752161


(204392, 7)

In [112]:
def _metrics_from_lists(true_set, recs, k=3):
    """Compute top-k ranking metrics (Hit@k, Precision@k, F1@k, NDCG@k) given a ground-truth set and a recommendation list."""
    recs_k = recs[:k]
    hits = sum(int(i in true_set) for i in recs_k)
    hitk = 1.0 if hits > 0 else 0.0
    preck = hits / k
    reck  = hits / max(1, len(true_set))
    f1k   = 0.0 if (preck + reck) == 0 else 2*preck*reck/(preck+reck)
    dcg = 0.0
    for rank, iid in enumerate(recs_k, start=1):
        if iid in true_set:
            dcg += 1.0 / np.log2(rank + 1)
    ideal = min(len(true_set), k)
    idcg = sum(1.0/np.log2(r+1) for r in range(1, ideal+1)) if ideal>0 else 0.0
    ndcgk = 0.0 if idcg == 0 else dcg / idcg
    return hitk, preck, f1k, ndcgk

In [115]:
def _to_away_home_series(s):
    """Convert matchup strings (e.g., 'A @ B', 'B vs A', 'X ## Y') into standardized 'AWAY ## HOME' format, else return None."""
    s = s.astype(str).str.strip()
    out = pd.Series([None] * len(s), index=s.index, dtype=object)

    # Case 1: AWAY @ HOME  (or 'at')
    mask_at = s.str.contains(_RE_AT)
    if mask_at.any():
        parts = s.loc[mask_at].str.split(_RE_AT, n=1, expand=True)
        away = parts[0].str.strip()
        home = parts[1].str.strip()
        out.loc[mask_at] = away + " ## " + home

    # Case 2: HOME vs AWAY (or '&') -> flip to AWAY ## HOME
    mask_vs = (~mask_at) & s.str.contains(_RE_VS)
    if mask_vs.any():
        parts = s.loc[mask_vs].str.split(_RE_VS, n=1, expand=True)
        home = parts[0].str.strip()
        away = parts[1].str.strip()
        out.loc[mask_vs] = away + " ## " + home

    # Case 3: already looks like 'X ## Y' -> keep (assume directional already)
    mask_hash = out.isna() & s.str.contains(r"##")
    if mask_hash.any():
        cleaned = s.loc[mask_hash].str.replace(r"\s*##\s*", " ## ", regex=True).str.strip()
        out.loc[mask_hash] = cleaned

    # Unparseable rows remain None
    return out

def build_dir_item(df):
    """
    Return a Series of directional game ids in the form 'AWAY ## HOME'.

    Preference:
      1) vendor 'event_description' (e.g., 'LAL @ BOS' or 'BOS vs LAL')
      2) away/home columns (common aliases)
      3) 'item' column (attempt to parse; if already 'X ## Y', keep as-is)
    """
    cmap = {c.lower(): c for c in df.columns}  # case-insensitive column lookup

    # 1) event_description present: parse to AWAY ## HOME
    if "event_description" in cmap:
        return _to_away_home_series(df[cmap["event_description"]])

    # 2) try common away/home aliases
    away_aliases = ["awayteam", "away_team", "away", "visitor"]
    home_aliases = ["hometeam", "home_team", "home", "homeclub"]

    away_col = next((cmap[a] for a in away_aliases if a in cmap), None)
    home_col = next((cmap[h] for h in home_aliases if h in cmap), None)

    if away_col is not None and home_col is not None:
        away = df[away_col].astype(str).str.strip()
        home = df[home_col].astype(str).str.strip()
        return away + " ## " + home

    # 3) last resort: try to parse 'item'; if it's already 'X ## Y', keep cleaned
    if "item" in df.columns:
        return _to_away_home_series(df["item"])

    raise KeyError("build_dir_item: need 'event_description' or Away/Home columns (or parsable 'item').")


In [117]:
def evaluate_k_for_Hyper_Optimization(model,
               user_items,                   # CSR user×item matrix to pass into model.recommend
               val_df, 
               pop_prior, 
               k=3,
                            
               low_decile_uids=None,         # set of uids to force popularity for (optional)
               user_col="uid",               # column name for user ids in val_df
               item_col="iid",               # column name for item ids in val_df
               date_col="bet_date",          # column name for date in val_df (normalized to days)
               weights=(0.60, 0.30, 0.10),   # (w_ndcg, w_prec, w_cov)
               return_preds=True,           # if True, also return a predictions table
               item_names=None               # optional array mapping iid -> item string
              ):
    """
    Evaluate a recommender model at top-k using validation data, returning Hit@k,
    Precision@k, NDCG@k, Coverage@k, and a weighted objective score (optionally
    with per-user prediction tables).

    Parameters
    ----------
    model : object
        Trained recommender with a `.recommend(user, user_items, N)` method and
        optional `user_factors`/`item_factors` attributes for shape checks.
    user_items : scipy.sparse.csr_matrix
        User–item interaction matrix used to generate recommendations.
    val_df : pandas.DataFrame
        Validation set containing user–item interactions to evaluate against.
    pop_prior : array-like or None
        Global item popularity prior (used as fallback for cold-start users).
    k : int, default=3
        Top-k cutoff for evaluation.
    low_decile_uids : set of int, optional
        User IDs to force onto popularity-based recommendations (simulating weak users).
    user_col : str, default="uid"
        Column in `val_df` containing user IDs.
    item_col : str, default="iid"
        Column in `val_df` containing item IDs.
    date_col : str, default="bet_date"
        Column in `val_df` containing interaction dates (grouped by day).
    weights : tuple of float, default=(0.60, 0.30, 0.10)
        Weights applied to (NDCG, Precision, Coverage) when computing the objective.
    return_preds : bool, default=True
        If True, also return a DataFrame of top-k predictions per user/date.
    item_names : array-like, optional
        Optional mapping of item IDs to human-readable names.

    Returns
    -------
    metrics : dict
        Dictionary with average Hit@k, Precision@k, NDCG@k, Coverage@k,
        weighted "Objective", and number of evaluated user-days.
    preds : pandas.DataFrame, optional
        If `return_preds=True`, a tidy table of predictions with columns:
        [user_col, date_col, "rank", item_col, "item"].

    Notes
    -----
    - Cold-start or low-decile users fall back to popularity ordering if `pop_prior` is provided.
    - Coverage measures how diverse the daily top-k recommendations are across users.
    - The objective combines relevance and coverage into a single scalar for hyperparameter search.
    """
 
    # --- Shapes & safety ---
    mf_users = getattr(model, "user_factors", np.empty((0, 0))).shape[0]
    mf_items = getattr(model, "item_factors", np.empty((0, 0))).shape[0]
    UI_safe  = user_items[:mf_users, :]  # ensure #rows == mf_users

    use_pop = (pop_prior is not None) and (len(pop_prior) == mf_items) and (float(np.max(pop_prior)) > 0)
    pop_order = np.argsort(-pop_prior).tolist() if use_pop else []
    low_decile_uids = set(low_decile_uids or [])

    # --- Ground truth & slates ---
    true_by_ud = (val_df.groupby([user_col, date_col])[item_col]
                  .apply(lambda s: set(s.unique())).to_dict())
    slate_by_day = (val_df.groupby(date_col)[item_col]
                    .apply(lambda s: set(s.unique())).to_dict())

    # --- Per-user recommendation lists (keys must match user_col values) ---
    user_recs = {}
    for u in pd.unique(val_df[user_col]):
        u = int(u)
        cold_or_low = (u < 0) or (u >= mf_users) or (u in low_decile_uids)
        if cold_or_low:
            user_recs[u] = pop_order[:] if use_pop else []
            continue
        try:
            rec_i, _ = model.recommend(u, UI_safe, N=200,
                                       filter_items=None,
                                       filter_already_liked_items=False)
            user_recs[u] = rec_i.tolist()
        except IndexError:
            user_recs[u] = pop_order[:] if use_pop else []

    # --- Evaluate @k ---
    hit_sum = prec_sum = ndcg_sum = 0.0
    n = 0
    unique_recs_by_day = {}  # day -> set(iids)
    pred_rows = [] if return_preds else None

    for (u, day), true_set in true_by_ud.items():
        slate = slate_by_day.get(day, set())
        if not slate:
            continue
        recs = [i for i in user_recs.get(int(u), []) if i in slate][:k]
        if day not in unique_recs_by_day:
            unique_recs_by_day[day] = set()
        unique_recs_by_day[day].update(recs)

        # predictions table (optional)
        if return_preds and recs:
            for r, iid in enumerate(recs, 1):
                pred_rows.append({
                    user_col: int(u),
                    date_col: day,
                    "rank": r,
                    item_col: int(iid),
                    "item": (None if item_names is None else item_names[int(iid)])
                })

        # metrics
        hits = sum(1 for i in recs if i in true_set)
        preck = hits / k

        ideal = min(len(true_set), k)
        if ideal > 0:
            idcg = sum(1.0/np.log2(r+1) for r in range(1, ideal+1))
            dcg  = sum(1.0/np.log2(r+1) for r, i in enumerate(recs, 1) if i in true_set)
            ndcg = dcg / idcg
        else:
            ndcg = 0.0

        hit_sum  += 1.0 if hits > 0 else 0.0
        prec_sum += preck
        ndcg_sum += ndcg
        n += 1

    # --- Aggregate + coverage + objective ---
    if n == 0:
        metrics = {f"Hit@{k}": 0.0, f"Precision@{k}": 0.0, f"NDCG@{k}": 0.0,
                   f"Coverage@{k}": 0.0, "Objective": 0.0, "n_eval": 0}
        preds = (pd.DataFrame(pred_rows) if return_preds else None)
        return (metrics, preds) if return_preds else metrics

    ndcg = ndcg_sum / n
    prec = prec_sum / n
    hit  = hit_sum  / n
    total_recs = n * k
    unique_day_item = sum(len(s) for s in unique_recs_by_day.values())
    coverage = unique_day_item / max(1, total_recs)

    w_ndcg, w_prec, w_cov = weights
    objective = w_ndcg * ndcg + w_prec * prec + w_cov * coverage

    metrics = {f"Hit@{k}": hit,
               f"Precision@{k}": prec,
               f"NDCG@{k}": ndcg,
               f"Coverage@{k}": coverage,
               "Objective": objective,
               "n_eval": n}

    preds = (pd.DataFrame(pred_rows).sort_values([user_col, date_col, "rank"])
             if return_preds else None)
    return (metrics, preds) if return_preds else metrics


In [17]:
df_train.head()

Unnamed: 0,mask_id,betdate,event_description,wager_amount,AwayTeam,HomeTeam,bet_date
0,751771,2024-10-22 00:11:01,Minnesota Timberwolves @ Los Angeles Lakers,1.666667,Minnesota Timberwolves,Los Angeles Lakers,2024-10-22
1,751771,2024-10-22 00:11:20,Minnesota Timberwolves @ Los Angeles Lakers,5.0,Minnesota Timberwolves,Los Angeles Lakers,2024-10-22
2,751771,2024-10-22 00:23:25,Minnesota Timberwolves @ Los Angeles Lakers,0.833333,Minnesota Timberwolves,Los Angeles Lakers,2024-10-22
3,751771,2024-10-22 00:24:13,Minnesota Timberwolves @ Los Angeles Lakers,0.909091,Minnesota Timberwolves,Los Angeles Lakers,2024-10-22
4,751771,2024-10-22 00:25:32,Minnesota Timberwolves @ Los Angeles Lakers,0.833333,Minnesota Timberwolves,Los Angeles Lakers,2024-10-22


In [121]:
def run_bpr_training_and_evaluation(
    df_train: pd.DataFrame,
    df_validation: pd.DataFrame,
    preprocess_validation: Callable[[pd.DataFrame], pd.DataFrame],
    evaluate_k_for_Hyper_Optimization: Callable[..., Dict[str, float]],
    build_dir_item: Callable[[pd.DataFrame], pd.Series],
    factors_grid=(32, 64),
    learning_rates=(0.001, 0.01),
    regularizations=(1e-5, 1e-4, 1e-3, 1e-2),
    iterations_grid=(40, 60, 80),
    k_eval: int = 3,
    objective_weights: Tuple[float, float, float] = (0.60, 0.30, 0.10),
) -> Dict[str, Any]:
    """
    End-to-end pipeline that: prepares data, performs a temporal split, encodes users/items,
    builds implicit matrices, runs grid-search for implicit BPR, reports the best config,
    then refits a final BPR model on all positives and preprocesses validation for downstream use.

    Steps & behavior (mirrors the provided script exactly):
      1) Prepare: copy, parse bet_date, build canonical 'item' via build_dir_item, keep positives (wager_amount > 0).
      2) Temporal split: last 20% by bet_date -> validation; first 80% -> train; print user/item coverage diagnostics.
      3) Encode users/items from TRAIN ONLY; create mapping DataFrames and map train/val into train space.
      4) Popularity prior from TRAIN (normalized 0..1) for evaluation fallback; pop_order computed (desc).
      5) Build sparse matrices:
         - TRAIN matrices: IU_train (items×users) and UI_train (users×items) with **binary** weights (standard BPR).
         - Cast CSR internals to int32 and guard UI rows to match model.user_factors when recommending.
      6) Hyperparameter tuning:
         - Grid: Cartesian product of factors, learning_rates, regularizations, iterations.
         - Fit each config on IU_train; align validation to model shapes; evaluate at k using provided evaluator.
         - Track and print Hit@k, Precision@k, NDCG@k, Coverage@k, and weighted Objective; keep best config/model.
      7) Report best config and metrics.
      8) Final refit on **ALL** positives:
         - Factorize across ALL positives; build IU_all/UI_all with **log1p(wager_amount)** weights (as in original).
         - Train final BayesianPersonalizedRanking on IU_all.
         - Build user/item index maps and iid_to_item reverse map.
      9) Preprocess the provided df_validation (copy), ensure bet_date is datetime.date, and print .info().

    Returns a dict with: best config/metrics/model, train/val artifacts, final model, matrices, mappings, and cleaned validation.
    """
    # --- 1) Prepare data ---
    df = df_train.copy()
    df["bet_date"] = pd.to_datetime(df["bet_date"])
    df["item"] = build_dir_item(df)  # canonical item id "AWAY ## HOME"
    pos = df.loc[df["wager_amount"] > 0]

    # --- Split: last 10% dates as validation ---
    cut = pos["bet_date"].quantile(0.80)
    train_pos = pos.loc[pos["bet_date"] < cut].copy()
    val_pos = pos.loc[pos["bet_date"] >= cut].copy()

    # --- Coverage diagnostics ---
    train_users_raw = set(train_pos["mask_id"].unique())
    val_users_raw = set(val_pos["mask_id"].unique())
    train_items_raw = set(train_pos["item"].unique())
    val_items_raw = set(val_pos["item"].unique())

    val_users_unseen = val_users_raw - train_users_raw
    val_items_unseen = val_items_raw - train_items_raw

    print(
        f"[COVERAGE] VAL users unseen in TRAIN: {len(val_users_unseen)} / {len(val_users_raw)} "
        f"({(len(val_users_unseen)/max(1,len(val_users_raw))):.1%})"
    )
    print(
        f"[COVERAGE] VAL items unseen in TRAIN: {len(val_items_unseen)} / {len(val_items_raw)} "
        f"({(len(val_items_unseen)/max(1,len(val_items_raw))):.1%})"
    )
    _unseen_users_list = sorted(list(val_users_unseen))[:10]
    _unseen_items_list = sorted(list(val_items_unseen))[:10]
    print(f"[COVERAGE] sample unseen user ids: {_unseen_users_list}")
    print(f"[COVERAGE] sample unseen item ids: {_unseen_items_list}")

    # --- 2) Encode from TRAIN ONLY ---
    u_codes, users = pd.factorize(train_pos["mask_id"], sort=True)
    i_codes, items = pd.factorize(train_pos["item"], sort=True)
    u_map = pd.DataFrame({"mask_id": users, "uid": np.arange(len(users), dtype=np.int32)})
    i_map = pd.DataFrame({"item": items, "iid": np.arange(len(items), dtype=np.int32)})

    def to_train_space(df_: pd.DataFrame) -> pd.DataFrame:
        return (
            df_.merge(u_map, on="mask_id", how="inner")
               .merge(i_map, on="item", how="inner")
        )

    train_m = to_train_space(train_pos)
    val_m = to_train_space(val_pos)

    # --- Popularity prior from TRAIN (0..1) ---
    _item_counts = train_m.groupby("iid").size()
    pop_prior = _item_counts.reindex(range(len(items)), fill_value=0).to_numpy(dtype=np.float32)
    if pop_prior.max() > 0:
        pop_prior = pop_prior / pop_prior.max()
    else:
        pop_prior = np.zeros_like(pop_prior, dtype=np.float32)

    pop_order = np.argsort(-pop_prior)

    n_users, n_items = len(users), len(items)
    if (n_users == 0) or (n_items == 0) or train_m.empty or val_m.empty:
        raise ValueError("Empty train/val after split+encoding. Adjust split or check coverage.")

    # --- 3) Sparse matrices for TRAIN (binary weights for standard BPR) ---
    W = train_m["wager_amount"].astype(bool).astype(np.float32).to_numpy()
    IU_train = sp.coo_matrix(
        (W, (train_m["iid"].to_numpy(), train_m["uid"].to_numpy())),
        shape=(n_items, n_users),
        dtype=np.float32,
    ).tocsr()
    UI_train = IU_train.T.tocsr()

    for M in (IU_train, UI_train):
        M.indptr = M.indptr.astype(np.int32, copy=False)
        M.indices = M.indices.astype(np.int32, copy=False)

    if UI_train.shape[0] != IU_train.shape[1]:
        UI_train = UI_train[:IU_train.shape[1], :]

    # --- 4) Hyperparameter tuning ---
    FACTORS = list(factors_grid)
    LEARNING_RATES = list(learning_rates)
    REGULARIZATIONS = list(regularizations)
    ITERATIONS = list(iterations_grid)

    grid = [
        dict(factors=f, learning_rate=lr, regularization=reg, iterations=it)
        for f, lr, reg, it in product(FACTORS, LEARNING_RATES, REGULARIZATIONS, ITERATIONS)
    ]
    grid.sort(key=lambda p: (p["factors"] * p["iterations"], p["learning_rate"], p["regularization"]))

    print(f"Total configs: {len(grid)}")  # 2*2*4*3 = 48 by default

    best: Dict[str, Any] = {"score": -1.0, "metrics": None, "cfg": None, "model": None}

    print("TUNING (implicit BPR; non-directional items; UI CSR for recommend):")
    for p in grid:
        model = BayesianPersonalizedRanking(
            factors=p["factors"],
            learning_rate=p["learning_rate"],
            regularization=p["regularization"],
            iterations=p["iterations"],
            verify_negative_samples=True,
            num_threads=0,  # 0=all cores; set 1 for deterministic single-thread if desired
        )
        model.fit(IU_train)

        # Sanity checks: model vs matrices
        mf_users = getattr(model, "user_factors", np.empty((0, 0))).shape[0]
        mf_items = getattr(model, "item_factors", np.empty((0, 0))).shape[0]

        if mf_users != IU_train.shape[1]:
            print(f"[WARN] model.user_factors={mf_users} but IU_train.shape[1]={IU_train.shape[1]}")

        UI_train_safe = UI_train[:mf_users, :] if UI_train.shape[0] != mf_users else UI_train

        print(
            "model users:", mf_users, " | UI rows:", UI_train_safe.shape[0],
            " | max val uid:", int(val_m["uid"].max()),
            " | items:", mf_items, " | IU items:", IU_train.shape[0]
        )

        # Align validation DF to the trained model's user/item ranges
        val_eval = val_m[(val_m["uid"] < mf_users) & (val_m["iid"] < mf_items)].copy()
        if val_eval.empty:
            print("[WARN] validation set became empty after alignment; skipping config")
            continue

        # Diagnostics: show how much we dropped after alignment
        u0, u1 = val_m["uid"].nunique(), val_eval["uid"].nunique()
        i0, i1 = val_m["iid"].nunique(), val_eval["iid"].nunique()
        if (u0 != u1) or (i0 != i1):
            print(
                f"[INFO] Dropped users: {u0 - u1} / {u0} ({(u0-u1)/max(1,u0):.1%}), "
                f"items: {i0 - i1} / {i0} ({(i0-i1)/max(1,i0):.1%})"
            )

        metrics = evaluate_k_for_Hyper_Optimization(
            model=model,
            user_items=UI_train_safe,
            val_df=val_eval,               # must have columns uid, iid, bet_date
            pop_prior=pop_prior,
            k=k_eval,
            low_decile_uids=None,          # or set()
            user_col="uid",
            item_col="iid",
            date_col="bet_date",
            weights=objective_weights,
            return_preds=False,            # tuning: metrics only
        )

        print(
            f"  cfg={p} -> Hit@{k_eval}={metrics[f'Hit@{k_eval}']:.4f}  "
            f"Prec@{k_eval}={metrics[f'Precision@{k_eval}']:.4f}  "
            f"NDCG@{k_eval}={metrics[f'NDCG@{k_eval}']:.4f}  "
            f"Coverage@{k_eval}={metrics[f'Coverage@{k_eval}']:.4f}  "
            f"Objective={metrics['Objective']:.4f}"
        )

        if metrics["Objective"] > best["score"]:
            best["score"] = metrics["Objective"]
            best["metrics"] = metrics
            best["cfg"] = p
            best["model"] = model

    # --- Report best ---
    m = best["metrics"]
    #print("\nBEST ON VALIDATION:")
    #print(f"  cfg={best['cfg']}")
#     print(
#         f"  Hit@{k_eval}={m[f'Hit@{k_eval}']:.4f} | "
#         f"Precision@{k_eval}={m[f'Precision@{k_eval}']:.4f} | "
#         f"NDCG@{k_eval}={m[f'NDCG@{k_eval}']:.4f} | "
#         f"Coverage@{k_eval}={m[f'Coverage@{k_eval}']:.4f} | "
#         f"Objective={m['Objective']:.4f}")

    # --- Final training on ALL positives  ---
    u_codes_all, users_all = pd.factorize(pos["mask_id"], sort=True)
    i_codes_all, items_all = pd.factorize(pos["item"], sort=True)
    #W_all = np.log1p(pos["wager_amount"].astype(np.float32).to_numpy())
    W_all=pos["wager_amount"].astype(bool).astype(np.float32).to_numpy()
    

    IU_all = sp.coo_matrix(
        (W_all, (i_codes_all, u_codes_all)),
        shape=(len(items_all), len(users_all)),
        dtype=np.float32,
    ).tocsr()
    UI_all = IU_all.T.tocsr()

    for M in (IU_all, UI_all):
        M.indptr = M.indptr.astype(np.int32, copy=False)
        M.indices = M.indices.astype(np.int32, copy=False)

    final_model = BayesianPersonalizedRanking(
        factors=best["cfg"]["factors"],
        learning_rate=best["cfg"]["learning_rate"],
        regularization=best["cfg"]["regularization"],
        iterations=best["cfg"]["iterations"],
        verify_negative_samples=True,
        num_threads=0,
    )
    final_model.fit(IU_all)

    user_index_all = {u: i for i, u in enumerate(users_all)}
    item_index_all = {it: i for i, it in enumerate(items_all)}
    iid_to_item = {i: it for it, i in item_index_all.items()}

    # --- Clean validation/unseen data (shared steps) ---
    df_val_clean = preprocess_validation(df_validation.copy())
    df_val_clean["bet_date"] = pd.to_datetime(df_val_clean["bet_date"]).dt.date
    df_val_clean.info()

    return {
        # tuning artifacts
        "best": best,                          # {"score","metrics","cfg","model"}
        "UI_train": UI_train,
        "IU_train": IU_train,
        "train_m": train_m,
        "val_m": val_m,
        "pop_prior": pop_prior,
        "pop_order": pop_order,
        "users": users,
        "items": items,
        # final model artifacts
        "final_model": final_model,
        "UI_all": UI_all,
        "IU_all": IU_all,
        "users_all": users_all,
        "items_all": items_all,
        "user_index_all": user_index_all,
        "item_index_all": item_index_all,
        "iid_to_item": iid_to_item,
        # cleaned validation
        "df_validation": df_val_clean,
    }



##Call the function and unpack

results = run_bpr_training_and_evaluation(
    df_train=df_train,
    df_validation=df_validation,
    preprocess_validation=preprocess_validation,
    evaluate_k_for_Hyper_Optimization=evaluate_k_for_Hyper_Optimization,
    build_dir_item=build_dir_item,
    factors_grid=(32, 64),
    learning_rates=(0.001, 0.01),
    regularizations=(1e-5, 1e-4, 1e-3, 1e-2),
    iterations_grid=(40, 60, 80),
    k_eval=3,
    objective_weights=(0.60, 0.30, 0.10),
)

[COVERAGE] VAL users unseen in TRAIN: 43 / 496 (8.7%)
[COVERAGE] VAL items unseen in TRAIN: 108 / 207 (52.2%)
[COVERAGE] sample unseen user ids: [134111, 139018, 155268, 155662, 226897, 232637, 236153, 265134, 277155, 298583]
[COVERAGE] sample unseen item ids: ['Atlanta Hawks ## Dallas Mavericks', 'Atlanta Hawks ## Houston Rockets', 'Atlanta Hawks ## Philadelphia 76ers', 'Boston Celtics ## Memphis Grizzlies', 'Boston Celtics ## Phoenix Suns', 'Boston Celtics ## Portland Trail Blazers', 'Boston Celtics ## Sacramento Kings', 'Boston Celtics ## San Antonio Spurs', 'Boston Celtics ## Utah Jazz', 'Brooklyn Nets ## Dallas Mavericks']
Total configs: 48
TUNING (implicit BPR; non-directional items; UI CSR for recommend):


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 40} -> Hit@3=0.4654  Prec@3=0.1905  NDCG@3=0.2855  Coverage@3=0.0054  Objective=0.2290


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 40} -> Hit@3=0.4440  Prec@3=0.1766  NDCG@3=0.2871  Coverage@3=0.0052  Objective=0.2258


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 40} -> Hit@3=0.4447  Prec@3=0.1844  NDCG@3=0.3010  Coverage@3=0.0053  Objective=0.2365


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 40} -> Hit@3=0.4457  Prec@3=0.1937  NDCG@3=0.2870  Coverage@3=0.0054  Objective=0.2308


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 40} -> Hit@3=0.4661  Prec@3=0.1909  NDCG@3=0.2848  Coverage@3=0.0113  Objective=0.2293


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 40} -> Hit@3=0.4861  Prec@3=0.1950  NDCG@3=0.2975  Coverage@3=0.0118  Objective=0.2382


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 40} -> Hit@3=0.4657  Prec@3=0.1868  NDCG@3=0.2834  Coverage@3=0.0118  Objective=0.2273


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 40} -> Hit@3=0.4895  Prec@3=0.1987  NDCG@3=0.3022  Coverage@3=0.0111  Objective=0.2420


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 60} -> Hit@3=0.4532  Prec@3=0.1808  NDCG@3=0.2840  Coverage@3=0.0053  Objective=0.2252


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 60} -> Hit@3=0.5431  Prec@3=0.2255  NDCG@3=0.3420  Coverage@3=0.0059  Objective=0.2734


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 60} -> Hit@3=0.5173  Prec@3=0.2037  NDCG@3=0.3188  Coverage@3=0.0057  Objective=0.2530


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 60} -> Hit@3=0.4756  Prec@3=0.1978  NDCG@3=0.3016  Coverage@3=0.0052  Objective=0.2408


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 60} -> Hit@3=0.4745  Prec@3=0.1946  NDCG@3=0.2956  Coverage@3=0.0166  Objective=0.2374


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 60} -> Hit@3=0.4532  Prec@3=0.1841  NDCG@3=0.2823  Coverage@3=0.0153  Objective=0.2261


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 60} -> Hit@3=0.4559  Prec@3=0.1862  NDCG@3=0.2834  Coverage@3=0.0156  Objective=0.2275


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 60} -> Hit@3=0.4661  Prec@3=0.1924  NDCG@3=0.2921  Coverage@3=0.0153  Objective=0.2345


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 80} -> Hit@3=0.4986  Prec@3=0.2068  NDCG@3=0.3074  Coverage@3=0.0058  Objective=0.2471


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 40} -> Hit@3=0.4722  Prec@3=0.1981  NDCG@3=0.3022  Coverage@3=0.0054  Objective=0.2413


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 80} -> Hit@3=0.5166  Prec@3=0.1974  NDCG@3=0.3170  Coverage@3=0.0057  Objective=0.2500


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 40} -> Hit@3=0.4396  Prec@3=0.1718  NDCG@3=0.2810  Coverage@3=0.0049  Objective=0.2206


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 80} -> Hit@3=0.4749  Prec@3=0.1964  NDCG@3=0.2973  Coverage@3=0.0055  Objective=0.2379


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 40} -> Hit@3=0.4647  Prec@3=0.1808  NDCG@3=0.2989  Coverage@3=0.0051  Objective=0.2341


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 80} -> Hit@3=0.4844  Prec@3=0.2007  NDCG@3=0.3139  Coverage@3=0.0054  Objective=0.2491


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 40} -> Hit@3=0.4379  Prec@3=0.1754  NDCG@3=0.2865  Coverage@3=0.0054  Objective=0.2250


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 80} -> Hit@3=0.4654  Prec@3=0.1888  NDCG@3=0.2875  Coverage@3=0.0179  Objective=0.2309


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 40} -> Hit@3=0.4932  Prec@3=0.2026  NDCG@3=0.3077  Coverage@3=0.0122  Objective=0.2466


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 80} -> Hit@3=0.4532  Prec@3=0.1851  NDCG@3=0.2809  Coverage@3=0.0173  Objective=0.2258


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 40} -> Hit@3=0.4830  Prec@3=0.1960  NDCG@3=0.2973  Coverage@3=0.0117  Objective=0.2383


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 80} -> Hit@3=0.4613  Prec@3=0.1904  NDCG@3=0.2864  Coverage@3=0.0173  Objective=0.2307


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 40} -> Hit@3=0.4681  Prec@3=0.1911  NDCG@3=0.2871  Coverage@3=0.0109  Objective=0.2307


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 32, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 80} -> Hit@3=0.4508  Prec@3=0.1826  NDCG@3=0.2800  Coverage@3=0.0172  Objective=0.2245


  0%|          | 0/40 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 40} -> Hit@3=0.4684  Prec@3=0.1916  NDCG@3=0.2854  Coverage@3=0.0109  Objective=0.2298


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 60} -> Hit@3=0.4749  Prec@3=0.1903  NDCG@3=0.2981  Coverage@3=0.0053  Objective=0.2365


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 60} -> Hit@3=0.4657  Prec@3=0.1852  NDCG@3=0.3002  Coverage@3=0.0055  Objective=0.2362


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 60} -> Hit@3=0.4742  Prec@3=0.1835  NDCG@3=0.3012  Coverage@3=0.0051  Objective=0.2363


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 60} -> Hit@3=0.4053  Prec@3=0.1712  NDCG@3=0.2675  Coverage@3=0.0051  Objective=0.2123


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 60} -> Hit@3=0.4718  Prec@3=0.1924  NDCG@3=0.2910  Coverage@3=0.0161  Objective=0.2339


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 60} -> Hit@3=0.4654  Prec@3=0.1912  NDCG@3=0.2891  Coverage@3=0.0153  Objective=0.2324


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 60} -> Hit@3=0.4644  Prec@3=0.1916  NDCG@3=0.2899  Coverage@3=0.0154  Objective=0.2330


  0%|          | 0/60 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 60} -> Hit@3=0.4701  Prec@3=0.1926  NDCG@3=0.2950  Coverage@3=0.0147  Objective=0.2363


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 1e-05, 'iterations': 80} -> Hit@3=0.4837  Prec@3=0.1961  NDCG@3=0.2854  Coverage@3=0.0055  Objective=0.2306


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 80} -> Hit@3=0.4742  Prec@3=0.1973  NDCG@3=0.3048  Coverage@3=0.0052  Objective=0.2426


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.001, 'iterations': 80} -> Hit@3=0.4786  Prec@3=0.1991  NDCG@3=0.3023  Coverage@3=0.0053  Objective=0.2416


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.001, 'regularization': 0.01, 'iterations': 80} -> Hit@3=0.4956  Prec@3=0.1914  NDCG@3=0.3163  Coverage@3=0.0053  Objective=0.2478


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 1e-05, 'iterations': 80} -> Hit@3=0.4616  Prec@3=0.1900  NDCG@3=0.2880  Coverage@3=0.0171  Objective=0.2315


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.0001, 'iterations': 80} -> Hit@3=0.4576  Prec@3=0.1896  NDCG@3=0.2891  Coverage@3=0.0171  Objective=0.2320


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.001, 'iterations': 80} -> Hit@3=0.4650  Prec@3=0.1912  NDCG@3=0.2922  Coverage@3=0.0171  Objective=0.2344


  0%|          | 0/80 [00:00<?, ?it/s]

[WARN] model.user_factors=762 but IU_train.shape[1]=651
model users: 762  | UI rows: 651  | max val uid: 649  | items: 651  | IU items: 762
[INFO] Dropped users: 10 / 413 (2.4%), items: 17 / 99 (17.2%)
  cfg={'factors': 64, 'learning_rate': 0.01, 'regularization': 0.01, 'iterations': 80} -> Hit@3=0.4640  Prec@3=0.1909  NDCG@3=0.2890  Coverage@3=0.0172  Objective=0.2324

BEST ON VALIDATION:
  cfg={'factors': 32, 'learning_rate': 0.001, 'regularization': 0.0001, 'iterations': 60}
  Hit@3=0.5431 | Precision@3=0.2255 | NDCG@3=0.3420 | Coverage@3=0.0059 | Objective=0.2734


  0%|          | 0/60 [00:00<?, ?it/s]

[validation] 'A @ B' format match rate: 100.0%
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 30243 entries, 0 to 30242
Data columns (total 9 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   mask_id              30243 non-null  int64         
 1   betdate              30243 non-null  datetime64[ns]
 2   event_description    30243 non-null  string        
 3   wager_amount         30243 non-null  float64       
 4   bet_date             30243 non-null  object        
 5   Scheduled Game Date  28517 non-null  datetime64[ns]
 6   Offical Game Date    28468 non-null  datetime64[ns]
 7   AwayTeam             30243 non-null  string        
 8   HomeTeam             30243 non-null  string        
dtypes: datetime64[ns](3), float64(1), int64(1), object(1), string(3)
memory usage: 2.1+ MB


In [123]:
results["df_validation"].shape

(30243, 9)

In [None]:
##Access what you need for model evaluation:
best_cfg = results["best"]["cfg"]
best_metrics = results["best"]["metrics"]
best_model = results["best"]["model"]  # tuned on TRAIN
final_model = results["final_model"]   # refit on ALL positives
UI_all = results["UI_all"]
IU_all = results["IU_all"]
user_index_all = results["user_index_all"]
item_index_all = results["item_index_all"]
iid_to_item = results["iid_to_item"]
df_validation = results["df_validation"]

# MODEL EVALUATION ! # 

In [128]:
def directional_item_from_string(s):
    """
    Convert matchup text to directional 'LEFT ## RIGHT' (preserve order).
    Examples:
      'Team A @ Team B'  -> 'Team A ## Team B'
      'Team A at Team B' -> 'Team A ## Team B'
      'Team A vs Team B' -> 'Team A ## Team B'
      'Team A & Team B'  -> 'Team A ## Team B'
    Returns None if it can't parse exactly two sides.
    """
    if s is None:
        return None
    if isinstance(s, float) and math.isnan(s):
        return None

    text = str(s).strip()
    if not text:
        return None

    parts = _SEP.split(text)
    parts = [p.strip() for p in parts if p.strip()]
    if len(parts) != 2:
        return None

    return parts[0] + " ## " + parts[1]
                
def build_popularity_prior_from_iids(i_all, num_items):
    """Compute popularity scores as item frequencies normalized by the maximum (0–1) and return both scores and descending rank order."""
    counts = np.bincount(i_all, minlength=num_items).astype(np.float32)
    scores = (counts / counts.max()) if counts.max() > 0 else np.zeros(num_items, dtype=np.float32)
    order = np.argsort(-scores)
    return scores, order

def to_date(s):
    return pd.to_datetime(s, errors="coerce").dt.date

In [129]:
# Types for clarity
MaskId = int | str
ItemStr = str
UID = int
IID = int


def load_schedules_from_sources(
    box_scores_xlsx: str,
    box_scores_sheet: str,
    vendor_csv: str,
    directional_item_from_string: Callable[[str], Optional[str]],
) -> Tuple[pd.DataFrame, pd.DataFrame, Dict[pd.Timestamp, Set[ItemStr]], Dict[pd.Timestamp, Set[ItemStr]]]:
    """
    Load and normalize full schedule (box scores XLSX) and vendor schedule (CSV) into
    canonical (date, item) rows where `item` is a directional matchup like "AWAY ## HOME",
    then build fast per-day lookup sets for each source.

    Returns
    -------
    full_sched : DataFrame
        Columns: ["date", "item"] deduplicated from box scores (dates parsed as MM/DD/YYYY).
    vendor_sched : DataFrame
        Columns: ["date", "item"] deduplicated from vendor file (dates parsed as DD-MM-YYYY).
    full_by_day : dict[date -> set[item]]
        Map of unique items per date from full_sched.
    vendor_by_day : dict[date -> set[item]]
        Map of unique items per date from vendor_sched.
    """
    # Full slate (Box Scores)
    box = pd.read_excel(box_scores_xlsx, sheet_name=box_scores_sheet)
    box["date"] = pd.to_datetime(box["Game Date"], format="%m/%d/%Y", errors="coerce").dt.date
    box["item"] = box["Modified Game Name"].apply(directional_item_from_string)
    full_sched = (
        box[["date", "item"]]
        .dropna()
        .drop_duplicates()
        .reset_index(drop=True)
    )

    # Vendor slate
    vend = pd.read_csv(vendor_csv)
    vend["date"] = pd.to_datetime(
        vend["Date"].astype(str).str.strip(), format="%d-%m-%Y", errors="coerce"
    ).dt.date
    vend["item"] = vend["Game"].apply(directional_item_from_string)
    vendor_sched = (
        vend[["date", "item"]]
        .dropna()
        .drop_duplicates()
        .reset_index(drop=True)
    )

    # Per-day lookup sets
    full_by_day = full_sched.groupby("date")["item"].apply(lambda s: set(s.unique())).to_dict()
    vendor_by_day = vendor_sched.groupby("date")["item"].apply(lambda s: set(s.unique())).to_dict()

    return full_sched, vendor_sched, full_by_day, vendor_by_day


def prepare_validation_truth(
    df_validation: pd.DataFrame,
    date_start: str | pd.Timestamp,
    date_end: str | pd.Timestamp,
    directional_item_from_string: Callable[[str], Optional[str]],
) -> Tuple[pd.DataFrame, Dict[Tuple[MaskId, pd.Timestamp], Set[ItemStr]], List[MaskId]]:
    """
    Build a clean validation table limited to [date_start, date_end], derive canonical
    directional items from event text, and produce ground-truth sets per (mask_id, date)
    along with the list of users to score.

    Returns
    -------
    val : DataFrame
        Filtered validation rows with columns at least ["mask_id","date","item"].
    truth_by_ud : dict[(mask_id,date) -> set[item]]
        Ground-truth unique items per user-day.
    users_to_score : list[mask_id]
        Sorted list of unique users present in `val`.
    """
    val = df_validation.copy()
    # Scheduled Game Date -> date
    val["date"] = pd.to_datetime(val["Scheduled Game Date"], errors="coerce").dt.date
    # Directional item from event_description
    val["item"] = val["event_description"].apply(directional_item_from_string)
    val = val.dropna(subset=["mask_id", "date", "item"]).copy()

    start_d = pd.to_datetime(date_start).date()
    end_d = pd.to_datetime(date_end).date()
    val = val[(val["date"] >= start_d) & (val["date"] <= end_d)].copy()

    print("Initial num of users in val", val["mask_id"].nunique())

    truth_by_ud = (
        val.groupby(["mask_id", "date"])["item"]
        .apply(lambda s: set(s.unique()))
        .to_dict()
    )
    users_to_score = sorted(val["mask_id"].unique().tolist())
    return val, truth_by_ud, users_to_score


def build_i_all_from_train(
    df_train: pd.DataFrame,
    item_index_all: Dict[ItemStr, IID],
    directional_item_from_string: Callable[[str], Optional[str]],
) -> np.ndarray:
    """
    Construct an item-id array `i_all` for popularity priors using TRAIN ONLY (no leakage):
    parse directional items from train event text and map to global item indices, dropping unknowns.

    Returns
    -------
    i_all : np.ndarray[int]
        Array of item indices seen in training (possibly with repeats), suitable for bincount.
    """
    _tmp = df_train.copy()
    _tmp["item"] = _tmp["event_description"].apply(directional_item_from_string)
    i_all = _tmp["item"].map(item_index_all).dropna().astype(int).to_numpy()
    print(f"[INFO] Derived i_all from df_train: n={len(i_all)}")
    return i_all


def make_id_mappers_and_ui(
    final_model,
    UI_all,
    user_index_all: Dict[MaskId, UID],
    item_index_all: Dict[ItemStr, IID],
):
    """
    Create safe mappers from external ids to model indices and return a UI matrix trimmed
    to the model's user count for use with `model.recommend`.

    Returns
    -------
    uid_for : Callable[[MaskId], Optional[int]]
        Maps mask_id -> uid if within model range, else None.
    iid_for : Callable[[ItemStr], Optional[int]]
        Maps item string -> iid if within model item range, else None.
    UI_safe : scipy.sparse.csr_matrix
        User×Item matrix sliced to `final_model.user_factors.shape[0]` rows.
    model_user_count : int
        Number of users known by the model.
    model_item_count : int
        Number of items known by the model.
    """
    model_user_count = getattr(final_model, "user_factors", np.empty((0, 0))).shape[0]
    model_item_count = getattr(final_model, "item_factors", np.empty((0, 0))).shape[0]
    UI_safe = UI_all[:model_user_count, :]

    def uid_for(mask_id: MaskId) -> Optional[int]:
        uid = user_index_all.get(mask_id, None)
        if uid is None or uid < 0 or uid >= model_user_count:
            return None
        return int(uid)

    def iid_for(item: ItemStr) -> Optional[int]:
        iid = item_index_all.get(item, None)
        if iid is None or iid < 0 or iid >= model_item_count:
            return None
        return int(iid)

    return uid_for, iid_for, UI_safe, model_user_count, model_item_count


def precompute_user_recommendations(
    final_model,
    UI_safe,
    users_to_score: Iterable[MaskId],
    uid_for: Callable[[MaskId], Optional[int]],
    items_all: Iterable[ItemStr],
    N_req: Optional[int] = None,
) -> Tuple[Dict[MaskId, Optional[Dict[int, float]]], int, int]:
    """
    Run `model.recommend` once per warm user to cache recommendation scores, leaving cold
    users as None (to be served by popularity later).

    Parameters
    ----------
    final_model : object
        Trained implicit BPR model with `.recommend(uid, UI, N, ...)`.
    UI_safe : csr_matrix
        User×Item matrix trimmed to the model's user count.
    users_to_score : Iterable
        External user identifiers (mask_id) to score.
    uid_for : Callable
        Mapper from mask_id -> uid in model space (or None if out-of-range).
    items_all : Iterable
        Collection of all item labels (used only to set a reasonable N_req if not provided).
    N_req : int, optional
        Number of recommendations to request per user; defaults to min(len(items_all), 10000).

    Returns
    -------
    user_rec_map : dict[mask_id -> dict[iid -> score] | None]
        Per-user map of model item indices to relevance scores; None for cold users.
    n_warm : int
        Count of warm users (seen by the model).
    n_cold : int
        Count of cold users (not seen by the model).
    """
    N_req = min(len(list(items_all)), 10000) if N_req is None else int(N_req)

    user_rec_map: Dict[MaskId, Optional[Dict[int, float]]] = {}
    n_warm = n_cold = 0

    for mask_id in users_to_score:
        uid = uid_for(mask_id)
        if uid is not None:
            rec_i, rec_s = final_model.recommend(
                uid,
                UI_safe,
                N=N_req,
                filter_items=None,
                filter_already_liked_items=False,
            )
            rec_dict = {int(i): float(s) for i, s in zip(rec_i.tolist(), rec_s.tolist())}
            user_rec_map[mask_id] = rec_dict
            n_warm += 1
        else:
            user_rec_map[mask_id] = None  # cold -> popularity later
            n_cold += 1

    #print(f"[INFO] Users to score: {len(list(users_to_score))} | warm={n_warm} cold={n_cold}")
    return user_rec_map, n_warm, n_cold


# -------------------------
# Orchestrated convenience
# -------------------------
def build_scoring_artifacts(
    BOX_SCORES_XLSX: str,
    BOX_SCORES_SHEET: str,
    VENDOR_CSV: str,
    df_validation: pd.DataFrame,
    df_train: pd.DataFrame,
    items_all: Iterable[ItemStr],
    user_index_all: Dict[MaskId, UID],
    item_index_all: Dict[ItemStr, IID],
    UI_all,                         # csr_matrix (user×item) from your trained pipeline
    final_model,                    # fitted implicit BPR model on ALL positives
    DATE_START: str | pd.Timestamp,
    DATE_END: str | pd.Timestamp,
    directional_item_from_string: Callable[[str], Optional[str]],
    build_popularity_prior_from_iids: Callable[[np.ndarray, int], Tuple[np.ndarray, np.ndarray]],
) -> Dict[str, object]:
    """
    High-level wrapper: load schedules, prepare validation truth,
    derive popularity priors from TRAIN, build safe mappers, and precompute per-user recommendations.

    Returns
    -------
    artifacts : dict
        Dictionary containing:
          - "full_sched", "vendor_sched", "full_by_day", "vendor_by_day"
          - "val", "truth_by_ud", "users_to_score"
          - "i_all", "pop_scores", "pop_order"
          - "uid_for", "iid_for", "UI_safe", "model_user_count", "model_item_count"
          - "user_rec_map", "n_warm", "n_cold"
    """
    # 1) Schedules
    full_sched, vendor_sched, full_by_day, vendor_by_day = load_schedules_from_sources(
        BOX_SCORES_XLSX, BOX_SCORES_SHEET, VENDOR_CSV, directional_item_from_string
    )

    # 2) Validation truth
    val, truth_by_ud, users_to_score = prepare_validation_truth(
        df_validation, DATE_START, DATE_END, directional_item_from_string
    )

    # 3) Popularity prior i_all from TRAIN (no leakage)
    i_all = build_i_all_from_train(df_train, item_index_all, directional_item_from_string)

    # 4) Priors (scores 0..1 + descending order)
    pop_scores, pop_order = build_popularity_prior_from_iids(i_all, num_items=len(list(items_all)))

    # 5) Mappers and safe UI
    uid_for, iid_for, UI_safe, model_user_count, model_item_count = make_id_mappers_and_ui(
        final_model, UI_all, user_index_all, item_index_all
    )

    # 6) Precompute recommendations per user
    user_rec_map, n_warm, n_cold = precompute_user_recommendations(
        final_model=final_model,
        UI_safe=UI_safe,
        users_to_score=users_to_score,
        uid_for=uid_for,
        items_all=items_all,
        N_req=min(len(list(items_all)), 10000),
    )

    return {
        # schedules
        "full_sched": full_sched,
        "vendor_sched": vendor_sched,
        "full_by_day": full_by_day,
        "vendor_by_day": vendor_by_day,
        # validation truth
        "val": val,
        "truth_by_ud": truth_by_ud,
        "users_to_score": users_to_score,
        # priors
        "i_all": i_all,
        "pop_scores": pop_scores,
        "pop_order": pop_order,
        # mappers and matrices
        "uid_for": uid_for,
        "iid_for": iid_for,
        "UI_safe": UI_safe,
        "model_user_count": model_user_count,
        "model_item_count": model_item_count,
        # precomputed recs
        "user_rec_map": user_rec_map,
        "n_warm": n_warm,
        "n_cold": n_cold,
    }


artifacts = build_scoring_artifacts(
    BOX_SCORES_XLSX=BOX_SCORES_XLSX,
    BOX_SCORES_SHEET=BOX_SCORES_SHEET,
    VENDOR_CSV=VENDOR_CSV,
    df_validation=df_validation,
    df_train=df_train,
    items_all=items_all,
    user_index_all=user_index_all,
    item_index_all=item_index_all,
    UI_all=UI_all,
    final_model=final_model,
    DATE_START=DATE_START,
    DATE_END=DATE_END,
    directional_item_from_string=directional_item_from_string,
    build_popularity_prior_from_iids=build_popularity_prior_from_iids,
)


# # Reuse anywhere in your workflow:
full_by_day = artifacts["full_by_day"]
vendor_by_day = artifacts["vendor_by_day"]
truth_by_ud = artifacts["truth_by_ud"]
pop_scores, pop_order = artifacts["pop_scores"], artifacts["pop_order"]
uid_for = artifacts["uid_for"]
user_rec_map = artifacts["user_rec_map"]

Initial num of users in val 439
[INFO] Derived i_all from df_train: n=204392
[INFO] Users to score: 439 | warm=439 cold=0


In [65]:
def _compute_week_of_month(dts: pd.Series) -> pd.Series:
    """Week-of-month as 1..5 using simple 7-day buckets by day-of-month."""
    dom = dts.dt.day
    return ((dom - 1) // 7 + 1).astype(int)

def _infer_date_col(df: pd.DataFrame) -> str:
    """
    Try to find a sensible date column in rankings.
    Priority order is adjustable.
    """
    candidates = ["bet_date", "date", "game_date", "event_date"]
    for c in candidates:
        if c in df.columns:
            return c
    # fallback: any column with 'date' in its name
    for c in df.columns:
        if "date" in str(c).lower():
            return c
    raise KeyError(
        f"Could not infer a date column. Looked for any of {candidates} or *date* in {list(df.columns)}."
    )

def augment_rankings_for_export(
    rankings: pd.DataFrame,
    *,
    date_col: str | None = None,     # None => auto-detect
    model=None,                      # trained implicit BPR model (optional)
    user_index: dict | None = None   # {mask_id -> uid} (optional)
) -> pd.DataFrame:
    """
    Modifies a rankings DataFrame for export:
      • drops 'score' (if present)
      • adds 'new_user' (1=cold, 0=warm)
      • adds time columns: 'month', 'week_of_month', 'day_type' derived from the date column

    new_user logic (checked in this order):
      1) If 'uid' present and `model` provided:
         uid >= model.user_factors.shape[0] → cold (1)
      2) Else if 'mask_id' present and `user_index` provided:
         mask_id not in user_index → cold (1)
      3) Else: default warm (0) and print a warning.
    """
    df = rankings.copy()

    # Drop score safely
    df = df.drop(columns=["score"], errors="ignore")

    # new_user
    # If you've already computed df["new_user"] upstream, KEEP it and don't overwrite.
    if "new_user" not in df.columns:
        new_user = pd.Series(0, index=df.index, dtype=np.int8)
        try:
            if "uid" in df.columns and model is not None:
                n_model_users = int(getattr(model, "user_factors", np.empty((0, 0))).shape[0])
                uid_vals = pd.to_numeric(df["uid"], errors="coerce").fillna(-1).astype(int)
                new_user = ((uid_vals < 0) | (uid_vals >= n_model_users)).astype(np.int8)
            elif "mask_id" in df.columns and user_index is not None:
                known = set(user_index.keys())
                new_user = (~df["mask_id"].isin(known)).astype(np.int8)
            else:
                #print("[WARN] Could not determine warm/cold users; 'new_user' defaulted to 0 for all rows.")
        except Exception as e:
            #print(f"[WARN] Failed to compute 'new_user' ({e}); defaulting to 0.")
        df["new_user"] = new_user
    # else: leave the existing df["new_user"] as-is



    # Time features
    if date_col is None:
        date_col = _infer_date_col(df)

    dts = pd.to_datetime(df[date_col], errors="coerce", utc=False)
    if dts.isna().any():
        bad = df.loc[dts.isna(), date_col].head(3).tolist()
        raise ValueError(f"Some '{date_col}' values could not be parsed as datetime. Examples: {bad}")

    df["month"] = dts.dt.month.astype(np.int8)                # 1..12
    df["week_of_month"] = _compute_week_of_month(dts)         # 1..5
    df["day_type"] = np.where(dts.dt.dayofweek >= 5, "Weekend", "Weekday")
    return df


In [137]:
# ============ BUILD RANKINGS (BOTH TRACKS) ============

def both_orientations(vendor_item: str) -> Set[str]:
    """
    Expand a vendor matchup string into both possible orientations.
    Examples
    --------
    - "LAL ## BOS" → {"LAL ## BOS", "BOS ## LAL"}
    - "LAL @ BOS"  → {"LAL @ BOS"}  (unchanged if no "##")
    Parameters
    ----------
    vendor_item : str
        Vendor-supplied matchup string, possibly using "##" to separate teams.
    Returns
    -------
    set of str
        One or two canonical item strings representing both home/away orientations.
    """
    if "##" not in vendor_item:
        return {vendor_item}
    a, b = [p.strip() for p in vendor_item.split("##", 1)]
    return {f"{a} ## {b}", f"{b} ## {a}"}

def build_daily_slates(
    d,full_by_day: Dict,vendor_by_day: Dict,item_to_iid: Dict[str, int],unknown_full_log: Dict,unknown_vendor_log: Dict,) -> Tuple[List[int], List[int]]:
    """
    Build the set of known items (mapped to iids) for a given date from both the official (full) slate and the vendor slate.

    Behavior
    --------
    - Full slate: uses directional keys like "A @ B" directly; logs unknowns.
    - Vendor slate: expands "A ## B" into both orientations, deduplicates, and maps to iids; logs unknowns.

    Parameters
    ----------
    d : date
        Target game date.
    full_by_day : dict[date -> set[str]]
        Mapping from date to full schedule items in "Away @ Home" format.
    vendor_by_day : dict[date -> set[str]]
        Mapping from date to vendor schedule items in "A ## B" format.
    item_to_iid : dict[str, int]
        Dictionary mapping canonical item strings to integer ids.
    unknown_full_log : dict
        Updated in-place with counts of full slate items missing from catalog.
    unknown_vendor_log : dict
        Updated in-place with counts of vendor slate items missing from catalog.

    Returns
    -------
    full_iids : list[int]
        List of iids from the full slate for date d.
    vendor_iids : list[int]
        List of iids from the vendor slate (both orientations) for date d.
    """
    all_known = set(item_to_iid.keys())

    # --- FULL track (directional, as-is) ---
    full_items = full_by_day.get(d, set())
    #print("Full items",full_items)
    # unknowns (directional keys not found in catalog)
    unk_full = [it for it in full_items if it not in all_known]
    if unk_full:
        unknown_full_log[d] = len(unk_full)
    full_iids = [item_to_iid[it] for it in full_items if it in item_to_iid]

    # --- VENDOR track (expand, then map) ---
    vendor_items = vendor_by_day.get(d, set())
    vendor_items_expanded: Set[str] = set()
    for it in vendor_items:
        vendor_items_expanded |= both_orientations(it)

    #print("Vendor items:",vendor_items_expanded)
    vendor_iids = [item_to_iid[it] for it in vendor_items_expanded if it in item_to_iid]
    
    # unknowns after expansion (i.e., 'A @ B'/'B @ A' not in catalog)
    unk_vendor = [it for it in vendor_items_expanded if it not in all_known]
    if unk_vendor:
        unknown_vendor_log[d] = len(unk_vendor)

    vendor_iids = list({item_to_iid[it] for it in vendor_items_expanded if it in item_to_iid})
    return full_iids, vendor_iids

def build_score_map(union_iids: Iterable[int],rec_map,pop_scores: Dict[int, float], iid_to_index: Optional[Dict[int, int]] = None) -> Dict[int, float]:
    """
    Construct a consistent {iid -> score} mapping from various recommendation formats.

    Resolution order
    ----------------
    - If `rec_map` is a dict: use rec_map[iid], else back off to popularity.
    - If `rec_map` is a pandas Series: align by iid index, missing values fall back to popularity.
    - If `rec_map` is a NumPy array:
        * If `iid_to_index` is provided, map iids to array positions.
        * Else assume direct indexing (iids 0..n-1); out-of-range values fall back to popularity.
    - Any other type: assign popularity scores only.

    Parameters
    ----------
    union_iids : iterable of int
        Item ids that need scores.
    rec_map : dict, pandas.Series, or numpy.ndarray
        Recommendation scores in one of the supported formats.
    pop_scores : dict[int, float]
        Popularity-based fallback scores (0–1).
    iid_to_index : dict[int, int], optional
        Mapping from iid -> position in rec_map if rec_map is an ndarray.

    Returns
    -------
    scores : dict[int, float]
        Dictionary of iid -> score for all requested items.
    """
    scores = {}

    # Case 1: dict-like (fast path)
    if isinstance(rec_map, dict):
        for iid in union_iids:
            scores[iid] = float(rec_map.get(iid, pop_scores.get(iid, 0.0)))
        return scores

    # Case 2: pandas Series (index are iids)
    if isinstance(rec_map, pd.Series):
        # Pull only needed iids; unseen -> NaN -> backoff to pop
        sub = rec_map.reindex(list(union_iids))
        for iid, val in sub.items():
            if pd.isna(val):
                scores[iid] = float(pop_scores.get(iid, 0.0))
            else:
                scores[iid] = float(val)
        return scores

    # Case 3: numpy array
    if isinstance(rec_map, np.ndarray):
        n = len(rec_map)
        for iid in union_iids:
            if iid_to_index is not None:
                pos = iid_to_index.get(iid)
                if pos is not None and 0 <= pos < n:
                    scores[iid] = float(rec_map[pos])
                else:
                    scores[iid] = float(pop_scores.get(iid, 0.0))
            else:
                # Assume iids are 0..n-1
                if 0 <= iid < n:
                    scores[iid] = float(rec_map[iid])
                else:
                    scores[iid] = float(pop_scores.get(iid, 0.0))
        return scores

    # Unknown type: fall back entirely to popularity
    for iid in union_iids:
        scores[iid] = float(pop_scores.get(iid, 0.0))
    return scores

In [150]:
def score_iid(iid: int, rec_map: Optional[Dict[int, float]], pop_scores: Dict[int, float]) -> float:
    """
    Return a numeric score for an item id `iid`, preferring the user's recommendation map if available, otherwise falling back to popularity.
    """
    if rec_map is None:
        return float(pop_scores.get(iid, 0.0))
    try:
        return float(rec_map[iid])
    except (IndexError, TypeError, KeyError):
        return float(pop_scores.get(iid, 0.0))

def build_calibration_pairs_topn(
    dates_range: Iterable[pd.Timestamp],
    users_to_score: Iterable,
    full_by_day: Dict,              # date -> set[str] like {'LAL @ BOS', ...}
    vendor_by_day: Dict,            # date -> set[str] like {'BOS ## LAL', ...}
    item_to_iid: Dict[str, int],
    user_rec_map: Dict,             # mask_id -> dict[iid->score] or None
    pop_scores: Dict[int, float],
    truth_by_ud: Dict[Tuple, Set],  # (mask_id,date) -> set of truth items (iid or item str)
    iid_to_item: Dict[int, str],
    build_daily_slates: Callable[..., Tuple[List[int], List[int]]],
    unknown_full_log: Dict,         # updated in-place
    unknown_vendor_log: Dict,       # updated in-place
    top_n: int,
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Construct (scores, labels) used to train a confidence calibrator by taking the top-N scored items
    per (user, day) on the union of full and vendor slates and labeling them by ground-truth membership.
    """
    scores: List[float] = []
    labels: List[int] = []

    for d in dates_range:
        full_iids, vendor_iids = build_daily_slates(
            d, full_by_day, vendor_by_day, item_to_iid, unknown_full_log, unknown_vendor_log
        )
        if not full_iids and not vendor_iids:
            continue

        union_iids = set(full_iids) | set(vendor_iids)

        for mask_id in users_to_score:
            rec_map = user_rec_map.get(mask_id)
            truth = truth_by_ud.get((mask_id, d), set())
            # Truth may be a set of iids or item strings; detect and compare accordingly.
            truth_is_iid = len(truth) > 0 and isinstance(next(iter(truth)), (int, np.integer))

            scored = [(iid, score_iid(iid, rec_map, pop_scores)) for iid in union_iids]
            scored.sort(key=lambda t: -t[1])

            for iid, s in scored[:top_n]:
                is_hit = (iid in truth) if truth_is_iid else (iid_to_item.get(iid) in truth)
                scores.append(float(s))
                labels.append(1 if is_hit else 0)

    return np.asarray(scores, dtype=float), np.asarray(labels, dtype=int)


def fit_platt_calibrator(val_scores: np.ndarray, val_labels: np.ndarray) -> LogisticRegression:
    """
    Fit a Platt-scaling logistic regression calibrator P(y=1 | score) from (score, label) pairs, with a safe fallback if labels are degenerate.
    """
    if val_scores.size == 0 or len(np.unique(val_labels)) < 2:
        # Fallback: approximate identity mapping (monotone) using two anchor points.
        platt = LogisticRegression(solver="lbfgs", max_iter=1000)
        platt.fit(np.array([[0.0], [1.0]]), np.array([0, 1]))
        return platt

    platt = LogisticRegression(
        solver="lbfgs",
        max_iter=1000,
        class_weight="balanced",  # mitigate class imbalance
    )
    platt.fit(val_scores.reshape(-1, 1), val_labels)
    return platt


def generate_rankings_with_calibrated_confidence(
    dates_range: Iterable[pd.Timestamp],
    users_to_score: Iterable,
    full_by_day: Dict,
    vendor_by_day: Dict,
    item_to_iid: Dict[str, int],
    user_rec_map: Dict,                      # mask_id -> dict[iid->score] or None
    pop_scores: Dict[int, float],
    iid_to_item: Dict[int, str],
    platt: LogisticRegression,
    build_daily_slates: Callable[..., Tuple[List[int], List[int]]],
    unknown_full_log: Dict,
    unknown_vendor_log: Dict,
    top_k: int,
) -> List[Dict]:
    """
    Produce per-user, per-day rankings for both the full and vendor tracks with calibrated confidence scores using a fitted Platt model.
    """
    rows: List[Dict] = []

    for d in dates_range:
        full_iids, vendor_iids = build_daily_slates(
            d, full_by_day, vendor_by_day, item_to_iid, unknown_full_log, unknown_vendor_log
        )
        if not full_iids and not vendor_iids:
            continue

        # Score the union once so both tracks reuse the same scores.
        union_iids = set(full_iids) | set(vendor_iids)

        for mask_id in users_to_score:
            rec_map = user_rec_map.get(mask_id)  # may be None (cold)
            score_map = {iid: score_iid(iid, rec_map, pop_scores) for iid in union_iids}

            # FULL track
            full_ranked = sorted(((iid, score_map[iid]) for iid in full_iids), key=lambda t: -t[1])
            full_top = full_ranked[:top_k]
            if len(full_top) > 0:
                slate_scores = np.array([s for _, s in full_top], dtype=float)
                conf_full = platt.predict_proba(slate_scores.reshape(-1, 1))[:, 1]
            else:
                conf_full = np.array([], dtype=float)

            for rank, ((iid, score), conf) in enumerate(zip(full_top, conf_full), 1):
                rows.append(
                    {
                        "date": d,
                        "mask_id": mask_id,
                        "iid": iid,
                        "item": iid_to_item[iid],
                        "score": float(score),
                        "confidence": float(conf),
                        "rank": rank,
                        "track": "full",
                    }
                )

            # VENDOR track
            vendor_ranked = sorted(((iid, score_map[iid]) for iid in vendor_iids), key=lambda t: -t[1])
            vendor_top = vendor_ranked[:top_k]
            if len(vendor_top) > 0:
                slate_scores = np.array([s for _, s in vendor_top], dtype=float)
                conf_vendor = platt.predict_proba(slate_scores.reshape(-1, 1))[:, 1]
            else:
                conf_vendor = np.array([], dtype=float)

            for rank, ((iid, score), conf) in enumerate(zip(vendor_top, conf_vendor), 1):
                rows.append(
                    {
                        "date": d,
                        "mask_id": mask_id,
                        "iid": iid,
                        "item": iid_to_item[iid],
                        "score": float(score),
                        "confidence": float(conf),
                        "rank": rank,
                        "track": "vendor",
                    }
                )

    return rows


def run_calibrated_scoring_and_export(
    *,
    # Inputs carried from previous blocks
    users_to_score: Iterable,
    user_rec_map: Dict[object, Optional[Dict[int, float]]],
    pop_scores: Dict[int, float],
    iid_to_item: Dict[int, str],
    uid_for: Callable[[object], Optional[int]],
    full_by_day: Dict,
    vendor_by_day: Dict,
    item_to_iid: Dict[str, int],
    truth_by_ud: Dict[Tuple, Set],
    # Config
    DATE_START: str | pd.Timestamp,
    DATE_END: str | pd.Timestamp,
    TOP_K: int,
    TOP_N_CAL: int,
    # Utilities
    build_daily_slates: Callable[..., Tuple[List[int], List[int]]],
    augment_rankings_for_export: Callable[..., pd.DataFrame],
    OUTPUT_CSV: str,
) -> Dict[str, object]:
    """
    Orchestrate calibrated ranking generation: build calibration pairs, fit Platt scaler, score both tracks with calibrated confidence, augment results, and save to CSV.

    Returns
    -------
    artifacts : dict
        {
          "unknown_full_log": defaultdict(int),
          "unknown_vendor_log": defaultdict(int),
          "dates_range": np.ndarray of dates,
          "val_scores": np.ndarray,
          "val_labels": np.ndarray,
          "platt": LogisticRegression,
          "rows": list[dict],
          "df_all": pd.DataFrame,
          "df_all_aug": pd.DataFrame,
          "output_csv": str
        }
    """
    # Coverage diagnostics
    unknown_full_log: Dict = defaultdict(int)
    unknown_vendor_log: Dict = defaultdict(int)

    # Date range [inclusive]
    dates_range = pd.date_range(DATE_START, DATE_END, freq="D").date

    # --- Build calibration pairs and fit Platt scaler ---
    val_scores, val_labels = build_calibration_pairs_topn(
        dates_range=dates_range,
        users_to_score=users_to_score,
        full_by_day=full_by_day,
        vendor_by_day=vendor_by_day,
        item_to_iid=item_to_iid,
        user_rec_map=user_rec_map,
        pop_scores=pop_scores,
        truth_by_ud=truth_by_ud,
        iid_to_item=iid_to_item,
        build_daily_slates=build_daily_slates,
        unknown_full_log=unknown_full_log,
        unknown_vendor_log=unknown_vendor_log,
        top_n=TOP_N_CAL,
    )

    platt = fit_platt_calibrator(val_scores, val_labels)

    # --- Main evaluation loop with calibrated confidence ---
    rows = generate_rankings_with_calibrated_confidence(
        dates_range=dates_range,
        users_to_score=users_to_score,
        full_by_day=full_by_day,
        vendor_by_day=vendor_by_day,
        item_to_iid=item_to_iid,
        user_rec_map=user_rec_map,
        pop_scores=pop_scores,
        iid_to_item=iid_to_item,
        platt=platt,
        build_daily_slates=build_daily_slates,
        unknown_full_log=unknown_full_log,
        unknown_vendor_log=unknown_vendor_log,
        top_k=TOP_K,
    )

    print("len(rows) =", len(rows))
    print("users_to_score size =", len(list(users_to_score)))
    print("any full_iids ever?", any(full_by_day.get(d, set()) for d in dates_range))
    print("any vendor_iids ever?", any(vendor_by_day.get(d, set()) for d in dates_range))

    # Build DataFrame
    df_all = pd.DataFrame(rows)

    # Add new_user flag via model visibility (matches warm/cold log)
    df_all["new_user"] = df_all["mask_id"].apply(lambda mid: 1 if uid_for(mid) is None else 0).astype("int8")

    # Augment + export
    df_all_aug = augment_rankings_for_export(df_all, date_col=None)
    Path(OUTPUT_CSV).parent.mkdir(parents=True, exist_ok=True)
    df_all_aug.to_csv(OUTPUT_CSV, index=False)
    print(f"[INFO] Saved rankings: {OUTPUT_CSV} | rows={len(df_all_aug)}")

    if unknown_full_log or unknown_vendor_log:
        miss_full_total = sum(unknown_full_log.values())
        miss_vendor_total = sum(unknown_vendor_log.values())
        #print(f"[COVERAGE] Unknown items (not in items_all) -> full_track: {miss_full_total} | vendor_track: {miss_vendor_total}")

    return {
        "unknown_full_log": unknown_full_log,
        "unknown_vendor_log": unknown_vendor_log,
        "dates_range": dates_range,
        "val_scores": val_scores,
        "val_labels": val_labels,
        "platt": platt,
        "rows": rows,
        "df_all": df_all,
        "df_all_aug": df_all_aug,
        "output_csv": OUTPUT_CSV,
    }


# --------------------------
# Example orchestrated call
# --------------------------
artifacts = run_calibrated_scoring_and_export(
    users_to_score=users_to_score,
    user_rec_map=user_rec_map,
    pop_scores=pop_scores,
    iid_to_item=iid_to_item,
    uid_for=uid_for,
    full_by_day=full_by_day,
    vendor_by_day=vendor_by_day,
    item_to_iid=item_index_all,  # or item_to_iid if that's your mapping
    truth_by_ud=truth_by_ud,
    DATE_START=DATE_START,
    DATE_END=DATE_END,
    TOP_K=3,
    TOP_N_CAL=5,
    build_daily_slates=build_daily_slates,
    augment_rankings_for_export=augment_rankings_for_export,
    OUTPUT_CSV=OUTPUT_CSV)
df_all_aug = artifacts["df_all_aug"]  # ready for app/export

len(rows) = 28974
users_to_score size = 439
any full_iids ever? True
any vendor_iids ever? True
[INFO] Saved rankings: ./rankings_may01_14.csv | rows=28974


In [177]:
# =========================
# Core metric computation
# =========================
def eval_metrics_at_k(
    df_rankings: pd.DataFrame,
    truth_by_ud: Dict[Tuple[object, object], set],
    k: int = 3,
) -> Dict[str, float]:
    """
    Compute Precision@k, NDCG@k, HitCoverage@k over user-day slices in `df_rankings`,
    where ground-truth items are provided by `truth_by_ud[(mask_id, date)]`.
    """
    if df_rankings is None or df_rankings.empty:
        return {f"Precision@{k}": 0.0, f"NDCG@{k}": 0.0, f"HitCoverage@{k}": 0.0, "n_eval": 0}

    # Make a safe copy and ensure numeric rank
    recs = df_rankings.copy()
    recs["rank"] = pd.to_numeric(recs["rank"], errors="coerce")
    recs = recs[recs["rank"].notna()]

    # Keep only top-k rows
    recs = recs[recs["rank"] <= k]
    if recs.empty:
        return {f"Precision@{k}": 0.0, f"NDCG@{k}": 0.0, f"HitCoverage@{k}": 0.0, "n_eval": 0}

    total_prec = total_ndcg = total_cov = 0.0
    n = 0

    for (mask_id, d), grp in recs.groupby(["mask_id", "date"]):
        topk_items = grp.sort_values("rank")["item"].tolist()
        if not topk_items:
            continue

        true_items = truth_by_ud.get((mask_id, d), set())
        hits = sum(1 for it in topk_items if it in true_items)

        # Precision@k denominator uses the actual number of recs shown (<= k)
        denom = min(k, len(topk_items))
        prec_k = (hits / denom) if denom > 0 else 0.0

        # NDCG@k
        ideal = min(len(true_items), len(topk_items), k)
        if ideal > 0:
            idcg = sum(1.0 / np.log2(r + 1) for r in range(1, ideal + 1))
            dcg = sum(1.0 / np.log2(r + 1) for r, it in enumerate(topk_items, 1) if it in true_items)
            ndcg = dcg / idcg
        else:
            ndcg = 0.0

        # HitCoverage@k = at least one correct in the slate
        cov = 1.0 if hits > 0 else 0.0

        total_prec += prec_k
        total_ndcg += ndcg
        total_cov += cov
        n += 1

    if n == 0:
        return {f"Precision@{k}": 0.0, f"NDCG@{k}": 0.0, f"HitCoverage@{k}": 0.0, "n_eval": 0}

    return {
        f"Precision@{k}": total_prec / n,
        f"NDCG@{k}": total_ndcg / n,
        f"HitCoverage@{k}": total_cov / n,
        "n_eval": n,
    }


def track_metrics(df_all: pd.DataFrame, truth_by_ud: Dict, k: int = 3) -> Dict[str, Dict[str, float]]:
    """
    Evaluate metrics per track (e.g., 'full' vs 'vendor') in `df_all` and return a dict of results keyed by track name.
    """
    out: Dict[str, Dict[str, float]] = {}
    for track in sorted(df_all["track"].unique()):
        m = eval_metrics_at_k(df_all[df_all["track"] == track], truth_by_ud, k=k)
        out[track] = m
    return out


# =========================
# Segmentation utilities
# =========================
def ensure_day_type(df: pd.DataFrame, date_col: str = "date") -> pd.DataFrame:
    """
    Ensure a 'day_type' column exists ('Weekend'/'Weekday') based on `date_col`, returning a copy if needed.
    """
    if "day_type" in df.columns:
        return df
    out = df.copy()
    dts = pd.to_datetime(out[date_col], errors="coerce")
    out["day_type"] = np.where(dts.dt.dayofweek >= 5, "Weekend", "Weekday")
    return out


def compute_segmented_metrics(
    df_all_aug: pd.DataFrame,
    truth_by_ud: Dict[Tuple[object, object], set],
    user_segments: Optional[Sequence[Tuple[str, Callable[[pd.DataFrame], pd.Series]]]] = None,
    k: int = 3,
) -> Tuple[pd.DataFrame, List[Dict[str, object]]]:
    """
    Compute metrics for:
      • All players (new segment)
      • Weekend games / Weekday games (built-in)
      • Optional custom segments (if provided)

    Returns a tidy DataFrame and the raw rows list (for JSON).
    """
    df_seg = ensure_day_type(df_all_aug)

    rows: List[Dict[str, object]] = []

    def _append_rows(segment_type: str, seg_key: str, title: str, mask: pd.Series):
        df_full = df_seg.loc[mask & (df_seg["track"] == "full")]
        df_vendor = df_seg.loc[mask & (df_seg["track"] == "vendor")]

        m_full = eval_metrics_at_k(df_full, truth_by_ud, k=k)
        m_vendor = eval_metrics_at_k(df_vendor, truth_by_ud, k=k)

        # Console print (optional)
        print(f"\n[SEGMENT] {title}")
        print("  (Actual Games Played)")
        print(f"    Number of bets = {m_full['n_eval']}")
        print(f"    Precision@{k}    = {m_full[f'Precision@{k}']:.4f}")
        print(f"    NDCG@{k}         = {m_full[f'NDCG@{k}']:.4f}")
        print(f"    HitCoverage@{k}  = {m_full[f'HitCoverage@{k}']:.4f}")
        print("  (Pre-season schedule)")
        print(f"    Number of bets = {m_vendor['n_eval']}")
        print(f"    Precision@{k}    = {m_vendor[f'Precision@{k}']:.4f}")
        print(f"    NDCG@{k}         = {m_vendor[f'NDCG@{k}']:.4f}")
        print(f"    HitCoverage@{k}  = {m_vendor[f'HitCoverage@{k}']:.4f}")

        rows.append({
            "segment_type": segment_type,
            "segment": seg_key,
            "segment_label": title,
            "track": "Actual Games Played",
            "number_of_bets": m_full["n_eval"],
            "precision@k": m_full[f"Precision@{k}"],
            "ndcg@k": m_full[f"NDCG@{k}"],
            "hitcoverage@k": m_full[f"HitCoverage@{k}"],
            "k": k,
        })
        rows.append({
            "segment_type": segment_type,
            "segment": seg_key,
            "segment_label": title,
            "track": "Pre-season schedule",
            "number_of_bets": m_vendor["n_eval"],
            "precision@k": m_vendor[f"Precision@{k}"],
            "ndcg@k": m_vendor[f"NDCG@{k}"],
            "hitcoverage@k": m_vendor[f"HitCoverage@{k}"],
            "k": k,
        })

    # 1) New built-in: All players
    mask_all = pd.Series(True, index=df_seg.index)
    _append_rows(segment_type="user", seg_key="all", title="All players", mask=mask_all)

    # 2) Optional user-provided segments
    if user_segments:
        for title, mask_fn in user_segments:
            mask = mask_fn(df_seg)
            _append_rows(segment_type="custom", seg_key=title, title=title, mask=mask)

    # 3) Built-in day-type segments
    day_segments = [
        ("Weekend games", "daytype", "weekend", df_seg["day_type"].eq("Weekend")),
        ("Weekday games", "daytype", "weekday", df_seg["day_type"].eq("Weekday")),
    ]
    for title, seg_type, seg_key, mask in day_segments:
        _append_rows(segment_type=seg_type, seg_key=seg_key, title=title, mask=mask)

    metrics_df = pd.DataFrame(rows)
    return metrics_df, rows


# =========================
# Orchestrator + I/O
# =========================
def evaluate_and_save_all_metrics(
    df_all: pd.DataFrame,
    df_all_aug: pd.DataFrame,
    truth_by_ud: Dict[Tuple[object, object], set],
    output_csv_base: str,
    user_segments: Optional[Sequence[Tuple[str, Callable[[pd.DataFrame], pd.Series]]]] = None,
    k: int = 3,
) -> Dict[str, object]:
    """
    Run track-level and segmented evaluations, print a concise report,
    and save segmented results to CSV/JSON files next to `output_csv_base`.
    """
    # Track-level
    metrics_by_track = track_metrics(df_all, truth_by_ud, k=k)

    # Printing (matches your original style)
    for track_label, pretty in (
        ("Full track (Box Scores)", "full"),
        ("Vendor track (Pre-season schedule)", "vendor"),
    ):
        m = metrics_by_track.get(
            pretty,
            {"n_eval": 0, f"Precision@{k}": 0.0, f"NDCG@{k}": 0.0, f"HitCoverage@{k}": 0.0},
        )
        #print(f"\n[METRICS] {track_label}")
        #print(f"  n_eval         = {m['n_eval']}")
        #print(f"  Precision@{k}  = {m[f'Precision@{k}']:.4f}")
        #print(f"  NDCG@{k}       = {m[f'NDCG@{k}']:.4f}")
        #print(f"  HitCoverage@{k}= {m[f'HitCoverage@{k}']:.4f}")

    # Segmented (now includes All players + day-type + user segments)
    seg_df, seg_rows = compute_segmented_metrics(
        df_all_aug=df_all_aug,
        truth_by_ud=truth_by_ud,
        user_segments=user_segments,
        k=k,
    )

    # Save segmented metrics
    metrics_csv = output_csv_base.replace(".csv", "_metrics.csv")
    metrics_json = output_csv_base.replace(".csv", "_metrics.json")
    Path(metrics_csv).parent.mkdir(parents=True, exist_ok=True)
    #seg_df.to_csv(metrics_csv, index=False)
    #metrics_csv_s3 = f"s3://{olg-reco-outputs}/rankings_may01_14.csv"
    #seg_df.to_csv(metrics_csv_s3, index=False)
    bucket = "olg-reco-outputs"
    key = "rankings_may01_14_metrics.csv"  # or f"{prefix}/rankings_may01_14.csv"
    metrics_csv_s3 = f"s3://{bucket}/{key}"
    seg_df.to_csv(metrics_csv_s3, index=False)


    #print(f"\n[INFO] Saved segmented metrics: {metrics_csv} & {metrics_json}")

    # Optional preview
    #print("\n[PREVIEW] Rankings head()")
    #print(df_all.head(10).to_string(index=False))

    return {
        "k": k,
        "metrics_by_track": metrics_by_track,
        "segmented_metrics_df": seg_df,
        "segmented_metrics_rows": seg_rows,
        "metrics_csv": metrics_csv,
        "metrics_json": metrics_json,
    }


results = evaluate_and_save_all_metrics(
    df_all=df_all,
    df_all_aug=df_all_aug,
    truth_by_ud=truth_by_ud,
    output_csv_base="rankings_may01_14.csv",
    user_segments=None,  # or e.g., [("New users", lambda df: df["new_user"] == 1)]
    k=3,
)
seg_df = results["segmented_metrics_df"]


# Access artifacts later:
track_metrics_dict = results["metrics_by_track"]
seg_df = results["segmented_metrics_df"]
#print("Track metrics:", track_metrics_dict)


[SEGMENT] All players
  (Actual Games Played)
    Number of bets = 6146
    Precision@3    = 0.2929
    NDCG@3         = 0.3348
    HitCoverage@3  = 0.3581
  (Pre-season schedule)
    Number of bets = 6146
    Precision@3    = 0.1164
    NDCG@3         = 0.1702
    HitCoverage@3  = 0.2790

[SEGMENT] Weekend games
  (Actual Games Played)
    Number of bets = 1756
    Precision@3    = 0.3118
    NDCG@3         = 0.3539
    HitCoverage@3  = 0.3770
  (Pre-season schedule)
    Number of bets = 1756
    Precision@3    = 0.1281
    NDCG@3         = 0.1920
    HitCoverage@3  = 0.2779

[SEGMENT] Weekday games
  (Actual Games Played)
    Number of bets = 4390
    Precision@3    = 0.2853
    NDCG@3         = 0.3272
    HitCoverage@3  = 0.3506
  (Pre-season schedule)
    Number of bets = 4390
    Precision@3    = 0.1118
    NDCG@3         = 0.1615
    HitCoverage@3  = 0.2795


## Personalized Messaging ## 

In [181]:
from __future__ import annotations

import pandas as pd
from typing import Tuple


# =========================
# String Cleaning Helpers
# =========================

def _clean_spaces_upper(s: pd.Series) -> pd.Series:
    """
    Collapse multiple spaces into one, trim leading/trailing whitespace, and convert to UPPERCASE.
    Useful for robust string matching across different event description formats.
    """
    return (
        s.astype(str)
         .str.replace(r"\s+", " ", regex=True)
         .str.strip()
         .str.upper()
    )


def item_to_event(item_series: pd.Series) -> pd.Series:
    """
    Convert matchup strings of the form 'Team A ## Team B' into normalized 'TEAM A @ TEAM B'.
    If a row does not contain '##', the result will be NaN.
    """
    parts = item_series.astype(str).str.split(r"\s*##\s*", n=1, expand=True)
    event = parts[0].str.strip() + " @ " + parts[1].str.strip()
    return _clean_spaces_upper(event)


def normalize_event_desc(event_series: pd.Series) -> pd.Series:
    """
    Normalize event_description strings that already use '@' format into 'TEAM A @ TEAM B',
    ensuring uppercase and consistent spacing around '@'.
    """
    s = _clean_spaces_upper(event_series)
    return s.str.replace(r"\s*@\s*", " @ ", regex=True)


# =========================
# Feature Builders
# =========================

def build_player_game_history(df_all: pd.DataFrame, df_train: pd.DataFrame) -> pd.DataFrame:
    """
    Enrich `df_all` with per-player and overall game betting history from `df_train`.

    Features added
    --------------
    - No_Of_Bets_Player_Game_History : Number of bets by this player on this game historically.
    - Total_No_Of_Bets_For_Game_History : Total number of bets across all players on this game.
    - Total_Amount_History : Total wagered amount across all players on this game.

    Parameters
    ----------
    df_all : DataFrame
        Rankings or scoring DataFrame to enrich; must contain 'mask_id' and 'item'.
    df_train : DataFrame
        Training bet history with columns ['mask_id', 'event_description', 'wager_amount'].

    Returns
    -------
    DataFrame
        Copy of df_all with new history features added.
    """
    out = df_all.copy()
    train = df_train.copy()

    # Normalized event keys for joins
    out["event_norm"] = item_to_event(out["item"])
    train["event_norm"] = normalize_event_desc(train["event_description"])

    # Ensure wager_amount is numeric
    train["wager_amount"] = pd.to_numeric(train["wager_amount"], errors="coerce").fillna(0)

    # --- 1) Player-specific bet history ---
    user_event_counts = (
        train.groupby(["mask_id", "event_norm"])
             .size()
             .reset_index(name="No_Of_Bets_Player_Game_History")
    )
    out = out.merge(user_event_counts, how="left", on=["mask_id", "event_norm"])
    out["No_Of_Bets_Player_Game_History"] = (
        out["No_Of_Bets_Player_Game_History"].fillna(0).astype(int)
    )

    # --- 2) Overall game bet history ---
    overall = (
        train.groupby("event_norm")
             .agg(
                 Total_No_Of_Bets_For_Game_History=("event_norm", "size"),
                 Total_Amount_History=("wager_amount", "sum"),
             )
             .reset_index()
    )
    out = out.merge(overall, how="left", on="event_norm")
    out["Total_No_Of_Bets_For_Game_History"] = (
        out["Total_No_Of_Bets_For_Game_History"].fillna(0).astype(int)
    )
    out["Total_Amount_History"] = out["Total_Amount_History"].fillna(0.0)

    return out

enriched_df = build_player_game_history(df_all, df_train)

In [184]:
# =========================
# Data coercion utilities
# =========================

def coerce_history_columns(out: pd.DataFrame) -> pd.DataFrame:
    """
    Ensure expected history columns exist in `out` and coerce them to numeric types:
    - "No_Of_Bets_Player_Game_History" → int (missing/invalid → 0)
    - "Total_Amount_History"           → float (missing/invalid → 0.0)
    Returns a new DataFrame without mutating the input.
    """
    df = out.copy()

    df["No_Of_Bets_Player_Game_History"] = pd.to_numeric(
        df.get("No_Of_Bets_Player_Game_History", 0), errors="coerce"
    ).fillna(0).astype(int)

    df["Total_Amount_History"] = pd.to_numeric(
        df.get("Total_Amount_History", 0), errors="coerce"
    ).fillna(0.0)

    return df


# =========================
# Messaging helpers
# =========================

def fmt_currency(x: float) -> str:
    """
    Format a numeric value as a whole-dollar string with thousands separators (e.g., 12345 → '12,345').
    """
    return f"{x:,.0f}"


def build_message_for_row(
    row: pd.Series,
    currency_formatter: Callable[[float], str] = fmt_currency,
    bet_threshold_repeat: int = 3,
) -> str:
    """
    Create a personalized message for a ranking row based on the user's prior bets and total market activity.
    Logic mirrors the original snippet: >3 bets → “again”, 0 bets → “never bet”, else generic social proof.
    """
    n_bets = int(row.get("No_Of_Bets_Player_Game_History", 0))
    item = str(row.get("item", "this matchup"))
    total_amt = currency_formatter(float(row.get("Total_Amount_History", 0.0)))

    if n_bets > bet_threshold_repeat:
        return (
            f"You’ve placed {n_bets} bets on {item} before — it’s back on the schedule! Time to play again?"
        )
    elif n_bets == 0:
        return (
            f"You’ve never bet on {item} before, but bettors have already wagered over ${total_amt} on it historically. Wanna play?"
        )
    else:
        return (
            f"Bettors have wagered over ${total_amt} on {item} historically. It’s a matchup worth betting."
        )


def add_personalized_messaging(
    out: pd.DataFrame,
    currency_formatter: Callable[[float], str] = fmt_currency,
    bet_threshold_repeat: int = 3,
    column_name: str = "messaging",
) -> pd.DataFrame:
    """
    Return a copy of `out` with a new column (default 'messaging') containing per-row personalized messages,
    after safely coercing required history columns to numeric.
    """
    df = coerce_history_columns(out)
    df[column_name] = df.apply(
        lambda r: build_message_for_row(
            r, currency_formatter=currency_formatter, bet_threshold_repeat=bet_threshold_repeat
        ),
        axis=1,
    )
    return df


out = coerce_history_columns(enriched_df)
out = add_personalized_messaging(out, column_name="messaging")
#out[["item", "No_Of_Bets_Player_Game_History", "Total_Amount_History", "messaging"]].head()
bucket = "olg-reco-outputs"
key = "rankings_may01_14.csv"  
rankings_csv_s3 = f"s3://{bucket}/{key}"
out.to_csv(rankings_csv_s3, index=False)

### Compare model reco based on pre-season schdeule vs actual games played

In [186]:
def analyze_out(out: pd.DataFrame, TOP_K: int = 3):
    """
    Compare recommendation quality between 'full' and 'vendor' tracks in a
    per-user-per-day setting using pandas.

    What this computes
    ------------------
    A) Coverage & Volume
       - #pairs with full, #pairs with vendor, union size, intersection size
       - Intersection rate = intersection / union
       - Average list length per track after dedupe and TOP_K

    B) Exact Match Rates (over intersection of pairs)
       - Strict exact match: same length AND identical ordered list (up to K)
       - Set exact match: same multiset of items, ignoring order (up to K)

    D) Confidence Comparison
       - Mean/median confidence by track overall
       - Mean/median confidence by rank position (1..K)
       - Δconfidence (full − vendor) at matched rank positions

    E) Discrepancy Report (Top 20 pairs)
       - Overlap@K (see definition below)
       - Mean/median rank shift for common items
       - Items only in full / only in vendor
       - Sorted by lowest overlap first, then highest mean rank shift

    Overlap@K (exact definition)
    ----------------------------
    For a given pair (date, mask_id), take the Top-K lists from 'full' and 'vendor'
    after deduplication. Convert items to lowercase for matching. Then:
        Overlap@K = count of items that appear in BOTH lists.
    Range: 0 .. min(len(full_list), len(vendor_list), K).
    (Order does not matter for overlap; it’s a simple intersection size.)

    """

    # -------------------------
    # 0) Preprocess
    # -------------------------
    df = out.copy()

    # Coerce types
    df["date"] = pd.to_datetime(df["date"], errors="coerce").dt.date
    df["rank"] = pd.to_numeric(df["rank"], errors="coerce")
    df["confidence"] = pd.to_numeric(df["confidence"], errors="coerce")
    for col in ["item", "track", "day_type"]:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()

    # Keep only needed tracks and basic non-nulls
    df = df[df["track"].isin(["full", "vendor"])].dropna(
        subset=["date", "mask_id", "track", "rank", "item"]
    )

    # Deduplicate within (date, mask_id, track, item): keep lowest rank
    df = df.sort_values(["date", "mask_id", "track", "item", "rank"])
    df = df.drop_duplicates(["date", "mask_id", "track", "item"], keep="first")

    # Keep TOP_K per (date, mask_id, track)
    df = df.sort_values(["date", "mask_id", "track", "rank"])
    df = df.groupby(["date", "mask_id", "track"]).head(TOP_K)

    # Helper: build per-pair recs
    def build_recs(frame: pd.DataFrame):
        recs = {}
        for (d, u, t), grp in frame.groupby(["date", "mask_id", "track"]):
            grp = grp.sort_values("rank")
            items = list(grp["item"])
            items_norm = [str(s).lower() for s in items]
            ranks = list(grp["rank"].astype(int))
            confs = list(grp["confidence"])
            recs.setdefault((d, u), {})[t] = {
                "items": items,
                "items_norm": items_norm,
                "ranks": ranks,
                "conf": confs,
            }
        return recs

    recs = build_recs(df)

    pairs_full   = {p for p, d in recs.items() if "full" in d}
    pairs_vendor = {p for p, d in recs.items() if "vendor" in d}
    union_pairs  = pairs_full | pairs_vendor
    inter_pairs  = pairs_full & pairs_vendor

    # -------------------------
    # A) Coverage & Volume
    # -------------------------
    avg_len_full = np.mean([len(recs[p]["full"]["items"]) for p in pairs_full]) if pairs_full else np.nan
    avg_len_vendor = np.mean([len(recs[p]["vendor"]["items"]) for p in pairs_vendor]) if pairs_vendor else np.nan

    coverage_volume = pd.DataFrame({
        "metric": [
            "pairs_full",
            "pairs_vendor",
            "union_pairs",
            "intersection_pairs",
            "intersection_rate",
            f"avg_list_len_full@{TOP_K}",
            f"avg_list_len_vendor@{TOP_K}",
        ],
        "value": [
            len(pairs_full),
            len(pairs_vendor),
            len(union_pairs),
            len(inter_pairs),
            (len(inter_pairs) / len(union_pairs)) if len(union_pairs) else np.nan,
            avg_len_full,
            avg_len_vendor,
        ],
    })

    # -------------------------
    # B) Exact Match Rates (over intersection)
    # -------------------------
    strict_matches = 0
    set_matches = 0

    # Also compute pair-level overlap & rank shift for discrepancies
    pair_rows = []

    for p in inter_pairs:
        f = recs[p]["full"]["items_norm"]
        v = recs[p]["vendor"]["items_norm"]

        # Strict: same length and identical ranked list
        if (len(f) == len(v)) and (f == v):
            strict_matches += 1

        # Set exact: same multiset ignoring order
        if Counter(f) == Counter(v):
            set_matches += 1

        # Overlap
        sf, sv = set(f), set(v)
        overlap = len(sf & sv)

        # Rank shift for common items
        franks = {it: r for it, r in zip(f, recs[p]["full"]["ranks"])}
        vranks = {it: r for it, r in zip(v, recs[p]["vendor"]["ranks"])}
        common = list(sf & sv)
        if common:
            shifts = [abs(franks[it] - vranks[it]) for it in common]
            mean_shift = float(np.mean(shifts))
            median_shift = float(np.median(shifts))
        else:
            mean_shift = np.nan
            median_shift = np.nan

        # Items only in each
        only_full = [it for it in recs[p]["full"]["items"] if it.lower() not in sv]
        only_vendor = [it for it in recs[p]["vendor"]["items"] if it.lower() not in sf]

        pair_rows.append({
            "date": p[0], "mask_id": p[1],
            "overlap_at_k": overlap,
            "mean_rank_shift": mean_shift,
            "median_rank_shift": median_shift,
            "items_only_full": only_full,
            "items_only_vendor": only_vendor,
        })

    exact_match = pd.DataFrame({
        "metric": ["strict_exact_match_count", "strict_exact_match_rate",
                   "set_exact_match_count", "set_exact_match_rate"],
        "value": [
            strict_matches,
            (strict_matches / len(inter_pairs)) if len(inter_pairs) else np.nan,
            set_matches,
            (set_matches / len(inter_pairs)) if len(inter_pairs) else np.nan,
        ],
    })

    # -------------------------
    # D) Confidence Comparison
    # -------------------------

    # Overall by track (SeriesGroupBy -> wide, then rename)
    conf_overall = (
        df.groupby("track")["confidence"]
          .agg(["mean", "median", "count"])
          .reset_index()
          .rename(columns={"mean": "mean_conf", "median": "median_conf", "count": "n"})
    )

    # By rank position
    conf_by_rank = (
        df.groupby(["track", "rank"])["confidence"]
          .agg(["mean", "median", "count"])
          .reset_index()
          .rename(columns={"mean": "mean_conf", "median": "median_conf", "count": "n"})
          .sort_values(["rank", "track"])
    )

    # Δconfidence for matched positions (pairs that have both tracks and that rank)
    deltas = []
    for p in inter_pairs:
        f_map = {r: c for r, c in zip(recs[p]["full"]["ranks"],   recs[p]["full"]["conf"])}
        v_map = {r: c for r, c in zip(recs[p]["vendor"]["ranks"], recs[p]["vendor"]["conf"])}
        for r in range(1, TOP_K + 1):
            if (r in f_map) and (r in v_map):
                cf, cv = f_map[r], v_map[r]
                if pd.notna(cf) and pd.notna(cv):
                    deltas.append({"rank": r, "delta_conf": cf - cv})

    if deltas:
        deltas_df = pd.DataFrame(deltas)
        conf_deltas_by_rank = (
            deltas_df.groupby("rank")["delta_conf"]
                     .agg(["mean", "median", "count"])
                     .reset_index()
                     .rename(columns={"mean": "mean_delta", "median": "median_delta", "count": "n_pairs"})
                     .sort_values("rank")
        )
    else:
        conf_deltas_by_rank = pd.DataFrame(columns=["rank", "mean_delta", "median_delta", "n_pairs"])

    
    # -------------------------
    # E) Discrepancy Report (Top 20)
    # -------------------------
    discrepancies = pd.DataFrame(pair_rows)
    if not discrepancies.empty:
        discrepancies = discrepancies.sort_values(
            by=["overlap_at_k", "mean_rank_shift"],
            ascending=[True, False]
        ).head(20)

    # -------------------------
    # Plain-English Summary
    # -------------------------
    def bullet(text): return f"• {text}"
    bullets = []
    if len(inter_pairs):
        bullets.append(bullet(
            f"Strict exact match on {strict_matches}/{len(inter_pairs)} pairs "
            f"({(strict_matches/len(inter_pairs)):.1%}); set-exact match {(set_matches/len(inter_pairs)):.1%}."
        ))
    if pair_rows:
        mean_overlap = np.nanmean([r["overlap_at_k"] for r in pair_rows])
        mean_shift   = np.nanmean([r["mean_rank_shift"] for r in pair_rows])
        bullets.append(bullet(
            f"Average overlap@{TOP_K}: {mean_overlap:.2f}; average rank shift for common items: {mean_shift:.2f}."
        ))
    if len(union_pairs):
        bullets.append(bullet(
            f"Intersection rate (pairs with both lists): {(len(inter_pairs)/len(union_pairs)):.1%}."
        ))
    if not conf_deltas_by_rank.empty:
        row1 = conf_deltas_by_rank[conf_deltas_by_rank["rank"] == 1]
        if not row1.empty:
            bullets.append(bullet(
                f"At rank 1, mean Δconfidence (full−vendor) = {row1['mean_delta'].iloc[0]:.4f}."
            ))

    summary_text = "\n".join(bullets) if bullets else "• No intersection pairs to compare."

    # -------------------------
    # Return everything
    # -------------------------
    return {
        "coverage_volume": coverage_volume,
        "exact_match": exact_match,
        "conf_overall": conf_overall,
        "conf_by_rank": conf_by_rank,
        "conf_deltas_by_rank": conf_deltas_by_rank,
        "discrepancies": discrepancies,
        "summary_text": summary_text,
    }


results = analyze_out(out, TOP_K=3)
# print(results["coverage_volume"])
# print(results["exact_match"])
# print(results["conf_overall"])
# print(results["conf_by_rank"])
# print(results["conf_deltas_by_rank"])
# print(results["discrepancies"])
# print(results["summary_text"])

In [189]:
Path("artifacts").mkdir(exist_ok=True)

# Save the model
MODEL_PATH = "artifacts/model.joblib"
joblib.dump(model, MODEL_PATH)

['artifacts/model.joblib']