#Impact of Ecommerce Modality

In [0]:
# initialize libraries
from effodata import ACDS, golden_rules, Joiner, Equality, join_on, Sifter
import pyspark.sql.functions as f
from kpi_metrics import (
    KPI,
    AliasMetric,
    CustomMetric,
    AliasGroupby,
    Rollup,
    Cube,
    available_metrics,
    get_metrics,
)
import seg
from seg.utils import DateType
from kayday import KrogerDate, DateRange
import datetime as dt
import pyspark.sql.types as t

In [0]:
#initialize ACDS and KPI
acds = ACDS(use_sample_mart=True)
kpi = KPI(use_sample_mart=True)

In [0]:
#Set target date
target = KrogerDate(year=2023, period= 4, week=1, day=1)
#Set pre and post periods and format the dates to match Kroger fiscal week
pre_period = target.ago(periods=13)\
  .format_week()
post_period = target.ahead(periods=13)\
  .format_week()
target = target.format_week()

target = target[1:]
pre_period = pre_period[1:]
post_period = post_period[1:]

In [0]:
#Boost Customer Flag
boost_hhs = spark.read.parquet("abfss://landingzone@sa8451entlakegrnprd.dfs.core.windows.net/lsr/prod/paid_member_dashboard/current/") \
  .filter((f.col('enrollment_status') == 'Currently Enrolled') & (f.col('membership_package').isin('Boost','Boost Lite'))) \
  .withColumn('boost_hh_flag',f.lit("1")) \
  .select('ehhn','boost_hh_flag')

In [0]:
#load kpi data
kpi_df = kpi.get_aggregate(
    start_date = pre_period,
    end_date = post_period,
    group_by = ["fiscal_week","ehhn","modality","item_fulfillment"],
    metrics = ["sales"],
    join_with = ["households","dates"]
)

#Building Dataframes

##Baseline dataframe of households that used ecomm during the pre-period

In [0]:
#determine househoulds that participate in ecomm during the preperiod
pre_period_ecomm = kpi_df.filter((f.col("fiscal_week").substr(1, 6) >= pre_period[0:6]) & (f.col("fiscal_week").substr(1, 6) < target[0:6])\
                        & (f.col("modality").isin(["DELIVERY","PICKUP"])))
#determine the ehhns defined under the filter
distinct_ehhn_pre = pre_period_ecomm.select("ehhn").distinct()
#join back to kpi data to determine the dataset for the households that contain at least 1 ecomm purchase during the preperiod
baseline_preperiod = kpi_df.join(distinct_ehhn_pre,on="ehhn",how="inner")

##Establishing the benchmark group

In [0]:
#determine househoulds that do not participate in ecomm during the target period
no_ecomm_in_target = baseline_preperiod.filter(
    (f.col("fiscal_week").substr(1, 6) == target[0:6]) &
    (f.col("modality").isin(["INSTORE"])) &
    (f.col("item_fulfillment").isin(["INSTORE"]))
)
#determine the ehhns defined under the filter
distinct_ehhn_no_ecomm_in_target = no_ecomm_in_target.select("ehhn").distinct()

#join back to baseline data to determine the dataset for the households that do not contain an ecomm purchase during the target period
baseline_benchmark = baseline_preperiod.join(distinct_ehhn_no_ecomm_in_target,on="ehhn",how="inner")

In [0]:
#determine househoulds that do not participate in ecomm during the postperiod
postperiod_benchmark = baseline_benchmark.filter((f.col("fiscal_week").substr(1, 6) > target[0:6]) & (f.col("fiscal_week").substr(1, 6) <= post_period[0:6])\
                        & (f.col("modality").isin(["INSTORE"]) & f.col("item_fulfillment").isin(["INSTORE"]))
                        )
#determine the ehhns defined under the filter
ehhn_benchmark_post = postperiod_benchmark.select("ehhn").distinct()

#join back to baseline data to determine the dataset for the households that do not contain an ecomm purchase during the post period
full_benchmark_df = baseline_benchmark.join(ehhn_benchmark_post,on="ehhn",how="inner")


###Add funlo and boost information to the dataframe

In [0]:
# Alias the DataFrames before joining
df1 = seg.get_segs_and_join(["funlo","socs"], pre_period, full_benchmark_df).alias('df1')
df2 = kpi_df.alias('df2')
df3 = boost_hhs.alias('df3')

# Perform the join operations using the aliased DataFrames
full_benchmark_df_1 = df1.join(df2, df1.ehhn == df2.ehhn, how="inner")\
                         .join(df3, df1.ehhn == df3.ehhn, how="inner")\
                         .groupBy(df1.ehhn, 'df1.fiscal_week', df1.modality, df1.item_fulfillment, df1.sales, df1.funlo_rollup_desc, df1.socs_seg, df3.boost_hh_flag)\
                         .count()\
                         .drop('count')\
                         .filter(f.col("df1.funlo_rollup_desc").isin(["Loyal", "Opportunity", "Non-Loyal"]))

