In [69]:

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import FunctionTransformer, OrdinalEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import joblib
import zipfile
import os
import mlflow

# Set sklearn to output pandas DataFrames
from sklearn import set_config
set_config(transform_output="pandas")

# Helper function to read zipped CSVs
def read_zipped_csv(path):
    with zipfile.ZipFile(path) as z:
        file_name = z.namelist()[0]
        return pd.read_csv(z.open(file_name))

# Custom transformer for merging dataframes
def merge_dataframes(X, features_df=None, stores_df=None):
    df = X.copy()
    # Ensure Date columns are in datetime format
    df['Date'] = pd.to_datetime(df['Date'])
    features_df = features_df.copy()
    features_df['Date'] = pd.to_datetime(features_df['Date'])
    df = df.merge(features_df, on=["Store", "Date"], how="left")
    df = df.merge(stores_df, on="Store", how="left")
    df = df.drop(columns=['IsHoliday_y'], errors='ignore')
    df = df.rename(columns={'IsHoliday_x': 'IsHoliday'})
    return df

# Custom transformer for date features
def add_date_features(X):
    df = X.copy()
    df['Date'] = pd.to_datetime(df['Date'])
    df['Year'] = df['Date'].dt.year
    df['Month'] = df['Date'].dt.month
    df['Week'] = df['Date'].dt.isocalendar().week.astype(int)
    df['Day'] = df['Date'].dt.day
    df['IsHoliday'] = df['IsHoliday'].astype(int)
    return df

# Custom transformer for CPI and Unemployment imputation
def impute_cpi_unemployment(X):
    df = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X.copy()
    # Ensure required columns exist
    if 'CPI' in df.columns:
        df['CPI'] = df.groupby('Store')['CPI'].ffill().bfill()
    if 'Unemployment' in df.columns:
        df['Unemployment'] = df.groupby('Store')['Unemployment'].ffill().bfill()
    return df

# Custom transformer for lag and rolling features
def add_lag_rolling_features(X, y=None, train_full=None):
    df = X.copy()
    # Ensure Weekly_Sales is available for training
    if y is not None and 'Weekly_Sales' not in df.columns:
        df['Weekly_Sales'] = y.values if isinstance(y, pd.Series) else y
    if train_full is not None:
        train_full = train_full.copy()
        if 'Weekly_Sales' not in train_full.columns:
            train_full['Weekly_Sales'] = np.nan
        combined = pd.concat([train_full, df], sort=False)
    else:
        combined = df
    combined = combined.sort_values(['Store', 'Dept', 'Date'])
    # Compute lag and rolling features
    if 'Weekly_Sales' in combined.columns:
        combined['Sales_Lag_1'] = combined.groupby(['Store', 'Dept'])['Weekly_Sales'].shift(1)
        combined['Sales_Lag_2'] = combined.groupby(['Store', 'Dept'])['Weekly_Sales'].shift(2)
        combined['Sales_Rolling_Mean_3'] = combined.groupby(['Store', 'Dept'])['Weekly_Sales'].shift(1).rolling(3).mean()
        combined['Sales_Rolling_Mean_5'] = combined.groupby(['Store', 'Dept'])['Weekly_Sales'].shift(1).rolling(5).mean()
    if train_full is not None:
        result = combined[combined['Weekly_Sales'].isna()]
        result = result.drop(columns=['Weekly_Sales'], errors='ignore')
        return result
    return combined.drop(columns=['Weekly_Sales'], errors='ignore')

# Function to calculate WMAE (Weighted Mean Absolute Error)
def calculate_wmae(y_true, y_pred, is_holiday):
    weights = np.where(is_holiday, 5, 1)
    return np.sum(weights * np.abs(y_true - y_pred)) / np.sum(weights)

In [70]:

# Load datasets from Kaggle input directory
train_df = read_zipped_csv('/kaggle/input/walmart-recruiting-store-sales-forecasting/train.csv.zip')
test_df = read_zipped_csv('/kaggle/input/walmart-recruiting-store-sales-forecasting/test.csv.zip')
features_df = read_zipped_csv('/kaggle/input/walmart-recruiting-store-sales-forecasting/features.csv.zip')
stores_df = pd.read_csv('/kaggle/input/walmart-recruiting-store-sales-forecasting/stores.csv')


In [71]:

# Define feature columns for the model (excluding Date and Day)
feature_cols = [
    'Store', 'Dept', 'Type', 'Size',
    'Temperature', 'Fuel_Price', 'CPI', 'Unemployment',
    'MarkDown1', 'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5',
    'IsHoliday', 'Year', 'Month', 'Week',
    'Sales_Lag_1', 'Sales_Lag_2', 'Sales_Rolling_Mean_3', 'Sales_Rolling_Mean_5'
]

