In [None]:
# 0. Mount & setup
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/MyDrive/ML_FInal_Project

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import time
import gc
import wandb
from collections import Counter

from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error
from statsmodels.tools.sm_exceptions import ConvergenceWarning

from joblib import Parallel, delayed

# 1. Initialize W&B
wandb.login()
run = wandb.init(
    project="walmart-sales-forecasting",
    entity="lkata22-free-university-of-tbilisi-",
    name="SARIMA_v5_param_changes",
    group="SARIMA",
    config={
        "test_size": 0.2,
        "min_dept_sales": 3000,
        "max_mae_ratio": 0.5,
        "max_depts": 100,      # for dev
        "n_jobs": -1,
        "p": 1, "d": 1, "q": 1,
        "P": 0, "D": 1, "Q": 1, "s": 52,
        "maxiter": 50
    }
)
cfg = wandb.config

warnings.simplefilter('ignore')
warnings.simplefilter('ignore', ConvergenceWarning)

# 2. Load & merge data
train    = pd.read_csv("data/train.csv",   parse_dates=["Date"])
features = pd.read_csv("data/features.csv",parse_dates=["Date"])
stores   = pd.read_csv("data/stores.csv")

df = (
    train
    .merge(features, on=["Store","Date","IsHoliday"], how="left")
    .merge(stores, on="Store", how="left")
)

# 3. Extract config into locals
TEST_SIZE     = cfg.test_size
MIN_SALES     = cfg.min_dept_sales
MAX_MAE_RATIO = cfg.max_mae_ratio
MAX_DEPTS     = int(cfg.max_depts)
N_JOBS        = int(cfg.n_jobs)
P, D, Q       = int(cfg.p), int(cfg.d), int(cfg.q)
sP, sD, sQ, s = int(cfg.P), int(cfg.D), int(cfg.Q), int(cfg.s)
MAXITER       = int(cfg.maxiter)

# 4. Stationarity test
def is_stationary(ts):
    if ts.dropna().empty:
        return False
    return adfuller(ts.dropna())[1] <= 0.05

