In [7]:
import pandas as pd
import numpy as np
import pyarrow.dataset as ds
import s3fs

## Dataset Import

In [25]:
# Import FIP Dataset

s3_path_fip = (
    "s3://m3-intel-hub-dp-us-east-1-517292-prod/"
    "publish/data-product/financial_inventory_projection_report_network_update/"
)

dataset = ds.dataset(
    s3_path_fip,
    format="parquet",
    partitioning="hive" 
)

table = dataset.to_table(
    filter=(
        ds.field("date").isin(["202612", "202712", "202812"])
    ) &
    ~(
        (ds.field("snapshot_date") == "2026-01-23") &
        (ds.field("snapshot_type") == "friday")
    )
)


df_fip = table.to_pandas()
df_fip.head()

Unnamed: 0,material,plant,date,quantity,total_cost,concost_source,unit_of_measure,cost_per_unit,source,corporate_brand,material_type,development_lifecycle_status,enterprise_category,enterprise_sub_category,dosage_form_parent,corp_brand_id,network_or_business_unit,snapshot_type,snapshot_date
0,1362552,1734,202712,200.0,,missing,,,rr,,HIBE,CLINICAL,OPERATING SUPPLY,OPERATING SUPPLY,,00000nan,,friday,2025-07-18
1,1371110,1734,202612,2460.0,,missing,,,rr,,HALB,CLINICAL,OTHER PROCESS MATERIALS,OTHER PROCESS MATERIALS,,00000nan,,friday,2025-07-18
2,1371204,1734,202712,1411.666667,,missing,,,rr,,HALB,CLINICAL,OTHER PROCESS MATERIALS,OTHER PROCESS MATERIALS,,00000nan,,friday,2025-07-18
3,1379260,1734,202612,77.9,,missing,,,rr,,HALB,CLINICAL,OTHER PROCESS MATERIALS,OTHER PROCESS MATERIALS,,00000nan,,friday,2025-07-18
4,1382958,1734,202812,0.2,,missing,,,rr,,HALB,CLINICAL,OTHER PROCESS MATERIALS,OTHER PROCESS MATERIALS,,00000nan,,friday,2025-07-18


In [15]:
df_fip.shape

(2297304, 19)

In [16]:
# Import Plant Type Data

s3_path_plants = (
    "s3://m3-intel-hub-dp-us-east-1-517292-prod/"
    "refined/data-asset/fin_inv_proj/"
    "bms_internal_vs_external_plants/"
    "bms_internal_vs_external_plants.parquet"
)

df_plants = pd.read_parquet(s3_path_plants)
#df_plants.head()

In [17]:
# Import node type Dataset

fs = s3fs.S3FileSystem()  # uses SageMaker execution role

parquet_files_nt = fs.glob(
    "s3://m3-intel-hub-dp-us-east-1-517292-prod/"
    "dbt_intelligence_hub/intelligence_hub_db_sandbox_staging/"
    "src__sap_t001w/data/*.parquet"
)

df_ntype = pd.read_parquet(
    parquet_files_nt,
    engine="pyarrow",
    dtype_backend="pyarrow",
    filesystem=fs
)

#df_ntype.head()

In [18]:
# Import Material Master Data

fs = s3fs.S3FileSystem()  # uses SageMaker execution role

parquet_files = fs.glob(
    "s3://m3-intel-hub-dp-us-east-1-517292-prod/"
    "refined/data-asset/fin_inv_proj/"
    "sap_material_master/data/*.parquet"
)

df_mm = pd.read_parquet(
    parquet_files,
    engine="pyarrow",
    dtype_backend="pyarrow",
    filesystem=fs
)

#df_mm.head()

In [19]:
# import boto3

# s3 = boto3.client("s3")

# bucket = "m3-intel-hub-dp-us-east-1-517292-prod"
# prefix = "dbt_intelligence_hub/intelligence_hub_db_sandbox_staging/src__sap_t001w"

# response = s3.list_objects_v2(
#     Bucket=bucket,
#     Prefix=prefix
# )

# if "Contents" in response:
#     for obj in response["Contents"]:
#         print(obj["Key"], obj["Size"])
# else:
#     print("No objects found or no access.")

## Data Prep

In [26]:
# Create has_non_zero flag at material–plant level

df_fip["has_non_zero"] = (
    df_fip
    .groupby(["material", "plant"])["total_cost"]
    .transform(lambda x: (x != 0).any())
    .astype(int)
)

# Apply the filter
df_fip = df_fip.loc[df_fip["has_non_zero"] == 1].drop(columns="has_non_zero")

In [27]:
df_fip.shape

(1727882, 19)

In [28]:
base1 = df_fip.copy()

In [29]:
# Join material master
mm_cols = [
    "material_number",
    "plant",
    "profit_center",
    #"corporate_brand",
    "brand_name",
    #"corp_brand_id",
    "material_description",
    #"material_type",
    "material_group",
    "old_material_number",
    "base_unit_of_measure",
    "unit_of_weight",
    #"development_lifecycle_status",
    "plant_specific_material_status",
    "mrp_type",
    "procurement_type",
    "safety_stock",
    "minimum_lot_size",
    "maximum_lot_size",
    "fixed_lot_size",
    "total_replenishment_lead_time",
    "total_shelf_life",
    "batch_management",
    "abc_indicator",
    #"valuation_class",
    #"price_unit",
    #"price_control_indicator",
]