# Create pipeline with preprocessing and RandomForest model
pipeline = Pipeline([
    ('merge_data', FunctionTransformer(merge_dataframes, kw_args={'features_df': features_df, 'stores_df': stores_df})),
    ('date_features', FunctionTransformer(add_date_features)),
    ('lag_rolling_features', FunctionTransformer(add_lag_rolling_features, kw_args={'train_full': None}, validate=False)),
    ('encode_type', ColumnTransformer([
        ('type_encoder', OrdinalEncoder(categories=[['A', 'B', 'C']]), ['Type']),
        ('passthrough', 'passthrough', [col for col in feature_cols if col != 'Type'])
    ], remainder='drop')),
    ('impute_cpi_unemployment', FunctionTransformer(impute_cpi_unemployment)),
    ('impute_markdowns', SimpleImputer(strategy='constant', fill_value=0)),
    ('impute_lags', SimpleImputer(strategy='mean')),
    ('rf', RandomForestRegressor(n_estimators=100, max_depth=20, random_state=42, n_jobs=-1))
])


In [72]:

# Merge and preprocess training data
train_full = train_df.merge(features_df, on=["Store", "Date"], how="left")
train_full = train_full.merge(stores_df, on="Store", how="left")
train_full = train_full.drop(columns=['IsHoliday_y'], errors='ignore').rename(columns={'IsHoliday_x': 'IsHoliday'})
train_full['Date'] = pd.to_datetime(train_full['Date'])

# Select input features and target
X_train = train_full[['Store', 'Dept', 'Date']]
y_train = train_full['Weekly_Sales']


In [73]:

from sklearn.model_selection import train_test_split

# Split data for validation
X_train_split, X_val, y_train_split, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)

# Fit pipeline on training split, passing y for lag/rolling features
pipeline.set_params(lag_rolling_features__kw_args={'train_full': None, 'y': y_train_split})
pipeline.fit(X_train_split, y_train_split)

# Predict on validation set
pipeline.set_params(lag_rolling_features__kw_args={'train_full': train_full, 'y': None})
y_pred = pipeline.predict(X_val)

# Calculate and print validation metrics
# Get IsHoliday from transformed validation data
X_val_transformed = pipeline.named_steps['merge_data'].transform(X_val)
X_val_transformed = pipeline.named_steps['date_features'].transform(X_val_transformed)
mae = mean_absolute_error(y_val, y_pred)
wmae = calculate_wmae(y_val, y_pred, X_val_transformed['IsHoliday'])
print(f"Validation MAE: {mae:.2f}")
print(f"Validation WMAE: {wmae:.2f}")

Validation MAE: 16105.53
Validation WMAE: 16339.83


In [74]:

# Save the trained pipeline
os.makedirs("/kaggle/working/models", exist_ok=True)
model_path = "/kaggle/working/models/walmart_sales_pipeline.pkl"
joblib.dump(pipeline, model_path)

# Set MLflow tracking
os.environ['MLFLOW_TRACKING_URI'] = 'https://dagshub.com/lkhok22/ML-FinalProject-Walmart-Recruiting---Store-Sales-Forecasting.mlflow'
os.environ['MLFLOW_TRACKING_USERNAME'] = 'lkhok22'
os.environ['MLFLOW_TRACKING_PASSWORD'] = 'd529c095d3cbb382ae3b78c03c1c2abafabf5d17'

# Log parameters and artifact to MLflow
mlflow.set_experiment("Walmart_Sales_Pipeline")
with mlflow.start_run(run_name="Pipeline_Training"):
    mlflow.log_param("model_type", "RandomForest")
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("max_depth", 20)
    mlflow.log_metric("val_mae", mae)
    mlflow.log_metric("val_wmae", wmae)
    mlflow.log_artifact(model_path)


2025/07/06 18:04:32 INFO mlflow.tracking.fluent: Experiment with name 'Walmart_Sales_Pipeline' does not exist. Creating a new experiment.


🏃 View run Pipeline_Training at: https://dagshub.com/lkhok22/ML-FinalProject-Walmart-Recruiting---Store-Sales-Forecasting.mlflow/#/experiments/1/runs/025cd499527b45bdb9dc72bbe66d37f2
🧪 View experiment at: https://dagshub.com/lkhok22/ML-FinalProject-Walmart-Recruiting---Store-Sales-Forecasting.mlflow/#/experiments/1


In [75]:

# Prepare test data
test_full = test_df.copy()
test_full['Date'] = pd.to_datetime(test_full['Date'])
X_test = test_full[['Store', 'Dept', 'Date']]

# Predict on test data with train_full for lag/rolling features
pipeline.set_params(lag_rolling_features__kw_args={'train_full': train_full, 'y': None})
predictions = pipeline.predict(X_test)

In [76]:

# Create submission dataframe
submission = test_df[['Store', 'Dept', 'Date']].copy()
submission['Id'] = submission['Store'].astype(str) + '_' + submission['Dept'].astype(str) + '_' + submission['Date'].astype(str)
submission['Weekly_Sales'] = predictions
submission = submission[['Id', 'Weekly_Sales']]

# Save submission
submission.to_csv('/kaggle/working/submission.csv', index=False)
print("Submission file saved to /kaggle/working/submission.csv")

Submission file saved to /kaggle/working/submission.csv
