In [None]:
from utils import spark_session, save_table

session = spark_session("purchase-suppression-recs-eval")

# Sample users from MADCDL TOPO RVI module
```
1. keep the RVI item leaf categories, timestamp and user_id
2. keep the RVI items and their clicks/gmv_7d
```

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType



@F.udf(returnType=ArrayType(StringType()))
def item_and_clicks(items, clicks, gmv7d, gmv30d):
    its = [itm.split(":")[0] for itm in items] 
    clks = set([clk.split(":")[5] for clk in clicks])
    gmvs7d = {itm.split(":")[1].split(".")[0]: itm.split(":")[13] for itm in gmv7d}
    gmvs30d = {itm.split(":")[1].split(".")[0]: itm.split(":")[13] for itm in gmv30d}
    itm_clks = list()
    for i, itm in enumerate(its):
        clicked = 1 if itm in clks else 0
        igmv7d = gmvs7d.get(itm, 0)
        igmv30d = gmvs30d.get(itm, 0)
        itm_clks.append(f"{i}:{itm}:{clicked}:{igmv7d}:{igmv30d}")
    return itm_clks
        

#sample users
placement_id = 101843 #VLP Homepage - SIM RVI - dWeb - Signed In
#"101949" #VLP Homepage - RVI TOPO RVI Signed In [Multi Column]
site_id = 0
start_dt, end_dt = '20241001', '20241007'
sample_ratio = 0.1

df = session.table("bpe_v.plmerch_data_v")
df = (df.where((df.dt.between(start_dt,end_dt)) & (df.icfbot == '00') & (F.length(df.euid) == 0) & (df.plmt == placement_id) 
              & (F.size(df.clicks) > 0)  & (df.site_id == site_id) )
        .select("user_id", "merch_date", "algo_output_items", "clicks", "gmv_7d", "gmv_30d")
        .sample(False, sample_ratio).limit(100000)
        .withColumn("item", F.explode(item_and_clicks(F.col("algo_output_items"), F.col("clicks"), F.col("gmv_7d"), F.col("gmv_30d"))))
        .drop("algo_output_items").drop("clicks").drop("gmv_7d")
        .withColumn("rank", F.split(F.col("item"), ":")[0])
        .withColumn("item_id", F.split(F.col("item"), ":")[1])
        .withColumn("clicked", F.split(F.col("item"), ":")[2])
        .withColumn("gmv_7d", F.split(F.col("item"), ":")[3])
        .withColumn("gmv_30d", F.split(F.col("item"), ":")[4])
        .drop("item")
     )

# get leaf categories
df_lst = session.table("access_views.DW_LSTG_ITEM")
df = df.alias("df")
df = df.join(df_lst, "item_id").select("df.*",df_lst.leaf_categ_id)

save_table(session, df, f"bx_ps_res_sample_{placement_id}")

# Enhance samples with required data for feature extraction

In [None]:
from pyspark_query import get_minimal_vi_histories, get_minimal_purchase_histories, get_leaf_cat_data

config = dict()
config["filters"] = {
    "start_dt": "2023-10-01",
    "end_dt": "2024-09-30",
    "site_ids": [0]
}

config['event_limits'] = {
    'max_purchases': 1000,
    'max_vi': 1000,
    'max_events': 1000  
}


df = session.table(f"bx_ps_res_sample_{placement_id}")
df = df.withColumnRenamed("user_id", "BUYER_ID")
df_users = df.select("BUYER_ID").dropDuplicates().cache()

#for data point obtain the item's metadata
df_dpoints = get_leaf_cat_data(session, config, df)
save_table(session, df_dpoints, f"bx_ps_res_sample_enriched_{placement_id}")

# # #for each user obtain vi history
df_vi_hist = get_minimal_vi_histories(session, config, df_users)
save_table(session, df_vi_hist, f"bx_ps_res_user_vi_hist_{placement_id}")

#for each user obtain purchase history
df_prch_hist = get_minimal_purchase_histories(session, config, df_users)
save_table(session, df_prch_hist, f"bx_ps_res_user_prch_hist_{placement_id}")

# Extract features

