# Cell 1: Install Required Libraries

In [1]:
%pip install pandas numpy matplotlib seaborn statsmodels openpyxl

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


# Cell 2: Import Libraries

In [2]:
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.seasonal import STL
%matplotlib inline

# Cell 3: Data Loading Function

In [3]:
def load_excel_data(file_path):
    """Load Excel file with multiple sheets"""
    try:
        xls = pd.ExcelFile(file_path)
        sheets = {sheet_name: xls.parse(sheet_name) for sheet_name in xls.sheet_names}
        print(f"Successfully loaded {len(sheets)} sheets: {list(sheets.keys())}")
        return sheets
    except Exception as e:
        print(f"Error loading Excel file: {e}")
        return None

# Cell 4: Column Cleaning Function

In [4]:
def clean_column_names(df):
    """Clean and standardize column names"""
    original_cols = df.columns.tolist()
    df.columns = [re.sub(r'[^a-z0-9_]', '', col.lower().replace(' ', '_')) for col in df.columns]
    print(f"Cleaned {len(df.columns)} column names")
    return df

# Cell 5: Date Column Conversion

In [5]:
def convert_date_columns(df):
    """Convert date columns (e.g., Jan-23) to numeric"""
    date_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$', col)]
    print(f"Found {len(date_cols)} date columns: {date_cols[:5]}..." if len(date_cols) > 5 else f"Found {len(date_cols)} date columns: {date_cols}")
    
    for col in date_cols:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    return df

# Cell 6: Missing Values Handler

In [6]:
def handle_missing_values(df):
    """Handle missing values with interpolation or zero-filling"""
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$|^202\d$|^total_qty$', col)]
    
    # Calculate non-zero ratio for each product
    non_zero_ratio = (df[sales_cols] != 0).sum(axis=1) / len(sales_cols)
    consistent_products = non_zero_ratio > 0.5
    
    print(f"Found {consistent_products.sum()} consistent products (>50% non-zero sales)")
    
    # Apply interpolation for consistent products
    for idx in df[consistent_products].index:
        df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
    
    # Fill remaining NaNs with 0
    df[sales_cols] = df[sales_cols].fillna(0)
    
    # Remove products with all zero sales
    initial_rows = len(df)
    df = df[df[sales_cols].sum(axis=1) > 0]
    print(f"Removed {initial_rows - len(df)} products with all zero sales")
    
    # Handle string columns
    string_cols = df.select_dtypes(include=['object']).columns
    df[string_cols] = df[string_cols].fillna('Unknown')
    
    return df

# Cell 7: Outlier Detection Function

In [7]:
def detect_and_handle_outliers_stl(df, period=12, threshold=3, method='flag'):
    """Detect outliers using STL decomposition"""
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$')]
    outlier_flags = pd.DataFrame(0, index=df.index, columns=[f'{col}_outlier' for col in sales_cols])
    outlier_count = 0
    
    print(f"Analyzing {len(df)} products for outliers...")
    
    for idx, row in df.iterrows():
        sales = row[sales_cols].values
        if np.all(sales == 0) or len(sales) < period:
            continue
            
        try:
            stl = STL(sales, period=period, robust=True).fit()
            residuals = stl.resid
            z_scores = (residuals - np.mean(residuals)) / np.std(residuals)
            outliers = np.abs(z_scores) > threshold
            
            # Flag outliers
            for col, outlier in zip(sales_cols, outliers):
                outlier_flags.at[idx, f'{col}_outlier'] = int(outlier)
            
            # Cap outliers if method is 'cap'
            if method == 'cap':
                trend = stl.trend + stl.seasonal
                lower_bound = trend - threshold * np.std(residuals)
                upper_bound = trend + threshold * np.std(residuals)
                df.loc[idx, sales_cols] = np.clip(sales, lower_bound, upper_bound)
            
            if outliers.any():
                outlier_count += 1
                if outlier_count <= 5:  # Show first 5 examples
                    print(f"Outliers for product {row['product_id']}: {outliers.sum()} cases")
                    
        except Exception as e:
            continue
    
    print(f"Found outliers in {outlier_count} products")
    df = pd.concat([df, outlier_flags], axis=1)
    return df

# Cell 8: Smoothing Function

In [8]:
def apply_smoothing(df, window=3, min_variance=1000):
    """Apply moving average smoothing to noisy products"""
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$')]
    smoothed_count = 0
    
    for idx, row in df.iterrows():
        sales = row[sales_cols].values
        if np.var(sales) > min_variance:
            smoothed = pd.Series(sales).rolling(window=window, center=True, min_periods=1).mean()
            df.loc[idx, sales_cols] = smoothed
            smoothed_count += 1
    
    print(f"Applied smoothing to {smoothed_count} products with high variance")
    return df

# Cell 9: Rule-based Validation