material_master_sel = (
    df_mm[mm_cols]
    .drop_duplicates(subset=["material_number", "plant"])
)


base1 = base1.merge(
    material_master_sel,
    left_on=["material", "plant"],
    right_on=["material_number", "plant"],
    how="left",
    validate="m:1" 
)
base1 = base1.drop(columns=["material_number"])

base1 = base1.merge(
    df_plants[["Plant", "Plant Type"]],
    left_on=["plant"],
    right_on=["Plant"],
    how="left"
).drop(columns=["Plant"])

node_type_lkp = (
    df_ntype[["werks", "nodetype"]]
    .drop_duplicates(subset=["werks"])
)

base1 = base1.merge(
    node_type_lkp,
    left_on="plant",
    right_on="werks",
    how="left",
    validate="m:1"
).drop(columns=["werks"])

base1.shape

(1727882, 39)

In [30]:
# rearrange columns for better readability
new_cols = [
    'corporate_brand',
    'material_type', 'development_lifecycle_status', 'enterprise_category',
    'enterprise_sub_category', 'dosage_form_parent', 'corp_brand_id',
    'network_or_business_unit',
    'profit_center', 'brand_name', 'material_description', 'material_group',
    'old_material_number', 'base_unit_of_measure', 'unit_of_weight',
    'plant_specific_material_status', 'mrp_type', 'procurement_type',
    'safety_stock', 'minimum_lot_size', 'maximum_lot_size',
    'fixed_lot_size', 'total_replenishment_lead_time', 'total_shelf_life',
    'batch_management', 'abc_indicator',
    'source', 'concost_source', 'Plant Type', 'nodetype',
    'unit_of_measure',
    'material', 'plant', 'date', 'quantity', 'total_cost',
    'cost_per_unit', 'snapshot_type', 'snapshot_date'
]

base1 = base1[new_cols]


In [31]:
# Add material entry flag 

base = base1.copy()
first_seen = (
    base.groupby(["material", "plant","date"])["snapshot_date"]
      .transform("min")
)

base["sku_status"] = np.where(
    base["snapshot_date"] == first_seen,
    "NEW",
    "EXISTING"
)


In [32]:
# fip copy df for data prep
df = base.copy()
df["snapshot_date"] = pd.to_datetime(df["snapshot_date"])


# snapshot lookup table
snapshot_calendar = (
    df[["snapshot_type", "snapshot_date"]]
    .drop_duplicates()
    .sort_values(["snapshot_type", "snapshot_date"])
)


# attach prev snapshot to snapshot calendar
snapshot_calendar["prev_snapshot_date"] = (
    snapshot_calendar
    .groupby("snapshot_type")["snapshot_date"]
    .shift(1)
)
snapshot_calendar      # comparing bd13 - bd13 snapshots and friday-friday snapshots. no bd13-fri snapshots

Unnamed: 0,snapshot_type,snapshot_date,prev_snapshot_date
307116,bd13,2025-08-26,NaT
513147,bd13,2025-09-18,2025-08-26
741286,bd13,2025-10-17,2025-09-18
999877,bd13,2025-11-19,2025-10-17
1260991,bd13,2025-12-17,2025-11-19
1605209,bd13,2026-01-23,2025-12-17
0,friday,2025-07-18,NaT
50994,friday,2025-07-25,2025-07-18
101996,friday,2025-08-01,2025-07-25
153013,friday,2025-08-08,2025-08-01


In [33]:
# Attach previous snapshot date to each row

df = df.merge(
    snapshot_calendar[["snapshot_type", "snapshot_date", "prev_snapshot_date"]],
    on=["snapshot_type", "snapshot_date"],
    how="left"
)

In [34]:
# Prepare current and previous frames

# Current snapshot frame
current_df = df.copy()

current_df = current_df.rename(columns={
    "quantity": "quantity_curr",
    "cost_per_unit": "cost_per_unit_curr",
    "total_cost": "total_cost_curr",
})

# Previous snapshot frame
previous_df = df.rename(columns={
    "snapshot_date": "snapshot_date_prev",
    "quantity": "quantity_prev",
    "cost_per_unit": "cost_per_unit_prev",
    "total_cost": "total_cost_prev",
})[
    [
        "material",
        "plant",
        "date",
        "snapshot_type",
        "snapshot_date_prev",
        "quantity_prev",
        "cost_per_unit_prev",
        "total_cost_prev",
    ]
]


# Join current to previous snapshot
rca_base = current_df.merge(
    previous_df,
    left_on=[
        "material",
        "plant",
        "date",
        "snapshot_type",
        "prev_snapshot_date",
    ],
    right_on=[
        "material",
        "plant",
        "date",
        "snapshot_type",
        "snapshot_date_prev",
    ],
    how="left"
)

In [35]:
rca_base.shape

(1729942, 45)

In [36]:
# Flags for material–plant presence 

# 1. Identify NEW in current snapshot
# Present now, but not present in previous snapshot
rca_base["is_new_in_current_snapshot"] = (
    rca_base["prev_snapshot_date"].notna() &
    rca_base["quantity_prev"].isna()
)


