In [1]:
import pandas as pd
from pathlib import Path
import numpy as np

In [2]:
DATA_DIR = Path("data/churn-prediction-25-26")
file_path_dataset = DATA_DIR / "test.parquet"

In [3]:
df_test = pd.read_parquet(file_path_dataset)

In [4]:
df_test.index = range(len(df_test))
drop_columns = ["firstName", "lastName", "auth", "method", "ts", "location", "userAgent", "status"]
df_test.drop(columns=drop_columns, inplace=True)
df_test['userId'] = df_test['userId'].astype(int)
df_test['artist'] = df_test['artist'].fillna("No artist")
df_test['song'] = df_test['song'].fillna("No song")
df_test['length'] = df_test['length'].fillna(0)
for col in df_test.select_dtypes(include="object"):
    nunique = df_test[col].nunique()
    total = len(df_test)

    if nunique < total * 0.5:
        df_test[col] = df_test[col].astype("category")

In [5]:
def build_user_features(df_window):
    # Input: raw dataset where day <= cutoff day T
    # Output: user level features aggregated to day T

    # Sort to ensure correct time series
    df_window = df_window.sort_values(by="time", ascending=True)

    # Base DataFrame
    df_users = (
        df_window[["userId", "gender", "registration"]]
        .drop_duplicates(subset=["userId"])
        .set_index("userId")
    )

    # Unique artist
    df_unique_artists = (
        df_window.groupby("userId")["artist"]
        .nunique()
        .rename("num_unique_artists")
    )
    df_users = df_users.join(df_unique_artists)

    # Page counts
    df_page_counts = (
        df_window.groupby("userId")["page"]
        .value_counts()
        .unstack(fill_value=0)
    )
    df_page_counts.columns = [
        f"count_{col.replace(' ', '_').lower()}" for col in df_page_counts.columns
    ]

    df_users = df_users.join(df_page_counts)


    # Total session count
    df_session_count = (
        df_window.groupby("userId")["sessionId"]
        .nunique()
        .rename("count_total_sessions")
    )
    df_users = df_users.join(df_session_count)

    # Lifecycle (hours)
    df_last_time = (
        df_window.groupby("userId")["time"]
        .max()
        .rename("last_time")
    )
    df_users = df_users.join(df_last_time)

    df_users["user_lifecycle_h"] = (
        (df_users["last_time"] - df_users["registration"]).dt.total_seconds() / 3600
    )

    # Total length
    df_length = (
        df_window.groupby("userId")["length"]
        .sum()
        .rename("ttl_length")
    )
    df_users = df_users.join(df_length)

    # Items per session
    df_item_per_session = (
        df_window.groupby("userId")["itemInSession"].max()
        / df_users["count_total_sessions"]
    )
    df_item_per_session = df_item_per_session.rename("item_per_session")
    df_users = df_users.join(df_item_per_session)

    # Frequency (sessions per user life cycle in hours

    df_users["frequency"] = (
        df_users["count_total_sessions"] / df_users["user_lifecycle_h"]
    )
    df_users["frequency"] = df_users["frequency"].replace(np.inf, 0)

    # avg songs per session
    if "count_nextsong" in df_users.columns:
        df_users["avg_songs_session"] = (
            df_users["count_nextsong"] / df_users["count_total_sessions"]
        )
    else:
        df_users["avg_songs_session"] = 0

    # Thumbs_up/down ratios
    df_users["thumbs_ratio"] = df_users["count_thumbs_up"] / (df_users["count_thumbs_down"] + df_users["count_thumbs_up"])
    df_users["thumbs_ratio"] = df_users["thumbs_ratio"].fillna(0)
    df_users["thumbs_ratio"] = df_users["thumbs_ratio"].replace(np.inf, 0)

    # Errors per session
    if "count_error" in df_users.columns:
        df_users["errors_per_session"] = (
            df_users["count_error"] / df_users["count_total_sessions"]
        )
    else:
        df_users["errors_per_session"] = 0

    # Ads per session
    if "count_roll_advert" in df_users.columns:
        df_users["ads_per_session"] = (
            df_users["count_roll_advert"] / df_users["count_total_sessions"]
        )
    else:
        df_users["ads_per_session"] = 0

    # Last Level (of paid or free)
    df_last_level = (
    df_window.groupby("userId")["level"]
    .last()
    .rename("last_level")
    )
    df_users = df_users.join(df_last_level)
    
    # Hours since last session
    current_time = df_window["time"].max()
    df_users["hours_since_last_session"] = (current_time - df_users["last_time"]).dt.total_seconds() / 3600

    # Active days ratio
    df_active_days = (
    df_window.groupby("userId")["time"]
    .apply(lambda x: x.dt.date.nunique())
    .rename("active_days")
    )
    df_users = df_users.join(df_active_days)
    window_duration = (df_window["time"].max() - df_window["time"].min()).days + 1
    df_users["active_days_ratio"] = df_users["active_days"] / window_duration
    df_users["active_days_ratio"] = df_users["active_days_ratio"].fillna(0)

    # Session length variance
    df_session_lengths = (
    df_window.groupby(["userId", "sessionId"])["length"]
    .sum()
    .reset_index()
    )

    df_session_variance = (
    df_session_lengths.groupby("userId")["length"]
    .std()
    .rename("session_length_variance")
    )

    df_users = df_users.join(df_session_variance)
    df_users["session_length_variance"] = df_users["session_length_variance"].fillna(0)

    # New user
    df_users["is_new_user"] = (df_users["user_lifecycle_h"] < 480).astype(int)

    # Hours since last downgrade
    if "count_downgrade" in df_users.columns and df_users["count_downgrade"].sum() > 0:
        df_last_downgrade = (
            df_window[df_window["page"] == "Downgrade"]
            .groupby("userId")["time"]
            .max()
            .rename("last_downgrade_time")
        )
        df_users = df_users.join(df_last_downgrade)
        df_users["hours_since_downgrade"] = (
            (current_time - df_users["last_downgrade_time"]).dt.total_seconds() / 3600
        )
        df_users["hours_since_downgrade"] = df_users["hours_since_downgrade"].fillna(999)
        df_users = df_users.drop(columns=["last_downgrade_time"])
    else:
        df_users["hours_since_downgrade"] = 999*3600

    # Unique songs ratio
    df_unique_songs = (
    df_window[df_window["page"] == "NextSong"]
    .groupby("userId")["song"]
    .nunique()
    .rename("num_unique_songs")
    )
    df_users = df_users.join(df_unique_songs)

    if "count_nextsong" in df_users.columns:
        df_users["unique_songs_ratio"] = (
            df_users["num_unique_songs"] / df_users["count_nextsong"]
        )
        df_users["unique_songs_ratio"] = df_users["unique_songs_ratio"].fillna(0)
        df_users["unique_songs_ratio"] = df_users["unique_songs_ratio"].replace(np.inf, 0)
    else:
        df_users["unique_songs_ratio"] = 0

    # New trend features
    window_start = df_window["time"].min()
    window_end = df_window["time"].max()
    window_midpoint = window_start + (window_end - window_start) / 2

    # Activity level early and late

    df_early_actions = (
        df_window[df_window["time"] <= window_midpoint]
        .groupby("userId")
        .size()
        .rename("early_actions")
        )
    
    df_late_actions = (
        df_window[df_window["time"] > window_midpoint]
        .groupby("userId")
        .size()
        .rename("late_actions")
    )

    df_users = df_users.join(df_early_actions)
    df_users = df_users.join(df_late_actions)
    df_users["early_actions"] = df_users["early_actions"].fillna(0)
    df_users["late_actions"] = df_users["late_actions"].fillna(0)

    # Engagement in window
    df_users["within_window_activity_ratio"] = (
        df_users["late_actions"] / (df_users["early_actions"] + 1)
    )

    df_users["within_window_activity_change"] = (
        df_users["late_actions"] - df_users["early_actions"]
    )

    # Songs early vs late
    df_early_songs = (
        df_window[(df_window["time"] <= window_midpoint) & (df_window["page"] == "NextSong")]
        .groupby("userId")
        .size()
        .rename("early_songs_played")
    )
    df_late_songs = (
        df_window[(df_window["time"] > window_midpoint) & (df_window["page"] == "NextSong")]
        .groupby("userId")
        .size()
        .rename("late_songs_played")
    )
        
    df_users = df_users.join(df_early_songs)
    df_users = df_users.join(df_late_songs)
    df_users["early_songs_played"] = df_users["early_songs_played"].fillna(0)
    df_users["late_songs_played"] = df_users["late_songs_played"].fillna(0)
    
    df_users["song_listening_change"] = (
        df_users["late_songs_played"] - df_users["early_songs_played"]
    )
    
    # Engagement in last 3 days

    window_last_3_days = window_end - pd.Timedelta(days=3)
    
    df_recent_actions = (
        df_window[df_window["time"] > window_last_3_days]
        .groupby("userId")
        .size()
        .rename("recent_actions_last_3d")
    )
    df_users = df_users.join(df_recent_actions)
    df_users["recent_actions_last_3d"] = df_users["recent_actions_last_3d"].fillna(0)
    
    df_users["recent_activity_ratio"] = (
        df_users["recent_actions_last_3d"] / (df_users.index.map(
            df_window.groupby("userId").size()
        ) + 1)
    )

    # Session depth
    df_early_session_depth = (
        df_window[df_window["time"] <= window_midpoint]
        .groupby("userId")["itemInSession"]
        .mean()
        .rename("early_avg_items_per_session")
    )
    df_late_session_depth = (
        df_window[df_window["time"] > window_midpoint]
        .groupby("userId")["itemInSession"]
        .mean()
        .rename("late_avg_items_per_session")
    )
    
    df_users = df_users.join(df_early_session_depth)
    df_users = df_users.join(df_late_session_depth)
    df_users["early_avg_items_per_session"] = df_users["early_avg_items_per_session"].fillna(0)
    df_users["late_avg_items_per_session"] = df_users["late_avg_items_per_session"].fillna(0)
    
    df_users["session_depth_change"] = (
        df_users["late_avg_items_per_session"] - df_users["early_avg_items_per_session"]
    )

    df_users = df_users.reset_index()

    return df_users

