In [1]:
# Cell 1 – config & imports
import gc, json, numpy as np, pandas as pd, os
import pyarrow as pa
import pyarrow.parquet as pq

TX_PATH = "../data/input_data/transactions_train.csv"
FEATURE_DIR = "../data/outputs"

# weeks we have built features for
TRAIN_WEEKS = [20200819, 20200826,20200902, 20200909]   # train label weeks
VALID_WEEK  = 20200916               # validation label week

LABEL_LOOKBACK_DAYS = 7
RANDOM_SEED = 42
RANK_TOP_K_EVAL = 12

np.random.seed(RANDOM_SEED)

def hex16_to_int(s):
    return np.int64(np.uint64(int(s[-16:],16)))

In [2]:
# Cell 2 – load feature column order (saved from features pipeline)
with open(f"{FEATURE_DIR}/feature_cols.json") as f:
    meta = json.load(f)

FULL_FEATURE_COLS = meta["feature_cols"]

# ID columns always needed
ID_COLS = ["customer_id", "article_id"]

print("Total available features:", len(FULL_FEATURE_COLS))

# For now, use all non-ID columns as model features
MODEL_FEATURES = [c for c in FULL_FEATURE_COLS if c not in ID_COLS]
print("Model feature count:", len(MODEL_FEATURES))

Total available features: 36
Model feature count: 34


In [3]:
# # Cell 3 – stream-load each week's features to a normalized file
# train_files = []
# valid_file = None

# for w in TRAIN_WEEKS + [VALID_WEEK]:
#     out_path = f"../data/outputs/features_stream_week={w}.parquet"

#     # MODIFIED: Always overwrite, never skip
#     if os.path.exists(out_path):
#         print(f"Removing old stream file: {out_path}")
#         os.remove(out_path)

#     file_path = f"{FEATURE_DIR}/features_week={w}.parquet"
#     print("Creating stream file from:", file_path)

#     # Load only required columns
#     df = pd.read_parquet(file_path, columns=ID_COLS + MODEL_FEATURES)
#     df["week_end"] = np.int32(w)

#     # Save normalized version
#     df.to_parquet(out_path, index=False)
#     print("Saved streamed:", out_path)

#     del df
#     gc.collect()

#     if w == VALID_WEEK:
#         valid_file = out_path
#     else:
#         train_files.append(out_path)

# print("Train stream files:", train_files)
# print("Valid stream file:", valid_file)

# # 11 min, 22s

Creating stream file from: ../data/outputs/features_week=20200819.parquet
Saved streamed: ../data/outputs/features_stream_week=20200819.parquet
Creating stream file from: ../data/outputs/features_week=20200826.parquet
Saved streamed: ../data/outputs/features_stream_week=20200826.parquet
Removing old stream file: ../data/outputs/features_stream_week=20200916.parquet
Creating stream file from: ../data/outputs/features_week=20200916.parquet
Saved streamed: ../data/outputs/features_stream_week=20200916.parquet
Train stream files: ['../data/outputs/features_stream_week=20200819.parquet', '../data/outputs/features_stream_week=20200826.parquet']
Valid stream file: ../data/outputs/features_stream_week=20200916.parquet


In [4]:
# Cell 4 – build labels and merge into train/valid (FULLY STREAMING)

import shutil
import pyarrow as pa
import pyarrow.parquet as pq

TX_CHUNK_ROWS = 5_000_000
LABELS_DIR = "../data/outputs/labels_temp"

def labels_path_for_week(w):
    return f"{LABELS_DIR}/labels_week={w}.parquet"

def empty_labels_df():
    return pd.DataFrame({
        'customer_id': pd.Series(dtype='int64'),
        'article_id': pd.Series(dtype='int32'),
        'label': pd.Series(dtype='int8'),
        'week_end': pd.Series(dtype='int32'),
    })

def infer_week_from_features_path(p):
    return int(p.split('week=')[1].split('.')[0])

def load_labels_for_week(w):
    path = labels_path_for_week(w)
    if os.path.exists(path):
        return pd.read_parquet(path)
    return empty_labels_df()

In [4]:
# 4.1 – Build labels FIRST in streaming mode
print("Streaming transactions to build labels...")
week_windows = {
    w: {
        "cut_ts": pd.to_datetime(str(w)) - pd.Timedelta(days=LABEL_LOOKBACK_DAYS),
        "last_ts": pd.to_datetime(str(w)),
    }
    for w in TRAIN_WEEKS + [VALID_WEEK]
}

shutil.rmtree(LABELS_DIR, ignore_errors=True)
os.makedirs(LABELS_DIR, exist_ok=True)

label_paths = {w: labels_path_for_week(w) for w in week_windows}
label_writers = {}