# 2. Identify DROPPED in current snapshot
# Present in previous snapshot but missing in current snapshot

# Identify valid previous snapshots (calendar-safe)
valid_prev_snapshots = (
    snapshot_calendar["prev_snapshot_date"]
        .dropna()
        .unique()
)

# Restrict previous snapshot data to valid transitions
previous_df_valid = previous_df[
    previous_df["snapshot_date_prev"].isin(valid_prev_snapshots)
]

# Anti-join: rows present in previous but missing in current
prev_only = previous_df_valid.merge(
    current_df[
        [
            "material",
            "plant",
            "date",
            "snapshot_type",
            "prev_snapshot_date",
        ]
    ],
    left_on=[
        "material",
        "plant",
        "date",
        "snapshot_type",
        "snapshot_date_prev",
    ],
    right_on=[
        "material",
        "plant",
        "date",
        "snapshot_type",
        "prev_snapshot_date",
    ],
    how="left",
    indicator=True
).query("_merge == 'left_only'")

# Mark dropped rows
prev_only["is_dropped_in_current_snapshot"] = True

# Ensure column alignment for concat
for col in rca_base.columns:
    if col not in prev_only.columns:
        prev_only[col] = np.nan

# Current snapshot rows are NOT dropped
rca_base["is_dropped_in_current_snapshot"] = False


# 3. Combine current + dropped rows

final_rca_frame = pd.concat(
    [rca_base, prev_only[rca_base.columns]],
    ignore_index=True
)

# 4. Normalize boolean flags