###Final Benchmark dataframe including funlo, socs, boost hhs

In [0]:
# Create a flag for period and ecomm modality 
final_benchmark = full_benchmark_df_1.withColumn("is_pre_target", f.when(f.col("fiscal_week").substr(1, 6) < target[0:6], 1).otherwise(0))\
                                        .withColumn("is_target", f.when(f.col("fiscal_week").substr(1, 6) == target[0:6], 1).otherwise(0))\
                                        .withColumn("is_post_target", f.when(f.col("fiscal_week").substr(1, 6) > target[0:6], 1).otherwise(0))\
                                        .withColumn("PICKUP", f.when(f.col("modality") == "PICKUP", 1).otherwise(0))\
                                        .withColumn("FC DELIVERY", f.when((f.col("modality") == "DELIVERY") & (f.col("item_fulfillment") == "OCADO"), 1).otherwise(0))\
                                        .withColumn("STORE DELIVERY", f.when((f.col("modality") == "DELIVERY") & (f.col("item_fulfillment") != "OCADO"), 1).otherwise(0))\
                                        .withColumn("INSTORE", f.when((f.col("modality") == "INSTORE") & (f.col("item_fulfillment") == "INSTORE"), 1).otherwise(0))

In [0]:
display(final_benchmark)

ehhn,fiscal_week,modality,item_fulfillment,sales,funlo_rollup_desc,socs_seg,boost_hh_flag,is_pre_target,is_target,is_post_target,PICKUP,FC DELIVERY,STORE DELIVERY,INSTORE
121342023,20220902,INSTORE,INSTORE,34.51,Loyal,H,1,1,0,0,0,0,0,1
121342023,20230203,INSTORE,INSTORE,69.86,Loyal,H,1,1,0,0,0,0,0,1
121342023,20240101,DELIVERY,INSTORE,42.44,Loyal,H,1,0,0,1,0,0,1,0
121342023,20240101,,,-2.02,Loyal,H,1,0,0,1,0,0,0,0
121342023,20231103,DELIVERY,INSTORE,55.97,Loyal,H,1,0,0,1,0,0,1,0
121342023,20240202,,,-2.02,Loyal,H,1,0,0,1,0,0,0,0
121342023,20230504,DELIVERY,OCADO,39.23,Loyal,H,1,0,0,1,0,1,0,0
121342023,20231104,DELIVERY,INSTORE,115.59,Loyal,H,1,0,0,1,0,0,1,0
121342023,20230601,INSTORE,INSTORE,311.97,Loyal,H,1,0,0,1,0,0,0,1
121342023,20220603,INSTORE,INSTORE,201.28,Loyal,H,1,1,0,0,0,0,0,1


##Calculate the total sales in the pre and post period for the benchmark group

In [0]:
# Sum the sales over the pre period
sum_sales_pre_target = final_benchmark.filter(f.col("is_pre_target") == 1) \
                                          .groupBy() \
                                          .agg(f.sum("sales").alias("sum_sales_pre_target"))

# Get the distinct ehhn count for pre period in the benchmark customer group
distinct_ehhn_pre_target_count = final_benchmark.filter(f.col("is_pre_target") == 1) \
                                                     .select("ehhn") \
                                                     .distinct() \
                                                     .count()

# Calculate the average sales per ehhn
benchmark_sales_per_hh_pre = sum_sales_pre_target.withColumn("Benchmark Sales per HH Pre Period", f.col("sum_sales_pre_target") / f.lit(distinct_ehhn_pre_target_count))\
  .withColumn("Customer Group",f.lit("Benchmark"))


# Sum the sales over the post period
sum_sales_post_target = final_benchmark.filter(f.col("is_post_target") == 1) \
                                          .groupBy() \
                                          .agg(f.sum("sales").alias("sum_sales_post_target"))

# Get the distinct ehhn count for the post
distinct_ehhn_post_target_count = final_benchmark.filter(f.col("is_post_target") == 1) \
                                                     .select("ehhn") \
                                                     .distinct() \
                                                     .count()

# Calculate the average sales per ehhn for post
benchmark_sales_per_hh_post = sum_sales_post_target.withColumn("Benchmark Sales per HH Post Period", f.col("sum_sales_post_target") / f.lit(distinct_ehhn_post_target_count))\
  .withColumn("Customer Group",f.lit("Benchmark"))


benchmark_sales = benchmark_sales_per_hh_pre.join(benchmark_sales_per_hh_post,on="Customer Group",how="inner")

