# eCommerce Visit Segmentation (Session Clustering) — Scalable, Production-Minded Baseline

This notebook is a **Mercor-style** rewrite of `tds_session_clustering.ipynb` with a focus on:

- Correct **visit/session grain** (with guardrails)
- Warehouse-first feature engineering (**BigQuery SQL → visit-level feature table**)
- Standardized preprocessing (NA handling, outlier clipping, scaling)
- Defensible clustering workflow (K diagnostics + interpretability)
- Artifact outputs designed for production (labels, centroids, diagnostics, run report)
- **SMOKE_MODE** so the notebook runs end-to-end without BigQuery credentials

> You can productionize this by extracting the SQL into `sql/features.sql`, packaging the Python into `src/`, and orchestrating runs with Airflow/Prefect/Dagster.


## 0) Configuration

Environment variables (optional):
- `GOOGLE_PROJECT_ID` (required unless `SMOKE_MODE=1`)
- `DATASET` (default: `bigquery-public-data.google_analytics_sample`)
- `TABLE_PATTERN` (default: `ga_sessions_*`)
- `DATE_FROM`, `DATE_TO` (YYYYMMDD; end is exclusive)
- `K_MIN`, `K_MAX`, `K_FINAL`
- `Z_CLIP` (winsorization threshold in z-score space)
- `OUT_DIR` (artifact path)
- `SEED`
- `SMOKE_MODE=1` (run on synthetic sessions; good for CI / quick review)


In [None]:
import os
from dataclasses import dataclass

@dataclass(frozen=True)
class Config:
    # BigQuery inputs
    project_id: str = os.environ.get("GOOGLE_PROJECT_ID", "")
    dataset: str = os.environ.get("DATASET", "bigquery-public-data.google_analytics_sample")
    table_pattern: str = os.environ.get("TABLE_PATTERN", "ga_sessions_*")
    date_from: str = os.environ.get("DATE_FROM", "20170101")
    date_to: str = os.environ.get("DATE_TO", "20170201")  # exclusive end

    # Modeling
    k_min: int = int(os.environ.get("K_MIN", "2"))
    k_max: int = int(os.environ.get("K_MAX", "12"))
    k_final: int = int(os.environ.get("K_FINAL", "6"))
    z_clip: float = float(os.environ.get("Z_CLIP", "5.0"))
    seed: int = int(os.environ.get("SEED", "42"))

    # Outputs
    out_dir: str = os.environ.get("OUT_DIR", "./artifacts")

    # Execution mode
    smoke_mode: bool = os.environ.get("SMOKE_MODE", "0") == "1"

cfg = Config()
cfg


In [None]:
import json
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score

np.random.seed(cfg.seed)


## 1) Helper functions (guardrails + preprocessing)

Production mindset:
- Fail fast on grain violations (`visit_id` must be unique).
- Clip extreme outliers before scaling so K-Means doesn't chase anomalies.
- Keep the feature set explicit and auditable.


In [None]:
def assert_unique_grain(df: pd.DataFrame, key: str = "visit_id") -> None:
    if key not in df.columns:
        raise ValueError(f"Missing required grain key column: {key}")
    dupes = int(df[key].duplicated().sum())
    if dupes:
        raise ValueError(
            f"{key} is not unique ({dupes} duplicates). "
            "Your joins/aggregations likely multiplied rows."
        )

def preprocess_features(df: pd.DataFrame, features: list[str], z_clip: float = 5.0):
    missing = [c for c in features if c not in df.columns]
    if missing:
        raise ValueError(f"Missing feature columns: {missing}")

    X = df[features].copy().fillna(0)

    # winsorize via z-score clipping (robust enough baseline; swap with quantile clip if preferred)
    mu = X.mean()
    sigma = X.std(ddof=0).replace(0, np.nan)
    z = (X - mu) / sigma
    X_clip = (
        X.mask(z > z_clip, mu + z_clip * sigma)
         .mask(z < -z_clip, mu - z_clip * sigma)
         .fillna(X)
    )

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_clip)

    return X_clip, X_scaled, scaler

def k_diagnostics(X_scaled: np.ndarray, k_min: int, k_max: int, seed: int) -> pd.DataFrame:
    rows = []
    for k in range(k_min, k_max + 1):
        km = KMeans(n_clusters=k, n_init="auto", random_state=seed)
        labels = km.fit_predict(X_scaled)
        rows.append({
            "k": k,
            "inertia_ssd": float(km.inertia_),
            "silhouette": float(silhouette_score(X_scaled, labels)),
            "davies_bouldin": float(davies_bouldin_score(X_scaled, labels)),
            "calinski_harabasz": float(calinski_harabasz_score(X_scaled, labels)),
        })
    return pd.DataFrame(rows)


## 2) Ingestion

Two modes:

### A) BigQuery mode (default)
- Runs a **visit-level** feature SQL query.
- Requires `GOOGLE_PROJECT_ID`.