for col in [
    "is_new_in_current_snapshot",
    "is_dropped_in_current_snapshot",
]:
    final_rca_frame[col] = (
        final_rca_frame[col]
            .replace({1: True, 0: False})
            .fillna(False)
            .astype("boolean")
    )

  final_rca_frame = pd.concat(
  .fillna(False)


In [37]:
final_rca_frame.shape

(1774724, 47)

In [40]:
snapshot_mapping_check = (
    final_rca_frame
    .loc[:, ["snapshot_type", "snapshot_date", "prev_snapshot_date"]]
    .drop_duplicates()
    .sort_values(["snapshot_type", "snapshot_date"])
)

snapshot_mapping_check

Unnamed: 0,snapshot_type,snapshot_date,prev_snapshot_date
307846,bd13,2025-08-26,NaT
514303,bd13,2025-09-18,2025-08-26
742786,bd13,2025-10-17,2025-09-18
1001867,bd13,2025-11-19,2025-10-17
1263051,bd13,2025-12-17,2025-11-19
1607269,bd13,2026-01-23,2025-12-17
1730876,bd13,NaT,NaT
0,friday,2025-07-18,NaT
50994,friday,2025-07-25,2025-07-18
102142,friday,2025-08-01,2025-07-25


Error handling

1. Division by zero & invalid math -
    Previous quantity = 0
    Previous cost = 0
    Previous FIP = 0
    Volatility = 0
2. Min rolling window
   less than min periods of 3
   Newly introduced SKU
   Flat history causing volatility = 0
3. double counting due to duplicates
4. missing prev snapshot data - nulls
   SKU appears for first time
   SKU disappears and reappears
   Previous quantity / cost not available
5. Extreme values in the history
    Very large quantities or costs in the past pushing the present numbers. The outliers in the past affects the current numbers. Exlcude the historical outliers
   

# Data Transformations

### Driver Calculations

In [41]:
# Step 1: Compute raw change metrics (always runs)

# Base deltas

final_rca_frame["delta_quantity"] = (
    final_rca_frame["quantity_curr"] - final_rca_frame["quantity_prev"]
)

final_rca_frame["delta_cost_per_unit"] = (
    final_rca_frame["cost_per_unit_curr"] - final_rca_frame["cost_per_unit_prev"]
)


# Impact decomposition

# Quantity Impact 
final_rca_frame["quantity_impact"] = (
    final_rca_frame["delta_quantity"] *
    final_rca_frame["cost_per_unit_prev"]
)

# Cost Impact
final_rca_frame["cost_impact"] = (
    final_rca_frame["delta_cost_per_unit"] * final_rca_frame["quantity_prev"]
)

# Intercation
final_rca_frame["interaction_impact"] = (
    final_rca_frame["delta_quantity"] *
    final_rca_frame["delta_cost_per_unit"]
)

# Total Change
final_rca_frame["total_fip_change"] = (
    final_rca_frame["quantity_impact"] +
    final_rca_frame["cost_impact"] +
    final_rca_frame["interaction_impact"]
)

# Core Metrics

# delta_quantity_pct 
final_rca_frame["delta_quantity_pct"] = np.where(
    final_rca_frame["quantity_prev"] > 0,
    final_rca_frame["delta_quantity"] / final_rca_frame["quantity_prev"],
    np.nan
)

# delta_cost_per_unit_pct

final_rca_frame["delta_cost_per_unit_pct"] = np.where(
    final_rca_frame["cost_per_unit_prev"] > 0,
    final_rca_frame["delta_cost_per_unit"] / final_rca_frame["cost_per_unit_prev"],
    np.nan
)


# Contribution shares (absolute, normalized)
impact_abs_sum_qc = (
    final_rca_frame["quantity_impact"].abs() +
    final_rca_frame["cost_impact"].abs()
)

impact_abs_sum_all = (
    impact_abs_sum_qc +
    final_rca_frame["interaction_impact"].abs()
)


# quantity_impact_pct_of_total 
final_rca_frame["quantity_impact_pct_of_total"] = np.where(
    impact_abs_sum_qc > 0,
    final_rca_frame["quantity_impact"].abs() / impact_abs_sum_qc,
    0
)

# cost_impact_pct_of_total 
final_rca_frame["cost_impact_pct_of_total"] = np.where(
    impact_abs_sum_qc > 0,
    final_rca_frame["cost_impact"].abs() / impact_abs_sum_qc,
    0
)

# interaction_pct
final_rca_frame["interaction_pct"] = np.where(
    impact_abs_sum_all > 0,
    final_rca_frame["interaction_impact"].abs() / impact_abs_sum_all,
    0
)


# Dominance Score

final_rca_frame["abs_qty_impact"] = final_rca_frame["quantity_impact"].abs()
final_rca_frame["abs_cost_impact"] = final_rca_frame["cost_impact"].abs()

final_rca_frame["dominance_score"] = np.where(
    (final_rca_frame["abs_qty_impact"] + final_rca_frame["abs_cost_impact"]) == 0,
    0.0,  # avoid divide-by-zero → treat as neutral
    (final_rca_frame["abs_qty_impact"] - final_rca_frame["abs_cost_impact"]) /
    (final_rca_frame["abs_qty_impact"] + final_rca_frame["abs_cost_impact"])
)

### Data Sufficiency Metrics

In [111]:
final_rca_frame = final_rca_frame.sort_values(
    ["material", "plant","date", "snapshot_date"]
)

final_rca_frame["is_qty_present_greater0"] = (
    final_rca_frame["quantity_curr"] > 0
).astype(int)

# 1) history_weeks_count
final_rca_frame["history_weeks_count"] = (
    final_rca_frame
    .groupby(["material", "plant","date"])["is_qty_present_greater0"]
    .rolling(window=12, min_periods=1)
    .sum()
    .reset_index(level=[0,1,2], drop=True)
)

final_rca_frame["hist_count"] = (
    final_rca_frame
    .groupby(["material", "plant","date"])["delta_quantity"]
    .cumcount()
)

final_rca_frame["has_sufficient_6periods"] = (
    final_rca_frame["history_weeks_count"] >= 6
)


# 2) presence_stability_score
final_rca_frame["presence_stability_score"] = (
    final_rca_frame["history_weeks_count"] / 12
).clip(upper=1.0)


# 3) history_confidence
final_rca_frame["history_confidence"] = (
    final_rca_frame["history_weeks_count"] / 12
).clip(upper=1.0)                                       # check


# 4) sku_presence_class
conditions = [
    final_rca_frame["is_new_in_current_snapshot"] == True,  
    final_rca_frame["is_dropped_in_current_snapshot"] == True,
    (
        (final_rca_frame["quantity_prev"] == 0) &
        (final_rca_frame["quantity_curr"] > 0) &
        (final_rca_frame["history_weeks_count"] < 6)
    ),
    (
        (final_rca_frame["presence_stability_score"] >= 0.75) &
        (final_rca_frame["history_weeks_count"] >= 6)
    )
]

choices = [
    "ENTERED",
    "OUT",
    "REACTIVATED",
    "STABLE"
]

final_rca_frame["sku_presence_class"] = np.select(
    conditions,
    choices,
    default="ERRATIC"
)


# 5) uom_change_flag
final_rca_frame = final_rca_frame.sort_values(
    ["material", "plant","date", "snapshot_date"]
)

final_rca_frame["unit_of_measure_prev"] = (
    final_rca_frame
    .groupby(["material", "plant","date"])["base_unit_of_measure"]
    .shift(1)
)

final_rca_frame["uom_change_flag"] = (
    final_rca_frame["unit_of_measure_prev"].notna() &
    (final_rca_frame["unit_of_measure_prev"] != final_rca_frame["unit_of_measure"])
)


# 6) duplicate_sku_plant_flag
dup_counts = (
    final_rca_frame
    .groupby(["material", "plant","date", "snapshot_date"])
    .size()
    .rename("dup_count")
    .reset_index()
)
final_rca_frame = final_rca_frame.merge(
    dup_counts,
    on=["material", "plant","date", "snapshot_date"],
    how="left"
)
final_rca_frame["duplicate_sku_plant_flag"] = (
    final_rca_frame["dup_count"] > 1
)


# 7) dominance_instability_flag

final_rca_frame["dominance_score_var_4"] = (
    final_rca_frame
    .groupby(["material", "plant","date"])["dominance_score"]
    .rolling(window=4, min_periods=2)
    .var()
    .reset_index(level=[0,1,2], drop=True)
)
final_rca_frame["dominance_instability_flag"] = (
    final_rca_frame["dominance_score_var_4"] > 0.25
)


# 8) RCA Mode
conditions = [
    final_rca_frame["uom_change_flag"] |
    final_rca_frame["duplicate_sku_plant_flag"],

    final_rca_frame["sku_presence_class"].isin(["ENTERED", "OUT"]),

    final_rca_frame["has_sufficient_6periods"] == False,

    final_rca_frame["sku_presence_class"] == "STABLE"
]

choices = [
    "NO_RCA",
    "ENTRY_EXIT_RCA",
    "LIMITED_RCA",
    "FULL_TEMPORAL_RCA"
]

final_rca_frame["rca_mode"] = np.select(
    conditions,
    choices,
    default="LIMITED_RCA"
)


### Noise vs Signal Determination 

In [112]:
# Sorting

final_rca_frame = final_rca_frame.sort_values(
    ["material", "plant","date", "snapshot_date"]
)

#  1) Business materiality (value-based)   
# How big the change is in value terms, relative to prior fip.
final_rca_frame["relative_fip_impact"] = np.where(
    final_rca_frame["total_cost_prev"] > 0,
    final_rca_frame["total_fip_change"].abs() /
    final_rca_frame["total_cost_prev"],
    np.nan
)


#### Persistance

In [113]:
# Temporal persistence (directional consistency)

final_rca_frame["quantity_change_sign"] = np.sign(
    final_rca_frame["delta_quantity"]
) 

def persistence_score(series):
    score = []
    current = 0
    prev = 0
    for v in series:
        if v == 0 or pd.isna(v):
            current = 0
        elif v == prev:
            current += 1
        else:
            current = 1
        score.append(current)
        prev = v
    return score

final_rca_frame["quantity_persistence_score"] = (
    final_rca_frame
    .groupby(["material", "plant","date"])["quantity_change_sign"]
    .transform(persistence_score)
)


In [114]:
# Outlier Identification

# PARAMETERS

ROLLING_WINDOW = 12
MIN_PERIODS = 6

EXTREME_PCT_CHANGE = 0.5      # 50%
LOW_LEVEL_FLOOR = 0.05        # collapse threshold (5%)
HIGH_LEVEL_MULT = 100         # spike threshold (100x)

ROBUST_Z_THRESHOLD = 3
MIN_ZSCORE_POINTS = 3
PCTL_FALLBACK = 0.95

# 1) Rolling median of quantity (baseline scale)

final_rca_frame["quantity_rolling_median_12w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["quantity_curr"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(ROLLING_WINDOW, min_periods=MIN_PERIODS)
                      .median()
        )
)

