## Data Cleaning and Merging

### This script:
 1) samples 100,000 impressions (chunked)
 2) collects unique userId/mlogId/creatorId
 3) filters user/card/creator tables to those keys
 4) merges
 5) saves CSV to "data/"

In [3]:
from pathlib import Path
import pandas as pd
import numpy as np
from collections import defaultdict

# ---------------------------
# Config
# ---------------------------
DATA_DIR = Path("../csv_data")
OUT_DIR = Path("../data")
OUT_CSV = OUT_DIR / "imp_sample_100k_merged.csv"

# User-first sampling targets
N_USERS = 15_000            # uniformly sample users for modeling & EDA
MAX_IMPS_PER_USER = 10      # cap per-user impressions to ~100k total rows
CHUNKSIZE = 250_000
SEED = 42

# File paths
IMP_PATH = DATA_DIR / "impression_data.csv"
USR_PATH = DATA_DIR / "user_demographics.csv"
CRD_PATH = DATA_DIR / "mlog_demographics.csv"
CRT_PATH = DATA_DIR / "creator_demographics.csv"

# ---------------------------
# Reservoir sampling of users
# ---------------------------
def sample_users_uniform(user_csv, n_users=N_USERS, chunksize=CHUNKSIZE, seed=SEED):
    """
    Uniform sampling over all users listed in user_demographics.csv via reservoir sampling.
    Each userId is considered exactly once (no bias from impression frequency).
    """
    rng = np.random.default_rng(seed)
    reservoir = []
    seen = 0
    usecols = ["userId"]  # minimal read
    for chunk in pd.read_csv(user_csv, usecols=usecols, chunksize=chunksize, dtype={"userId": "string"}):
        for u in chunk["userId"]:
            if pd.isna(u):
                continue
            seen += 1
            if len(reservoir) < n_users:
                reservoir.append(u)
            else:
                j = rng.integers(0, seen)
                if j < n_users:
                    reservoir[j] = u
    return set(reservoir)

# ---------------------------
# Read helpers (filtered joins)
# ---------------------------
def read_filtered(path, key_col, keep_keys, usecols, chunksize=CHUNKSIZE):
    keep_keys = {str(x) for x in pd.Series(list(keep_keys)).dropna().unique()}
    parts = []
    for chunk in pd.read_csv(path, usecols=usecols, chunksize=chunksize, dtype={key_col: "string"}):
        chunk[key_col] = chunk[key_col].astype("string")
        sel = chunk[chunk[key_col].isin(keep_keys)]
        if not sel.empty:
            parts.append(sel)
    return pd.concat(parts, ignore_index=True) if parts else pd.DataFrame(columns=usecols)

# ---------------------------
# Build impressions for sampled users
# ---------------------------
def collect_impressions_for_users(imp_path, sampled_users, max_imps_per_user=MAX_IMPS_PER_USER,
                                  chunksize=CHUNKSIZE):
    usecols = [
        "userId","mlogId","impressTime","dt","impressPosition",
        "isClick","isLike","isComment","isShare","isViewComment",
        "isIntoPersonalHomepage","mlogViewTime"
    ]
    dtypes = {
        "userId": "string",
        "mlogId": "string",
        "dt": "Int16",
        "impressPosition": "Int16",
        "isClick": "Int8",
        "isLike": "Int8",
        "isComment": "Int8",
        "isShare": "Int8",
        "isViewComment": "Int8",
        "isIntoPersonalHomepage": "Int8",
        "mlogViewTime": "float32"
    }
    per_user_count = defaultdict(int)
    parts = []

    for chunk in pd.read_csv(imp_path, usecols=usecols, chunksize=chunksize, dtype=dtypes):
        chunk = chunk[chunk["userId"].isin(sampled_users)]
        if chunk.empty:
            continue
        # Order by time to keep early behavior meaningful
        chunk.sort_values(["userId", "impressTime"], inplace=True, kind="mergesort")

        # Apply per-user cap 
        keep_mask = []
        for u in chunk["userId"]:
            c = per_user_count[u]
            keep = c < max_imps_per_user
            keep_mask.append(keep)
            if keep:
                per_user_count[u] = c + 1

        kept = chunk.loc[keep_mask]
        if not kept.empty:
            parts.append(kept)

        # Early stop if everyone reached cap 
        if len(per_user_count) >= len(sampled_users) and all(
            per_user_count[u] >= max_imps_per_user for u in sampled_users
        ):
            break

    if not parts:
        return pd.DataFrame(columns=usecols)
    return pd.concat(parts, ignore_index=True)

