# ONE SOURCE TABLE FOR BI

In [1]:
# One Single Source Wide Table
from pathlib import Path
import pandas as pd

# ─── Paths ──────────────────────────────────────────────────────────────────────
PROJECT_ROOT = Path().resolve().parents[0]
PROC         = PROJECT_ROOT / "data" / "processed"

# ─── Load cleaned tables ───────────────────────────────────────────────────────
orders    = pd.read_parquet(PROC / "orders_clean.parquet")
items     = pd.read_parquet(PROC / "order_items_clean.parquet")
payments  = pd.read_parquet(PROC / "order_payments_clean.parquet")
reviews   = pd.read_parquet(PROC / "order_reviews_clean.parquet")
customers = pd.read_parquet(PROC / "customers_clean.parquet")[[
    "customer_id", "customer_unique_id"
]]

# ─── 1. Aggregate payments & reviews ────────────────────────────────────────────
pay_agg = (
    payments
      .groupby("order_id", as_index=False)
      .agg(
          total_paid = ("payment_value",     "sum"),
          n_payments = ("payment_sequential","count")
      )
)

rev_agg = (
    reviews
      .groupby("order_id", as_index=False)
      .agg(
          avg_score           = ("review_score",      "mean"),
          max_response_time   = ("response_time_days","max")
      )
)

# ─── 2. Build the wide fact table ───────────────────────────────────────────────
fact = (
    orders
      .merge(items,    on="order_id", how="left")
      .merge(pay_agg,  on="order_id", how="left")
      .merge(rev_agg,  on="order_id", how="left")
      .merge(customers, on="customer_id", how="left")
)

# ─── 3. Compute churn (no order in last 90 days) ────────────────────────────────
# reference date = last purchase in the dataset
reference_date = fact["order_purchase_timestamp"].max()
churn_cutoff    = reference_date - pd.Timedelta(days=90)

# last purchase per unique customer
last_purchase = (
    fact
      .groupby("customer_unique_id", as_index=False)
      ["order_purchase_timestamp"]
      .max()
      .rename(columns={"order_purchase_timestamp": "last_purchase_date"})
)

# churn flag: 1 = churned, 0 = active
last_purchase["churn"] = (
    last_purchase["last_purchase_date"] < churn_cutoff
).astype(int)

# merge churn info back into the fact
fact = fact.merge(
    last_purchase[
      ["customer_unique_id", "last_purchase_date", "churn"]
    ],
    on="customer_unique_id",
    how="left"
)

# ─── 4. Sanity checks ───────────────────────────────────────────────────────────
print("Fact shape:", fact.shape)
print("Nulls % per column:\n", fact.isnull().mean().round(3))
print("Churn distribution:\n", 
      fact[["customer_unique_id","churn"]]
          .drop_duplicates()["churn"]
          .value_counts())

# ─── 5. Save enriched fact table ────────────────────────────────────────────────
out_path = PROC / "sales_fact_with_churn.parquet"
fact.to_parquet(out_path, index=False)
print("✔️ saved", out_path)

# ─── Final peek ────────────────────────────────────────────────────────────────
fact.head()

Fact shape: (113425, 29)
Nulls % per column:
 order_id                         0.000
customer_id                      0.000
order_status                     0.000
order_purchase_timestamp         0.000
order_approved_at                0.001
order_delivered_carrier_date     0.017
order_delivered_customer_date    0.028
order_estimated_delivery_date    0.000
delivery_time_days               0.028
approval_lag_hours               0.001
days_diff_estimate               0.028
purchase_year                    0.000
purchase_month                   0.000
purchase_day                     0.000
purchase_week                    0.000
order_item_id                    0.007
product_id                       0.007
seller_id                        0.007
shipping_limit_date              0.007
price                            0.007
freight_value                    0.007
total_cost                       0.007
total_paid                       0.000
n_payments                       0.000
avg_score         

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date,delivery_time_days,approval_lag_hours,...,price,freight_value,total_cost,total_paid,n_payments,avg_score,max_response_time,customer_unique_id,last_purchase_date,churn
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18,8.0,0.178333,...,29.99,8.72,38.71,38.71,3.0,4.0,1.0,7c396fd4830fd04220f754e42b4e5bff,2017-10-02 10:56:33,1
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13,13.0,30.713889,...,118.7,22.76,141.46,141.46,1.0,4.0,0.0,af07308b275d755c9edb36a90c618231,2018-07-24 20:41:37,0
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04,9.0,0.276111,...,159.9,19.22,179.12,179.12,1.0,5.0,4.0,3a653a41f6f9fc3d2a113cf8398680e8,2018-08-08 08:38:49,0
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15,13.0,0.298056,...,45.0,27.2,72.2,72.2,1.0,5.0,2.0,7c142cf63193a1473d2e66489a9ae977,2017-11-18 19:28:06,1
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26,2.0,1.030556,...,19.9,8.72,28.62,28.62,1.0,5.0,1.0,72632f0f9dd73dfee390c9b22eb56dd6,2018-02-13 21:18:39,1


