In [0]:
%pip install --quiet easy_hml

In [0]:
%pip install --quiet airspace

In [0]:
dbutils.library.restartPython()

In [0]:
import pyspark.sql.functions as f
from pyspark.sql import Window
from pyspark.sql import Row
from functools import partial
import seg
import kayday as kd
from seg.utils import DateType
from IPython.display import Image as img
from IPython.display import display as img_display
from effodata import ACDS, golden_rules, Sifter, Equality, Joiner, Lambda
from kpi_metrics import KPI, available_metrics as  amKPI, get_metrics, AliasMetric, AliasGroupby, CustomMetric
import upc_input
import easy_hml
from easy_hml import generate_hml, approved_metric_pairs, available_metrics as amhml
import airspace
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

#Getting Our Base Model Inputs

In [0]:
model_inputs_raw = spark.read.table("sandbox_dev.tm_learning.model_inputs_raw") # new tag model inputs for the past 6 months 
 

In [0]:
model_inputs = model_inputs_raw.groupBy("GUID", "EMAIL_THEME", "DEPARTMENT").agg(
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

In [0]:
model_inputs_raw_unique_count = model_inputs_raw.select("GUID", "EMAIL_THEME", "DEPARTMENT").distinct().count()
model_inputs_count  = model_inputs.count()
print(model_inputs_raw_unique_count, model_inputs_count)

In [0]:
model_inputs.filter(f.col("GUID") == "004BA2DA3E78452C8613D89A7F38A381").display()

#Set up Airspace

In [0]:
#Set up airspace, well join this with basically everything
datetime.strptime('20250203', '%Y%m%d')
start_date = (datetime.today() - relativedelta(months=6)).strftime('%Y%m%d')
end_date = datetime.today().strftime('%Y%m%d')

config = {
  'start_date': start_date,
  'end_date' : end_date
}
as_mapping = airspace.email_households(config)
as_mapping2 = airspace.guid_to_ehhn_map(config)

In [0]:
# Join our model data with email_households to get guid->EHHN matches

features = model_inputs.withColumnRenamed("GUID", "guid").join(as_mapping, on = 'guid', how = "inner")
features2 = model_inputs.withColumnRenamed("GUID", "guid").join(as_mapping2, on = 'guid', how = "inner")

In [0]:
features.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.model_input_agg_6mo")

In [0]:
print(features.count())
print(model_inputs.count())
print(features2.count())


In [0]:
print(features.select("guid").distinct().count())
print(features2.select("guid").distinct().count())
print(model_inputs.select("GUID").distinct().count())

Well start with Segmentations, then move towards transactions


2024-08-02

#Funlo

## Get data / run joins

In [0]:
funlo = seg.get_seg_for_date(
  'funlo',
'2025-02-14', 
  )

In [0]:
funlo_feature = funlo.join(
  features,
  on='ehhn',
  how = "inner"
)

In [0]:
funlo_feature_count = funlo_feature.select("GUID").distinct().count() 
model_inputs_count  = model_inputs.select("GUID").distinct().count() 
print(funlo_feature_count)
print(model_inputs_count)

In [0]:
funlo_feature.filter(f.col("ehhn") == "61246700178").display()

In [0]:
funlo_aggs = funlo_feature.groupBy("funlo_rollup_desc", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

funlo_aggs.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.funlo_rollup_agg")

In [0]:
funlo_seg_aggs = funlo_feature.groupBy("funlo_seg_desc", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

funlo_aggs.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.funlo_seg_agg")

## Aggs

# CDS


## Get Data/Run joins

In [0]:
cds = seg.get_seg_for_date('cds_4_hh', '20250214')

In [0]:
segs_and_email = funlo_feature.join(cds, on = "ehhn", how = "inner")

In [0]:
segs_and_email.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.segs_and_rates")

## Aggs


In [0]:
price_seg = segs_and_email.groupBy("price_dim_seg", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

price_seg.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.price_seg_agg")

In [0]:
health_seg  = segs_and_email.groupBy("health_dim_seg", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

health_seg.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.health_seg_agg")

In [0]:
quality_seg  = segs_and_email.groupBy("quality_dim_seg", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

quality_seg.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.quality_seg_agg")

In [0]:
convenience_seg  = segs_and_email.groupBy("convenience_dim_seg", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)


convenience_seg.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.convenience_seg_agg")

In [0]:
variety_seg  = segs_and_email.groupBy("variety_seeking_dim_seg", "EMAIL_THEME", "DEPARTMENT").agg(
  f.countDistinct("guid").alias("Unique_Guids"),
  f.countDistinct("ehhn").alias("Unique_Households"),
  f.sum("THEME_SENT").alias("THEME_SENT"),
  f.sum("THEME_OPEN").alias("THEME_OPEN"),
  f.sum("THEME_CLICK").alias("THEME_CLICK"),
  f.sum("THEME_UNSUBSCRIBE").alias("THEME_UNSUBSCRIBE"),
  (f.sum("THEME_OPEN") / f.sum("THEME_SENT")).alias("theme_open_rate"),
  (f.sum("THEME_CLICK") / f.sum("THEME_SENT")).alias("theme_click_rate"),
  f.sum("DEPT_SENT").alias("DEPT_SENT"),
  f.sum("DEPT_OPEN").alias("DEPT_OPEN"),
  f.sum("DEPT_CLICK").alias("DEPT_CLICK"),
  f.sum("DEPT_UNSUBSCRIBE").alias("DEPT_UNSUBSCRIBE"),
  (f.sum("DEPT_OPEN") / f.sum("DEPT_SENT")).alias("dept_open_rate"),
  (f.sum("DEPT_CLICK") / f.sum("DEPT_SENT")).alias("dept_click_rate"),
)

variety_seg.write.mode("overwrite").saveAsTable(f"sandbox_dev.tm_learning.variety_seg_agg")

#ACDS

In [0]:
acds = ACDS(use_sample_mart=True)

In [0]:
transactions =  acds.get_transactions(start_date='2024-08-02', end_date = "2025-02-01")

## KPI

'last_sold_date',  
 'sales',  
 'gross_sales',  
 'units',  
 'visits',  
 'gr_visits',  
 'households',  
 'gr_households',  
 'sales_per_household',  
 'units_per_household',  
 'visits_per_household',  
 'sales_per_visit',  
 'units_per_visit',  
 'sales_per_unit',  
 'product_count',  
 'stores_selling',  
 'sales_per_store',  
 'units_per_store',  
 'kroger_coupon_discount_sales',  
 'kroger_coupon_discount_units',  
 'kroger_match_coupon_discount_sales',  
 'kroger_match_coupon_discount_units',  
 'vendor_discount_sales',  
 'vendor_discount_units',  
 'retailer_loyalty_discount_sales',  
 'retailer_loyalty_discount_units',  
 'total_discount_sales',  
 'total_discount_units'  

In [0]:
kpi = KPI(use_sample_mart=True)