# 5. Top-level fit function with debug reasons
def fit_series(args):
    store, dept, group = args
    ts = (
        group.set_index('Date')['Weekly_Sales']
             .sort_index().asfreq('W-FRI')
             .fillna(method='ffill').fillna(method='bfill')
    )
    if len(ts) < 52:
        return ("insufficient_length", None)
    avg_sales = ts.mean()
    if avg_sales < MIN_SALES:
        return ("low_avg_sales", None)

    split = int(len(ts)*(1-TEST_SIZE))
    train_ts, test_ts = ts[:split], ts[split:]

    # make stationary if needed
    diff_flag = False
    if not is_stationary(train_ts):
        train_ts = train_ts.diff().dropna()
        diff_flag = True

    # exogenous
    exog_cols = ['Temperature','Fuel_Price','CPI','Unemployment']
    exog_train = (
        group.set_index('Date')[exog_cols]
             .reindex(train_ts.index)
             .fillna(method='ffill').fillna(method='bfill')
    )
    exog_test = (
        group.set_index('Date')[exog_cols]
             .reindex(test_ts.index)
             .fillna(method='ffill').fillna(method='bfill')
    )

    # fit
    try:
        t0 = time.time()
        model = SARIMAX(
            train_ts,
            exog=exog_train,
            order=(P, D, Q),
            seasonal_order=(sP, sD, sQ, s),
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        fit = model.fit(disp=False, maxiter=MAXITER, cov_type='none')
        fit_time = time.time() - t0
    except Exception:
        return ("model_error", None)

    # forecast & invert diff
    fc = fit.forecast(steps=len(test_ts), exog=exog_test)
    if diff_flag:
        fc = train_ts.iloc[-1] + fc.cumsum()

    mae   = mean_absolute_error(test_ts, fc)
    ratio = mae / avg_sales
    if ratio > MAX_MAE_RATIO:
        return ("ratio_exceeded", None)

    # success
    result = {
        'store': store,
        'dept':  dept,
        'mae':   mae,
        'ratio': ratio,
        'avg_sales': avg_sales,
        'fit_time': fit_time
    }
    return ("ok", result)

# 6. Build args list and dispatch in parallel
groups    = list(df.groupby(['Store','Dept']))[:MAX_DEPTS]
args_list = [(s,d,g) for (s,d),g in groups]

print(f"▶️  Fitting {len(args_list)} series on {N_JOBS} cores…")
raw_out = Parallel(n_jobs=N_JOBS, verbose=5)(
    delayed(fit_series)(arg) for arg in args_list
)
print("✔️  Parallel fitting complete.")

# 7. Separate outcomes and count reasons
reasons, payloads = zip(*raw_out)
counts = Counter(reasons)
print("Drop counts by reason:")
for reason, cnt in counts.items():
    print(f"  {reason:20s}: {cnt}")

# 8. Collect only successful results
results = [p for r,p in raw_out if r=="ok"]

if not results:
    print("⚠️  No successful models. Adjust `min_dept_sales`, `max_mae_ratio`, or inspect drop reasons above.")
else:
    maes   = [r['mae'] for r in results]
    ratios = [r['ratio'] for r in results]
    times  = [r['fit_time'] for r in results]
    total  = df.groupby(['Store','Dept']).ngroups

    # 9. Log metrics
    wandb.log({
        "metrics/avg_mae":      np.mean(maes),
        "metrics/median_mae":   np.median(maes),
        "metrics/best_mae":     np.min(maes),
        "metrics/worst_mae":    np.max(maes),
        "coverage/valid":       len(results),
        "coverage/total_depts": total,
        "perf/avg_fit_time":    np.mean(times),
        "perf/median_fit_time": np.median(times),
        "perf/max_fit_time":    np.max(times)
    })

    # 10. Plot & log distributions
    plt.figure(figsize=(8,4))
    plt.hist(ratios, bins=20, edgecolor='black')
    plt.title('MAE / Avg Sales Ratio')
    wandb.log({"error_dist": wandb.Image(plt)})
    plt.close()

    plt.figure(figsize=(8,4))
    plt.hist(times, bins=20, edgecolor='black')
    plt.title('SARIMA Fit Time (s)')
    wandb.log({"fit_time_dist": wandb.Image(plt)})
    plt.close()

# 11. Finish
run.finish()


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/.shortcut-targets-by-id/1h3JmMNvF7pLor34P-qm2FEkIev93euuf/ML_FInal_Project


▶️  Fitting 100 series on -1 cores…


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=-1)]: Done  14 tasks      | elapsed:  2.4min
[Parallel(n_jobs=-1)]: Done  68 tasks      | elapsed:  9.9min
[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed: 15.6min finished


✔️  Parallel fitting complete.
Drop counts by reason:
  ratio_exceeded      : 35
  ok                  : 45
  low_avg_sales       : 18
  insufficient_length : 2


0,1
coverage/total_depts,▁
coverage/valid,▁
metrics/avg_mae,▁
metrics/best_mae,▁
metrics/median_mae,▁
metrics/worst_mae,▁
perf/avg_fit_time,▁
perf/max_fit_time,▁
perf/median_fit_time,▁

0,1
coverage/total_depts,3331.0
coverage/valid,45.0
metrics/avg_mae,4114.0854
metrics/best_mae,547.81974
metrics/median_mae,2802.73367
metrics/worst_mae,23981.86127
perf/avg_fit_time,23.26443
perf/max_fit_time,42.96892
perf/median_fit_time,27.04476


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
import time
import gc
import wandb

from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller
from sklearn.metrics import mean_absolute_error
from statsmodels.tools.sm_exceptions import ConvergenceWarning

from joblib import Parallel, delayed

In [None]:
wandb.login()
run = wandb.init(
    project="walmart-sales-forecasting",
    entity="lkata22-free-university-of-tbilisi-",
    name="SARIMA_v4",
    group="SARIMA",
    config={
        "test_size": 0.2,
        "min_dept_sales": 5000,
        "max_mae_ratio": 0.3,
        "max_depts": 100,      # for dev; switch to 3330 for full
        "n_jobs": -1,          # parallel jobs
        "p": 1, "d": 1, "q": 1,
        "P": 0, "D": 1, "Q": 1, "s": 52,
        "maxiter": 50
    }
)
cfg = wandb.config

<IPython.core.display.Javascript object>

[34m[1mwandb[0m: Logging into wandb.ai. (Learn how to deploy a W&B server locally: https://wandb.me/wandb-server)
[34m[1mwandb[0m: You can find your API key in your browser here: https://wandb.ai/authorize
wandb: Paste an API key from your profile and hit enter:

 ··········


[34m[1mwandb[0m: No netrc file found, creating one.
[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /root/.netrc
[34m[1mwandb[0m: Currently logged in as: [33masurm22[0m ([33masurm22-free-university-of-tbilisi-6158[0m) to [32mhttps://api.wandb.ai[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [None]:
warnings.simplefilter('ignore')
warnings.simplefilter('ignore', ConvergenceWarning)

# 2. Load & merge data
train    = pd.read_csv("data/train.csv",   parse_dates=["Date"])
features = pd.read_csv("data/features.csv",parse_dates=["Date"])
stores   = pd.read_csv("data/stores.csv")

df = (
    train
    .merge(features, on=["Store","Date","IsHoliday"], how="left")
    .merge(stores, on="Store", how="left")
)

# 3. Engineer global calendar & holiday‐proximity features
#    compute sorted list of holidays
holidays = sorted(df.loc[df.IsHoliday, "Date"].unique())

def days_to_next_hol(dt):
    # minimal non-negative delta
    return min((h - dt).days for h in holidays if h >= dt) if any(h>=dt for h in holidays) else 365

def days_since_prev_hol(dt):
    past = [h for h in holidays if h <= dt]
    return (dt - max(past)).days if past else 365

df["week_of_year"] = df.Date.dt.isocalendar().week
df["month"]        = df.Date.dt.month
df["dow_to_hol"]   = df.Date.apply(days_to_next_hol)
df["dow_since_hol"]= df.Date.apply(days_since_prev_hol)

# 4. Extract config into locals
TEST_SIZE     = cfg.test_size
MIN_SALES     = cfg.min_dept_sales
MAX_MAE_RATIO = cfg.max_mae_ratio
MAX_DEPTS     = int(cfg.max_depts)
N_JOBS        = int(cfg.n_jobs)
P, D, Q       = int(cfg.p), int(cfg.d), int(cfg.q)
sP, sD, sQ, s = int(cfg.P), int(cfg.D), int(cfg.Q), int(cfg.s)
MAXITER       = int(cfg.maxiter)

# 5. Stationarity test
def is_stationary(ts):
    if ts.dropna().empty:
        return False
    return adfuller(ts.dropna())[1] <= 0.05

# 6. Top-level fit function
def fit_series(args):
    store, dept, group = args

    # build target series
    ts = (
        group.set_index('Date')['Weekly_Sales']
             .sort_index().asfreq('W-FRI')
             .fillna(method='ffill').fillna(method='bfill')
    )
    if len(ts) < 52 or ts.mean() < MIN_SALES:
        return None

    # split
    split = int(len(ts)*(1-TEST_SIZE))
    train_ts, test_ts = ts[:split], ts[split:]
    avg_sales = train_ts.mean()

    # stationarity
    diff_flag = False
    if not is_stationary(train_ts):
        train_ts = train_ts.diff().dropna()
        diff_flag = True

    # prepare exogenous: use full ts to compute lags/rollings, then slice
    full = group.set_index('Date').reindex(ts.index)
    exog = pd.DataFrame(index=ts.index)
    # original features
    exog[['Temperature','Fuel_Price','CPI','Unemployment',
          'week_of_year','month','dow_to_hol','dow_since_hol']] = \
        full[['Temperature','Fuel_Price','CPI','Unemployment',
              'week_of_year','month','dow_to_hol','dow_since_hol']]
    # lags & rolling
    exog['lag_1']   = ts.shift(1)
    exog['lag_52']  = ts.shift(52)
    exog['roll4_m'] = ts.shift(1).rolling(4).mean()
    exog['roll4_s'] = ts.shift(1).rolling(4).std()

    # fill missing exogs
    exog = exog.fillna(method='ffill').fillna(method='bfill')

    exog_train = exog.loc[train_ts.index]
    exog_test  = exog.loc[test_ts.index]

    # fit SARIMAX
    t0 = time.time()
    try:
        model = SARIMAX(
            train_ts,
            exog=exog_train,
            order=(P, D, Q),
            seasonal_order=(sP, sD, sQ, s),
            enforce_stationarity=False,
            enforce_invertibility=False
        )
        fit = model.fit(disp=False, maxiter=MAXITER, cov_type='none')
    except Exception:
        return None
    fit_time = time.time() - t0

    # forecast & invert diff
    fc = fit.forecast(steps=len(test_ts), exog=exog_test)
    if diff_flag:
        fc = train_ts.iloc[-1] + fc.cumsum()

    # evaluate
    mae   = mean_absolute_error(test_ts, fc)
    ratio = mae/avg_sales
    if ratio > MAX_MAE_RATIO:
        return None

    # cleanup
    del fit, model
    gc.collect()

    return {
        'store': store,
        'dept':  dept,
        'mae':   mae,
        'ratio': ratio,
        'avg_sales': avg_sales,
        'fit_time': fit_time
    }

# 7. Build args & dispatch in parallel with progress
groups   = list(df.groupby(['Store','Dept']))[:MAX_DEPTS]
args_list= [(s,d,g) for (s,d),g in groups]

print(f"Starting fit of {len(args_list)} series on {N_JOBS} cores…")
results = Parallel(n_jobs=N_JOBS, verbose=5)(
    delayed(fit_series)(arg) for arg in args_list
)
print("Parallel fitting done.")

Starting fit of 100 series on -1 cores…


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=-1)]: Done  14 tasks      | elapsed:    2.0s


Parallel fitting done.


[Parallel(n_jobs=-1)]: Done 100 out of 100 | elapsed:    3.1s finished


In [None]:
results = [r for r in results if r is not None]
maes    = [r['mae'] for r in results]
ratios  = [r['ratio'] for r in results]
times   = [r['fit_time'] for r in results]
total   = df.groupby(['Store','Dept']).ngroups

# 9. Log to W&B
wandb.log({
    "metrics/avg_mae":      np.mean(maes),
    "metrics/median_mae":   np.median(maes),
    "metrics/best_mae":     np.min(maes),
    "metrics/worst_mae":    np.max(maes),
    "coverage/valid":       len(results),
    "coverage/total_depts": total,
    "perf/avg_fit_time":    np.mean(times),
    "perf/median_fit_time": np.median(times),
    "perf/max_fit_time":    np.max(times)
})

# 10. Plot & log distributions
plt.figure(figsize=(8,4))
plt.hist(ratios, bins=20, edgecolor='black')
plt.title('MAE / Avg Sales Ratio')
wandb.log({"error_dist": wandb.Image(plt)})
plt.close()

plt.figure(figsize=(8,4))
plt.hist(times, bins=20, edgecolor='black')
plt.title('SARIMA Fit Time (s)')
wandb.log({"fit_time_dist": wandb.Image(plt)})
plt.close()

# 11. Finish
run.finish()

ValueError: zero-size array to reduction operation minimum which has no identity