In [1]:
# User LTV problem from the rel-amazon dataset
# on relbench: https://relbench.stanford.edu/datasets/rel-amazon/#user-ltv

# 1) https://demo.kurve.ai
# 2) create graph of rel-amazon data
# 3) assign user as parent node customer with depth 4

In [2]:
import pandas as pd
from torch_frame.utils import infer_df_stype
import catboost
from sklearn import metrics

In [3]:
# train cut date of 10/1/15
train_path = 'https://kurve-customers.s3.amazonaws.com/4e1a245a-3065-4600-bb0e-a92e06ee835c/5/output/user_ltv_train'

In [4]:
df = pd.read_parquet(train_path)


In [5]:
df.shape

(1850193, 37)

In [52]:
df.head()

Unnamed: 0,cust_customer_id,cust_customer_name,revi_review_time_min,revi_review_time_max,revi_customer_id_count,revi_rating_avg,revi_rating_sum,revi_rating_min,revi_rating_max,revi_verified_sum,...,revi_3dv4_change,revi_4dv7_change,revi_7dv14_change,revi_14dv30_change,revi_30dv60_change,revi_60dv90_change,revi_90dv180_change,revi_180dv365_change,revi_365dv730_change,revi_price_label
0,730926,Deborah T. Nix,NaT,NaT,,,,,,,...,,,,,,,,,,212.54
1,730948,Amazon Customer,NaT,NaT,,,,,,,...,,,,,,,,,,34.57
2,730959,summerblonde,NaT,NaT,,,,,,,...,,,,,,,,,,93.37
3,731013,gfd,NaT,NaT,,,,,,,...,,,,,,,,,,65.83
4,731036,Amazon Customer,NaT,NaT,,,,,,,...,,,,,,,,,,36.05


In [54]:
df[~df['revi_review_time_max'].isnull()].shape

(974228, 37)

In [55]:
df[~df['revi_review_time_max'].isnull()][target].max()

26445.200000000015

In [56]:
target = [c for c in df.columns if 'label' in c][0]

In [57]:
target

'revi_price_label'

In [58]:
df[target]

0          212.54
1           34.57
2           93.37
3           65.83
4           36.05
            ...  
1850188       NaN
1850189       NaN
1850190       NaN
1850191       NaN
1850192       NaN
Name: revi_price_label, Length: 1850193, dtype: float64

In [59]:
df[target].apply(lambda x: x if not pd.isnull(x) else 0)

0          212.54
1           34.57
2           93.37
3           65.83
4           36.05
            ...  
1850188      0.00
1850189      0.00
1850190      0.00
1850191      0.00
1850192      0.00
Name: revi_price_label, Length: 1850193, dtype: float64

In [60]:
df[target] = df[target].apply(lambda x: x if not pd.isnull(x) else 0)

In [62]:
# execute compute graph with
# cut date 1/1/2016
test = pd.read_parquet(train_path)

In [63]:
test.shape

(1850193, 37)

In [64]:
test.head()

Unnamed: 0,cust_customer_id,cust_customer_name,revi_review_time_min,revi_review_time_max,revi_customer_id_count,revi_rating_avg,revi_rating_sum,revi_rating_min,revi_rating_max,revi_verified_sum,...,revi_3dv4_change,revi_4dv7_change,revi_7dv14_change,revi_14dv30_change,revi_30dv60_change,revi_60dv90_change,revi_90dv180_change,revi_180dv365_change,revi_365dv730_change,revi_price_label
0,912886,laura,NaT,NaT,,,,,,,...,,,,,,,,,,89.6
1,912952,Amazon Customer,NaT,NaT,,,,,,,...,,,,,,,,,,69.27
2,913185,Kate,NaT,NaT,,,,,,,...,,,,,,,,,,169.5
3,913360,Amazon Customer,NaT,NaT,,,,,,,...,,,,,,,,,,77.38
4,913534,Dwight Byrd,NaT,NaT,,,,,,,...,,,,,,,,,,126.53


In [65]:
test[target] = test[target].apply(lambda x: x if not pd.isnull(x) else 0)

In [66]:
stypes = infer_df_stype(test)


In [67]:
features = [
    k for k,v in stypes.items()
    if str(v) == 'numerical'
    and 'label' not in k
    and not k.startswith('cust_')
]
features