###Ratio of HH sales post and pre period  for the benchmark customer group

In [0]:
benchmark_sales_ratio = benchmark_sales.withColumn("Benchmark Sales Ratio", f.col("Benchmark Sales per HH Post Period") / f.col("Benchmark Sales per HH Pre Period"))\
  .drop("sum_sales_pre_target")\
    .drop("sum_sales_post_target")

##Establishing the Target group dataframe

In [0]:
#determine househoulds that participate in ecomm during the target period
ecomm_in_target = baseline_preperiod.filter(
    (f.col("fiscal_week").substr(1, 6) == target[0:6]) & 
    (f.col("modality").isin(["DELIVERY", "PICKUP"]))
)
#determine the ehhns defined under the filter
distinct_ehhn_ecomm_in_target = ecomm_in_target.select("ehhn").distinct()

#join back to baseline data to determine the dataset for the households that contain at least 1 ecomm purchase during the target period
baseline_preperiod_ecomm = baseline_preperiod.join(distinct_ehhn_ecomm_in_target,on="ehhn",how="inner")


In [0]:
#determine househoulds that participate in ecomm during the post period
postperiod_ecomm = baseline_preperiod_ecomm.filter((f.col("fiscal_week").substr(1, 6) > target[0:6]) & (f.col("fiscal_week").substr(1, 6) <= post_period[0:6])\
                        & (f.col("modality").isin(["DELIVERY","PICKUP"])))

#determine the ehhns defined under the filter
ehhn_ecomm_post = postperiod_ecomm.select("ehhn").distinct()

#join back to baseline data to determine the dataset for the households that contain at least 1 ecomm purchase during the post period
full_target_df = baseline_preperiod_ecomm.join(ehhn_ecomm_post,on="ehhn",how="inner")

In [0]:
# Alias the DataFrames before joining
df1 = seg.get_segs_and_join(["funlo","socs"], pre_period, full_target_df).alias('df1')
df2 = kpi_df.alias('df2')
df3 = boost_hhs.alias('df3')

# Perform the join operations using the aliased DataFrames
full_target_df_1 = df1.join(df2, df1.ehhn == df2.ehhn, how="inner")\
                         .join(df3, df1.ehhn == df3.ehhn, how="inner")\
                         .groupBy(df1.ehhn, 'df1.fiscal_week', df1.modality, df1.item_fulfillment, df1.sales, df1.funlo_rollup_desc, df1.socs_seg, df3.boost_hh_flag)\
                         .count()\
                         .drop('count')\
                         .filter(f.col("df1.funlo_rollup_desc").isin(["Loyal", "Opportunity", "Non-Loyal"]))

###Final Target dataframe including funlo, socs, boost hhs

In [0]:
# Create a flag for if the fiscal week is less than target
final_target = full_target_df.withColumn("is_pre_target", f.when(f.col("fiscal_week").substr(1, 6) < target[0:6], 1).otherwise(0))\
                                        .withColumn("is_target", f.when(f.col("fiscal_week").substr(1, 6) == target[0:6], 1).otherwise(0))\
                                        .withColumn("is_post_target", f.when(f.col("fiscal_week").substr(1, 6) > target[0:6], 1).otherwise(0))\
                                        .withColumn("PICKUP", f.when(f.col("modality") == "PICKUP", 1).otherwise(0))\
                                        .withColumn("FC DELIVERY", f.when((f.col("modality") == "DELIVERY") & (f.col("item_fulfillment") == "OCADO"), 1).otherwise(0))\
                                        .withColumn("STORE DELIVERY", f.when((f.col("modality") == "DELIVERY") & (f.col("item_fulfillment") != "OCADO"), 1).otherwise(0))\
                                        .withColumn("INSTORE", f.when((f.col("modality") == "INSTORE") & (f.col("item_fulfillment") == "INSTORE"), 1).otherwise(0))



In [0]:
display(final_target)

ehhn,fiscal_week,modality,item_fulfillment,sales,is_pre_target,is_target,is_post_target,PICKUP,FC DELIVERY,STORE DELIVERY,INSTORE
102363630,20221102,DELIVERY,INSTORE,155.22,1,0,0,0,0,1,0
102363630,20230502,DELIVERY,INSTORE,114.87,0,0,1,0,0,1,0
102363630,20221303,DELIVERY,INSTORE,153.36,1,0,0,0,0,1,0
102363630,20220304,INSTORE,INSTORE,93.8,1,0,0,0,0,0,1
102363630,20220601,INSTORE,INSTORE,102.09,1,0,0,0,0,0,1
102363630,20220904,DELIVERY,INSTORE,58.32,1,0,0,0,0,1,0
102363630,20220803,DELIVERY,INSTORE,204.6,1,0,0,0,0,1,0
102363630,20221103,DELIVERY,INSTORE,88.84,1,0,0,0,0,1,0
102363630,20231304,DELIVERY,OCADO,77.04,0,0,1,0,1,0,0
102363630,20220903,DELIVERY,INSTORE,134.93,1,0,0,0,0,1,0