# 2) Structural extreme change (scale-based)

final_rca_frame["is_structural_extreme_qty"] = (
    (final_rca_frame["delta_quantity_pct"].abs() >= EXTREME_PCT_CHANGE) &
    (
        (final_rca_frame["quantity_curr"] <=
         LOW_LEVEL_FLOOR * final_rca_frame["quantity_rolling_median_12w"]) |
        (final_rca_frame["quantity_curr"] >=
         HIGH_LEVEL_MULT * final_rca_frame["quantity_rolling_median_12w"])
    )
)



# 3) CLEAN delta series (absolute, exclude structural extremes)

final_rca_frame["clean_delta_quantity"] = final_rca_frame["delta_quantity"]

final_rca_frame.loc[
    final_rca_frame["is_structural_extreme_qty"],
    "clean_delta_quantity"
] = np.nan



# 4) Robust rolling MAD of absolute delta quantity

final_rca_frame["delta_qty_rolling_mad_12w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["clean_delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(ROLLING_WINDOW, min_periods=MIN_PERIODS)
                      .apply(
                          lambda s: np.nanmedian(
                              np.abs(s - np.nanmedian(s))
                          ),
                          raw=True
                      )
        )
)

# 5) Robust MAD-based z-score (absolute delta)

def robust_zscore(series, min_points=MIN_ZSCORE_POINTS):
    valid = series.dropna()

    if len(valid) < min_points:
        return pd.Series(np.nan, index=series.index)

    median = np.nanmedian(valid)
    mad = np.nanmedian(np.abs(valid - median))

    if mad == 0 or np.isnan(mad):
        return pd.Series(np.nan, index=series.index)

    return (series - median) / (1.4826 * mad)


final_rca_frame["quantity_z_scr"] = (
    final_rca_frame
        .groupby(["material", "plant"])["clean_delta_quantity"]
        .transform(lambda x: robust_zscore(x.shift(1)))
)



# 6) Z-score availability (CORRECT, LOCAL gating)

final_rca_frame["qty_z_hist_count"] = (
    final_rca_frame
        .groupby(["material", "plant"])["clean_delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(ROLLING_WINDOW, min_periods=1)
                      .count()
        )
)

final_rca_frame["has_quantity_zscore"] = (
    (final_rca_frame["qty_z_hist_count"] >= MIN_ZSCORE_POINTS) &
    (final_rca_frame["delta_qty_rolling_mad_12w"] > 0)
)



# 7) Statistical outlier (MAD-based)

final_rca_frame["is_statistical_outlier_qty"] = (
    final_rca_frame["has_quantity_zscore"] &
    (final_rca_frame["quantity_z_scr"].abs() > ROBUST_Z_THRESHOLD)
)


