In [1]:
print("hello world")

hello world


#### Pandas external library loading

In [2]:
import pandas as pd

#### Reading data from Excel

In [3]:
# Stock data loading
file_path = "C:\\Python repositories\\Education project\\src\\data\\input\\Stock.xlsx"

stock = pd.read_excel(file_path)
stock.tail()

Unnamed: 0,SKU,Date,Stock
20069,ef8df9d9-3740-45fb-bb23-3ab5a4cf4103,2024-06-01,0.7
20070,a2d70046-4084-439a-a314-edb49007151d,2024-06-01,0.7
20071,c7d88a1d-1ed1-4aad-84ed-186afde4e4be,2024-06-01,0.7
20072,e47462a3-427a-4727-903a-46fa27578068,2024-06-01,0.7
20073,06bbb978-eefb-4d23-aede-1eb453a21177,2024-06-01,0.7


#### Change Date to datetime format

In [4]:
# Create a string Period column from Date
# New column
stock['Period'] = stock['Date']
# Change format
stock['Period'] = pd.to_datetime(stock['Date']).dt.strftime('%Y-%m')
# Delete old column
del stock['Date']
stock.tail()

Unnamed: 0,SKU,Stock,Period
20069,ef8df9d9-3740-45fb-bb23-3ab5a4cf4103,0.7,2024-06
20070,a2d70046-4084-439a-a314-edb49007151d,0.7,2024-06
20071,c7d88a1d-1ed1-4aad-84ed-186afde4e4be,0.7,2024-06
20072,e47462a3-427a-4727-903a-46fa27578068,0.7,2024-06
20073,06bbb978-eefb-4d23-aede-1eb453a21177,0.7,2024-06


In [5]:
# Renaming column to a default input column for XYZ analysis 'Value'
stock.rename(columns={
                'Stock': 'Value',
},inplace=True)
stock.tail()

Unnamed: 0,SKU,Value,Period
20069,ef8df9d9-3740-45fb-bb23-3ab5a4cf4103,0.7,2024-06
20070,a2d70046-4084-439a-a314-edb49007151d,0.7,2024-06
20071,c7d88a1d-1ed1-4aad-84ed-186afde4e4be,0.7,2024-06
20072,e47462a3-427a-4727-903a-46fa27578068,0.7,2024-06
20073,06bbb978-eefb-4d23-aede-1eb453a21177,0.7,2024-06


In [6]:
import pandas as pd
import numpy as np
from typing import Literal # Used for type hinting data_mode