tx_iter = pd.read_csv(
    TX_PATH,
    usecols=["t_dat", "customer_id", "article_id"],
    dtype={"t_dat": "string", "customer_id": "string", "article_id": "int32"},
    chunksize=TX_CHUNK_ROWS,
)

for idx, chunk in enumerate(tx_iter, start=1):
    chunk['t_dat'] = pd.to_datetime(chunk['t_dat'])
    chunk["customer_id"] = chunk["customer_id"].str[-16:].apply(hex16_to_int)

    for w, bounds in week_windows.items():
        mask = (chunk["t_dat"] > bounds["cut_ts"]) & (chunk["t_dat"] <= bounds["last_ts"])
        if not mask.any():
            continue

        lbl = chunk.loc[mask, ["customer_id", "article_id"]].drop_duplicates()
        if lbl.empty:
            continue

        lbl["label"] = 1
        lbl["week_end"] = np.int32(w)

        table = pa.Table.from_pandas(
            lbl[["customer_id", "article_id", "label", "week_end"]],
            preserve_index=False,
        )

        writer = label_writers.get(w)
        if writer is None:
            label_writers[w] = pq.ParquetWriter(
                label_paths[w],
                table.schema,
                compression="snappy",
            )
            writer = label_writers[w]

        writer.write_table(table)

    del chunk
    if idx % 10 == 0:
        gc.collect()

for writer in label_writers.values():
    writer.close()

# Deduplicate and finalize labels
total_positive = 0
for w, path in label_paths.items():
    if os.path.exists(path):
        week_df = pd.read_parquet(path, engine="pyarrow")
        if not week_df.empty:
            week_df = week_df.drop_duplicates(["customer_id", "article_id", "week_end"])
            week_df["label"] = week_df["label"].astype("int8")
        else:
            week_df = empty_labels_df()
        week_df.to_parquet(path, index=False)
        total_positive += len(week_df)
        del week_df
    else:
        empty_labels_df().to_parquet(path, index=False)

print("Total positive labels (all weeks):", total_positive)
gc.collect()

# 2 min, 12 sec

Streaming transactions to build labels...
Total positive labels (all weeks): 1150836


0

In [5]:
# %% 
# Cell 4.2 – Winner-style GLOBAL negative sampling
# --------------------------------------------------------------

import os, gc
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa

NEG_SAMPLES = 2_000_000
FEATURE_CHUNK_ROWS = 5_000_000
rng = np.random.RandomState(42)

train_files = [f"../data/outputs/features_stream_week={w}.parquet" for w in TRAIN_WEEKS]
valid_file  = f"../data/outputs/features_stream_week={VALID_WEEK}.parquet"

all_week_files = train_files + [valid_file]
output_files = {}

print("Using feature stream files:")
print("Train:", train_files)
print("Valid:", valid_file)


# --------------------------------------------------------------
# GLOBAL-sampling function
# --------------------------------------------------------------
def sample_global_negatives(all_neg_chunks, target, rng):
    """Take list of DataFrames containing negatives → return a 1M global sample."""
    neg_df = pd.concat(all_neg_chunks, ignore_index=True)
    if len(neg_df) <= target:
        return neg_df     # small case

    return neg_df.sample(target, random_state=rng)


