### Code for inferring missing promos from sellout

In [None]:
import pandas as pd
from pandas.api.indexers import FixedForwardWindowIndexer

In [None]:
RAW_AVERAGE_PRICE = "raw_average_price"  # value / volume in the sellout
RETAIL_STANDARD_PRICE = "retail_standard_price"  # this is per hL
PROMO_DEPTH_VALUE = "promo_depth_value"
PROMO_MECHANIC = "promo_mechanic"
PROMO_EVENT_NAME = "promo_event_name"
VOLUME = "total_volume"
PROMO_VOLUME = "promo_volume"
PROMO_ON = "promo_on"  # indicates if there's at least one promo days during the week
START_DATE_WEEK = "start_date_week"
UNIVERSE_COLS = ["product_family", "customer"]
PROMO_START_DATE = "promo_start_date"
PROMO_END_DATE = "promo_end_date"

In [None]:
def _combine_change_points(upward_change_points: pd.Series, downward_change_points: pd.Series, calibration_points: pd.Series) -> pd.Series:
    """Combine the three types of change points to a single vector with invalid changes removed.

    Invalid changes: upward change that is lower than the previous point or vv

    Args:
        upward_change_points: The upward change points.
        downward_change_points: The downward change points.
        calibration_points: The calibration points.

    Returns:
        A series containing the combined change points.
    """
    change_points = calibration_points.combine_first(downward_change_points).combine_first(upward_change_points)

    diff_since_last_value = change_points.dropna().diff().reindex_like(change_points)

    invalid_upward = upward_change_points.notna() & (diff_since_last_value < 0)
    invalid_downward = downward_change_points.notna() & (diff_since_last_value > 0)

    if invalid_upward.any() or invalid_downward.any():
        # iteratively remove invalid change points
        while invalid_upward.any() or invalid_downward.any():
            change_points = change_points.where(~invalid_upward).where(~invalid_downward)

            diff_since_last_value = change_points.dropna().diff().reindex_like(change_points)

            invalid_upward = upward_change_points.notna() & (diff_since_last_value < 0)
            invalid_downward = downward_change_points.notna() & (diff_since_last_value > 0)

    return change_points

In [None]:
def _select_first_nonmissing(series_in: pd.Series) -> pd.Series:
    """Select the first non-missing value of each sequence of non-missing values.

    Args:
        series_in: The series to select the first non-missing value from.

    Returns:
        A series containing the first non-missing value of each sequence of non-missing values.
    """
    return series_in.where(series_in.shift(1).isna())

In [None]:
def _get_change_points(price: pd.Series, forward_looking: pd.Series, backward_looking: pd.Series) -> pd.Series:
    """Get change points by comparing forward and backward looking prices.

    Args:
        price: The non-promo price.
        forward_looking: The forward looking price.
        backward_looking: The backward looking price.

    Returns:
        A series containing the change points.
    """
    # change up: first point where forward > backward and price >> backward
    upward_condition = (forward_looking > backward_looking) & (price > ((backward_looking + forward_looking) / 2))
    upward_change_points = forward_looking.where(upward_condition).pipe(_select_first_nonmissing)

    # change down: first point where forward < backward and price << forward
    downward_condition = (forward_looking < backward_looking) & (price < ((backward_looking + forward_looking) / 2))
    downward_change_points = forward_looking.where(downward_condition).pipe(_select_first_nonmissing)

    # if forward and backward looking curves are the same, that should be a calibration
    calibration_points = forward_looking.where(forward_looking == backward_looking).pipe(_select_first_nonmissing)

    return _combine_change_points(upward_change_points, downward_change_points, calibration_points)


In [None]:
def _connect_change_points(change_points: pd.Series, forward_looking: pd.Series) -> pd.Series:
    """Connect change points by adding a first point and ffilling between points.

    Args:
        change_points: The change points.
        forward_looking: The forward looking price.

    Returns:
        A series containing the connected change points.
    """
    try:
        change_points.iat[0] = forward_looking.dropna().iat[0]
        smoothed_price = change_points.ffill()
    except IndexError:
        # All prices are NaN
        smoothed_price = change_points

    return smoothed_price

