In [None]:
import sys
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Tuple
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
import seaborn as sns
 
#sys.path.append('../utils')
# from azure_blob_utils import AzureBlob

In [None]:
def load_data(pick_path: str, material_path: str, qoh_path: str,
              res_mat_path: str, master_noun_path: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Load pick data from Azure and material base from local CSV.
 
    Args:
        blob (AzureBlob): AzureBlob instance for reading from blob storage.
        pick_path (str): Path to pick data CSV on blob.
        material_path (str): Path to material master data.
        qoh_path (str): Path to quantity on hand data.
        res_mat_path (str): Path to material with reserved bin data.
        master_noun_path (str): Path to validated master noun data.
 
    Returns:
        Tuple[pd.DataFrame, pd.DataFrame]: Loaded pick data and material base.
    """
    df_pick = pd.read_csv(pick_path)
    #material_base = blob.read_excel(material_path, sheet_name="Material Base Data")
    df_qoh = pd.read_csv(qoh_path)
    df_res_mat = pd.read_excel(res_mat_path)
    df_material = pd.read_csv(material_path)
    df_master_noun = pd.read_csv(master_noun_path)
    df_material = df_material[df_material['XStatus'] == 'ZA']    # Filter material master data to get the active materials

    # Step 1: Extract unique materials from qoh and reserved bin
    materials_qoh = df_qoh['Material'].astype(str).str.strip().unique()
    materials_res_mat = df_res_mat['Material Assigned'].astype(str).str.strip().unique()

    # Step 2: Combine into a set to ensure uniqueness
    material_set = set(materials_qoh).union(materials_res_mat)

    # Step 3: Filter active materials to include only rows where 'Material' is in the material_set
    material_base = df_material[df_material['Material'].astype(str).str.strip().isin(material_set)]
 
    df_master_noun['Material'] = df_master_noun['Material'].astype(str).str.strip()
    material_base['Material'] = material_base['Material'].astype(str).str.strip()
    df_pick['MATNR'] = df_pick['MATNR'].astype(str).str.strip()

    # Now merge
    material_base = material_base.merge(df_master_noun[['Material', 'Master_Noun']], 
                              on='Material', 
                              how='left')
 
    return df_pick, material_base

In [None]:
def prepare_data(df: pd.DataFrame, date_col: str = 'Dates', material_col: str = 'MATNR',
                 to_col: str = 'TO Number') -> pd.DataFrame:
    """
    Prepare the data by extracting monthly counts of material movements.
 
    Args:
        df (pd.DataFrame): Input DataFrame containing material, date, and transfer order number.
        date_col (str): Name of the column containing date.
        material_col (str): Name of the column containing material codes.
        to_col (str): Name of the column containing transfer order numbers.
 
    Returns:
        pd.DataFrame: Pivoted DataFrame with months as index and materials as columns.
    """
    df[date_col] = pd.to_datetime(df[date_col])
    df['Month'] = df[date_col].dt.to_period('M')
    grouped = df.groupby(['Month', material_col])[to_col].nunique().reset_index()
    pivot = grouped.pivot(index='Month', columns=material_col, values=to_col).fillna(0)
    pivot.index = pivot.index.to_timestamp()
    return pivot

In [None]:
def filter_pick_data_by_date(df: pd.DataFrame, start: str, end: str) -> pd.DataFrame:
    """
    Filter pick data using a date range with .between().
 
    Args:
        df (pd.DataFrame): Pick data with 'Dates' column.
        start (str): Start date in 'YYYY-MM-DD'.
        end (str): End date in 'YYYY-MM-DD'.
 
    Returns:
        pd.DataFrame: Filtered pick data.
    """
    df['Dates'] = pd.to_datetime(df['Dates'])
    return df[df['Dates'].between(start, end)]

In [None]:
def calculate_seasonality_strength(ts, model='additive', period=12):
    """
    Calculate the strength of seasonality (FS) for a time series.
 
    FS = max(0, 1 - Var(R_t) / Var(S_t + R_t))
 
    Args:
        ts (pd.Series): Time series indexed by datetime.
        model (str): Type of decomposition model - 'additive' or 'multiplicative'. Default is 'additive'.
        period (int): Number of periods in a season (e.g., 12 for monthly seasonality).
 
    Returns:
        float: Seasonality strength (FS) in range [0, 1].
    """
    result = seasonal_decompose(ts, model=model, period=period)
    R_t = result.resid.dropna()
    S_t = result.seasonal.loc[R_t.index]
    FS = max(0, 1 - (np.var(R_t) / np.var(S_t + R_t)))
    return FS

In [None]:
def get_significant_seasonal_months(ts, model='additive', period=12, top_n=3):
    """
    Identifies top N seasonal months based on magnitude from seasonal decomposition.

    Args:
        ts (pd.Series): Time series for one material with datetime index.
        model (str): Type of decomposition ('additive' or 'multiplicative').
        period (int): Seasonality period (e.g., 12 for monthly).
        top_n (int): Number of top seasonal months to return based on magnitude.

    Returns:
        List[str]: Fiscal month names with strongest seasonal impact.
    """
    result = seasonal_decompose(ts, model=model, period=period)
    seasonal = result.seasonal
 
    # Get top N months by absolute seasonal component
    monthly_avg = seasonal.groupby(seasonal.index.month).mean()
    positive_peaks = monthly_avg[monthly_avg > 0]  # only positive seasonal values
    top_months = positive_peaks.sort_values(ascending=False).head(top_n).index
 
 
    #top_months = seasonal.abs().groupby(seasonal.index.month).mean().nlargest(top_n).index
 
    # Map calendar months (1=Jan, ..., 12=Dec) to fiscal names (Apr=1, ..., Mar=12)
    fiscal_month_names = ['April', 'May', 'June', 'July', 'August', 'September',
                          'October', 'November', 'December', 'January', 'February', 'March']

    # Convert calendar month to fiscal index: (month - 4) % 12
    fiscal_indices = ((top_months - 4) % 12)
    unique_fiscal_indices = sorted(fiscal_indices.tolist(), key=lambda x: (x + 12) % 12)
 
    return [fiscal_month_names[i] for i in unique_fiscal_indices]

In [None]:
def analyze_seasonality(pivot, period=12):
    """
    Analyze seasonality strength and significant months for each material.
 
    Args:
        pivot (pd.DataFrame): Pivoted time series with 'Month' as index and materials as columns.
        period (int): Seasonality period (e.g., 12 for monthly data).
 
    Returns:
        seasonality_strength_df (pd.DataFrame): FS scores and seasonality category.
        seasonal_months_df (pd.DataFrame): Material and strong seasonal months.
    """
    seasonality_scores = []
    material_seasonal_months = []
 
    for material in pivot.columns:
        ts = pivot[material].dropna()
        if len(ts) == 0:
            continue
 
        fs = calculate_seasonality_strength(ts, period=period)
 
        # Assign category
        if fs >= 0.7:
            category = 'High'
        elif fs >= 0.3:
            category = 'Medium'
        else:
            category = 'Low'
 
        seasonality_scores.append({'Material': material, 'FS': fs, 'Seasonality': category})
 
        if fs >= 0.3:
            months = get_significant_seasonal_months(ts, period=period)
            for month in months:
                material_seasonal_months.append({'Material': material, 'seasonal_month': month})
 
    return pd.DataFrame(seasonality_scores), pd.DataFrame(material_seasonal_months)

In [None]:
def group_materials_by_season(months_df):
    """
    Groups materials by their seasonal months and assigns a calendar-ordered group name.
 
    Args:
        months_df (pd.DataFrame): DataFrame with 'Material' and 'seasonal_month' columns.
 
    Returns:
        pd.DataFrame: DataFrame with 'Material' and 'seasonal_group' (calendar sorted string of months).
    """
    calendar_order = ['January', 'February', 'March', 'April', 'May', 'June',
                      'July', 'August', 'September', 'October', 'November', 'December']
 
    # Create a mapping from month to calendar index
    month_order_map = {month: i for i, month in enumerate(calendar_order)}
 
    return (
        months_df.groupby('Material')['seasonal_month']
        .apply(lambda x: ', '.join(sorted(x.unique(), key=lambda m: month_order_map[m])))
        .reset_index(name='seasonal_group')
    )

In [None]:
def unpivot_and_add_seasonal_group(pivot, material_group_df):
    """
    Unpivots the pivot table and adds seasonal group info.
 
    Args:
        pivot (pd.DataFrame): Pivoted time series with 'Month' as index and materials as columns.
        material_group_df (pd.DataFrame): DataFrame with 'Material' and 'seasonal_group' columns.
 
    Returns:
        pd.DataFrame: Unpivoted DataFrame with seasonal group.
    """
    pivot_reset = pivot.reset_index()
    df_unpivoted = pd.melt(pivot_reset, id_vars='Month', var_name='Material', value_name='TO_Count')

    # Map seasonal group to materials
    material_to_group = dict(zip(material_group_df['Material'], material_group_df['seasonal_group']))
    df_unpivoted['seasonal_group'] = df_unpivoted['Material'].map(material_to_group)
 
    return df_unpivoted

In [None]:
def export_results(df1: pd.DataFrame, df2: pd.DataFrame, df3: pd.DataFrame, prefix: str = "seasonality_groups"):
    """
    Save Material Base with clusters, Monthly TOs, Seasonality Decomposition to a single Excel file with separate sheets.
 
    Args:
        df1, df2, df3 (pd.DataFrame): DataFrames to save.
        file_path (str): Path to the output Excel file.
    """
 
    timestamp = datetime.now().strftime('%Y%m%d')
#     file_path = f"{prefix}_{timestamp}.xlsx"
 
#     with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
#         df1.to_excel(writer, sheet_name='Material Base with clusters', index=False)
#         df2.to_excel(writer, sheet_name='Monthly TOs', index=False)
#         df3.to_excel(writer, sheet_name='Seasonality FS', index=False)
    df1.to_csv(r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\Material Base with clusters', index=False)
    df2.to_csv(r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\Monthly TOs', index=False)
    df3.to_csv(r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\Seasonality FS', index=False)
 
    print(f"✅ Data saved")

In [None]:
def decide_seasonality_threshold(seasonality_strength_df, percentile_method=False, domain_threshold=0.3):
    """
    Decide the optimal threshold for FS (Seasonality Strength).

    Args:
    - seasonality_strength_df (DataFrame): DataFrame containing 'FS' values for each material.
    - percentile_method (bool): Whether to use percentile method to determine threshold (default False).
    - domain_threshold (float): Initial threshold based on domain knowledge (default 0.3).

    Returns:
    - float: The chosen threshold for strong seasonality.
    - DataFrame: Materials with FS > threshold.
    """

    # Step 1: Plot Distribution of FS
    plt.hist(seasonality_strength_df['FS'], bins=30, edgecolor='black')
    plt.title('Distribution of FS (Seasonality Strength)')
    plt.xlabel('FS')
    plt.ylabel('Count')
    plt.show()

    # Step 2: Domain Knowledge-based Threshold (Initial Guess)
    print(f"Initial Domain Knowledge Threshold: {domain_threshold}")

    # Step 3: Empirical Threshold (Percentile-based)
    if percentile_method:
        percentile_90 = np.percentile(seasonality_strength_df['FS'], 90)
        percentile_80 = np.percentile(seasonality_strength_df['FS'], 80)
        print(f"90th Percentile Threshold: {percentile_90}")
        print(f"80th Percentile Threshold: {percentile_80}")

        # Use the 90th percentile as threshold (can change based on needs)
        chosen_threshold = percentile_90
    else:
        # Default to domain-based threshold if no percentile method is used
        chosen_threshold = domain_threshold
 
    # Step 4: Filter Materials Above Chosen Threshold
    seasonal_materials = seasonality_strength_df[seasonality_strength_df['FS'] > chosen_threshold]

    print(f"\n✅ Materials with strong seasonality (FS > {chosen_threshold}):")
    print(seasonal_materials.sort_values(by='FS', ascending=False))

    # Step 5: Return the chosen threshold and filtered materials
    return chosen_threshold, seasonal_materials

In [None]:
def main():
    """
    Full pipeline for seasonal clustering of materials for warehouse optimization.
    Returns:
        None
    """
    # Initialize blob and paths
#     blob = AzureBlob(storage_account_name="250501pdeus2nedcst01")
    pick_path = r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\4.1.2025 - Orders & Dispatch_Raw_Picking_Data.csv'
    material_path = r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\4.1.2025 - All Material IDs w Long Description.csv'
    qoh_path = r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\4.1.2025 - Inventory Management QOH.csv'
    res_mat_path = r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\14.04.25 - storage_bins_reserved_material.xlsx'
    master_noun_path = r'C:\Users\Shruti.Agarwal\Desktop\code for review\Seasonality Analysis\Validated_Master_Noun_List.csv'
 
 
    # Load and filter
    df_pick, material_base = load_data(pick_path, material_path, qoh_path, res_mat_path, master_noun_path)
    df_filtered = filter_pick_data_by_date(df_pick, '2022-04-01', '2025-03-31')
 
    # Prep subsets
    df = df_filtered[df_filtered['MATNR'].isin(material_base['Material'])]
    pivot = prepare_data(df, "Dates", "MATNR", "TO Number")
 
    # Analyze Seasonality
    seasonality_strength_df, seasonal_months_df = analyze_seasonality(pivot, period=12)
 
    # Identify Thresholds
    #chosen_threshold, seasonal_materials = decide_seasonality_threshold(seasonality_strength_df,
    #                                        percentile_method=False, domain_threshold=0.3)
 
    # Group Seasonal Months into Seasonal Group
    material_group_df = group_materials_by_season(seasonal_months_df)
 
    # Unpivot Data and Merge Seasonal Group
    unpivoted_df = unpivot_and_add_seasonal_group(pivot, material_group_df)
 
    # Merge Seasonality Category
    unpivoted_df = unpivoted_df.merge(
        seasonality_strength_df[['Material', 'Seasonality']], on='Material', how='left')
 
    # 🔹 Export
    final_df = material_base.merge(material_group_df[['Material', 'seasonal_group']], on='Material', how='left')
    export_results(final_df, unpivoted_df, seasonality_strength_df)

In [None]:
if __name__ == "__main__":
    main()