## Прогноз PCS (three-phase linear)

Прогноз щотижневих продажів по SKU з додатковими регресорами.

In [1]:
from pathlib import Path

import numpy as np
import pandas as pd

from three_phase_linear import ForecastConfig, run_three_phase_forecast

DATA_PATH = Path('dataset_pcs.csv')
OUTPUT_PATH = Path('pcs_three_phase_forecast_accuracy_calculation.csv')
GROUP_COLS = ['sku_id']
TARGET_COLUMN = 'qty_total'
REGRESSORS = [
    'orders_qty', 'total_abc_numeric', 'avg_discount_perc_by_goods',
    'max_discount_perc_by_goods', 'avg_goods_price_by_goods', 'oos__by_goods',
    'war', 'covid', 'sin_quarter', 'cos_quarter', 'sin_month', 'cos_month',
    'sin_week', 'cos_week'
]


In [2]:
df = pd.read_csv(DATA_PATH)
df['period'] = pd.to_datetime(df['period'])
comma_cols = ['avg_discount_perc_by_goods', 'max_discount_perc_by_goods', 'avg_goods_price_by_goods', 'oos__by_goods', 'sin_month', 'cos_month', 'sin_week', 'cos_week']
for col in comma_cols:
    df[col] = df[col].astype(str).str.replace(',', '.', regex=False)
    df[col] = pd.to_numeric(df[col], errors='coerce')
numeric_cols = ['qty_total', 'orders_qty', 'total_abc_numeric', 'war', 'covid', 'sin_quarter', 'cos_quarter']
for col in numeric_cols:
    df[col] = pd.to_numeric(df[col], errors='coerce')

placeholder_mask = df['last_goods_sell_status'].isna() & df['oos__by_goods'].isna()
df.loc[placeholder_mask, TARGET_COLUMN] = np.nan

forecast_horizon = int(df.loc[placeholder_mask, 'period'].nunique()) if placeholder_mask.any() else 0
if forecast_horizon <= 0:
    forecast_horizon = 4
print(f'Forecast horizon for accuracy: {forecast_horizon} weeks')

history_df = df.loc[~placeholder_mask].copy()
history_df = history_df.sort_values(GROUP_COLS + ['period']).reset_index(drop=True)
history_df['is_evaluation_period'] = False

for _, group in history_df.groupby(GROUP_COLS):
    eval_count = min(len(group), forecast_horizon)
    if eval_count == 0:
        continue
    eval_indices = group.tail(eval_count).index
    history_df.loc[eval_indices, 'is_evaluation_period'] = True

history_df[f'{TARGET_COLUMN}_actual'] = history_df[TARGET_COLUMN]
history_df.loc[history_df['is_evaluation_period'], TARGET_COLUMN] = np.nan

df = history_df


Forecast horizon for accuracy: 15 weeks


In [3]:
input_cols = ['period', *GROUP_COLS, 'category_id', TARGET_COLUMN, *REGRESSORS]
input_cols = list(dict.fromkeys(input_cols))
config = ForecastConfig(
    time_col='period',
    target_col=TARGET_COLUMN,
    group_cols=GROUP_COLS,
    freq='W-MON',
    forecast_horizon=forecast_horizon,
    seasonal_periods=52,
    min_history=20,
    lags=(1, 2, 3, 4, 8, 12, 16),
    rolling_windows=(3, 4, 8, 12),
    additional_regressors=REGRESSORS,
    random_search_iterations=0,
    n_splits=3,
    n_estimators=300,
    target_transform=np.log1p,
    target_inverse_transform=np.expm1,
    random_state=46,
)