def assign_xyz_groups(
    df: pd.DataFrame, 
    data_mode: Literal["dense", "sparse"] = "dense"
) -> pd.DataFrame:
    """
    Add 'XYZ', 'CV', and threshold columns to the original DataFrame.

    This function automatically determines X/Y thresholds based on the 33rd/66th
    percentiles of the Coefficient of Variation (CV) for all *eligible* SKUs.

    NEW PARAMETER:
    - data_mode (str):
        - "dense" (default): Replicates Excel behavior. Calculates mean/std
          across the *entire* period range for *all* SKUs. Missing periods
          for a SKU are filled with 0.0 before calculating stats.
        - "sparse": Calculates mean/std *only* using the periods provided
          for each SKU in the input data.

    Rules:
    - Input DataFrame MUST have columns: ['SKU', 'Period', 'Value'].
    - Aggregation at (SKU, Period) is always SUM.
    - SKUs with < 2 aggregated periods => XYZ = "" (Only applies in "sparse" mode)
    - SKUs with mean == 0 => cv = 0 and XYZ = "".
    - ... (rest of classification logic) ...
    """
    
    # --- 0. Validate new parameter ---
    if data_mode not in ["dense", "sparse"]:
        raise ValueError(f"data_mode must be 'dense' or 'sparse', not '{data_mode}'")
        
    # --- 1. Validate required columns ---
    required = {"SKU", "Period", "Value"}
    if not required.issubset(df.columns):
        missing = required - set(df.columns)
        raise ValueError(f"Missing required column(s): {missing}")

    # Work on a copy to avoid mutating caller's DataFrame
    df_out = df.copy()

    # --- 2. Ensure Value is numeric or convertible ---
    if not pd.api.types.is_numeric_dtype(df_out["Value"]):
        coerced = pd.to_numeric(df_out["Value"], errors="coerce")
        if coerced.isna().all():
            raise TypeError("'Value' column must be numeric or convertible to numeric.")
        df_out["Value"] = coerced.fillna(0.0)
    else:
        df_out["Value"] = df_out["Value"].fillna(0.0)

    # --- 3. Aggregate (SKU, Period) by SUM ---
    # This initial aggregation is always needed to ensure one value per SKU/Period
    period_sum = (
        df_out.groupby(["SKU", "Period"], as_index=False)["Value"]
        .sum()
    )

    # --- 3b. (NEW) Densify data if required ---
    # This block creates the DataFrame used for calculating stats
    
    if data_mode == "dense":
        # 1. Get all unique SKUs and Periods from the *original* data
        all_skus = df_out['SKU'].unique()
        all_periods = df_out['Period'].unique()

        # 2. Create a "scaffold" (Cartesian product) of all possible combinations
        df_scaffold = pd.MultiIndex.from_product(
            [all_skus, all_periods], 
            names=['SKU', 'Period']
        ).to_frame(index=False)

        # 3. Merge the aggregated data (period_sum) onto the scaffold
        # This creates a "dense" DataFrame
        df_dense = pd.merge(
            df_scaffold,
            period_sum,
            on=["SKU", "Period"],
            how="left"
        )

        # 4. Fill missing values (where no sales occurred) with 0.0
        df_dense["Value"] = df_dense["Value"].fillna(0.0)
        
        # 5. This dense DataFrame is now the input for stats calculation
        stats_input_df = df_dense
        
    else: # data_mode == "sparse"
        # Use the original sparse aggregation
        stats_input_df = period_sum


    # --- 4. Compute per-SKU statistics across aggregated periods ---
    # MODIFIED: Uses 'stats_input_df' which is either dense or sparse
    sku_stats = (
        stats_input_df.groupby("SKU", as_index=False)["Value"]
        .agg(n_periods="count", mean_value="mean", std_value="std")
    )
    # Note: In "dense" mode, 'n_periods' will be the total # of periods
    # for all SKUs. In "sparse" mode, it's the count of non-zero periods.
    # The classification logic handles this correctly.

    # Fill std=NaN with 0.0 for single-period SKUs (applies to sparse mode)
    sku_stats["std_value"] = sku_stats["std_value"].fillna(0.0)

    # --- 5. Compute CV (Vectorized) ---
    # (Unchanged)
    sku_stats['cv'] = np.where(
        sku_stats['mean_value'] == 0,
        0.0,  # Set CV to 0 if mean is 0
        sku_stats['std_value'] / np.abs(sku_stats['mean_value'])
    )
    
    # Handle potential inf/-inf results
    sku_stats['cv'] = sku_stats['cv'].replace([np.inf, -np.inf], np.nan)


    # --- 6. Determine Dynamic Thresholds ---
    # (Unchanged)
    MIN_PERIODS = 2 # This rule now only has a real effect in "sparse" mode

    is_eligible = (
        (sku_stats['n_periods'] >= MIN_PERIODS) &
        (sku_stats['mean_value'] != 0)
    )
    
    eligible_cvs = sku_stats.loc[is_eligible, 'cv'].dropna()

    if eligible_cvs.empty:
        x_threshold = 0.0
        y_threshold = 0.0
    else:
        quantiles = eligible_cvs.quantile([0.33, 0.66])
        # Automatic version
        x_threshold = quantiles[0.33]
        y_threshold = quantiles[0.66]
        # Manual version 
        # x_threshold = 0.5
        # y_threshold = 1

        if pd.isna(x_threshold) or pd.isna(y_threshold):
            x_threshold = 0.0
            y_threshold = 0.0

    # --- 7. Classify into X/Y/Z or blank "" (Vectorized) ---
    # (Unchanged)
    # The (n_periods < MIN_PERIODS) rule works correctly:
    # - In "dense" mode, it's always False (as n_periods = total periods)
    # - In "sparse" mode, it works as originally intended
    conditions = [
        (sku_stats['n_periods'] < MIN_PERIODS),
        (sku_stats['mean_value'] == 0),
        (sku_stats['cv'].isna()),
        (sku_stats['cv'] <= x_threshold), # "X"
        (sku_stats['cv'] <= y_threshold)  # "Y"
    ]
    
    choices = ["", "", "", "X", "Y"]
    default_choice = "Z"

    sku_stats['XYZ'] = np.select(conditions, choices, default=default_choice)

    # --- 8. Merge SKU-level labels back into the original DataFrame ---
    # (Unchanged)
    # We merge back onto the *original* df_out, not the dense one
    cols_to_merge = sku_stats[["SKU", "XYZ", "cv"]].rename(
        columns={"cv": "CV"}
    )

    merged = pd.merge(
        df_out,
        cols_to_merge,
        on="SKU",
        how="left",
        validate="m:1"  # many original rows to one sku_stats row
    )

    merged["XYZ"] = merged["XYZ"].fillna("")
    
    # --- 8b. Add threshold columns for debugging ---
    # (Unchanged)
    merged['x_threshold_33'] = x_threshold
    merged['y_threshold_66'] = y_threshold
    
    # --- 9. Final validation: ensure only allowed values present ---
    # (Unchanged)
    allowed = {"", "X", "Y", "Z"}
    unique_vals = set(merged["XYZ"].unique())
    disallowed = unique_vals - allowed
    if disallowed:
        raise RuntimeError(f"Unexpected XYZ values found: {disallowed}")

    # --- 10. Final column ordering ---
    # (Unchanged)
    new_cols = ["XYZ", "CV", "x_threshold_33", "y_threshold_66"]
    
    original_cols_to_keep = [
        c for c in df.columns if c not in new_cols
    ]
    
    final_cols = original_cols_to_keep + new_cols
    final = merged[final_cols]

    return final