### B) Smoke mode
- Set `SMOKE_MODE=1` to run end-to-end on synthetic sessions.
- This is useful for CI, quick validation, and reviewer environments.


In [None]:
def make_synthetic_sessions(n: int = 2000, seed: int = 42) -> pd.DataFrame:
    rng = np.random.default_rng(seed)

    # Three archetypes: bouncers, researchers, buyers
    archetype = rng.choice([0, 1, 2], size=n, p=[0.45, 0.45, 0.10])

    pageviews = np.where(archetype == 0, rng.poisson(2, n),
                np.where(archetype == 1, rng.poisson(8, n), rng.poisson(12, n)))
    hits = pageviews + rng.poisson(5, n)
    unique_pages = np.minimum(pageviews, np.where(archetype == 0, rng.poisson(2, n), rng.poisson(6, n)))

    time_on_site_sec = np.where(archetype == 0, rng.normal(35, 20, n),
                       np.where(archetype == 1, rng.normal(240, 80, n), rng.normal(420, 120, n)))
    time_on_site_sec = np.clip(time_on_site_sec, 0, None)

    transactions = np.where(archetype == 2, rng.binomial(1, 0.55, n), rng.binomial(1, 0.02, n))
    transaction_revenue = transactions * np.where(archetype == 2, rng.normal(85000, 25000, n), rng.normal(15000, 5000, n))
    transaction_revenue = np.clip(transaction_revenue, 0, None)

    df = pd.DataFrame({
        "visit_id": [f"v{i}" for i in range(n)],
        "date_id": pd.to_datetime("2026-01-01"),
        "pageviews": pageviews.astype(int),
        "hits": hits.astype(int),
        "unique_pages": unique_pages.astype(int),
        "time_on_site_sec": time_on_site_sec.astype(float),
        "transactions": transactions.astype(int),
        "transaction_revenue": transaction_revenue.astype(float),
    })

    df["time_on_site_sec_per_pageview"] = df["time_on_site_sec"] / df["pageviews"].replace(0, 1)
    df["hits_per_pageview"] = df["hits"] / df["pageviews"].replace(0, 1)
    return df


In [None]:
def build_feature_query(dataset: str, table_pattern: str, date_from: str, date_to: str) -> str:
    # Keep this query small and auditable; extract to sql/features.sql in production.
    return f"""
    WITH base AS (
      SELECT
        CONCAT(CAST(fullVisitorId AS STRING), '-', CAST(visitId AS STRING)) AS visit_id,
        PARSE_DATE('%Y%m%d', date) AS date_id,
        totals.hits AS hits,
        totals.pageviews AS pageviews,
        totals.timeOnSite AS time_on_site_sec,
        totals.transactions AS transactions,
        totals.transactionRevenue AS transaction_revenue
      FROM `{dataset}.{table_pattern}`
      WHERE _TABLE_SUFFIX >= '{date_from}' AND _TABLE_SUFFIX < '{date_to}'
    ),
    pages AS (
      SELECT
        CONCAT(CAST(fullVisitorId AS STRING), '-', CAST(visitId AS STRING)) AS visit_id,
        COUNT(DISTINCT h.page.pagePath) AS unique_pages
      FROM `{dataset}.{table_pattern}`, UNNEST(hits) h
      WHERE _TABLE_SUFFIX >= '{date_from}' AND _TABLE_SUFFIX < '{date_to}'
      GROUP BY 1
    )
    SELECT
      b.*,
      p.unique_pages,
      SAFE_DIVIDE(b.time_on_site_sec, NULLIF(b.pageviews, 0)) AS time_on_site_sec_per_pageview,
      SAFE_DIVIDE(b.hits, NULLIF(b.pageviews, 0)) AS hits_per_pageview
    FROM base b
    LEFT JOIN pages p USING (visit_id)
    """


In [None]:
def load_from_bigquery(cfg: Config) -> pd.DataFrame:
    if not cfg.project_id:
        raise ValueError("GOOGLE_PROJECT_ID is required unless SMOKE_MODE=1.")
    from google.cloud import bigquery
    client = bigquery.Client(project=cfg.project_id)
    sql = build_feature_query(cfg.dataset, cfg.table_pattern, cfg.date_from, cfg.date_to)
    return client.query(sql).result().to_dataframe(create_bqstorage_client=True)

df = make_synthetic_sessions(seed=cfg.seed) if cfg.smoke_mode else load_from_bigquery(cfg)
df.head()


## 3) Guardrails (grain + basic validity)

These checks prevent silent metric corruption (common failure mode in analytics pipelines).


In [None]:
assert_unique_grain(df, "visit_id")

for col in ["pageviews", "hits", "time_on_site_sec"]:
    if (df[col].fillna(0) < 0).any():
        raise ValueError(f"Invalid negative values detected in {col}.")

df.agg(
    n_rows=("visit_id", "size"),
    n_visits=("visit_id", "nunique"),
    min_date=("date_id", "min"),
    max_date=("date_id", "max"),
).to_frame("value")