# --------------------------------------------------------------
# PROCESS EACH WEEK
# --------------------------------------------------------------
for f in all_week_files:
    week = infer_week_from_features_path(f)
    is_valid = (week == VALID_WEEK)

    print(f"\n{'='*60}")
    print(f"Processing week {week} ({'VALID' if is_valid else 'TRAIN'})")
    print(f"{'='*60}")

    # Load positive labels
    week_labels = load_labels_for_week(week).astype({
        "customer_id": "int64",
        "article_id":  "int32",
        "week_end":    "int32",
        "label":       "int8"
    })

    # IMPORTANT PART:
    # Only customers with positives are used for negative sampling.
    training_customers = set(week_labels["customer_id"].unique())
    print(f"Training customers: {len(training_customers)}")

    # Buffers
    pos_chunks = []
    neg_chunks = []

    pqfile = pq.ParquetFile(f)

    # ------------ STREAM FEATURES ------------
    for batch_idx, batch in enumerate(pqfile.iter_batches(batch_size=FEATURE_CHUNK_ROWS)):

        df = batch.to_pandas()

        df["customer_id"] = df["customer_id"].astype("int64")
        df["article_id"]  = df["article_id"].astype("int32")
        df["week_end"]    = df["week_end"].astype("int32")

        # Filter to customers with positives → CRITICAL FIX
        if not is_valid:
            df = df[df["customer_id"].isin(training_customers)]
        if df.empty:
            continue

        # Merge labels
        df = df.merge(
            week_labels,
            on=["customer_id", "article_id", "week_end"],
            how="left"
        )
        df["label"] = df["label"].fillna(0).astype("int8")

        # Split
        pos = df[df.label == 1]
        neg = df[df.label == 0]

        if not pos.empty:
            pos_chunks.append(pos)

        if not is_valid and not neg.empty:
            neg_chunks.append(neg)
        if is_valid and not neg.empty:
            neg_chunks.append(neg)   # keep all negs in valid

        del df, pos, neg
        if batch_idx % 10 == 0:
            gc.collect()

    # ------------- VALID CASE: keep all -------------
    if is_valid:
        final_df = pd.concat(pos_chunks + neg_chunks, ignore_index=True)

    # ------------- TRAIN CASE: global sample 1M negs -------------
    else:
        pos_df = pd.concat(pos_chunks, ignore_index=True)
        neg_df = sample_global_negatives(neg_chunks, NEG_SAMPLES, rng)

        final_df = pd.concat([pos_df, neg_df], ignore_index=True)

        print(f"Sampled negatives: {len(neg_df)}")
        print(f"Positives kept:   {len(pos_df)}")

    # Save
    out_tmp = (
        "../data/outputs/valid_merged_temp.parquet"
        if is_valid
        else f"../data/outputs/train_part_week={week}_temp.parquet"
    )

    final_df.to_parquet(out_tmp, index=False, compression="snappy")
    print(f"Saved: {out_tmp}  rows={len(final_df):,}")

    # rename train file
    if not is_valid:
        out_final = out_tmp.replace("_temp", "")
        os.rename(out_tmp, out_final)

    del final_df, pos_chunks, neg_chunks
    gc.collect()

print("\nDone building merged train/valid files.")

# 4m with 2m negatives per week


Using feature stream files:
Train: ['../data/outputs/features_stream_week=20200819.parquet', '../data/outputs/features_stream_week=20200826.parquet', '../data/outputs/features_stream_week=20200902.parquet', '../data/outputs/features_stream_week=20200909.parquet']
Valid: ../data/outputs/features_stream_week=20200916.parquet

Processing week 20200819 (TRAIN)
Training customers: 66199
Sampled negatives: 2000000
Positives kept:   40551
Saved: ../data/outputs/train_part_week=20200819_temp.parquet  rows=2,040,551

Processing week 20200826 (TRAIN)
Training customers: 74010
Sampled negatives: 2000000
Positives kept:   43777
Saved: ../data/outputs/train_part_week=20200826_temp.parquet  rows=2,043,777

Processing week 20200902 (TRAIN)
Training customers: 77684
Sampled negatives: 2000000
Positives kept:   47317
Saved: ../data/outputs/train_part_week=20200902_temp.parquet  rows=2,047,317

Processing week 20200909 (TRAIN)
Training customers: 77091
Sampled negatives: 2000000
Positives kept:   48437


In [6]:
# %%
# Cell 5.1 – Process VALID using PURE PYARROW (Fast, low-memory)

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import numpy as np
import gc

valid_rank_path = "../data/outputs/valid_rank.parquet"
valid_groups_path = "../data/outputs/groups_valid.npy"
valid_merged_file = "../data/outputs/valid_merged_temp.parquet"

print("Processing VALID for ranking (PyArrow fast sort)...")

# 1. Read file as Arrow table (zero-copy, multithreaded)
table = pq.read_table(valid_merged_file)

# 2. Build query_id column (vectorized compute)
query_id = (
    pc.multiply(table["week_end"].cast(pa.int64()),
                pa.scalar(100_000_000, pa.int64()))
)
query_id = pc.add(query_id, table["customer_id"].cast(pa.int64()))
table = table.append_column("query_id", query_id)

# 3. Add random column for stable shuffle inside each query
n = table.num_rows
rand_col = pa.array(np.random.RandomState(RANDOM_SEED).rand(n).astype("float32"))
table = table.append_column("__rand", rand_col)

# 4. Sort by query_id (primary) then random (secondary)
print("Sorting Arrow table...")
sort_keys = [("query_id", "ascending"), ("__rand", "ascending")]
table = table.sort_by(sort_keys)

# 5. Compute group sizes (using NumPy view of Arrow buffers)
q = table["query_id"].to_numpy(zero_copy_only=False)
boundary_idx = np.flatnonzero(np.r_[True, q[1:] != q[:-1], True])
valid_group_sizes = np.diff(boundary_idx)

# 6. Remove temporary random column
table = table.drop_columns(["__rand"])

# 7. Write final sorted validation file
pq.write_table(table, valid_rank_path, compression="snappy")
np.save(valid_groups_path, valid_group_sizes)