In [None]:
def _infer_rsp_from_non_promo_price(price: pd.Series, window_length: int, accepted_outliers: int) -> pd.Series:
    """Infer retail standard price from the non-promo price.

    Args:
        price: The non-promo price.
        window_length: The number of weeks to look back to infer the retail standard price.
        accepted_outliers: The number of outliers to accept when inferring the retail standard price.

    Returns:
        A series containing the retail standard price.
    """
    min_periods = window_length // 2
    forward_indexer = FixedForwardWindowIndexer(window_size=window_length)
    quantile = (window_length - accepted_outliers - 1) / (window_length - 1)

    forward_looking = price.rolling(forward_indexer, min_periods).quantile(quantile, interpolation="nearest").bfill().ffill()
    backward_looking = price.rolling(window_length, min_periods, center=False).quantile(quantile, interpolation="nearest").ffill().bfill()

    change_points = _get_change_points(price, forward_looking, backward_looking)
    return _connect_change_points(change_points, forward_looking)


In [None]:
def _add_rsp_to_non_promo_df(df_features_non_promo: pd.DataFrame, config) -> pd.DataFrame:
    """Add retail standard price to non-promo sellout data.

    Returns:
        A dataframe containing the non-promo sellout data with retail standard price.
    """
    df_features_non_promo = df_features_non_promo.sort_values(by=START_DATE_WEEK)
    df_features_non_promo[RETAIL_STANDARD_PRICE] = df_features_non_promo.groupby(config.input_scope.universe_cols, observed=True)[
        RAW_AVERAGE_PRICE
    ].transform(
        lambda x: _infer_rsp_from_non_promo_price(
            x,
            config.transformation.price_smoothing_window,
            config.transformation.accepted_outliers,
        )
    )

    df_features_non_promo[RETAIL_STANDARD_PRICE] = df_features_non_promo.groupby(
        config.input_scope.universe_cols,
        observed=True,
    )[RETAIL_STANDARD_PRICE].bfill()

    return df_features_non_promo

In [None]:
def derive_retail_standard_price(df_features: pd.DataFrame, config) -> pd.DataFrame:
    """Derive retail standard price from weekly non promo price, then add RSP index.

    Retail prices are derived by finding revenue / volume for non promo weeks. It is computed by taking the max price
    over the number of promo weeks specified in the `PRICE_SMOOTHING_WINDOW` in the configuration.

    Returns:
        A dataframe containing the features with retail standard price and RSP index.
    """
    df_features_non_promo = df_features[df_features[PROMO_ON] == 0]
    df_features_non_promo_with_rsp = _add_rsp_to_non_promo_df(df_features_non_promo, config)

    # match nearest retail standard price in the data
    return pd.merge_asof(
        df_features.sort_values(START_DATE_WEEK),
        df_features_non_promo_with_rsp.sort_values(START_DATE_WEEK)[
            [*config.input_scope.universe_cols, START_DATE_WEEK, RETAIL_STANDARD_PRICE]
        ],
        by=config.input_scope.universe_cols,
        on=START_DATE_WEEK,
        direction="nearest",
    )

In [None]:
def _update_sellout_promo_event_dates(df_sellout_promo: pd.DataFrame, config) -> pd.DataFrame:
    # TODO: document this line
    universe_cols = config.input_scope.universe_cols
    df_sellout_promo = df_sellout_promo.sort_values(by=[*universe_cols, START_DATE_WEEK])
    df_sellout_promo["unknown_event_number"] = (
        df_sellout_promo.groupby(universe_cols)[START_DATE_WEEK].diff().dt.days.fillna(10) > 7
    ).cumsum()

    df_sellout_promo = df_sellout_promo.drop(columns=[PROMO_START_DATE, PROMO_END_DATE])
    sellout_promo_start_dates = df_sellout_promo.groupby([*universe_cols, "unknown_event_number"], as_index=False).agg(
        **{PROMO_START_DATE: (START_DATE_WEEK, "min")}  # type: ignore[reportArgumentType]
    )
    sellout_promo_end_dates = df_sellout_promo.groupby([*universe_cols, "unknown_event_number"], as_index=False).agg(
        **{PROMO_END_DATE: (START_DATE_WEEK, lambda x: x.max() + pd.DateOffset(days=6))}  # type: ignore[reportArgumentType]
    )
    df_sellout_promo = df_sellout_promo.merge(sellout_promo_start_dates, on=[*universe_cols, "unknown_event_number"], how="left")
    return df_sellout_promo.merge(sellout_promo_end_dates, on=[*universe_cols, "unknown_event_number"], how="left")