# 8) Percentile fallback (ONLY when z-score unavailable)
final_rca_frame["qty_abs_pct_threshold"] = (
    final_rca_frame
        .groupby(["material", "plant"])["clean_delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .abs()
                      .quantile(PCTL_FALLBACK)
        )
)

# Fallback scale must be valid (non-zero)
final_rca_frame["has_valid_pct_scale"] = (
    final_rca_frame["qty_abs_pct_threshold"] > 0
)

final_rca_frame["is_pct_outlier_qty"] = (
    final_rca_frame["clean_delta_quantity"].abs() >
    final_rca_frame["qty_abs_pct_threshold"]
)

# 9) FINAL quantity outlier flag (hierarchical & SAFE)

final_rca_frame["is_quantity_outlier"] = (
    final_rca_frame["is_structural_extreme_qty"] |
    np.where(
        final_rca_frame["has_quantity_zscore"],
        final_rca_frame["is_statistical_outlier_qty"],
        final_rca_frame["has_valid_pct_scale"] &
        final_rca_frame["is_pct_outlier_qty"]
    )
)


#### Change Point Detection

In [115]:
# Change Point Detection

# PARAMETERS

CP_LONG_WINDOW = 12
CP_LONG_MIN = 6
CP_SHORT_WINDOW = 6
CP_SHORT_MIN = 3

CP_TREND_STRENGTH_THRESHOLD = 2
CP_FALLBACK_STRENGTH_THRESHOLD = 5   # conservative
MIN_PERSISTENCE_CP = 2


# 1) Rolling average of ABSOLUTE delta quantity (long window)

final_rca_frame["rolling_avg_delta_qty_12w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(CP_LONG_WINDOW, min_periods=CP_LONG_MIN)
                      .mean()
        )
)


# 2) rolling MAD of ABSOLUTE delta quantity

final_rca_frame["delta_qty_rolling_mad_12w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(CP_LONG_WINDOW, min_periods=CP_LONG_MIN)
                      .apply(
                          lambda s: np.nanmedian(
                              np.abs(s - np.nanmedian(s))
                          ),
                          raw=True
                      )
        )
)

# 3) Primary trend strength (MAD-based)

final_rca_frame["trend_strength"] = (
    final_rca_frame["rolling_avg_delta_qty_12w"].abs() /
    (1.4826 * final_rca_frame["delta_qty_rolling_mad_12w"])
)

# Invalidate degenerate cases
final_rca_frame.loc[
    (final_rca_frame["delta_qty_rolling_mad_12w"] <= 0) |
    (final_rca_frame["delta_qty_rolling_mad_12w"].isna()),
    "trend_strength"
] = np.nan

final_rca_frame["has_valid_cp_strength"] = (
    final_rca_frame["trend_strength"].notna()
)


# 4) GENERIC fallback scale (never collapses)

final_rca_frame["delta_qty_rolling_median_abs_12w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(CP_LONG_WINDOW, min_periods=CP_LONG_MIN)
                      .apply(lambda s: np.nanmedian(np.abs(s)), raw=True)
        )
)

final_rca_frame["fallback_change_strength"] = (
    final_rca_frame["rolling_avg_delta_qty_12w"].abs() /
    final_rca_frame["delta_qty_rolling_median_abs_12w"]
)

# Invalidate fallback when scale unusable
final_rca_frame.loc[
    (final_rca_frame["delta_qty_rolling_median_abs_12w"] <= 0) |
    (final_rca_frame["delta_qty_rolling_median_abs_12w"].isna()),
    "fallback_change_strength"
] = np.nan

final_rca_frame["use_fallback_cp"] = (
    final_rca_frame["trend_strength"].isna() &
    final_rca_frame["fallback_change_strength"].notna()
)

# 5) Short-window trend confirmation (ABSOLUTE delta)
final_rca_frame["rolling_avg_delta_qty_6w"] = (
    final_rca_frame
        .groupby(["material", "plant"])["delta_quantity"]
        .transform(
            lambda x: x.shift(1)
                      .rolling(CP_SHORT_WINDOW, min_periods=CP_SHORT_MIN)
                      .mean()
        )
)

final_rca_frame["trend_confirmed"] = (
    np.sign(final_rca_frame["rolling_avg_delta_qty_12w"]) ==
    np.sign(final_rca_frame["rolling_avg_delta_qty_6w"])
)


# 6) FINAL generic change-point detection
final_rca_frame["change_point_detected"] = (
    (final_rca_frame["quantity_persistence_score"] >= MIN_PERSISTENCE_CP) &
    final_rca_frame["trend_confirmed"] &
    (
        # Primary MAD-based path
        (
            final_rca_frame["has_valid_cp_strength"] &
            (final_rca_frame["trend_strength"] > CP_TREND_STRENGTH_THRESHOLD)
        )
        |
        # Fallback absolute-scale path
        (
            final_rca_frame["use_fallback_cp"] &
            (final_rca_frame["fallback_change_strength"] > CP_FALLBACK_STRENGTH_THRESHOLD)
        )
    )
)



#### Regime Stability

In [116]:
# Regime stability - validate the change point with stability

REGIME_STABILITY_WINDOW = 3        # how many snapshots must settle
REGIME_STABILITY_TOLERANCE = 0.2   # ±20% band around new level