['revi_customer_id_count',
 'revi_rating_avg',
 'revi_rating_sum',
 'revi_verified_sum',
 'revi_price_avg',
 'revi_price_sum',
 'revi_price_min',
 'revi_price_max',
 'revi_num_events_1d',
 'revi_num_events_3d',
 'revi_num_events_4d',
 'revi_num_events_7d',
 'revi_num_events_14d',
 'revi_num_events_30d',
 'revi_num_events_60d',
 'revi_num_events_90d',
 'revi_num_events_180d',
 'revi_num_events_365d',
 'revi_num_events_730d',
 'revi_1dv3_change',
 'revi_3dv4_change',
 'revi_4dv7_change',
 'revi_7dv14_change',
 'revi_14dv30_change',
 'revi_30dv60_change',
 'revi_60dv90_change',
 'revi_90dv180_change',
 'revi_180dv365_change',
 'revi_365dv730_change']

In [68]:
df = df[~df['revi_review_time_max'].isnull()]
test = test[~test['revi_review_time_max'].isnull()]

In [69]:
import numpy as np
import pandas as pd
from pandas.api.types import is_object_dtype, is_categorical_dtype, is_bool_dtype
from sklearn.model_selection import KFold, train_test_split
from sklearn.metrics import mean_absolute_error
from catboost import CatBoostRegressor, Pool

# --------------------------------------------
# 0) Split once into train_full / test
# --------------------------------------------
X_train_full, X_test, y_train_full, y_test = train_test_split(
    df[features], df[target],
    test_size=0.20,
    random_state=42
)

# --------------------------------------------
# Identify categorical columns (by name)
# --------------------------------------------
cat_features = [
    col for col in X_train_full.columns
    if is_object_dtype(X_train_full[col])
       or is_categorical_dtype(X_train_full[col])
       or is_bool_dtype(X_train_full[col])
]
print(f"Categorical features: {cat_features}")

# Optional (harmless): ensure 'category' dtype for object-y cats (speeds up a bit)
for c in cat_features:
    if not is_categorical_dtype(X_train_full[c]):
        X_train_full[c] = X_train_full[c].astype('category')
        X_test[c]       = X_test[c].astype('category')

# --------------------------------------------
# 1) K-Fold CV on train_full (no log transforms)
# --------------------------------------------
k = 3
kf = KFold(n_splits=k, shuffle=True, random_state=42)

fold_maes = []
test_preds = np.zeros(len(X_test))        # ensemble predictions on test (original scale)
oof_preds  = np.zeros(len(X_train_full))  # optional OOF preds (original scale)
best_iters = []

for fold, (idx_tr, idx_va) in enumerate(kf.split(X_train_full), 1):
    print(f"\n=== Fold {fold} ===")

    X_tr, X_va = X_train_full.iloc[idx_tr], X_train_full.iloc[idx_va]
    y_tr, y_va = y_train_full.iloc[idx_tr], y_train_full.iloc[idx_va]

    train_pool = Pool(X_tr, y_tr, cat_features=cat_features)
    valid_pool = Pool(X_va, y_va, cat_features=cat_features)

    mdl = CatBoostRegressor(
        # objective & metrics (L1 on original scale)
        loss_function="MAE",
        eval_metric="MAE",
        custom_metric=["MAE", "RMSE", "R2"],
        use_best_model=True,

        # capacity vs regularization
        iterations=2000,
        learning_rate=0.03,
        depth=7,
        l2_leaf_reg=10,

        # randomness / bagging / feature subsampling
        bootstrap_type="Bayesian",
        bagging_temperature=0.75,
        rsm=0.8,
        random_strength=0.5,

        # tree / leaves
        feature_border_type="GreedyLogSum",
        min_data_in_leaf=20,
        boosting_type="Plain",

        # early stopping
        od_type="Iter",
        od_wait=150,

        verbose=False
    )

    mdl.fit(train_pool, eval_set=valid_pool)
    best_iters.append(mdl.best_iteration_ if mdl.best_iteration_ is not None else mdl.tree_count_)

    # ---- Fold validation (MAE on original scale) ----
    val_pred = mdl.predict(valid_pool)
    val_mae = mean_absolute_error(y_va, val_pred)
    fold_maes.append(val_mae)
    print(f"Fold {fold} validation MAE : {val_mae:.4f}")

    # ---- Accumulate ensemble predictions on the held-out test ----
    test_preds += mdl.predict(Pool(X_test, cat_features=cat_features)) / k

    # (optional) OOF predictions
    oof_preds[idx_va] = val_pred

# --------------------------------------------
# 2) CV summary and final test metric (original scale)
# --------------------------------------------
print("\n=== CV Summary ===")
print(f"Mean CV MAE : {np.mean(fold_maes):.4f} ± {np.std(fold_maes):.4f}")
print(f"Folds MAE   : {[f'{a:.4f}' for a in fold_maes]}")

test_mae = mean_absolute_error(y_test, test_preds)
print(f"\nFinal Test MAE (averaged over {k} folds): {test_mae:.4f}")

yhat_test = test_preds  # already in original target units