In [None]:
def _update_sellout_promo_event_values(
    df_sellout_promo: pd.DataFrame,
    config,
) -> pd.DataFrame:
    df_sellout_promo[[PROMO_ON, PROMO_EVENT_NAME, PROMO_MECHANIC, PROMO_DEPTH_VALUE]] = (
        1,
        "UNKNOWN_EVENT",
        "Unknown Mechanic",
        config.transformation.sellout_inferred_promo_event_depth,
    )
    return _update_sellout_promo_event_dates(df_sellout_promo, config)

In [None]:
def infer_missing_promos_from_sellout(df: pd.DataFrame, config) -> pd.DataFrame:
    """Infer missing promos from sellout.

    Some promotions might be missing in the promo calendar.
    This functions identifies missing promos if one of these two conditions are met:
    - Price diff from RSP > price_inferred_discount_threshold
    - Nielsen promo volume > nielsen_promo_volume_share_threshold
    price_discount_threshold & nielsen_promo_volume_share_threshold are defined in the configuration as it's OpCo-specific.
    """
    df = derive_retail_standard_price(df, config)
    df["price_inferred_discount"] = (df[RETAIL_STANDARD_PRICE] - df[RAW_AVERAGE_PRICE]) / df[RETAIL_STANDARD_PRICE]
    df["nielsen_promo_volume_share"] = df[PROMO_VOLUME] / df[VOLUME]

    _sellout_inferred_promo_weeks = (
        (df["price_inferred_discount"] > config.transformation.missing_promos_inferral.price_inferred_discount_threshold)
        | (df["nielsen_promo_volume_share"] > config.transformation.missing_promos_inferral.nielsen_promo_volume_share_threshold)
    ) & (df[PROMO_ON] == 0)

    df_sellout_promo, df_other = df[_sellout_inferred_promo_weeks], df[~_sellout_inferred_promo_weeks]
    df_sellout_promo = _update_sellout_promo_event_values(df_sellout_promo.copy(), config)
    df = (
        pd.concat([df_sellout_promo, df_other], axis=0)
        .sort_values(by=[*config['input_scope']['universe_cols'], START_DATE_WEEK])
        .reset_index(drop=True)
    )
    return df.drop(columns=[RETAIL_STANDARD_PRICE, "price_inferred_discount", "nielsen_promo_volume_share", "unknown_event_number"])

In [None]:
config_HFRA = {
    'transformation': {
        'price_smoothing_window': 20,
        'accepted_outliers': 2,
        'sellout_inferred_promo_event_depth': 0.3,
        'missing_promos_inferral': {
            'price_inferred_discount_threshold': 0.18727472247942986,
            'nielsen_promo_volume_share_threshold': 0.6883282528207958
        }
    },
    'input_scope': {
        'universe_cols': ["product_family", "customer"]
    }
}

config_HUK = {
    'transformation': {
        'price_smoothing_window': 10,
        'accepted_outliers': 1,
        'sellout_inferred_promo_event_depth': None, # TODO: Verify, originally we have "null"
        'missing_promos_inferral': {
            'price_inferred_discount_threshold': 0.1, 
            'nielsen_promo_volume_share_threshold': 0.85
        }
    },
    'input_scope': {
        'universe_cols': ["product_family", "customer"]
    }
}
config_map = {
    "HFRA": config_HFRA,
    "HUK": config_HUK
}

In [None]:
# TODO: Please implement below logic and optionally adjust code if needed.

df_extracted = "SQL CODE HERE"

for opco, configuration in config_map.items():
    df_preprocessed = infer_missing_promos_from_sellout(df_extracted, configuration)