In [9]:
def rule_based_validation(df, max_threshold=10):
    """Apply rule-based validation for negative and unrealistic sales"""
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$')]
    validation_flags = pd.DataFrame(0, index=df.index, columns=[f'{col}_invalid' for col in sales_cols])
    invalid_count = 0
    
    for idx, row in df.iterrows():
        sales = row[sales_cols]
        
        # Check for negative sales
        negative_sales = sales < 0
        for col in sales_cols[negative_sales]:
            validation_flags.at[idx, f'{col}_invalid'] = 1
            df.at[idx, col] = 0
            invalid_count += 1
        
        # Check for unrealistic sales (10x historical max)
        historical_max = sales.max()
        if historical_max > 0:
            unrealistic_sales = sales > historical_max * max_threshold
            for col in sales_cols[unrealistic_sales]:
                validation_flags.at[idx, f'{col}_invalid'] = 1
                invalid_count += 1
    
    print(f"Fixed {invalid_count} invalid sales values")
    df = pd.concat([df, validation_flags], axis=1)
    return df

# Cell 10: Zero Values Check

In [10]:
def check_zero_values(df, sheet_name):
    """Check and report zero values in sales columns"""
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$|^202\d$|^total_qty$', col)]
    zero_counts = (df[sales_cols] == 0).sum()
    total_zeros = zero_counts.sum()
    
    print(f"\n{'='*50}")
    print(f"Zero value check for sheet '{sheet_name}':")
    print(f"{'='*50}")
    
    if total_zeros == 0:
        print("✅ No zero values found in sales columns.")
    else:
        print(f"⚠️  Found {total_zeros} zero values across sales columns:")
        for col, count in zero_counts.items():
            if count > 0:
                print(f"   - {col}: {count} zeros")
    
    return total_zeros

# Cell 11: Yearly Totals Validation

In [11]:
def validate_yearly_totals(df):
    """Validate yearly totals against monthly sums"""
    year_cols = [col for col in df.columns if re.match(r'^202\d$', col)]
    monthly_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$', col)]
    
    print(f"Validating {len(year_cols)} yearly columns against monthly data")
    
    for year in year_cols:
        year_num = int(year)
        relevant_months = [col for col in monthly_cols if f"{year_num % 100:02d}" in col]
        
        if relevant_months:
            df[f'{year}_calc'] = df[relevant_months].sum(axis=1)
            df[f'{year}_discrepancy'] = df[year] - df[f'{year}_calc']
            df[f'{year}_flag'] = df[f'{year}_discrepancy'].abs() > 0.01  # Allow small rounding differences
            
            discrepancies = df[f'{year}_flag'].sum()
            if discrepancies > 0:
                print(f"   - {year}: {discrepancies} products have discrepancies")
    
    return df

# Cell 12: Visualization Function

In [12]:
def visualize_outliers(df, product_id, sheet_name):
    """Visualize sales trends for a specific product"""
    product_data = df[df['product_id'] == product_id]
    if product_data.empty:
        print(f"❌ No data found for product {product_id}")
        return
    
    sales_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$', col)]
    sales_data = product_data[sales_cols].iloc[0]
    
    # Create time series
    try:
        dates = pd.to_datetime(sales_cols, format='%b-%y')
        sales_df = pd.DataFrame({'Date': dates, 'Sales': sales_data.values})
        sales_df = sales_df.sort_values('Date')
        
        plt.figure(figsize=(14, 8))
        plt.subplot(2, 1, 1)
        plt.plot(sales_df['Date'], sales_df['Sales'], marker='o', linewidth=2, markersize=6)
        plt.title(f'Sales Trend for Product {product_id} ({sheet_name})', fontsize=14, fontweight='bold')
        plt.xlabel('Date')
        plt.ylabel('Sales Quantity')
        plt.grid(True, alpha=0.3)
        plt.xticks(rotation=45)
        
        # Add summary statistics
        plt.subplot(2, 1, 2)
        stats_text = f"""
        Product: {product_id}
        Total Sales: {sales_data.sum():,.0f}
        Average Monthly Sales: {sales_data.mean():.1f}
        Max Monthly Sales: {sales_data.max():,.0f}
        Min Monthly Sales: {sales_data.min():,.0f}
        Months with Zero Sales: {(sales_data == 0).sum()}
        """
        plt.text(0.1, 0.5, stats_text, transform=plt.gca().transAxes, fontsize=11,
                bbox=dict(boxstyle="round,pad=0.3", facecolor="lightgray"))
        plt.axis('off')
        
        plt.tight_layout()
        plt.show()
        
    except Exception as e:
        print(f"Error creating visualization: {e}")

# Cell 13: Main Cleaning Function

