In [6]:
import numpy as np
import pandas as pd

from dataclasses import dataclass
from __future__ import annotations
from typing import Literal, Optional

In [None]:
root = '/Users/mdiaspinto/Documents/School/Python Data Science/Final Project/kaggle-churn'
df_raw = pd.read_parquet(root + '/data/train.parquet')
df_raw.head()

Unnamed: 0,status,gender,firstName,level,lastName,userId,ts,auth,page,sessionId,location,itemInSession,userAgent,method,length,song,artist,time,registration
0,200,M,Shlok,paid,Johnson,1749042,1538352001000,Logged In,NextSong,22683,"Dallas-Fort Worth-Arlington, TX",278,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",PUT,524.32934,Ich mache einen Spiegel - Dream Part 4,Popol Vuh,2018-10-01 00:00:01,2018-08-08 13:22:21
992,200,M,Shlok,paid,Johnson,1749042,1538352525000,Logged In,NextSong,22683,"Dallas-Fort Worth-Arlington, TX",279,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",PUT,178.02404,Monster (Album Version),Skillet,2018-10-01 00:08:45,2018-08-08 13:22:21
1360,200,M,Shlok,paid,Johnson,1749042,1538352703000,Logged In,NextSong,22683,"Dallas-Fort Worth-Arlington, TX",280,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",PUT,232.61995,Seven Nation Army,The White Stripes,2018-10-01 00:11:43,2018-08-08 13:22:21
1825,200,M,Shlok,paid,Johnson,1749042,1538352935000,Logged In,NextSong,22683,"Dallas-Fort Worth-Arlington, TX",281,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",PUT,265.50812,Under The Bridge (Album Version),Red Hot Chili Peppers,2018-10-01 00:15:35,2018-08-08 13:22:21
2366,200,M,Shlok,paid,Johnson,1749042,1538353200000,Logged In,NextSong,22683,"Dallas-Fort Worth-Arlington, TX",282,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",PUT,471.69261,Circlesong 6,Bobby McFerrin,2018-10-01 00:20:00,2018-08-08 13:22:21


In [5]:
df_raw.dtypes

status                    int64
gender                   object
firstName                object
level                    object
lastName                 object
userId                   object
ts                        int64
auth                     object
page                     object
sessionId                 int64
location                 object
itemInSession             int64
userAgent                object
method                   object
length                  float64
song                     object
artist                   object
time             datetime64[us]
registration     datetime64[us]
dtype: object

In [None]:
@dataclass(frozen=True)
class LabelSpineConfig:
    user_col: str = "userId"
    time_col: str = "time"
    page_col: str = "page"
    cancel_value: str = "cancel_confirmation"

    horizon_days: int = 10  # label is 1 if cancel happens in (t, t+horizon]
    asof_freq: str = "D"    # user-day spine by default

    # How to pick as-of dates:
    # - "active_days": one as-of date per day where user had any event
    # - "all_days": all calendar days from first_seen..last_seen
    asof_mode: Literal["active_days", "all_days"] = "all_days"

    # Optional: drop rows too close to churn (often improves realism)
    # Example: buffer_days=1 drops asof_date in (cancel_time-1d, cancel_time)
    buffer_days: int = 0