print(f"Valid rows: {table.num_rows}")
print(f"Valid groups: {len(valid_group_sizes)}, avg size={table.num_rows/len(valid_group_sizes):.2f}")
print("Saved valid_rank.parquet and groups_valid.npy")

# Cleanup
del table, q, boundary_idx, valid_group_sizes
gc.collect()

# 42s


Processing VALID for ranking (PyArrow fast sort)...
Sorting Arrow table...
Valid rows: 253714158
Valid groups: 1371980, avg size=184.93
Saved valid_rank.parquet and groups_valid.npy


0

In [7]:
# Cell 5.2 – Process TRAIN (Load ALL parts, Sort, Write)
train_rank_path = "../data/outputs/train_rank.parquet"
train_groups_path = "../data/outputs/groups_train.npy"
train_parts_files = [f"../data/outputs/train_part_week={w}.parquet" for w in TRAIN_WEEKS]

print("\nProcessing TRAIN for ranking (Global Sort)...")

# 1. Load ALL train parts
# OPTIMIZATION: Load one by one and append to list, then concat.
dfs = []
for f in train_parts_files:
    print(f"Loading {f}...")
    _df = pd.read_parquet(f)
    dfs.append(_df)

print("Concatenating...")
df_train = pd.concat(dfs, ignore_index=True)
del dfs
gc.collect()

# 2. Create Query ID
print("Creating Query IDs...")
df_train["query_id"] = (
    df_train["week_end"].astype("int64") * 100_000_000
    + df_train["customer_id"].astype("int64")
)

# 3. GLOBAL SORT
print("Generating random sort key...")
rng = np.random.RandomState(RANDOM_SEED)
df_train["__rand"] = rng.rand(len(df_train))

print("Sorting train data (this may take a while)...")
df_train = df_train.sort_values(["query_id", "__rand"])
df_train.drop(columns=["__rand"], inplace=True)

# 4. Calculate Groups
print("Calculating groups...")
q_ids = df_train["query_id"].values
unique_indices = np.flatnonzero(np.r_[True, q_ids[1:] != q_ids[:-1], True])
train_group_sizes = np.diff(unique_indices)

# 5. Save
print("Saving train data...")
table = pa.Table.from_pandas(df_train, preserve_index=False)
pq.write_table(table, train_rank_path, compression="snappy")
np.save(train_groups_path, train_group_sizes)

print(f"Train rows: {len(df_train)}")
print(f"Train groups: {len(train_group_sizes)} (Avg size: {len(df_train)/len(train_group_sizes):.2f})")
print("Saved train_rank.parquet and groups_train.npy")

del df_train, table, q_ids, unique_indices
gc.collect()

# 2 min, 4s


Processing TRAIN for ranking (Global Sort)...
Loading ../data/outputs/train_part_week=20200819.parquet...
Loading ../data/outputs/train_part_week=20200826.parquet...
Loading ../data/outputs/train_part_week=20200902.parquet...
Loading ../data/outputs/train_part_week=20200909.parquet...
Concatenating...
Creating Query IDs...
Generating random sort key...
Sorting train data (this may take a while)...
Calculating groups...
Saving train data...
Train rows: 8180082
Train groups: 294983 (Avg size: 27.73)
Saved train_rank.parquet and groups_train.npy


0

In [8]:
# Cell 6 – choose model features & save dataset_meta.json

# We no longer have train_full in memory; infer columns from one train part
example_train_path = train_parts_files[0]
example_df = pd.read_parquet(example_train_path)

drop_cols = ["label", "week_end", "query_id"]
id_cols   = ["customer_id", "article_id"]

model_features = [
    c for c in example_df.columns
    if c not in drop_cols + id_cols
]

print("Number of model features:", len(model_features))
del example_df
gc.collect()

# Get row counts from saved files
train_rank_rows = sum(
    pd.read_parquet(p, columns=["customer_id"]).shape[0]
    for p in train_parts_files
)
valid_rank_rows = pd.read_parquet("../data/outputs/valid_rank.parquet", columns=["customer_id"]).shape[0]

with open("../data/outputs/dataset_meta.json", "w") as f:
    json.dump({
        "train_weeks": TRAIN_WEEKS,
        "valid_week": VALID_WEEK,
        "label_lookback_days": LABEL_LOOKBACK_DAYS,
        "neg_sample_clf": NEG_SAMPLES,
        "model_features": model_features,
        "rank_train_rows": int(train_rank_rows),
        "rank_valid_rows": int(valid_rank_rows),
        "rank_at": RANK_TOP_K_EVAL,
    }, f)

print("Saved ../data/outputs/dataset_meta.json")

Number of model features: 34
Saved ../data/outputs/dataset_meta.json