In [7]:
# XYZ classification operation
df_result = assign_xyz_groups(stock)
df_result.tail()

Unnamed: 0,SKU,Value,Period,XYZ,CV,x_threshold_33,y_threshold_66
20069,ef8df9d9-3740-45fb-bb23-3ab5a4cf4103,0.7,2024-06,Z,4.108462,2.300895,2.376692
20070,a2d70046-4084-439a-a314-edb49007151d,0.7,2024-06,Y,2.300895,2.300895,2.376692
20071,c7d88a1d-1ed1-4aad-84ed-186afde4e4be,0.7,2024-06,Y,2.300895,2.300895,2.376692
20072,e47462a3-427a-4727-903a-46fa27578068,0.7,2024-06,Y,2.300895,2.300895,2.376692
20073,06bbb978-eefb-4d23-aede-1eb453a21177,0.7,2024-06,Y,2.300895,2.300895,2.376692


In [8]:
import os

# Saving in the same folder as the Python (py or ipynb) file using one of two popular Excel libraries
def save_local_file(data: pd.DataFrame, name: str) -> None:
    """
    Saves a DataFrame to an Excel file using the preferred engine.
    Falls back to openpyxl if xlsxwriter is unavailable.
    """
    dump_file_name = f"{name}.xlsx"
    data_dump = os.path.join(os.getcwd(), dump_file_name)

    try:
        # Try xlsxwriter first (faster, supports formatting)
        writer = pd.ExcelWriter(data_dump, engine="xlsxwriter")
    except ModuleNotFoundError:
        print("⚠️  xlsxwriter not found. Falling back to openpyxl.")
        writer = pd.ExcelWriter(data_dump, engine="openpyxl")

    # Write to Excel
    data.to_excel(writer, sheet_name=name, index=False)

    # Save and close the writer properly
    writer.close()
    print(f"✅ Data successfully saved as: {data_dump}")

In [9]:
# Save output file
save_local_file(df_result, "xyz")

✅ Data successfully saved as: c:\Python repositories\Education project\src\code\xyz.xlsx