preds, summaries = run_three_phase_forecast(df[input_cols].copy(), config)
preds = preds.rename(columns={
    'prediction': 'qty_total_forecast',
    f'{TARGET_COLUMN}_holtwinters': 'qty_total_baseline',
})
preds['qty_total_forecast'] = preds['qty_total_baseline']
summary_report = pd.DataFrame({
    'group_key': [s.group_key[0] for s in summaries],
    'train_rows': [s.train_rows for s in summaries],
    'cv_mae': [s.best_score for s in summaries],
    'skipped_reason': [s.skipped_reason for s in summaries],
})
summary_report.head()

  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  aic = self.nobs * np.log(sse / self.nobs) + k * 2
  bic = self.nobs * np.log(sse / self.nobs) + k * np.log(self.nobs)
  aic = self.nobs * np.log(sse / self.nobs) + k * 2
  bic = self.nobs * np.log(sse / self.nobs) + k * np.log(self.nobs)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates

Unnamed: 0,group_key,train_rows,cv_mae,skipped_reason
0,598294111,104,,
1,598302302,104,,
2,599653937,104,,
3,599703217,104,,
4,746436995,104,,


In [4]:
merge_cols = [*GROUP_COLS, 'period']
result_df = df.copy()
result_df = result_df.merge(preds[merge_cols + ['qty_total_forecast']], on=merge_cols, how='left')
result_df['qty_total'] = result_df['qty_total'].astype(float)
result_df['qty_total'] = result_df['qty_total'].fillna(result_df['qty_total_forecast'])

forecast_df = result_df[result_df['is_evaluation_period']].copy()
forecast_df = forecast_df.sort_values(GROUP_COLS + ['period']).reset_index(drop=True)
output_columns = ['sku_id', 'category_id', 'period', 'qty_total']
forecast_df[output_columns].to_csv(OUTPUT_PATH, index=False)

forecast_df[output_columns].tail()


Unnamed: 0,sku_id,category_id,period,qty_total
6940,48789690935,672241,2025-02-03,0.0
6941,48789690935,672241,2025-02-10,0.0
6942,48789690935,672241,2025-02-17,0.0
6943,48789690935,672241,2025-02-24,0.0
6944,48789690935,672241,2025-03-03,0.0


MAE - Середня абсолютна помилка 

In [5]:
evaluation_df = result_df[result_df['is_evaluation_period']].copy()
actual = evaluation_df[f'{TARGET_COLUMN}_actual'].astype(float)
forecast = evaluation_df['qty_total_forecast'].astype(float)
mask = actual.notna() & forecast.notna()
if mask.any():
    mae = np.abs(actual[mask] - forecast[mask]).mean()
    print(f'MAE: {mae:.4f}')
else:
    print('MAE: not enough data to calculate')


MAE: 2.0108


MAPE-Середня абсолютна відсоткова помилка 

In [6]:
evaluation_df = result_df[result_df['is_evaluation_period']].copy()
actual = evaluation_df[f'{TARGET_COLUMN}_actual'].astype(float)
forecast = evaluation_df['qty_total_forecast'].astype(float)
mask = actual.notna() & forecast.notna() & (actual.replace(0, np.nan).notna())
if mask.any():
    mape = (np.abs((actual[mask] - forecast[mask]) / actual[mask]) * 100).mean()
    print(f'MAPE: {mape:.4f}%')
else:
    print('MAPE: not enough data to calculate')


MAPE: 128.8276%


WMAPE-Взважена середня абсолютна помилка 

In [7]:
evaluation_df = result_df[result_df['is_evaluation_period']].copy()
actual = evaluation_df[f'{TARGET_COLUMN}_actual'].astype(float)
forecast = evaluation_df['qty_total_forecast'].astype(float)
mask = actual.notna() & forecast.notna()
denominator = np.abs(actual[mask]).sum()
if mask.any() and denominator > 0:
    wmape = np.abs(actual[mask] - forecast[mask]).sum() / denominator * 100
    print(f'WMAPE: {wmape:.4f}%')
else:
    print('WMAPE: not enough data to calculate')


WMAPE: 177.8538%


In [8]:
summary_report

Unnamed: 0,group_key,train_rows,cv_mae,skipped_reason
0,598294111,104,,
1,598302302,104,,
2,599653937,104,,
3,599703217,104,,
4,746436995,104,,
...,...,...,...,...
458,48609967412,104,,
459,48609984414,104,,
460,48788920432,104,,
461,48789681494,104,,