# 1) Reference "new level" after change

final_rca_frame["post_cp_level"] = (
    final_rca_frame
        .groupby(["material", "plant"])["quantity_curr"]
        .shift(1)
)


# 2) Within-band check
final_rca_frame["within_new_regime_band"] = (
    final_rca_frame["quantity_curr"].between(
        final_rca_frame["post_cp_level"] * (1 - REGIME_STABILITY_TOLERANCE),
        final_rca_frame["post_cp_level"] * (1 + REGIME_STABILITY_TOLERANCE)
    )
)


# 3) Rolling stability confirmation
final_rca_frame["regime_stability_score"] = (
    final_rca_frame
        .groupby(["material", "plant"])["within_new_regime_band"]
        .transform(
            lambda x: x.shift(-1)   # look forward (post-change validation)
                      .rolling(REGIME_STABILITY_WINDOW, min_periods=REGIME_STABILITY_WINDOW)
                      .sum()
        )
)

final_rca_frame["is_regime_stable"] = (
    final_rca_frame["regime_stability_score"] >= REGIME_STABILITY_WINDOW
)


final_rca_frame["effective_change_point"] = (
    final_rca_frame["change_point_detected"] &
    final_rca_frame["is_regime_stable"]
)


#### Noise & Signal Flagging

In [117]:
# BASE METHOD 

final_rca_frame["is_noise_base"] = (
    final_rca_frame["is_quantity_outlier"] &
    (~final_rca_frame["effective_change_point"])
)

# insufficient data → cannot classify as noise yet
final_rca_frame.loc[
    ~final_rca_frame["has_sufficient_6periods"],
    "is_noise_base"
] = False

final_rca_frame["is_signal_base"] = ~final_rca_frame["is_noise_base"]


In [118]:
# New Method

final_rca_frame["is_explainable"] = (
    # data quality must be OK
    (~final_rca_frame["uom_change_flag"]) &
    (~final_rca_frame["duplicate_sku_plant_flag"]) &

    # must not be lifecycle noise
    (final_rca_frame["sku_presence_class"] == "STABLE")
)

final_rca_frame["is_noise_governed"] = (
    final_rca_frame["is_noise_base"] |
    (~final_rca_frame["is_explainable"])
)

final_rca_frame["is_signal_governed"] = (
    ~final_rca_frame["is_noise_governed"]
)

final_rca_frame["noise_reason"] = None

final_rca_frame.loc[
    final_rca_frame["uom_change_flag"] |
    final_rca_frame["duplicate_sku_plant_flag"],
    "noise_reason"
] = "DATA_QUALITY"

final_rca_frame.loc[
    final_rca_frame["sku_presence_class"] != "STABLE",
    "noise_reason"
] = "ERRATIC"


final_rca_frame.loc[
    final_rca_frame["noise_reason"].isna() &
    final_rca_frame["is_noise_base"],
    "noise_reason"
] = "STATISTICAL_NOISE"


#### Brand + Snapshot Date Level Top 10 SKUs

In [119]:
final_rca_frame["abs_fip_change"] = final_rca_frame["total_fip_change"].abs()

final_rca_frame["snapshot_signal_rank"] = (
    final_rca_frame
    .where(final_rca_frame["is_signal_governed"])
    .groupby(["snapshot_date", "corporate_brand"])["abs_fip_change"]
    .rank(method="first", ascending=False)
)
final_rca_frame["is_top10_contributor"] = (
    final_rca_frame["snapshot_signal_rank"] <= 10
)

final_rca_frame = final_rca_frame.drop(["snapshot_signal_rank","abs_fip_change"],axis=1)

In [120]:
final_rca_frame.head()

Unnamed: 0,corporate_brand,material_type,development_lifecycle_status,enterprise_category,enterprise_sub_category,dosage_form_parent,corp_brand_id,network_or_business_unit,profit_center,brand_name,...,regime_stability_score,is_regime_stable,effective_change_point,is_noise_base,is_signal_base,is_explainable,is_noise_governed,is_signal_governed,noise_reason,is_top10_contributor
0,,,,,,,,BIOLOGICS,,,...,,False,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
1,,,,,,,,BIOLOGICS,,,...,,False,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
2,,,,,,,,BIOLOGICS,,,...,3.0,True,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
3,,,,,,,,BIOLOGICS,,,...,3.0,True,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
4,,,,,,,,BIOLOGICS,,,...,3.0,True,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False


In [121]:
v = final_rca_frame[(final_rca_frame["corporate_brand"]=='REVLIMID') & (final_rca_frame ["snapshot_date"] > "2025-11-09") 
    &   (final_rca_frame ["material"] == '1456877')]
v.head()