In [6]:
user_base_test_df = build_user_features(df_test)

In [7]:
PROCESSED_DATA_DIR = Path("data/processing_checkpoint")
checkpoint_file_path = PROCESSED_DATA_DIR / "04_user_base_test.parquet"
user_base_test_df.to_parquet(checkpoint_file_path, index=False)

In [8]:
user_base_test_df.sample(5)

Unnamed: 0,userId,gender,registration,num_unique_artists,count_about,count_add_friend,count_add_to_playlist,count_downgrade,count_error,count_help,...,within_window_activity_ratio,within_window_activity_change,early_songs_played,late_songs_played,song_listening_change,recent_actions_last_3d,recent_activity_ratio,early_avg_items_per_session,late_avg_items_per_session,session_depth_change
682,1030443,M,2018-09-12 08:45:20,572,2,14,24,6,0,4,...,0.217224,-608.0,603.0,126.0,-477.0,0.0,0.0,67.427284,40.798817,-26.628468
329,1487060,M,2018-08-26 02:15:14,2905,11,93,174,52,5,36,...,0.785892,-861.0,3362.0,2583.0,-779.0,525.0,0.073018,151.321988,90.496839,-60.825148
2128,1658531,M,2018-08-03 08:12:12,771,2,23,22,0,2,8,...,3.903346,782.0,187.0,782.0,595.0,35.0,0.026535,38.134328,99.120952,60.986624
2012,1429977,M,2018-09-26 06:10:46,303,0,4,13,1,0,1,...,3.61,262.0,70.0,295.0,225.0,92.0,0.199566,42.636364,56.429363,13.792999
1170,1046525,M,2018-08-22 11:44:57,1594,6,71,65,17,5,15,...,0.932424,-108.0,1340.0,1244.0,-96.0,643.0,0.206288,157.561414,176.445479,18.884064