In [None]:
df_smp = session.table(f"bx_ps_res_sample_enriched_{placement_id}")
df_prch = session.table(f"bx_ps_res_user_prch_hist_{placement_id}")
df_vi = session.table(f"bx_ps_res_user_vi_hist_{placement_id}")

In [None]:
from pyspark.sql import functions as F
df_smp = df_smp.withColumnRenamed("leaf_categ_id", "LEAF_CATEG_ID")
df_smp = df_smp.withColumn("EVENT_TIMESTAMP", F.to_timestamp(df_smp.merch_date, "yyyy/MM/dd hh:mm:ss"))
df_smp = df_smp.drop("merch_date")

In [None]:
import features as FT


# f_time_since_last_purchase_from_LEAF_cat
df_ft = FT.f_time_since_last_action_from_cat_in_days(df_smp, df_prch, cat_type="LEAF", action_type="purchase")
df_res = FT.add_feature(df_smp, df_ft, feature_name="f_time_since_last_purchase_from_LEAF_cat", join_key="LEAF_CATEG_ID", how="left", na_value=-1)

# f_time_since_last_view_from_LEAF_cat
df_ft = FT.f_time_since_last_action_from_cat_in_days(df_smp, df_vi, cat_type="LEAF", action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_time_since_last_view_from_LEAF_cat", join_key="LEAF_CATEG_ID", how="left", na_value=-1)

# f_time_since_last_view_from_LVL2_cat
df_ft = FT.f_time_since_last_action_from_cat_in_days(df_smp, df_vi, cat_type="LVL2", action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_time_since_last_view_from_LVL2_cat", join_key="CATEG_LVL2_ID", how="left", na_value=-1)
                       
# f_num_views_from_LEAF_cat_in_last_30_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=30, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LEAF_cat_in_last_30_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_views_from_LEAF_cat_in_last_14_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=14, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LEAF_cat_in_last_14_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_purchases_from_LEAF_cat_in_last_60_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_prch, cat_type="LEAF", days=60, action_type="purchase")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_purchases_from_LEAF_cat_in_last_60_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_views_from_LEAF_cat_in_last_60_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=60, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LEAF_cat_in_last_60_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_purchases_from_LVL2_cat_in_last_60_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_prch, cat_type="LVL2", days=60, action_type="purchase")
df_res = FT.add_feature(df_res, df_ft, join_key="CATEG_LVL2_ID", how="left", na_value=0)

# f_num_views_from_LEAF_cat_in_last_2_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=2, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LEAF_cat_in_last_2_days", join_key="LEAF_CATEG_ID", how="left", na_value=0) 
                       
# f_num_purchases_from_LEAF_cat_in_last_30_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_prch, cat_type="LEAF", days=30, action_type="purchase")
df_res = FT.add_feature(df_res, df_ft, join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_views_from_LEAF_cat_in_last_7_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=7, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_purchases_from_LEAF_cat_in_last_30_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_views_from_LEAF_cat_in_last_5_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LEAF", days=5, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LEAF_cat_in_last_5_days", join_key="LEAF_CATEG_ID", how="left", na_value=0) 
                       
# f_num_purchases_from_LEAF_cat_in_last_14_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_prch, cat_type="LEAF", days=14, action_type="purchase")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_purchases_from_LEAF_cat_in_last_14_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_num_views_from_LVL2_cat_in_last_60_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LVL2", days=60, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LVL2_cat_in_last_60_days", join_key="CATEG_LVL2_ID", how="left", na_value=0)
                       
# f_num_views_from_LVL2_cat_in_last_5_days
df_ft = FT.f_num_actions_from_cat_in_last_x_days(df_smp, df_vi, cat_type="LVL2", days=5, action_type="view")
df_res = FT.add_feature(df_res, df_ft, feature_name="f_num_views_from_LVL2_cat_in_last_5_days", join_key="CATEG_LVL2_ID", how="left", na_value=0)
                       
# f_max_LEAF_cat_view_propensity_cosine_sim_in_last_30_days                       
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=30, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_cosine_sim_in_last_30_days ", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_max_META_cat_view_propensity_jaccard_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="META", days=60, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_META_cat_view_propensity_jaccard_sim_in_last_60_days", join_key="META_CATEG_ID", how="left", na_value=0) 
                       
# f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_30_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=30, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_30_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)  

# f_max_LEAF_cat_view_propensity_cosine_sim_in_last_14_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=14, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_cosine_sim_in_last_14_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_max_LEAF_cat_view_propensity_cosine_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=60, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_cosine_sim_in_last_60_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)  
                       
# f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=60, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_60_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)  
                       
# f_max_LVL2_cat_view_propensity_jaccard_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LVL2", days=60, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LVL2_cat_view_propensity_jaccard_sim_in_last_60_days", join_key="CATEG_LVL2_ID", how="left", na_value=0)
                       
# f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_30_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_prch, action_type="purchase", cat_type="LVL2", days=30, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_30_days", join_key="CATEG_LVL2_ID", how="left", na_value=0) 
                       
# f_max_LEAF_cat_view_propensity_cosine_sim_in_last_7_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=7, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_cosine_sim_in_last_7_days", join_key="LEAF_CATEG_ID", how="left", na_value=0)
                       
# f_max_LVL2_cat_view_propensity_cosine_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LVL2", days=60, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LVL2_cat_view_propensity_cosine_sim_in_last_60_days", join_key="CATEG_LVL2_ID", how="left", na_value=0) 

# f_min_LEAF_cat_view_propensity_cosine_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=60, sim_types=["cosine"], agg=["min"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_min_LEAF_cat_view_propensity_cosine_sim_in_last_60_days", join_key="LEAF_CATEG_ID", how="left", na_value=0) 
                       
# f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_60_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_prch, action_type="purchase", cat_type="LVL2", days=60, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_60_days", join_key="CATEG_LVL2_ID", how="left", na_value=0) 
                       
# f_max_LEAF_cat_purchase_propensity_cosine_sim_in_last_30_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_prch, action_type="purchase", cat_type="LEAF", days=30, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_purchase_propensity_cosine_sim_in_last_30_days", join_key="LEAF_CATEG_ID", how="left", na_value=0) 
                       
# f_max_LVL2_cat_purchase_propensity_cosine_sim_in_last_30_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_prch, action_type="purchase", cat_type="LVL2", days=30, sim_types=["cosine"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LVL2_cat_purchase_propensity_cosine_sim_in_last_30_days", join_key="CATEG_LVL2_ID", how="left", na_value=0)          
                       
# f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_14_days
df_ft = FT.f_cat_propensity_sim(df_smp, df_vi, action_type="view", cat_type="LEAF", days=14, sim_types=["jaccard"], agg=["max"])
df_res = FT.add_feature(df_res, df_ft, feature_name="f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_14_days", join_key="LEAF_CATEG_ID", how="left", na_value=0) 
                       
save_table(session, df_res, f"bx_ps_recs_features_{placement_id}")

# Apply PS model and filter RVI items

In [None]:
from pyspark.sql import functions as F

df = session.table(f"bx_ps_recs_features_{placement_id}")
# filter only users that have at least one purchase from category in last year
df_users = df.join(df_prch, ["BUYER_ID", "LEAF_CATEG_ID"]).select(df.BUYER_ID, df.LEAF_CATEG_ID).dropDuplicates()
df_users = df_users.withColumn("apply_filter", F.lit(1))
df = df.join(df_users, ["BUYER_ID", "LEAF_CATEG_ID"], how="left").fillna(0)
df = df.toPandas()
print(len(df))

In [None]:
import pandas as pd
import pickle

feature_list = [
 'f_num_views_from_LEAF_cat_in_last_30_days',
 'f_num_views_from_LEAF_cat_in_last_14_days',
 'f_num_purchases_from_LEAF_cat_in_last_60_days',
 'f_num_views_from_LEAF_cat_in_last_60_days',
 'f_num_purchases_from_LVL2_cat_in_last_60_days',
 'f_max_LEAF_cat_view_propensity_cosine_sim_in_last_30_days',
 'f_time_since_last_view_from_LEAF_cat',
 'f_num_views_from_LEAF_cat_in_last_2_days',
 'f_num_purchases_from_LEAF_cat_in_last_30_days',
 'f_max_META_cat_view_propensity_jaccard_sim_in_last_60_days',
 'f_num_views_from_LEAF_cat_in_last_7_days',
 'f_num_views_from_LEAF_cat_in_last_5_days',
 'f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_30_days',
 'f_time_since_last_view_from_LVL2_cat',
 'f_max_LEAF_cat_view_propensity_cosine_sim_in_last_14_days',
 'f_max_LEAF_cat_view_propensity_cosine_sim_in_last_60_days',
 'f_num_purchases_from_LEAF_cat_in_last_14_days',
 'f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_60_days',
 'f_max_LVL2_cat_view_propensity_jaccard_sim_in_last_60_days',
 'f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_30_days',
 'f_num_views_from_LVL2_cat_in_last_60_days',
 'f_num_views_from_LVL2_cat_in_last_5_days',
 'f_time_since_last_purchase_from_LEAF_cat',
 'f_max_LEAF_cat_view_propensity_cosine_sim_in_last_7_days',
 'f_max_LVL2_cat_view_propensity_cosine_sim_in_last_60_days',
 'f_min_LEAF_cat_view_propensity_cosine_sim_in_last_60_days',
 'f_max_LVL2_cat_purchase_propensity_jaccard_sim_in_last_60_days',
 'f_max_LEAF_cat_purchase_propensity_cosine_sim_in_last_30_days',
 'f_max_LVL2_cat_purchase_propensity_cosine_sim_in_last_30_days',
 'f_max_LEAF_cat_view_propensity_jaccard_sim_in_last_14_days'
]

xgb_model = pickle.load(open("xgb_model_60.pkl", "rb"))
df["pred"] = xgb_model.predict(df[feature_list])

# measure the metrics

In [None]:
import numpy as np


y_test_clicks, y_pred_clicks = sum(df["clicked"].astype(float).values * df["apply_filter"].values), sum(df["clicked"].astype(float).values * df["apply_filter"].values * df['pred'].values)
y_test_gmv7d, y_pred_gmv7d = sum(df["gmv_7d"].astype(float).values * df["apply_filter"].values), sum(df["gmv_7d"].astype(float).values * df["apply_filter"].values * df['pred'].values)
num_suppressed = len(df[(df["pred"] == 0) & (df["apply_filter"] == 1)])
num_recommended = len(df[(df["pred"] == 1) & (df["apply_filter"] == 1)])
total_filter = len(df[df["apply_filter"] == 1])
total_items = len(df)
ctr = y_test_clicks / total_filter
gmv_7d = y_test_gmv7d / total_filter
exp_new_clicks = ctr * num_suppressed
exp_new_gmv7d = gmv_7d * num_suppressed


print(f"Metrics (placement: {placement_id}):")
print(f'total items: {total_items}')
print(f'total PS eligible items: {total_filter} ({(total_filter/total_items)*100:.3f}%)')
print(f'num suppressed: {num_suppressed} ({(num_suppressed/total_filter)*100:.3f}%)')
print(f'num recommended: {num_recommended} ({(num_recommended/total_filter)*100:.3f}%)')
print(f'Exp[ctr]: {ctr}')
print(f'Exp[gmv_7d (imp)]: {gmv_7d}')
print(f'exp new clicks: {exp_new_clicks}')
print(f'exp new GMV7d: {exp_new_gmv7d}')
print(f'test clicks: {y_test_clicks}, pred clicks: {y_pred_clicks} (exp clicks: {int(y_pred_clicks+exp_new_clicks)}, lift: {100*((y_pred_clicks+exp_new_clicks)/y_test_clicks-1):.3f}%)')
print(f'test gmv_7d: {y_test_gmv7d}, pred gmv_7d: {y_pred_gmv7d}  (exp gmv_7d: {y_pred_gmv7d+exp_new_gmv7d}, lift: {100*((y_pred_gmv7d+exp_new_gmv7d)/y_test_gmv7d-1):.3f}%)')

In [None]:
if session:
    session.stop()