# üß™ DejaBrew Sales & Inventory Forecasting with Gradient Boosting

This notebook trains Gradient Boosting Regression models on the **Coffee Shop Sales by Ahmed Abas** Kaggle dataset.

**Dataset**: Coffee shop sales transaction records for Maven Roasters

**Model**: Gradient Boosting Regressor with advanced feature engineering

**Output**: Trained models (.joblib) with accuracy metrics for integration into DejaBrew system

## üì¶ Step 1: Install Dependencies

In [None]:
!pip install kaggle scikit-learn pandas numpy joblib matplotlib seaborn -q

## üîë Step 2: Setup Kaggle API

1. Go to https://www.kaggle.com/settings/account
2. Click "Create New API Token"
3. Upload the `kaggle.json` file below

In [None]:
from google.colab import files
import os

# Upload your kaggle.json
print("Please upload your kaggle.json file:")
uploaded = files.upload()

# Setup Kaggle
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json
print("‚úì Kaggle API configured!")

## üì• Step 3: Download Kaggle Dataset

**Dataset**: Coffee Shop Sales by Ahmed Abas

In [None]:
import os

# Download the Coffee Shop Sales dataset
!kaggle datasets download -d ahmedabbas757/coffee-sales

# Unzip the dataset
!unzip -q coffee-sales.zip -d coffee_data

print("\n‚úì Dataset downloaded and extracted!")
print("\nFiles in dataset:")
!ls -lh coffee_data/

## üîç Step 4: Load and Explore Data

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta

# Set display options
pd.set_option('display.max_columns', None)
sns.set_style('whitegrid')

# Load the dataset (update filename if different)
# Common names: 'Coffee Shop Sales.xlsx', 'index.xlsx', 'coffee_shop_sales.csv'
# Let's check what file exists
import glob
files_in_data = glob.glob('coffee_data/*')
print("Available files:")
for f in files_in_data:
    print(f" - {f}")

# Try to load the main file (adjust filename as needed)
data_file = files_in_data[0] if files_in_data else 'coffee_data/Coffee Shop Sales.xlsx'

# Load based on file extension
if data_file.endswith('.xlsx') or data_file.endswith('.xls'):
    df_raw = pd.read_excel(data_file)
elif data_file.endswith('.csv'):
    df_raw = pd.read_csv(data_file)
else:
    print(f"Unknown file format: {data_file}")
    print("Please manually specify the correct file path below.")
    # df_raw = pd.read_excel('coffee_data/YOUR_FILE_NAME.xlsx')

print(f"\n‚úì Loaded dataset: {data_file}")
print(f"Shape: {df_raw.shape}")
print(f"\nFirst few rows:")
df_raw.head()

In [None]:
# Explore data structure
print("Dataset Info:")
print(df_raw.info())
print("\nColumn Names:")
print(df_raw.columns.tolist())
print("\nBasic Statistics:")
df_raw.describe()

## üîß Step 5: Data Preprocessing

Convert the dataset to the format needed for training:
- **date**: Transaction date
- **product**: Product name (coffee/bakery item)
- **quantity**: Number of items sold

In [None]:
# Adjust column names based on actual dataset structure
# Common column names in Coffee Shop Sales dataset:
# - 'transaction_date' or 'date' or 'Date'
# - 'product_name' or 'product' or 'Product'
# - 'quantity' or 'Quantity' or 'transaction_qty'

# Let's create a mapping (UPDATE THESE based on your actual columns)
# Example mappings:
column_mapping = {
    'transaction_date': 'date',  # UPDATE: Use actual date column name
    'product_detail': 'product',  # UPDATE: Use actual product column name
    'transaction_qty': 'quantity'  # UPDATE: Use actual quantity column name
}

# Alternative: If you see the columns above, update this mapping:
# For example, if columns are 'Date', 'Product', 'Qty':
# column_mapping = {'Date': 'date', 'Product': 'product', 'Qty': 'quantity'}

print("Current columns:", df_raw.columns.tolist())
print("\nPlease update column_mapping above if needed, then run again.")

# Apply mapping
df = df_raw.copy()

# Try to rename columns
try:
    df = df.rename(columns=column_mapping)
    
    # Select only needed columns
    df = df[['date', 'product', 'quantity']].copy()
    
    # Convert date to datetime
    df['date'] = pd.to_datetime(df['date'])
    
    # Remove invalid quantities
    df = df[df['quantity'] > 0]
    
    # Strip whitespace from product names
    df['product'] = df['product'].str.strip()
    
    print(f"\n‚úì Data preprocessed successfully!")
    print(f"Shape: {df.shape}")
    print(f"Date range: {df['date'].min()} to {df['date'].max()}")
    print(f"Unique products: {df['product'].nunique()}")
    
    print("\nSample data:")
    display(df.head())
    