# ---------------------------
# Post-merge hygiene
# ---------------------------
def cast_post_merge(df: pd.DataFrame) -> pd.DataFrame:
    if {"userId","mlogId","impressTime"}.issubset(df.columns):
        df.drop_duplicates(["userId","mlogId","impressTime"], inplace=True)

    # booleans to uint8
    for c in ["isClick","isLike","isComment","isShare","isViewComment","isIntoPersonalHomepage"]:
        if c in df:
            df[c] = df[c].fillna(0).astype("uint8")

    # numerics
    if "impressPosition" in df:
        df["impressPosition"] = df["impressPosition"].astype("Int16")
    if "mlogViewTime" in df:
        df["mlogViewTime"] = pd.to_numeric(df["mlogViewTime"], errors="coerce").astype("float32")

    # categoricals for memory
    for c in ["gender","province","type","contentId","talkId","creatorType"]:
        if c in df:
            df[c] = df[c].astype("category")

    return df

# ---------------------------
# Main build
# ---------------------------
def main():
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    # 1) Uniformly sample users 
    sampled_users = sample_users_uniform(USR_PATH, n_users=N_USERS, chunksize=CHUNKSIZE, seed=SEED)
    print(f"Sampled users: {len(sampled_users):,}")

    # 2) Pull up impressions per sampled user
    imp = collect_impressions_for_users(IMP_PATH, sampled_users, MAX_IMPS_PER_USER, CHUNKSIZE)
    print(f"Collected impressions: {len(imp):,} ")

    # 3) Keys for joins
    user_keys = imp["userId"].astype("string").unique()
    mlog_keys = imp["mlogId"].astype("string").unique()

    # 4) Read user/card/creator dims (static, leakage-free)
    users = read_filtered(
        USR_PATH, "userId", user_keys,
        ["userId","age","gender","province","level","registeredMonthCnt","followCnt"]
    )
    cards = read_filtered(
        CRD_PATH, "mlogId", mlog_keys,
        ["mlogId","type","contentId","talkId","publishTime","creatorId"]
    )
    creator_keys = cards["creatorId"].dropna().astype("string").unique() if not cards.empty else []
    creators = read_filtered(
        CRT_PATH, "creatorId", creator_keys,
        ["creatorId","creatorType","level"]
    )

    # 5) Merge
    df = (imp
          .merge(users, on="userId", how="left", suffixes=("", "_user"))
          .merge(cards, on="mlogId", how="left", suffixes=("", "_card"))
          .merge(creators, on="creatorId", how="left", suffixes=("", "_creator"))
          )

    # 6) Rename levels and create label
    if "level" in df.columns:
        df.rename(columns={"level": "user_level"}, inplace=True)
    if "level_creator" in df.columns:
        df.rename(columns={"level_creator": "creator_level"}, inplace=True)
    elif "level_y" in df.columns and "creator_level" not in df.columns:
        df.rename(columns={"level_y": "creator_level"}, inplace=True)
    if "level_x" in df.columns and "user_level" not in df.columns:
        df.rename(columns={"level_x": "user_level"}, inplace=True)

    # Binary label: active if user_level >= 5
    if "user_level" in df.columns:
        df["y_active"] = (pd.to_numeric(df["user_level"], errors="coerce") >= 5).astype("uint8")
    else:
        df["y_active"] = np.nan

    # 7) dtypes
    df = cast_post_merge(df)

    # 8) Save
    df.to_csv(OUT_CSV, index=False)
    print(f"Saved: {OUT_CSV}")

if __name__ == "__main__":
    main()


Sampled users: 15,000
Collected impressions: 107,425 
Saved: ../data/imp_sample_100k_merged.csv


#### Sanity Checks

In [5]:
df = pd.read_csv("../data/imp_sample_100k_merged.csv")
# 1) Should be ~100,000 rows
length=len(df)
print("Rows: ",length)

# 2) How many unique users/cards/creators in the sample?
print("Users: ",df["userId"].nunique())
print("Cards: ",df["mlogId"].nunique())
print("Creators: ",df["creatorId"].nunique())

# 3) Null rates on key columns
print("Null rates: ",df[["age","gender","province","type","contentId","talkId","creatorType"]].isna().mean().round(3))

# 4) Spot-check that clicks exist
print("Clicks: ",df["isClick"].sum(), df["isShare"].sum())

Rows:  107425
Users:  15000
Cards:  12019
Creators:  7056
Null rates:  age            0.364
gender         0.364
province       0.000
type           0.000
contentId      0.134
talkId         0.000
creatorType    0.000
dtype: float64
Clicks:  3456 17