## 4) Feature set + preprocessing

Keep feature selection **small and auditable**.
In production you would:
- own a feature contract (schema + tests)
- evaluate drift on each feature


In [None]:
FEATURES = [
    "pageviews",
    "hits",
    "unique_pages",
    "time_on_site_sec",
    "time_on_site_sec_per_pageview",
    "hits_per_pageview",
    "transactions",
    "transaction_revenue",
]

X_clip, X_scaled, scaler = preprocess_features(df, FEATURES, z_clip=cfg.z_clip)
X_clip.head()


## 5) Choose K (multi-metric diagnostics)

We use several diagnostics together. Final K should also pass:
- interpretability
- cluster size sanity (no tiny artifact clusters unless expected)
- temporal stability (segment mix doesn't thrash)


In [None]:
diag = k_diagnostics(X_scaled, cfg.k_min, cfg.k_max, cfg.seed)
diag


In [None]:
plt.figure(figsize=(8, 3))
plt.plot(diag["k"], diag["inertia_ssd"], marker="o")
plt.title("SSD / Inertia (Elbow)")
plt.xlabel("k"); plt.ylabel("inertia")
plt.tight_layout()
plt.show()

plt.figure(figsize=(8, 3))
plt.plot(diag["k"], diag["silhouette"], marker="o")
plt.title("Silhouette (higher is better)")
plt.xlabel("k"); plt.ylabel("score")
plt.tight_layout()
plt.show()

plt.figure(figsize=(8, 3))
plt.plot(diag["k"], diag["davies_bouldin"], marker="o")
plt.title("Davies–Bouldin (lower is better)")
plt.xlabel("k"); plt.ylabel("score")
plt.tight_layout()
plt.show()


## 6) Fit final model + interpret clusters

Deliverables:
- `session_clusters`: visit_id → cluster label
- `cluster_centers`: centroids in original feature units
- size distribution for sanity checks


In [None]:
model = KMeans(n_clusters=cfg.k_final, n_init="auto", random_state=cfg.seed)
labels = model.fit_predict(X_scaled)

session_clusters = df[["visit_id", "date_id"]].copy()
session_clusters["cluster"] = labels.astype(int)

sizes = session_clusters["cluster"].value_counts().sort_index().to_frame("n_sessions")
sizes["pct_sessions"] = sizes["n_sessions"] / sizes["n_sessions"].sum()
sizes


In [None]:
cluster_centers = pd.DataFrame(
    scaler.inverse_transform(model.cluster_centers_),
    columns=FEATURES,
)
cluster_centers["cluster"] = range(cfg.k_final)

# Helpful for interpretation: order clusters by "intent proxy" (transactions/revenue)
cluster_centers.sort_values(["transactions", "transaction_revenue"], ascending=False)


## 7) Temporal stability (segment mix drift)

In production, drift monitoring is often more valuable than a one-time clustering metric.


In [None]:
mix = session_clusters.groupby(["date_id", "cluster"]).size().rename("n").reset_index()
tot = mix.groupby("date_id")["n"].sum().rename("n_total").reset_index()
mix = mix.merge(tot, on="date_id", how="left")
mix["pct"] = mix["n"] / mix["n_total"]

pivot = mix.pivot_table(index="date_id", columns="cluster", values="pct", fill_value=0).sort_index()

plt.figure(figsize=(10, 4))
for c in pivot.columns:
    plt.plot(pivot.index, pivot[c], label=f"cluster {c}")
plt.title("Cluster Mix Over Time (pct sessions)")
plt.xlabel("date"); plt.ylabel("pct")
plt.legend(ncol=3, fontsize=8)
plt.tight_layout()
plt.show()


## 8) Persist artifacts

Artifacts are the minimum useful outputs for production use:
- session labels
- centroids
- k diagnostics
- run report (config + stats)


In [None]:
Path(cfg.out_dir).mkdir(parents=True, exist_ok=True)

session_clusters_path = Path(cfg.out_dir) / "session_clusters.parquet"
cluster_centers_path = Path(cfg.out_dir) / "cluster_centers.parquet"
k_diag_path = Path(cfg.out_dir) / "k_diagnostics.parquet"
report_path = Path(cfg.out_dir) / "model_report.json"

session_clusters.to_parquet(session_clusters_path, index=False)
cluster_centers.to_parquet(cluster_centers_path, index=False)
diag.to_parquet(k_diag_path, index=False)

report = {
    "run_utc": datetime.datetime.utcnow().isoformat() + "Z",
    "smoke_mode": bool(cfg.smoke_mode),
    "config": cfg.__dict__,
    "n_sessions": int(len(session_clusters)),
    "k_final": int(cfg.k_final),
    "cluster_sizes": sizes.reset_index().rename(columns={"index": "cluster"}).to_dict(orient="records"),
}
report_path.write_text(json.dumps(report, indent=2))

[str(session_clusters_path), str(cluster_centers_path), str(k_diag_path), str(report_path)]
