### 05 — Sessionization & Prefix→Target Generation

**Goal:**  
- For **Amazon Books 2023**: build synthetic sessions using **sliding windows** (Option A) to convert sparse user timelines into many sequence training examples.
- For **MARS**: sessionize by **1 hour gap** (as decided in Step 04).
- For **YOOCHOOSE**: use existing session IDs (already processed).
- Generate `prefix -> target` pairs for training (next-item prediction).
- Save outputs and report diagnostics.

Outputs saved to `data/processed/`:
- `amazon_windows_sessions.parquet` (or chunked files)
- `amazon_prefix_target.parquet` (or chunked)
- `mars_sessions.parquet`
- `mars_prefix_target.parquet`
- summary CSVs


In [2]:
# Cell: imports & settings
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm import tqdm
import math
import json

%matplotlib inline

DATA_DIR = Path("../data/processed")
OUT_DIR = DATA_DIR
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Parameters (tune as needed)
WINDOW_SIZE = 20    # number of items in a sliding window
STRIDE = 10         # step between windows
MIN_WINDOW_ITEMS = 2  # min items to keep a window (>=2 gives at least one pair)
MAX_PREFIX_LEN = 50  # cap prefix length when generating prefix-target
MIN_SESSION_LEN = 2
CHUNK_WRITE = True  # if True, write parquet in chunk files to avoid excessive RAM use


#### A — Helper functions
- `build_sliding_windows()` : builds sliding-window sessions for Amazon (memory-efficient).
- `sessionize_by_gap()` : sessionize a dataset by time-gap (used for MARS).
- `generate_prefix_target()` : generate prefix→target pairs from a sessions DataFrame.


In [3]:
# Cell: helper function definitions
def build_sliding_windows(df, user_col='user_id', item_col='item_id',
                          time_col='timestamp', window_size=WINDOW_SIZE, stride=STRIDE,
                          out_session_col='session_id_real'):
    """
    Build sliding-window synthetic sessions from a user timeline DataFrame.
    Returns a DataFrame with columns: dataset, user_id, session_id_real, item_id, timestamp
    This version streams per-user and yields rows to reduce peak memory usage.
    """
    rows = []
    # Ensure sorted
    df = df.sort_values([user_col, time_col])
    users = df[user_col].unique()
    total_users = len(users)
    print(f"Users to process: {total_users:,}")
    
    # iterate per user
    for user in tqdm(users, desc="Building windows per user"):
        user_slice = df[df[user_col] == user]
        items = user_slice[item_col].astype(str).tolist()
        times = pd.to_datetime(user_slice[time_col]).tolist()
        n = len(items)
        if n < MIN_WINDOW_ITEMS:
            continue
        start = 0
        win_idx = 0
        while start < n:
            end = min(start + window_size, n)
            window_items = items[start:end]
            window_times = times[start:end]
            if len(window_items) >= MIN_WINDOW_ITEMS:
                sess_id = f"{user}__w{win_idx}_{start}"
                for it, ts in zip(window_items, window_times):
                    rows.append({
                        "dataset": "amazon_books_2023",
                        "user_id": str(user),
                        out_session_col: sess_id,
                        "item_id": str(it),
                        "timestamp": pd.to_datetime(ts)
                    })
                win_idx += 1
            start += stride
            # if rows accumulate too large, yield in batches (handled by caller)
            if len(rows) >= 500_000:
                yield pd.DataFrame(rows)
                rows = []
    # final flush
    if rows:
        yield pd.DataFrame(rows)


def sessionize_by_gap(df, gap_seconds=3600, user_col='user_id', time_col='timestamp',
                      out_session_col='session_id_real'):
    """
    Sessionize by gap_seconds per user. Returns sessionized df and session summary.
    """
    df = df.sort_values([user_col, time_col]).copy()
    df[time_col] = pd.to_datetime(df[time_col])
    df['prev_ts'] = df.groupby(user_col)[time_col].shift(1)
    df['gap_s'] = (df[time_col] - df['prev_ts']).dt.total_seconds()
    df['new_session'] = (df['gap_s'].isna()) | (df['gap_s'] > gap_seconds)
    df['sess_idx'] = df.groupby(user_col)['new_session'].cumsum().astype(int)
    df[out_session_col] = df[user_col].astype(str) + "__s" + df['sess_idx'].astype(str)
    # summary
    session_summary = df.groupby(out_session_col).agg(
        dataset=('dataset','first'),
        user_id=(user_col,'first'),
        start_time=(time_col,'min'),
        end_time=(time_col,'max'),
        session_length=('item_id','size')
    ).reset_index()
    # filter short and very long sessions
    valid_sessions = session_summary[
        (session_summary['session_length'] >= MIN_SESSION_LEN)
    ][out_session_col].tolist()
    df = df[df[out_session_col].isin(valid_sessions)].copy()
    df = df.drop(columns=['prev_ts','gap_s','new_session','sess_idx'], errors='ignore')
    return df, session_summary

