In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0
#%pip install tqdm
#

## Variable Timing Matching

In [0]:
from pyspark.sql import functions as F


CAL_DATE = "CAL_DATE"
REGION_COL = "RETAILER_SALES_REGION_CODE"
FORMAT_COL = "FORMAT"

import pandas as pd
from pyspark.sql import window
from datetime import datetime, timedelta, date
from naz_measure.configs.shuffleboard_2025_config import (
    start_date, 
    end_date, 
    measure_start, 
    measure_end,
    brand_cluster_code, 
    desired_premise, 
    desired_retailer_channel,
    vpid_offset_weeks, 
    match_configs,
)
from naz_measure.utils.data import get_raw_data_tables
from naz_measure.utils.matching_algo import run_matching_variable_timing
from pyspark.sql import SparkSession
from time import perf_counter

t0 = perf_counter()

vpid_timing_df = pd.DataFrame({
    "VPID": list(vpid_offset_weeks.keys()),
    "identifier": list(vpid_offset_weeks.values()),  
})

spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
hv, hp, hr, hc = get_raw_data_tables(spark)

config_summary, matched_dfs, group_dfs = run_matching_variable_timing(
    vpid_timing_df=vpid_timing_df,
    match_configs=match_configs,
    hv=hv, hp=hp, hr=hr, hc=hc,
    base_start=start_date, base_end=end_date,
    base_measure_start=measure_start, base_measure_end=measure_end,
    brand_cluster_code=brand_cluster_code,
    desired_premise=desired_premise,
    desired_retailer_channel=desired_retailer_channel,
    # candidate_control_vpids=candidate_control_vpids,
    offset_col="identifier", 
    max_controls_per_test = 1,
    data_end_cap=date(2025, 11, 30),
)
elapsed = perf_counter() - t0
print(f"run_matching_variable_timing runtime: {elapsed:.2f} seconds")
display(config_summary)

Matching configs:   0%|          | 0/3 [00:00<?, ?config/s]

run_matching_variable_timing runtime: 1062.65 seconds


config,test_pre_period_vol_cy,test_pre_period_vol_ly,test_pre_period_trend,control_pre_period_vol_cy,control_pre_period_vol_ly,control_pre_period_trend,test_post_period_vol_cy,test_post_period_vol_ly,test_post_period_trend,control_post_period_vol_cy,control_post_period_vol_ly,control_post_period_trend
minmax_CYLYTrend_blocking,2839.1800000000003,3059.59,-0.0720390640576024,1888.43,2034.61,-0.0718466929780153,1942.18,1956.88,-0.007511957810392,1121.67,1243.82,-0.0982055281310799
minmax_CYTrend_blocking,2839.1800000000003,3059.59,-0.0720390640576024,1914.02,1923.21,-0.0047784693299224,1942.18,1956.88,-0.007511957810392,1131.16,1222.02,-0.0743523019263188
minmax_all_blocking,2839.1800000000003,3059.59,-0.0720390640576024,1879.18,2025.9700000000005,-0.0724541824410037,1942.18,1956.88,-0.007511957810392,1117.83,1244.8,-0.1020003213367609


In [0]:
# %sql
# select * from vip_dev.retailer.vip_class

recordtype,vpcotcd,vpcot,vpsubchnlcd,vpsubchnl,bdnprem,bdnpremd,bdnlictype,bdnlictyped,__created_tsp,channel,macro_channel
DETAIL,3,LIQUOR/PACKAGE STORE,3,LIQUOR/PACKAGE STORE,F,Off Premise,F,Off Premise,2025-10-10T02:07:22.597Z,PKG LIQ,LARGE FORMAT
DETAIL,38,FACTORY/OFFICE,38,FACTORY/ OFFICE,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,40,HEALTH/HOSPITAL,40,HEALTH/ HOSPITAL,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,23,BAR,23,BAR/TAVERN,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,41,GOVERNMENT/NON MILITARY,41,GOVERNMENT/ NON-MILITARY,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,24,RECREATION/ ENTERTAINMENT,33,SPECIAL EVENT/TEMPORARY L,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,24,RECREATION/ ENTERTAINMENT,24,RECREATION/BOWLING CENTER,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE
DETAIL,8,MASS MERCH/SUPERCENTER,8,MASS MERCH/SUPERCENTER,F,Off Premise,F,Off Premise,2025-10-10T02:07:22.597Z,GROCERY MM,LARGE FORMAT
DETAIL,6,NON RETAIL ACCOUNT,50,DIRECT DISTRIBUTORS,F,Off Premise,X,Other non retail,2025-10-10T02:07:22.597Z,SF ALL OTHER,SMALL FORMAT
DETAIL,24,RECREATION/ ENTERTAINMENT,27,GOLF/ COUNTRY CLUB,O,On Premise,O,On Premise,2025-10-10T02:07:22.597Z,ON PREMISE,ON PREMISE


## View Individual Weeks Within a Config and Final DF

In [0]:
cfg_name = "minmax_all_blocking"
offset = 1   

df = matched_dfs.get((cfg_name, offset))
if df is not None:
    display(df)
else:
    print("No matched_df for this config/offset")
dfs = []
for i in range(0, 14):
    df = group_dfs[("minmax_all_blocking", i)].copy()
    df["identifier"] = i
    dfs.append(df)

df_all_pairs = pd.concat(dfs, ignore_index=True)

display(df_all_pairs)