In [13]:
def clean_sales_data(df, sheet_name):
    """Main cleaning pipeline for sales data"""
    print(f"\n🔄 Starting cleaning pipeline for sheet: '{sheet_name}'")
    print(f"Initial data shape: {df.shape}")
    
    # Step 1: Clean column names
    print("\n1️⃣ Cleaning column names...")
    df = clean_column_names(df)
    
    # Step 2: Convert date columns
    print("\n2️⃣ Converting date columns...")
    df = convert_date_columns(df)
    
    # Step 3: Handle missing values
    print("\n3️⃣ Handling missing values...")
    df = handle_missing_values(df)
    
    # Step 4: Apply rule-based validation
    print("\n4️⃣ Applying rule-based validation...")
    df = rule_based_validation(df)
    
    # Step 5: Detect outliers
    print("\n5️⃣ Detecting outliers...")
    df = detect_and_handle_outliers_stl(df, method='flag')
    
    # Step 6: Apply smoothing
    print("\n6️⃣ Applying smoothing...")
    df = apply_smoothing(df)
    
    # Step 7: Validate yearly totals
    print("\n7️⃣ Validating yearly totals...")
    df = validate_yearly_totals(df)
    
    # Step 8: Ensure data types
    print("\n8️⃣ Ensuring proper data types...")
    numeric_cols = [col for col in df.columns if re.match(r'^\w{3}-\d{2}$|^202\d$|^total_qty$', col)]
    df[numeric_cols] = df[numeric_cols].astype(float)
    
    # Step 9: Remove duplicates
    print("\n9️⃣ Removing duplicates...")
    initial_rows = len(df)
    df = df.drop_duplicates(subset=['product_id'], keep='first')
    duplicates_removed = initial_rows - len(df)
    print(f"Removed {duplicates_removed} duplicate rows based on Product ID")
    
    # Step 10: Standardize IDs
    print("\n🔟 Standardizing IDs...")
    if 'customer_id_sold_to' in df.columns:
        df['customer_id_sold_to'] = df['customer_id_sold_to'].astype(str).str.strip()
    df['product_id'] = df['product_id'].astype(str).str.strip()
    
    # Step 11: Check zero values
    check_zero_values(df, sheet_name)
    
    # Step 12: Create sample visualization
    print(f"\n📊 Creating sample visualization...")
    if not df.empty and len(df) > 0:
        sample_product = df['product_id'].iloc[0]
        visualize_outliers(df, sample_product, sheet_name)
    
    print(f"\n✅ Cleaning completed! Final data shape: {df.shape}")
    return df

# Cell 14: Main Execution Function

In [14]:
def main(file_path='Sales History.xlsx', output_path='Cleaned_Sales_History_Enhanced_Updated.xlsx'):
    """Main execution function"""
    print("🚀 Starting Sales Data Cleaning Pipeline")
    print("="*60)
    
    # Load data
    print("📂 Loading Excel data...")
    sheets = load_excel_data(file_path)
    if not sheets:
        print("❌ Failed to load data. Please check the file path.")
        return
    
    # Clean each sheet
    cleaned_sheets = {}
    for sheet_name, df in sheets.items():
        cleaned_df = clean_sales_data(df, sheet_name)
        cleaned_sheets[sheet_name] = cleaned_df
    
    # Save cleaned data
    print(f"\n💾 Saving cleaned data to '{output_path}'...")
    try:
        with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
            for sheet_name, df in cleaned_sheets.items():
                df.to_excel(writer, sheet_name=sheet_name, index=False)
        
        print(f"✅ Successfully saved cleaned data to '{output_path}'")
        
        # Print summary
        print("\n📋 CLEANING SUMMARY:")
        print("="*60)
        for sheet_name, df in cleaned_sheets.items():
            print(f"Sheet '{sheet_name}': {len(df)} products, {len(df.columns)} columns")
            
    except Exception as e:
        print(f"❌ Error saving file: {e}")

# Cell 15: Execute the Pipeline

In [15]:
main()

🚀 Starting Sales Data Cleaning Pipeline
📂 Loading Excel data...
Successfully loaded 2 sheets: ['Private label', 'Brand']

🔄 Starting cleaning pipeline for sheet: 'Private label'
Initial data shape: (41, 43)

1️⃣ Cleaning column names...
Cleaned 43 column names

2️⃣ Converting date columns...
Found 0 date columns: []

3️⃣ Handling missing values...
Found 38 consistent products (>50% non-zero sales)
Removed 0 products with all zero sales

4️⃣ Applying rule-based validation...


  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].interpolate(method='linear', limit_direction='both')
  df.loc[idx, sales_cols] = df.loc[idx, sales_cols].int

TypeError: match() missing 1 required positional argument: 'string'

# Cell 16: Optional - Load and Explore Cleaned Data

In [None]:
cleaned_data = load_excel_data('Cleaned_Sales_History_Updated.xlsx')

if cleaned_data:
    for sheet_name, df in cleaned_data.items():
        print(f"\n📋 Summary for {sheet_name}:")
        print(f"Shape: {df.shape}")
        print(f"Columns: {list(df.columns[:10])}...")  # Show first 10 columns
        print(f"Sample data:")
        print(df.head())

# Cell 17: Optional - Custom Product Visualization

In [None]:
sheet_name = 'Private label'  # or 'Brand'
product_id = 'YOUR_PRODUCT_ID'  # Replace with actual product ID

if 'cleaned_data' in locals():
    df = cleaned_data[sheet_name]
    visualize_outliers(df, product_id, sheet_name)