def generate_prefix_target(df, session_col='session_id_real', item_col='item_id', max_prefix_len=MAX_PREFIX_LEN):
    """
    From a sessionized df ordered by timestamp, generate prefix->target pairs.
    Returns DataFrame with columns: dataset, user_id, session_id_real, prefix_len, prefix (space-separated), target
    """
    rows = []
    df = df.sort_values([session_col, 'timestamp'])
    grouped = df.groupby(session_col)
    for session_id, g in tqdm(grouped, desc="Generating prefix-target"):
        items = g[item_col].astype(str).tolist()
        if len(items) < 2: continue
        # optionally truncate to last (max_prefix_len + 1) items to limit explosion
        if len(items) > max_prefix_len + 1:
            items = items[-(max_prefix_len + 1):]
        for t in range(1, len(items)):
            prefix = items[:t]
            target = items[t]
            rows.append({
                'dataset': g['dataset'].iloc[0],
                'user_id': g['user_id'].iloc[0],
                session_col: session_id,
                'prefix_len': len(prefix),
                'prefix': " ".join(prefix),
                'target': str(target)
            })
        # flush periodically if very large
        if len(rows) >= 500_000:
            yield pd.DataFrame(rows)
            rows = []
    if rows:
        yield pd.DataFrame(rows)


#### B — Process Amazon using sliding windows (streaming)
This cell will build windowed sessions and save them to disk in chunked parquet files to avoid using too much RAM.


In [None]:
# Cell: process Amazon in streaming mode and write chunked parquet files
amazon_path = DATA_DIR / "amazon_books_2023_interactions.parquet"
assert amazon_path.exists(), "Amazon parquet not found; check path."

amazon_df = pd.read_parquet(amazon_path)  # full file; ensure enough RAM or use query to limit users
print("Loaded Amazon interactions:", len(amazon_df))

# Optional: focus on most active users to reduce processing time (uncomment to use)
# active_users = amazon_df['user_id'].value_counts().nlargest(500000).index.tolist()
# amazon_df = amazon_df[amazon_df['user_id'].isin(active_users)].copy()

out_base = OUT_DIR / "amazon_windows_sessions"
# remove old files if rerunning
for p in out_base.parent.glob("amazon_windows_sessions*.parquet"):
    try:
        p.unlink()
    except:
        pass

# stream generator yields DataFrame chunks
gen = build_sliding_windows(amazon_df, window_size=WINDOW_SIZE, stride=STRIDE)
chunk_i = 0
total_rows = 0
for chunk in gen:
    if len(chunk)==0:
        continue
    path = out_base.with_name(f"amazon_windows_sessions_part{chunk_i:03d}.parquet")
    chunk.to_parquet(path, index=False)
    chunk_i += 1
    total_rows += len(chunk)
    print(f"Wrote {path} rows={len(chunk):,}; total so far={total_rows:,}")

print("Done. Total rows written:", total_rows)


Loaded Amazon interactions: 27078467
Users to process: 9,614,012


Building windows per user:   0%|          | 16/9614012 [00:22<3671:01:18,  1.37s/it]

#### C — Combine chunked windows (optional) and inspect session stats
If you created chunked parquet parts you can inspect their stats and optionally concatenate into one file.


In [None]:
# Cell: read chunk files, compute session summary stats and optionally combine into single parquet (careful on RAM)
parts = sorted(OUT_DIR.glob("amazon_windows_sessions_part*.parquet"))
print("Found parts:", len(parts))
if len(parts)==0:
    raise FileNotFoundError("No amazon windows parts found. Did previous cell run?")

# compute per-part session summary (stream)
sess_rows = []
total_events = 0
for p in parts:
    df_part = pd.read_parquet(p)
    total_events += len(df_part)
    summary = df_part.groupby('session_id_real').size().reset_index(name='length')
    sess_rows.append(summary)
    print(p, "events:", len(df_part), "sessions:", len(summary))

# concatenate summaries to get global stats (session_ids unique by construction)
sess_all = pd.concat(sess_rows, ignore_index=True)
print("Approx sessions (may include duplicates across parts if same session spans parts):", len(sess_all))
print("Session length describe:")
print(sess_all['length'].describe(percentiles=[0.25,0.5,0.75,0.9,0.95,0.99]))
# optionally write a single combined sessions file (if RAM allows)
combined_path = OUT_DIR / "amazon_windows_sessions_combined.parquet"
# caution: concatenating all parts may require RAM; skip if huge
do_combine = False
if do_combine:
    frames = [pd.read_parquet(p) for p in parts]
    df_combined = pd.concat(frames, ignore_index=True)
    df_combined.to_parquet(combined_path, index=False)
    print("Wrote combined sessions to:", combined_path)


#### D — Generate prefix→target pairs for Amazon (stream chunked parts)
Read each window-session chunk and generate prefix-target pairs streaming to chunked parquet files.


In [None]:
# Cell: generate prefix-target for amazon parts and write chunked parquet
out_pairs_base = OUT_DIR / "amazon_prefix_target_part"
pair_i = 0
total_pairs = 0