# 📈 1.0 Victor Wei — Feature Engineering Pipeline

> **Outputs** (4 parquets, one per modeling objective)
>
> | File | Grain | Primary target / usage |
> |------|-------|------------------------|
> | `order_features.parquet`   | Order (`order_id`) | `late_delivery` (binary) |
> | `churn_features.parquet`   | Customer (`customer_unique_id`) | `churn` (binary) |
> | `segment_features.parquet` | Customer | Unsupervised clustering |
> | `cltv_summary.parquet`     | Customer summary for BG/NBD + GG | Frequency / Recency / T / Monetary |

In [2]:
# ── Imports & Path setup ──────────────────────────────────────────────────────
from pathlib import Path
import pandas as pd
import numpy as np

PROJECT_ROOT = Path().resolve().parents[0]
PROC          = PROJECT_ROOT / "data" / "processed"


---
## A. Order-level features  → Late-delivery model  (LightGBM)

In [3]:
orders   = pd.read_parquet(PROC / "orders_clean.parquet")
items    = pd.read_parquet(PROC / "order_items_clean.parquet")
products = pd.read_parquet(PROC / "products_clean.parquet")[[
    "product_id", "product_volume_cm3", "product_density_g_cm3"
]]

order_items = (
    orders.merge(items, on="order_id", how="left")
          .merge(products, on="product_id", how="left")
)

order_items["late_delivery"] = (order_items["days_diff_estimate"] < 0).astype(int)

order_features = (
    order_items
      .groupby("order_id", as_index=False)
      .agg(
          late_delivery      =("late_delivery",      "max"),
          approval_lag_hours =("approval_lag_hours", "first"),
          purchase_month     =("order_purchase_timestamp", lambda s: s.iloc[0].month),
          purchase_dow       =("order_purchase_timestamp", lambda s: s.iloc[0].dayofweek),
          n_items            =("order_item_id",       "nunique"),
          order_price        =("price",               "sum"),
          order_freight      =("freight_value",       "sum"),
          avg_item_volume    =("product_volume_cm3",  "mean"),
          avg_item_density   =("product_density_g_cm3","mean"),
          delivery_time_days =("delivery_time_days",  "first"),
          est_delta_days     =("days_diff_estimate",  "first"),
      )
)
order_features.to_parquet(PROC / "order_features.parquet", index=False)
print("✔️ order_features.parquet saved")

✔️ order_features.parquet saved



---
## B. Customer-level features  (churn + segmentation)

In [4]:
fact = pd.read_parquet(PROC / "sales_fact_with_churn.parquet")
DATE_REF = fact["order_purchase_timestamp"].max()

rfm = (
    fact.groupby("customer_unique_id").agg(
        recency_days   =("order_purchase_timestamp", lambda s: (DATE_REF - s.max()).days),
        frequency      =("order_id",               "nunique"),
        monetary_value =("total_paid",             "sum"),
        first_purchase =("order_purchase_timestamp", "min"),
    ).reset_index()
)
rfm["cust_age_days"] = (DATE_REF - rfm["first_purchase"]).dt.days

behav = (
    fact.groupby("customer_unique_id").agg(
        avg_delivery_time_days =("delivery_time_days", "mean"),
        avg_approval_lag_hours =("approval_lag_hours", "mean"),
        avg_estimate_delta     =("days_diff_estimate", "mean"),
        avg_rating             =("avg_score", "mean"),
        avg_payment_count      =("n_payments", "mean"),
        avg_items_per_order    =("order_item_id", "nunique"),
    ).reset_index()
)


churn = fact[["customer_unique_id", "churn"]].drop_duplicates()

customer_full = (
    rfm.merge(behav, on="customer_unique_id")
       .merge(churn, on="customer_unique_id")
)

# 1️⃣  Churn feature set (with label)
customer_full.to_parquet(PROC / "churn_features.parquet", index=False)
print("✔️ churn_features.parquet saved")

# 2️⃣  Segmentation feature set (drop targets)
segment_cols = [c for c in customer_full.columns if c not in {"customer_unique_id", "churn", "first_purchase"}]
customer_full[["customer_unique_id"] + segment_cols].to_parquet(PROC / "segment_features.parquet", index=False)
print("✔️ segment_features.parquet saved")

✔️ churn_features.parquet saved
✔️ segment_features.parquet saved


In [10]:
customer_full.head()