# --------------------------------------------
# 3) Refit on the full training data for deployment
# --------------------------------------------
final_iters = int(np.median(best_iters)) if len(best_iters) > 0 else 2000

final_mdl = CatBoostRegressor(
    loss_function="MAE",
    eval_metric="MAE",
    custom_metric=["MAE", "RMSE", "R2"],
    use_best_model=True,

    iterations=final_iters,
    learning_rate=0.03,
    depth=7,
    l2_leaf_reg=10,

    bootstrap_type="Bayesian",
    bagging_temperature=0.75,
    rsm=0.8,
    random_strength=0.5,

    feature_border_type="GreedyLogSum",
    min_data_in_leaf=20,
    boosting_type="Plain",

    od_type="Iter",
    od_wait=150,

    verbose=False
)

final_mdl.fit(
    Pool(X_train_full, y_train_full, cat_features=cat_features),
    eval_set=Pool(X_test, y_test, cat_features=cat_features)  # sanity eval on original scale
)

  or is_categorical_dtype(X_train_full[col])


Categorical features: []

=== Fold 1 ===
Fold 1 validation MAE : 53.1038

=== Fold 2 ===
Fold 2 validation MAE : 52.2943

=== Fold 3 ===
Fold 3 validation MAE : 52.9586

=== CV Summary ===
Mean CV MAE : 52.7856 ± 0.3524
Folds MAE   : ['53.1038', '52.2943', '52.9586']

Final Test MAE (averaged over 3 folds): 53.0793


<catboost.core.CatBoostRegressor at 0x378100eb0>

In [70]:
test['pred'] = final_mdl.predict(test[features])

In [71]:
metrics.mean_absolute_error(test[target], test['pred'])

40.18221007376014

In [72]:
list(reversed(sorted(zip(mdl.feature_names_, mdl.feature_importances_),key=lambda x: x[1])))

[('revi_num_events_730d', 10.576783038049822),
 ('revi_num_events_365d', 9.853151484802348),
 ('revi_num_events_180d', 9.300702610183068),
 ('revi_num_events_90d', 7.723936504856479),
 ('revi_customer_id_count', 6.61560405966247),
 ('revi_num_events_60d', 5.597997240989586),
 ('revi_verified_sum', 5.068008594035822),
 ('revi_num_events_30d', 3.9102553305807684),
 ('revi_rating_sum', 3.807379631668409),
 ('revi_365dv730_change', 3.5121299322827335),
 ('revi_rating_avg', 3.3906741924436465),
 ('revi_180dv365_change', 3.351962906616119),
 ('revi_30dv60_change', 3.2269154430181017),
 ('revi_90dv180_change', 3.1999211907816574),
 ('revi_price_max', 2.7878125575635555),
 ('revi_60dv90_change', 2.7264368755569572),
 ('revi_14dv30_change', 2.5646350657369545),
 ('revi_price_avg', 2.14842949145279),
 ('revi_num_events_14d', 1.9079139329298072),
 ('revi_7dv14_change', 1.5428716977927486),
 ('revi_num_events_4d', 1.4737400914684666),
 ('revi_price_min', 1.1498075813205504),
 ('revi_num_events_7d'

In [73]:
test[target].mean()

50.57594678165461

In [None]:
test.head()

In [None]:
import duckdb

In [None]:
metrics.mean_absolute_error(test[target], test[target].apply(lambda x: 7.3079799512807595))

In [36]:
test[test['cust_customer_id'] == 72743]

Unnamed: 0,cust_customer_id,cust_customer_name,revi_review_time_min,revi_review_time_max,revi_customer_id_count,revi_rating_avg,revi_rating_sum,revi_rating_min,revi_rating_max,revi_verified_sum,...,revi_4dv7_change,revi_7dv14_change,revi_14dv30_change,revi_30dv60_change,revi_60dv90_change,revi_90dv180_change,revi_180dv365_change,revi_365dv730_change,revi_price_label,pred
650191,72743,Jennifer Donovan,2008-01-17,2015-12-31,460,4.121739,1896.0,1.0,5.0,25.0,...,1.0,0.5,0.666667,0.5,1.0,0.857143,0.411765,0.303571,138.01,43.3701


In [39]:
rev = pd.read_parquet('/usr/local/lake/relbench/rel-amazon/review.parquet')

In [47]:
rev[
(rev['customer_id'] == 72743)
&
(rev['review_time'] > '2008-01-03')
&
(rev['review_time'] < '2010-01-03')
#&
#(rev['review_time'] >= '2015-10-01')
].price.sum()

1238.0500000000002

In [45]:
rev.customer_id.unique().shape

(1815811,)

In [61]:
rev.review_time.max()

Timestamp('2018-09-28 00:00:00')