Test_VPID,Control_VPID,Distance,match_order
2354484,42505,1.528672695159912,1
41902,42928,0.034316960722208,1
3234638,43348,2.7384767532348637,1
3530574,45084,0.3181670308113098,1
3490683,46058,0.4353997111320495,1
3205460,46333,0.0091928197070956,1
3980328,50971,0.195185512304306,1
3617545,51859,0.7648432850837708,1
56133,52000,0.2090222537517547,1
55894,55754,0.7201030850410461,1


Group,VPID,offset_weeks,identifier
Test,43770,0,0
Test,44728,0,0
Test,46158,0,0
Test,46340,0,0
Test,48136,0,0
Test,50977,0,0
Test,58074,0,0
Test,63770,0,0
Test,77249,0,0
Test,81233,0,0


## Variable Timing Validation

In [0]:


from collections import Counter
from importlib import reload
from inspect import signature
import pandas as pd
import naz_measure.utils.validate as validate
from naz_measure.utils.data import get_raw_data_tables
from pyspark.sql import SparkSession
from naz_measure.configs.displayNTRL_2025_config import (
    start_date, 
    end_date, 
    measure_start, 
    measure_end,
    vpid_offset_weeks, 
    match_configs
)


spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
hv, hp, hr, hc = get_raw_data_tables(spark)
target_id_counts = dict(Counter(vpid_offset_weeks.values()))
iter_df = validate.run_validation_iterations_variable_timing(
    vpid_timing_df=vpid_timing_df,
    match_configs=match_configs,
    hv=hv, hp=hp, hr=hr, hc=hc,
    base_start=start_date, base_end=end_date,
    base_measure_start=measure_start, base_measure_end=measure_end,
    sample_size_total=175,
    n_iterations=100,
    rng_seed=42,
    show_progress=True,
    offset_col="identifier",
    target_id_counts=target_id_counts,
)

print(f"iterations x configs rows: {len(iter_df)}")
display(iter_df)

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
from naz_measure.utils.data import build_variable_timing_caches

# Build the same pre/post period caches
offsets = sorted(vpid_timing_df["identifier"].astype(int).unique().tolist())
vol_pre_period_by_offset, vol_post_period_by_offset = build_variable_timing_caches(
    hv, hp, hr, hc,
    offsets=offsets,
    base_start=start_date, base_end=end_date,
    base_measure_start=measure_start, base_measure_end=measure_end,
    vpids_for_post_period=None,
)

tests_requested_by_offset = {
    k: set(vpid_timing_df.loc[vpid_timing_df["identifier"] == k, "VPID"])
    for k in offsets
}

tests_with_pre_by_offset = {
    k: set(vol_pre_period_by_offset[k]["VPID"])
    for k in offsets
}

lost_before_matching = {
    k: tests_requested_by_offset[k] - tests_with_pre_by_offset[k]
    for k in offsets
}

print("=== Missing from pre-period data (before matching) ===")
total_lost_before = 0
for k in offsets:
    n_lost = len(lost_before_matching[k])
    total_lost_before += n_lost
    if n_lost:
        print(f"Offset {k}: {n_lost} tests missing from pre-period data")

print(f"TOTAL missing from pre-period data across all offsets: {total_lost_before}\n")

cfg_name = match_configs[0].get("minmax_all_blocking", list(match_configs[0].values())[0])  # fallback to first available key

tests_in_result_by_offset = {
    k: set(
        group_dfs[(cfg_name, k)]
        .query("Group == 'Test'")["VPID"]
        .tolist()
    )
    for k in offsets
}

lost_in_matching = {
    k: (tests_with_pre_by_offset[k] & tests_requested_by_offset[k]) - tests_in_result_by_offset[k]
    for k in offsets
}

print("=== Lost during matching (have pre-period data but not in final membership) ===")
total_lost_matching = 0
for k in offsets:
    n_lost = len(lost_in_matching[k])
    total_lost_matching += n_lost
    if n_lost:
        print(f"Offset {k}: {n_lost} tests lost in matching")

print(f"TOTAL lost in matching across all offsets: {total_lost_matching}")


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:460)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:591)
	at com.data

## View Iteration Config

In [0]:

import pandas as pd

cfg_name = "minmax_CYTrendShare_blocking"

groups = []
for key, gdf in group_dfs.items():
    if isinstance(key, tuple):
        name, offset_weeks = key
    else:
        name, offset_weeks = key, None

    if name != cfg_name or gdf is None or (isinstance(gdf, pd.DataFrame) and gdf.empty):
        continue

    tmp = gdf.copy()
    if "offset_weeks" not in tmp.columns:
        tmp["offset_weeks"] = offset_weeks
    groups.append(tmp[["Group","VPID","offset_weeks"]])

membership = pd.concat(groups, ignore_index=True) if groups else pd.DataFrame(columns=["Group","VPID","offset_weeks"])
membership = membership.drop_duplicates(["Group","VPID","offset_weeks"]).sort_values(["offset_weeks","Group","VPID"])
print(f"Membership rows for {cfg_name}: {len(membership)}")
display(membership)

if "offset_weeks" in membership.columns:
    display(membership.groupby(["offset_weeks","Group"], as_index=False).size())


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:460)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:591)
	at com.data

## Run Tests

In [0]:
import pandas as pd
from naz_measure.utils.validate import run_delta_abs_tests_less


tests = run_delta_abs_tests_less(iter_df)
display(tests)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:138)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:717)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.cancelExecution(ExecutionContextManagerV1.scala:460)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:591)
	at com.data