for p in parts:
    df_part = pd.read_parquet(p)
    # generate prefix-target in streaming yields
    for chunk_pairs in generate_prefix_target(df_part, session_col='session_id_real', item_col='item_id', max_prefix_len=MAX_PREFIX_LEN):
        path = out_pairs_base.with_name(f"amazon_prefix_target_part{pair_i:03d}.parquet")
        chunk_pairs.to_parquet(path, index=False)
        pair_i += 1
        total_pairs += len(chunk_pairs)
        print(f"Wrote {path} (#pairs={len(chunk_pairs):,}); total pairs so far={total_pairs:,}")

print("Done generating amazon prefix-target pairs. Total pairs:", total_pairs)


#### E — Process MARS using 1-hour gap, then generate prefix-targets
This uses the sessionization-by-gap function and then generate prefix-target pairs (streamed).


In [None]:
# Cell: sessionize MARS by 1 hour gap
mars_path = DATA_DIR / "mars_interactions.parquet"
assert mars_path.exists(), "MARS parquet not found; check path"

mars_df = pd.read_parquet(mars_path)
print("Loaded MARS rows:", len(mars_df))

mars_session_gap = 60*60  # 1 hour
mars_sess_df, mars_summary = sessionize_by_gap(mars_df, gap_seconds=mars_session_gap, user_col='user_id', time_col='timestamp', out_session_col='session_id_real')

# Save sessionized mars
mars_sessions_path = OUT_DIR / "mars_sessions.parquet"
mars_sess_df.to_parquet(mars_sessions_path, index=False)
mars_summary_path = OUT_DIR / "mars_session_summary.csv"
mars_summary.to_csv(mars_summary_path, index=False)
print("Saved mars sessions and summary:", mars_sessions_path, mars_summary_path)
print("MARS sessions total:", mars_summary.shape[0])
print(mars_summary['session_length'].describe())


In [None]:
# Cell: generate prefix-target for MARS and save (small, in-memory ok)
mars_pairs = []
for chunk_pairs in generate_prefix_target(mars_sess_df, session_col='session_id_real', item_col='item_id', max_prefix_len=MAX_PREFIX_LEN):
    mars_pairs.append(chunk_pairs)
if len(mars_pairs):
    mars_pairs_df = pd.concat(mars_pairs, ignore_index=True)
else:
    mars_pairs_df = pd.DataFrame(columns=['dataset','user_id','session_id_real','prefix_len','prefix','target'])

mars_pairs_path = OUT_DIR / "mars_prefix_target.parquet"
mars_pairs_df.to_parquet(mars_pairs_path, index=False)
print("Saved MARS prefix-target pairs:", mars_pairs_path)
print("MARS pairs:", len(mars_pairs_df))
mars_pairs_df.head()


#### F — Quick diagnostics to print & paste back for review

Please paste the outputs (numbers & small table) here after you run:
- # amazon parts written, total events written
- approximate # amazon sessions (from summaries) and session length stats
- total amazon prefix-target pairs generated
- mars sessions count, mars session length describe
- mars prefix-target pairs count and pairs.head()


In [None]:
# Cell: print quick diagnostics summary (amazon and mars)
parts = sorted(OUT_DIR.glob("amazon_windows_sessions_part*.parquet"))
amazon_pairs_parts = sorted(OUT_DIR.glob("amazon_prefix_target_part*.parquet"))

print("Amazon window parts:", len(parts))
total_amz_events = sum(pd.read_parquet(p).shape[0] for p in parts)
print("Total amazon window events (sum of parts):", total_amz_events)

# session length approx from earlier step (sess_all may be available)
try:
    print("Approx amazon sessions (summary parts concat):", len(sess_all))
    print(sess_all['length'].describe(percentiles=[0.25,0.5,0.75,0.9,0.95,0.99]))
except NameError:
    print("Session summary not available in memory; check part outputs.")

# amazon pairs
total_amz_pairs = 0
for p in amazon_pairs_parts:
    dfp = pd.read_parquet(p)
    total_amz_pairs += len(dfp)
print("Amazon prefix-target parts:", len(amazon_pairs_parts), "total pairs:", total_amz_pairs)

# MARS stats
print("\nMARS sessions:", mars_summary.shape[0])
print(mars_summary['session_length'].describe())
print("MARS pairs:", len(mars_pairs_df))
mars_pairs_df.head()


#### G — Save metadata (paths & counts) for downstream notebooks

Write a small JSON manifest so later notebooks can find files easily.


In [None]:
manifest = {
    "amazon_windows_parts": [str(p) for p in sorted(OUT_DIR.glob("amazon_windows_sessions_part*.parquet"))],
    "amazon_prefix_target_parts": [str(p) for p in sorted(OUT_DIR.glob("amazon_prefix_target_part*.parquet"))],
    "mars_sessions": str(OUT_DIR / "mars_sessions.parquet"),
    "mars_session_summary": str(OUT_DIR / "mars_session_summary.csv"),
    "mars_prefix_target": str(OUT_DIR / "mars_prefix_target.parquet")
}
with open(OUT_DIR / "sessionization_manifest.json", "w") as f:
    json.dump(manifest, f, indent=2)
print("Saved manifest:", OUT_DIR / "sessionization_manifest.json")