Unnamed: 0,customer_unique_id,recency_days,frequency,monetary_value,first_purchase,cust_age_days,avg_delivery_time_days,avg_approval_lag_hours,avg_estimate_delta,avg_rating,avg_payment_count,avg_items_per_order,churn
0,0000366f3b9a7992bf8c76cfdf3221e2,160,1,141.9,2018-05-10 10:56:27,160,6.0,0.2475,4.0,5.0,1.0,1,1
1,0000b849f77a49e4a4ce2b2a4ca5be3f,163,1,27.19,2018-05-07 11:11:27,163,3.0,7.238056,4.0,4.0,1.0,1,1
2,0000f46a3911fa3c0805444483337064,585,1,86.22,2017-03-10 21:05:03,585,25.0,0.0,1.0,3.0,1.0,1,1
3,0000f6ccb0745a6a4b88665a16c9f078,369,1,43.62,2017-10-12 20:29:41,369,20.0,0.326667,11.0,4.0,1.0,1,1
4,0004aac84e0df4da2b147fca70cf8255,336,1,196.89,2017-11-14 19:45:42,336,13.0,0.352778,7.0,5.0,1.0,1,1


---
## C. Extended segmentation (Geography · Promo sensitivity · Tenure)

In [5]:
# ── Load fact + supporting tables ──────────────────────────────────────────────
fact         = pd.read_parquet(PROC / "sales_fact_with_churn.parquet")
customers    = pd.read_parquet(PROC / "customers_clean.parquet")[[
    "customer_unique_id", "customer_state"
]]
rfm          = pd.read_parquet(PROC / "churn_features.parquet")[[ 
    "customer_unique_id", "cust_age_days"
]]
customer_full = pd.read_parquet(PROC / "churn_features.parquet")

# ── 1) Geography — top-10 state one-hots ────────────────────────────────────────
fact = fact.merge(customers, on="customer_unique_id", how="left")
state_counts = fact["customer_state"].value_counts()
TOP_STATES   = state_counts.head(10).index

fact["state_grouped"] = np.where(
    fact["customer_state"].isin(TOP_STATES),
    fact["customer_state"],
    "OTHER"
)

geo = (
    pd.get_dummies(
        fact[["customer_unique_id", "state_grouped"]],
        columns=["state_grouped"],
        dtype="int"
    )
    .groupby("customer_unique_id", as_index=False)
    .max()
)

# ── 2) Promotion sensitivity — share of orders in promo months ────────────────
PROMO_MONTHS = [6, 11, 12]
fact["is_promo"] = fact["order_purchase_timestamp"]\
    .dt.month.isin(PROMO_MONTHS).astype(int)

promo = (
    fact
      .groupby("customer_unique_id", as_index=False)
      .agg(promo_share=("is_promo", "mean"))
)

# ── 3) Tenure bucket — one-hot of cust_age_days ────────────────────────────────
TENURE_BINS   = [0, 180, 365, 730, np.inf]
TENURE_LABELS = ["<6m", "6-12m", "1-2y", ">2y"]

rfm["tenure_bucket"] = pd.cut(
    rfm["cust_age_days"],
    bins=TENURE_BINS,
    labels=TENURE_LABELS
)

tenure = pd.get_dummies(
    rfm[["customer_unique_id", "tenure_bucket"]],
    columns=["tenure_bucket"],
    dtype="int"
)

# ── 4) Assemble extended segmentation feature table ───────────────────────────
seg_ext = (
    customer_full
      .drop(columns=["churn", "first_purchase"])
      .merge(geo,    on="customer_unique_id", how="left")
      .merge(promo,  on="customer_unique_id", how="left")
      .merge(tenure, on="customer_unique_id", how="left")
)

# ── 5) Save extended segmentation features ────────────────────────────────────
seg_ext.to_parquet(
    PROC / "segment_features_extended.parquet",
    index=False
)
print("✔️ segment_features_extended.parquet saved")

✔️ segment_features_extended.parquet saved


*Why add these?*
* **Geography** splits regional behaviour & logistics cost.
* **Promo share** isolates bargain‑hunters.
* **Tenure bucket** respects lifecycle stage without skewing distances.


---
## D. CLTV summary data (BG/NBD + GammaGamma)

In [11]:
# Pick just the RFM + T columns
cltv_summary = customer_full[[
    "customer_unique_id",
    "frequency",
    "recency_days",
    "cust_age_days",
    "monetary_value"
]].rename(columns={
    "recency_days":   "recency_cal",  # lifetimes naming
    "cust_age_days":  "T"
})

# Save out for BG/NBD + Gamma-Gamma
out_path = PROC / "cltv_summary.parquet"
cltv_summary.to_parquet(out_path, index=False)
print("✔️ cltv_summary.parquet saved:", out_path)

✔️ cltv_summary.parquet saved: /home/gwei4/e_commerce_customer_ai_platform/data/processed/cltv_summary.parquet