Unnamed: 0,corporate_brand,material_type,development_lifecycle_status,enterprise_category,enterprise_sub_category,dosage_form_parent,corp_brand_id,network_or_business_unit,profit_center,brand_name,...,regime_stability_score,is_regime_stable,effective_change_point,is_noise_base,is_signal_base,is_explainable,is_noise_governed,is_signal_governed,noise_reason,is_top10_contributor
253553,REVLIMID,HALB,COMMERCIAL,API,API / DRUG SUBSTANCE,,3302101,PHARMA,10959,REVLIMID,...,1.0,False,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
253554,REVLIMID,HALB,COMMERCIAL,API,API / DRUG SUBSTANCE,,3302101,PHARMA,10959,REVLIMID,...,0.0,False,False,False,True,False,True,False,INSUFFICIENT_HISTORY,False
253555,REVLIMID,HALB,COMMERCIAL,API,API / DRUG SUBSTANCE,,3302101,PHARMA,10959,REVLIMID,...,1.0,False,False,True,False,False,True,False,LIFECYCLE_OR_ERRATIC,False
253556,REVLIMID,HALB,COMMERCIAL,API,API / DRUG SUBSTANCE,,3302101,PHARMA,10959,REVLIMID,...,1.0,False,False,False,True,False,True,False,LIFECYCLE_OR_ERRATIC,False
253557,REVLIMID,HALB,COMMERCIAL,API,API / DRUG SUBSTANCE,,3302101,PHARMA,10959,REVLIMID,...,1.0,False,False,True,False,False,True,False,LIFECYCLE_OR_ERRATIC,False


In [122]:
cols = ['material', 'plant', 'date', 'quantity_curr','quantity_prev', 'total_cost_curr','cost_per_unit_curr',
    "snapshot_date",'quantity_persistence_score','rolling_avg_delta_qty_12w','delta_qty_rolling_mad_12w', 'trend_strength','has_valid_cp_strength',
       'rolling_avg_delta_qty_6w', 'trend_confirmed','change_point_detected', 'is_noise_base', 'is_signal_base','is_top10_contributor']
# ,'quantity_z_scr','has_quantity_zscore',
#        'is_statistical_outlier_qty', 'qty_abs_pct_threshold',
#        'is_pct_outlier_qty','is_structural_extreme_qty', 'is_quantity_outlier']


v1 = final_rca_frame.loc[
    (final_rca_frame["snapshot_date"] >= "2025-11-28") &
    (final_rca_frame["material"] == "1456877"),
    cols
]
v1

Unnamed: 0,material,plant,date,quantity_curr,quantity_prev,total_cost_curr,cost_per_unit_curr,snapshot_date,quantity_persistence_score,rolling_avg_delta_qty_12w,delta_qty_rolling_mad_12w,trend_strength,has_valid_cp_strength,rolling_avg_delta_qty_6w,trend_confirmed,change_point_detected,is_noise_base,is_signal_base,is_top10_contributor
253556,1456877,2091,202812,6570763.0,6580000.0,62246870.0,9.47331,2025-11-28,1,534474.0,0.0,,False,1102698.0,True,False,False,True,False
253557,1456877,2091,202812,9925.612,6570763.0,94028.4,9.47331,2025-12-05,2,550579.3,0.0,,False,1089968.0,True,False,True,False,False
253558,1456877,2091,202812,71050000.0,9925.612,673078700.0,9.47331,2025-12-12,1,3842.845,4618.607,0.5612,True,-3504.959,False,False,True,False,False
253559,1456877,2091,202812,71050000.0,36188.53,673078700.0,9.47331,2025-12-17,2,5923849.0,22712.87,175.917042,True,11836510.0,True,True,True,False,False
253560,1456877,2091,202812,71050000.0,71050000.0,673078700.0,9.47331,2025-12-19,0,11841670.0,38190.56,209.137949,True,23353940.0,True,False,False,True,False
253561,1456877,2091,202812,71050000.0,71050000.0,673078700.0,9.47331,2025-12-26,0,11841670.0,38190.56,209.137949,True,23347910.0,True,False,False,True,False
253562,1456877,2091,202812,71050000.0,71050000.0,1479466000.0,20.82289,2026-01-02,0,11841670.0,38190.56,209.137949,True,22580640.0,True,False,False,True,True
253563,1456877,2091,202812,70000.0,71050000.0,1457602.0,20.82289,2026-01-09,1,11836070.0,22712.87,351.488811,True,22582170.0,True,False,True,False,False
253564,1456877,2091,202812,70000.0,70000.0,1457602.0,20.82289,2026-01-16,0,5921071.0,972708.0,4.105762,True,11845650.0,True,False,False,True,False
253565,1456877,2091,202812,0.0,71050000.0,0.0,20.82289,2026-01-23,1,5921071.0,972708.0,4.105762,True,5635.245,True,False,True,False,False


In [123]:
# 1478550 @1037

In [124]:
# final_rca_frame.to_csv('Allbrands_202812_output_4thFeb.csv')

In [125]:
brands_to_keep = [
    "All Other Pharmaceut",
    "ABRAXANE",
    "REVLIMID"
]

filtered_df = final_rca_frame[
    final_rca_frame["corporate_brand"].isin(brands_to_keep)
]

In [126]:
# filtered_df.to_csv('3brands_202812_output_4thFeb.csv')

In [128]:
filtered_df.shape

(67680, 111)

In [46]:
final_rca_frame.loc[
    final_rca_frame["corporate_brand"].str.contains(
        "All Other Pharmaceut", case=False, na=False
    ),
    "corporate_brand"
].unique()


array(['All Other Pharmaceut'], dtype=object)