except Exception as e:
    print(f"‚ùå Error: {e}")
    print("\nPlease update the column_mapping dictionary above with correct column names.")

In [None]:
# Aggregate daily sales per product
df_daily = df.groupby(['date', 'product'])['quantity'].sum().reset_index()

print(f"Daily aggregated data shape: {df_daily.shape}")
print(f"\nTop 10 products by total sales:")
top_products = df_daily.groupby('product')['quantity'].sum().sort_values(ascending=False).head(10)
print(top_products)

## üìä Step 6: Visualize Sales Patterns

In [None]:
# Plot top products
plt.figure(figsize=(12, 6))
top_10_products = df_daily.groupby('product')['quantity'].sum().sort_values(ascending=False).head(10)
top_10_products.plot(kind='bar', color='steelblue')
plt.title('Top 10 Products by Total Sales', fontsize=16, fontweight='bold')
plt.xlabel('Product')
plt.ylabel('Total Quantity Sold')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

# Plot sales over time for top product
top_product = top_10_products.index[0]
plt.figure(figsize=(14, 5))
product_data = df_daily[df_daily['product'] == top_product].set_index('date')['quantity']
product_data.plot(color='darkgreen', linewidth=2)
plt.title(f'Sales Trend for {top_product}', fontsize=16, fontweight='bold')
plt.xlabel('Date')
plt.ylabel('Quantity Sold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## ‚öôÔ∏è Step 7: Feature Engineering

Create advanced features for better forecasting:
1. **Date features**: day_of_week, month, day_of_year, year
2. **Lag features**: Sales from 1, 2, 3, 7, 14 days ago
3. **Rolling averages**: 7-day and 14-day moving averages

In [None]:
def create_features_for_product(df_product):
    """
    Creates advanced features for a single product's time series.
    
    Features include:
    - Date features: day_of_week, month, day_of_year, year
    - Lag features: quantity from 1, 2, 3, 7, 14 days ago
    - Rolling averages: 7-day and 14-day windows
    """
    # Ensure data is sorted by date
    df_product = df_product.sort_values('date').copy()
    df_product.set_index('date', inplace=True)
    
    # Create a continuous date range (fill missing dates with 0)
    date_range = pd.date_range(start=df_product.index.min(), 
                                end=df_product.index.max(), 
                                freq='D')
    df_product = df_product.reindex(date_range, fill_value=0)
    
    # Create feature dataframe
    features = pd.DataFrame(index=df_product.index)
    features['quantity'] = df_product['quantity']
    
    # 1. Date features
    features['day_of_week'] = features.index.dayofweek  # 0=Monday, 6=Sunday
    features['month'] = features.index.month
    features['day_of_year'] = features.index.dayofyear
    features['year'] = features.index.year
    
    # 2. Lag features (sales from previous days)
    for lag in [1, 2, 3, 7, 14]:
        features[f'lag_{lag}'] = features['quantity'].shift(lag)
    
    # 3. Rolling averages
    features['rolling_mean_7'] = features['quantity'].rolling(window=7, min_periods=1).mean()
    features['rolling_mean_14'] = features['quantity'].rolling(window=14, min_periods=1).mean()
    
    # Drop rows with NaN values (from lag features)
    features = features.dropna()
    
    return features

print("‚úì Feature engineering function created!")
print("\nExample features for top product:")

# Test on top product
top_product_data = df_daily[df_daily['product'] == top_product][['date', 'quantity']].copy()
example_features = create_features_for_product(top_product_data)
print(f"\nFeature shape: {example_features.shape}")
print(f"Feature columns: {example_features.columns.tolist()}")
display(example_features.tail())

## ü§ñ Step 8: Train Gradient Boosting Models

Train individual models for each product with accuracy evaluation.

In [None]:
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib
import os
import json

# Create output directory
os.makedirs('trained_models', exist_ok=True)

def train_model_for_product(product_name, df_product_data, min_data_points=30):
    """
    Trains a Gradient Boosting model for a single product.
    
    Returns:
        dict with model, metrics, and status
    """
    try:
        # Create features
        features = create_features_for_product(df_product_data)
        
        if len(features) < min_data_points:
            return {
                'status': 'skipped',
                'reason': f'Insufficient data ({len(features)} points)'
            }
        
        # Separate features and target
        X = features.drop('quantity', axis=1)
        y = features['quantity']
        
        # Split: 80% train, 20% test (chronological split)
        split_idx = int(len(X) * 0.8)
        X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
        y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
        
        # Train Gradient Boosting model
        model = GradientBoostingRegressor(
            n_estimators=200,
            learning_rate=0.05,
            max_depth=5,
            min_samples_split=10,
            min_samples_leaf=4,
            random_state=42,
            verbose=0
        )
        
        model.fit(X_train, y_train)
        
        # Predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        
        # Calculate metrics
        # Training metrics
        train_mae = mean_absolute_error(y_train, y_pred_train)
        train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
        train_r2 = r2_score(y_train, y_pred_train)
        train_mape = np.mean(np.abs((y_train - y_pred_train) / (y_train + 1))) * 100  # +1 to avoid division by zero
        
        # Test metrics
        test_mae = mean_absolute_error(y_test, y_pred_test)
        test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
        test_r2 = r2_score(y_test, y_pred_test)
        test_mape = np.mean(np.abs((y_test - y_pred_test) / (y_test + 1))) * 100
        
        # Calculate accuracy percentage (100% - MAPE)
        train_accuracy = max(0, 100 - train_mape)
        test_accuracy = max(0, 100 - test_mape)
        
        return {
            'status': 'success',
            'model': model,
            'train_metrics': {
                'mae': round(train_mae, 2),
                'rmse': round(train_rmse, 2),
                'r2': round(train_r2, 4),
                'mape': round(train_mape, 2),
                'accuracy': round(train_accuracy, 2)
            },
            'test_metrics': {
                'mae': round(test_mae, 2),
                'rmse': round(test_rmse, 2),
                'r2': round(test_r2, 4),
                'mape': round(test_mape, 2),
                'accuracy': round(test_accuracy, 2)
            },
            'data_points': len(features),
            'train_size': len(X_train),
            'test_size': len(X_test)
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'reason': str(e)
        }

print("‚úì Model training function created!")

In [None]:
# Train models for top products
# You can adjust this number based on your needs
TOP_N_PRODUCTS = 30  # Train models for top 30 products

# Get top products by total sales
product_sales = df_daily.groupby('product')['quantity'].sum().sort_values(ascending=False)
top_products_list = product_sales.head(TOP_N_PRODUCTS).index.tolist()

print(f"Training models for top {TOP_N_PRODUCTS} products...\n")
print("="*80)

# Store results
training_results = {}
trained_models = {}
metrics_summary = []

for i, product in enumerate(top_products_list, 1):
    print(f"\n[{i}/{len(top_products_list)}] Training: {product}")
    
    # Get product data
    product_data = df_daily[df_daily['product'] == product][['date', 'quantity']].copy()
    
    # Train model
    result = train_model_for_product(product, product_data)
    
    if result['status'] == 'success':
        print(f"  ‚úì Success!")
        print(f"  Data points: {result['data_points']} (Train: {result['train_size']}, Test: {result['test_size']})")
        print(f"  Test Accuracy: {result['test_metrics']['accuracy']}%")
        print(f"  Test R¬≤: {result['test_metrics']['r2']}")
        print(f"  Test MAE: {result['test_metrics']['mae']}")
        print(f"  Test RMSE: {result['test_metrics']['rmse']}")
        
        # Save model
        safe_name = product.lower().replace(' ', '_').replace('/', '_')
        model_filename = f"model_{safe_name}.joblib"
        model_path = os.path.join('trained_models', model_filename)
        joblib.dump(result['model'], model_path)
        
        trained_models[product] = model_path
        training_results[product] = result
        
        # Add to summary
        metrics_summary.append({
            'product': product,
            'test_accuracy': result['test_metrics']['accuracy'],
            'test_r2': result['test_metrics']['r2'],
            'test_mae': result['test_metrics']['mae'],
            'data_points': result['data_points']
        })
        
    elif result['status'] == 'skipped':
        print(f"  ‚äò Skipped: {result['reason']}")
    else:
        print(f"  ‚úó Error: {result['reason']}")

print("\n" + "="*80)
print(f"\n‚úì Training complete! Successfully trained {len(trained_models)} models.")

## üìà Step 9: Accuracy Report

In [None]:
# Create accuracy summary DataFrame
metrics_df = pd.DataFrame(metrics_summary)
metrics_df = metrics_df.sort_values('test_accuracy', ascending=False)

print("="*80)
print("MODEL ACCURACY SUMMARY")
print("="*80)
print(f"\nTotal models trained: {len(metrics_df)}")
print(f"Average Test Accuracy: {metrics_df['test_accuracy'].mean():.2f}%")
print(f"Average Test R¬≤: {metrics_df['test_r2'].mean():.4f}")
print(f"Average Test MAE: {metrics_df['test_mae'].mean():.2f}")
print("\n" + "="*80)
print("\nDetailed Metrics by Product:")
print("="*80)
display(metrics_df)

# Visualize accuracy distribution
fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Accuracy distribution
axes[0].hist(metrics_df['test_accuracy'], bins=20, color='skyblue', edgecolor='black')
axes[0].axvline(metrics_df['test_accuracy'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {metrics_df["test_accuracy"].mean():.2f}%')
axes[0].set_xlabel('Test Accuracy (%)')
axes[0].set_ylabel('Number of Models')
axes[0].set_title('Distribution of Model Accuracy', fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# R¬≤ Score distribution
axes[1].hist(metrics_df['test_r2'], bins=20, color='lightgreen', edgecolor='black')
axes[1].axvline(metrics_df['test_r2'].mean(), color='red', linestyle='--', linewidth=2, label=f'Mean: {metrics_df["test_r2"].mean():.4f}')
axes[1].set_xlabel('Test R¬≤ Score')
axes[1].set_ylabel('Number of Models')
axes[1].set_title('Distribution of R¬≤ Scores', fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Save metrics to JSON
metrics_dict = metrics_df.to_dict('records')
with open('trained_models/model_metrics.json', 'w') as f:
    json.dump({
        'summary': {
            'total_models': len(metrics_df),
            'avg_test_accuracy': float(metrics_df['test_accuracy'].mean()),
            'avg_test_r2': float(metrics_df['test_r2'].mean()),
            'avg_test_mae': float(metrics_df['test_mae'].mean())
        },
        'models': metrics_dict
    }, f, indent=2)

print("\n‚úì Metrics saved to trained_models/model_metrics.json")

## üíæ Step 10: Save Model Metadata

In [None]:
# Save list of trained products
trained_products = list(trained_models.keys())
with open('trained_models/trained_articles.json', 'w') as f:
    json.dump(trained_products, f, indent=2)

print(f"‚úì Saved {len(trained_products)} product names to trained_articles.json")
print("\nTrained products:")
for i, product in enumerate(trained_products, 1):
    print(f"  {i}. {product}")

## üì¶ Step 11: Download Trained Models

Download all trained models as a ZIP file for integration into your DejaBrew system.

In [None]:
import shutil

# Create ZIP file
shutil.make_archive('dejabrew_trained_models', 'zip', 'trained_models')

print("‚úì Created dejabrew_trained_models.zip")
print("\nContents:")
!unzip -l dejabrew_trained_models.zip | head -20

# Download the ZIP file
print("\nDownloading...")
files.download('dejabrew_trained_models.zip')

print("\n" + "="*80)
print("‚úì TRAINING COMPLETE!")
print("="*80)
print("\nNext steps:")
print("1. Extract dejabrew_trained_models.zip")
print("2. Copy all .joblib files to: dejabrew/forecasting/forecasting_data/")
print("3. Copy trained_articles.json to: dejabrew/forecasting/forecasting_data/")
print("4. Restart your Django server")
print("5. Your forecasting system will now use the trained models!")
print("\nModel Performance:")
print(f"  - Average Accuracy: {metrics_df['test_accuracy'].mean():.2f}%")
print(f"  - Average R¬≤ Score: {metrics_df['test_r2'].mean():.4f}")
print(f"  - Total Models: {len(trained_models)}")

## üß™ Step 12: Test Predictions (Optional)

Test the trained model with sample predictions.

In [None]:
# Test prediction for a product
test_product = trained_products[0] if trained_products else None

if test_product:
    print(f"Testing predictions for: {test_product}")
    print("="*80)
    
    # Load model
    model = joblib.load(trained_models[test_product])
    
    # Get historical data
    product_data = df_daily[df_daily['product'] == test_product][['date', 'quantity']].copy()
    features = create_features_for_product(product_data)
    
    # Get last 14 days for context
    last_14_days = features.tail(14).copy()
    X_last = last_14_days.drop('quantity', axis=1)
    y_actual = last_14_days['quantity']
    
    # Predict
    y_pred = model.predict(X_last)
    
    # Create comparison
    comparison = pd.DataFrame({
        'date': X_last.index,
        'actual': y_actual.values,
        'predicted': y_pred.round(0).astype(int)
    })
    
    print("\nLast 14 Days - Actual vs Predicted:")
    display(comparison)
    
    # Visualize
    plt.figure(figsize=(12, 5))
    plt.plot(comparison['date'], comparison['actual'], marker='o', label='Actual', linewidth=2)
    plt.plot(comparison['date'], comparison['predicted'], marker='s', label='Predicted', linewidth=2, linestyle='--')
    plt.title(f'Actual vs Predicted Sales for {test_product}', fontsize=14, fontweight='bold')
    plt.xlabel('Date')
    plt.ylabel('Quantity')
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    # Calculate test accuracy
    test_mae = mean_absolute_error(y_actual, y_pred)
    test_mape = np.mean(np.abs((y_actual - y_pred) / (y_actual + 1))) * 100
    test_accuracy = max(0, 100 - test_mape)
    
    print(f"\nPrediction Accuracy: {test_accuracy:.2f}%")
    print(f"Mean Absolute Error: {test_mae:.2f}")