##Calculate the total sales in the pre and post period for the Target group

In [0]:
# Sum the sales over the pre period
target_sum_sales_pre_target = final_target.filter(f.col("is_pre_target") == 1) \
                                          .groupBy() \
                                          .agg(f.sum("sales").alias("sum_sales_pre_target"))

# Get the distinct ehhn count for pre period
target_distinct_ehhn_pre_target_count = final_target.filter(f.col("is_pre_target") == 1) \
                                                     .select("ehhn") \
                                                     .distinct() \
                                                     .count()

# Calculate the average sales per ehhn
target_sales_per_hh_pre = target_sum_sales_pre_target.withColumn("Target Sales per HH Pre Period", f.col("sum_sales_pre_target") / f.lit(target_distinct_ehhn_pre_target_count))\
  .withColumn("Customer Group",f.lit("Target"))

# Sum the sales over the post period
target_sum_sales_post_target = final_target.filter(f.col("is_post_target") == 1) \
                                          .groupBy() \
                                          .agg(f.sum("sales").alias("sum_sales_post_target"))

# Get the distinct ehhn count for post period
target_distinct_ehhn_post_target_count = final_target.filter(f.col("is_post_target") == 1) \
                                                     .select("ehhn") \
                                                     .distinct() \
                                                     .count()

# Calculate the average sales per ehhn
target_sales_per_hh_post = target_sum_sales_post_target.withColumn("Target Sales per HH Post Period", f.col("sum_sales_post_target") / f.lit(target_distinct_ehhn_post_target_count))\
  .withColumn("Customer Group",f.lit("Target"))


target_sales = target_sales_per_hh_pre.join(target_sales_per_hh_post,on="Customer Group",how="inner")



###Ratio of HH sales post and pre period  for the target customer group

In [0]:
# Calculate the ratio of Target Sales per HH Pre Period to Target Sales per HH Post Period
target_sales_ratio = target_sales.withColumn("Target Sales Ratio", f.col("Target Sales per HH Post Period") / f.col("Target Sales per HH Pre Period"))\
  .drop("sum_sales_pre_target")\
    .drop("sum_sales_post_target")

In [0]:
#create a marker to join the target and benchmark customer groups
target_sales_ratio_1 = target_sales_ratio.withColumn("join_mark",f.lit(1))
benchmark_sales_ratio_1 = benchmark_sales_ratio.withColumn("join_mark",f.lit(1))

###Benchmark pre/post ratio, Target pre/post ratio, and ratio of ratios

In [0]:

ratio_of_ratios = target_sales_ratio_1.join(benchmark_sales_ratio_1,on="join_mark",how="inner")\
                                        .withColumn("Ratio of Ratios", f.col("Target Sales Ratio")/f.col("Benchmark Sales Ratio"))\
                                        .select("Target Sales Ratio","Benchmark Sales Ratio","Ratio of Ratios")

In [0]:
display(ratio_of_ratios)

Target Sales Ratio,Benchmark Sales Ratio,Ratio of Ratios
0.8340110089338691,0.8458168648353268,0.9860420660874902


#Writing out final dataframes to Azure storage

In [0]:
prod_container = "sandbox"
prod_storage_account = "sa8451learningdev"
path = f"abfss://{prod_container}@{prod_storage_account}.dfs.core.windows.net/users/n615602/ecomm_modality_impact/p6/final_benchmark"

final_benchmark.write.mode("overwrite").parquet(path)

In [0]:
path = f"abfss://{prod_container}@{prod_storage_account}.dfs.core.windows.net/users/n615602/ecomm_modality_impact/p6/final_target"

final_target.write.mode("overwrite").parquet(path)

In [0]:
path = f"abfss://{prod_container}@{prod_storage_account}.dfs.core.windows.net/users/n615602/ecomm_modality_impact/p6/benchmark_sales"

benchmark_sales.write.mode("overwrite").parquet(path)

In [0]:
path = f"abfss://{prod_container}@{prod_storage_account}.dfs.core.windows.net/users/n615602/ecomm_modality_impact/p6/target_sales"

target_sales.write.mode("overwrite").parquet(path)

In [0]:
path = f"abfss://{prod_container}@{prod_storage_account}.dfs.core.windows.net/users/n615602/ecomm_modality_impact/p6/ratio_of_ratios"

ratio_of_ratios.write.mode("overwrite").parquet(path)