def build_label_spine(events: pd.DataFrame, cfg: LabelSpineConfig) -> pd.DataFrame:
    """
    Build a leak-resistant label spine with one row per (user, asof_date).

    Output columns:
      - userId
      - asof_date (normalized to cfg.asof_freq boundaries; daily by default)
      - cancel_time (first cancel_confirmation timestamp per user, NaT if never)
      - y (1 if cancel_time in (asof_date, asof_date + horizon], else 0)

    Notes:
    - Drops rows where asof_date >= cancel_time (no predicting after churn).
    - Uses an open interval on the left: (t, t+horizon], so cancel on the same
      day/time as t does NOT count.
    """
    df = events.copy()

    # ---- Basic hygiene ----
    required = {cfg.user_col, cfg.time_col, cfg.page_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    # Ensure time is datetime
    df[cfg.time_col] = pd.to_datetime(df[cfg.time_col], errors="coerce")
    df = df.dropna(subset=[cfg.user_col, cfg.time_col])

    # Normalize user id to stable string (avoids mixed object types)
    df[cfg.user_col] = df[cfg.user_col].astype(str)

    # ---- 1) First churn timestamp per user (first cancel_confirmation) ----
    is_cancel = df[cfg.page_col].astype(str).eq(cfg.cancel_value)
    cancel_times = (
        df.loc[is_cancel, [cfg.user_col, cfg.time_col]]
          .groupby(cfg.user_col, as_index=False)[cfg.time_col]
          .min()
          .rename(columns={cfg.time_col: "cancel_time"})
    )

    # ---- 2) Candidate as-of dates per user ----
    # Round each event timestamp down to the as-of granularity (daily by default)
    df["asof_date"] = df[cfg.time_col].dt.floor(cfg.asof_freq)

    # Build full date range per user: first_seen..last_seen at cfg.asof_freq
    user_ranges = (
        df.groupby(cfg.user_col)["asof_date"]
            .agg(first_seen="min", last_seen="max")
            .reset_index()
    )
    parts = []
    for _, row in user_ranges.iterrows():
        rng = pd.date_range(row["first_seen"], row["last_seen"], freq=cfg.asof_freq)
        parts.append(pd.DataFrame({cfg.user_col: row[cfg.user_col], "asof_date": rng}))
    spine = pd.concat(parts, ignore_index=True).sort_values([cfg.user_col, "asof_date"])

    else:
        raise ValueError(f"Unknown asof_mode: {cfg.asof_mode}")

    # ---- 3) Attach cancel_time and create label ----
    spine = spine.merge(cancel_times, on=cfg.user_col, how="left")

    horizon = pd.to_timedelta(cfg.horizon_days, unit="D")

    # y=1 if cancel_time in (t, t+horizon]
    # - open on the left, closed on right
    spine["y"] = (
        spine["cancel_time"].notna()
        & (spine["cancel_time"] > spine["asof_date"])
        & (spine["cancel_time"] <= spine["asof_date"] + horizon)
    ).astype(np.int8)

    # ---- 4) Temporal validity filters ----
    # Drop as-of rows at/after churn
    spine = spine[spine["cancel_time"].isna() | (spine["asof_date"] < spine["cancel_time"])]

    # Optional buffer window right before churn
    if cfg.buffer_days > 0:
        buffer = pd.to_timedelta(cfg.buffer_days, unit="D")
        spine = spine[
            spine["cancel_time"].isna()
            | ~((spine["asof_date"] > spine["cancel_time"] - buffer) & (spine["asof_date"] < spine["cancel_time"]))
        ]

    spine = spine.reset_index(drop=True)

    # Basic sanity: no y=1 without a cancel_time
    if (spine["y"].eq(1) & spine["cancel_time"].isna()).any():
        raise RuntimeError("Found y=1 rows with missing cancel_time; labeling logic is inconsistent.")

    return spine


In [11]:
def test_label_horizon_logic_daily_midnight():
    df = pd.DataFrame({
        "userId": ["u1"] * 3,
        "time": pd.to_datetime([
            "2020-01-01 10:00",
            "2020-01-05 12:00",   # cancel at noon on Jan 5
            "2020-01-20 09:00"
        ]),
        "page": ["home", "cancel_confirmation", "home"]
    })

    cfg = LabelSpineConfig(horizon_days=10, asof_freq="D", asof_mode="active_days")
    spine = build_label_spine(df, cfg)

    # Jan 1 as-of: cancel on Jan 5 noon is within (Jan 1 00:00, Jan 11 00:00] -> 1
    y_jan1 = spine.loc[spine["asof_date"] == pd.Timestamp("2020-01-01"), "y"].iloc[0]
    assert y_jan1 == 1

    # Jan 5 as-of (00:00): cancel at 12:00 is still in the future -> also 1
    y_jan5 = spine.loc[spine["asof_date"] == pd.Timestamp("2020-01-05"), "y"].iloc[0]
    assert y_jan5 == 1


In [12]:
test_label_horizon_logic()

AssertionError: 