# Feature engineering for two-tower + ranking RecSys

Build interaction, item, and visitor feature tables with `dask_cudf`, write to parquet, and log artifacts to MLflow.

In [None]:
import pathlib
import zipfile
from datetime import datetime

import os
import json
from urllib.parse import urlparse
import dask_cudf as dc
import dask.dataframe as dd
import cudf
from dask.distributed import Client

# Paths and clients
DATA_DIR = pathlib.Path("/tmp")
UNZIP_DIR = DATA_DIR / "unzipped"
SHARED_ROOT = pathlib.Path("/home/rapids/shared_data")
OUTPUT_DIR = SHARED_ROOT / "feature_store"
UNZIP_DIR.mkdir(exist_ok=True)
SHARED_ROOT.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(exist_ok=True)

client = Client(processes=False)


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://172.18.0.12:8787/status,

0,1
Dashboard: http://172.18.0.12:8787/status,Workers: 1
Total threads: 12,Total memory: 62.64 GiB
Status: running,Using processes: False

0,1
Comm: inproc://172.18.0.12/75/1,Workers: 0
Dashboard: http://172.18.0.12:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: inproc://172.18.0.12/75/4,Total threads: 12
Dashboard: http://172.18.0.12:45873/status,Memory: 62.64 GiB
Nanny: None,
Local directory: /tmp/dask-scratch-space/worker-_4m4hjn6,Local directory: /tmp/dask-scratch-space/worker-_4m4hjn6
GPU: NVIDIA GeForce GTX 1650 Ti,GPU memory: 4.00 GiB


In [None]:
def ensure_unzipped(zip_path: pathlib.Path) -> pathlib.Path:
    target = UNZIP_DIR / zip_path.with_suffix("").name
    if not target.exists():
        with zipfile.ZipFile(zip_path) as zf:
            info = zf.infolist()[0]
            target.parent.mkdir(parents=True, exist_ok=True)
            with zf.open(info) as src, target.open("wb") as dst:
                dst.write(src.read())
    return target

EVENTS_CSV = ensure_unzipped(DATA_DIR / "events.csv.zip")
ITEMS1_CSV = ensure_unzipped(DATA_DIR / "item_properties_part1.csv.zip")
ITEMS2_CSV = ensure_unzipped(DATA_DIR / "item_properties_part2.csv.zip")
TREE_CSV = DATA_DIR / "category_tree.csv"

EVENTS_DF = dc.read_csv(EVENTS_CSV, dtype={"timestamp": "int64", "visitorid": "int64"})
ITEM_PROPS_DF = dc.read_csv([ITEMS1_CSV, ITEMS2_CSV], dtype={"timestamp": "int64", "itemid": "int64"})
TREE_DF = dc.read_csv(TREE_CSV)

print(EVENTS_DF)



Unnamed: 0_level_0,timestamp,visitorid,event,itemid,transactionid
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
,int64,int64,object,int64,int8
,...,...,...,...,...


In [None]:
def build_events(df: dc.DataFrame) -> dc.DataFrame:
    out = df.copy()
    out["event_ts"] = out["timestamp"].map_partitions(
        cudf.to_datetime, unit="ms", meta=("event_ts", "datetime64[ns]")
    )
    out = out.rename(columns={"visitorid": "visitor_id", "itemid": "item_id"})
    return out[["visitor_id", "item_id", "event", "transactionid", "event_ts"]]


def build_item_features(events: dc.DataFrame, props: dc.DataFrame):
    # Event counts per item (small pivot ok)
    evt_counts = events.groupby(["item_id", "event"]).size().reset_index().rename(columns={0: "count"})
    evt_counts = evt_counts.compute()
    if len(evt_counts) == 0:
        evt_pivot = cudf.DataFrame({"item_id": []})
    else:
        evt_pivot = evt_counts.pivot(index="item_id", columns="event", values="count").fillna(0)
        evt_pivot.columns = [f"event_{c}" for c in evt_pivot.columns]
        evt_pivot = evt_pivot.reset_index()
    evt_pivot_dd = dc.from_cudf(evt_pivot, npartitions=1)

    # Property counts stay tall to avoid wide pivot OOM
    prop_counts_dd = props.groupby(["itemid", "property"]).size().rename("prop_count").reset_index()
    prop_counts_dd = prop_counts_dd.rename(columns={"itemid": "item_id"})
    return evt_pivot_dd, prop_counts_dd


def build_visitor_features(events: dc.DataFrame) -> dc.DataFrame:
    agg = events.groupby(["visitor_id", "event"]).size().reset_index().rename(columns={0: "count"}).compute()
    if len(agg) == 0:
        pivot = cudf.DataFrame({"visitor_id": []})
    else:
        pivot = agg.pivot(index="visitor_id", columns="event", values="count").fillna(0)
        pivot.columns = [f"evt_{c}" for c in pivot.columns]
        pivot = pivot.reset_index()
    recency = events.groupby("visitor_id")["event_ts"].max().compute().reset_index()
    visitors = dc.from_cudf(pivot, npartitions=1).merge(
        dc.from_cudf(recency, npartitions=1), on="visitor_id", how="left"
    )
    return visitors


EVENTS_FEATS = build_events(EVENTS_DF)
ITEM_FEATS, ITEM_PROP_COUNTS = build_item_features(EVENTS_FEATS, ITEM_PROPS_DF)
VISITOR_FEATS = build_visitor_features(EVENTS_FEATS)

print(EVENTS_FEATS.head())

Unnamed: 0,visitor_id,item_id,event,transactionid,event_ts
0,257597,355908,view,,2015-06-02 05:02:12.117
1,992329,248676,view,,2015-06-02 05:50:14.164
2,111016,318965,view,,2015-06-02 05:13:19.827
3,483717,253185,view,,2015-06-02 05:12:35.914
4,951259,367447,view,,2015-06-02 05:02:17.106


In [None]:
# Write parquet locally (overwrites on rerun)
events_out = OUTPUT_DIR / "events"
items_out = OUTPUT_DIR / "item_features"
item_prop_out = OUTPUT_DIR / "item_property_counts"
visitors_out = OUTPUT_DIR / "visitor_features"

EVENTS_FEATS.to_parquet(events_out, compression="snappy", overwrite=True)
ITEM_FEATS.to_parquet(items_out, compression="snappy", overwrite=True)
ITEM_PROP_COUNTS.to_parquet(item_prop_out, compression="snappy", overwrite=True)
VISITOR_FEATS.to_parquet(visitors_out, compression="snappy", overwrite=True)

print(events_out, items_out, item_prop_out, visitors_out)

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


(PosixPath('/home/rapids/shared_data/feature_store/events'),
 PosixPath('/home/rapids/shared_data/feature_store/item_features'),
 PosixPath('/home/rapids/shared_data/feature_store/item_property_counts'),
 PosixPath('/home/rapids/shared_data/feature_store/visitor_features'))

### Outputs

- `events/` : cleaned interactions with timestamps.
- `item_features/` : per-item event counts (wide, small).
- `item_property_counts/` : tall table of per-item property counts (avoid wide pivot).
- `visitor_features/` : per-visitor event counts and recency.

Artifacts are written under `/tmp/feature_store` and logged to MLflow at `datasets/feature_store`.