# SOG Forecast Benchmark Notebook
This notebook demonstrates how to pull sales history from DuckDB and submit it to the Nostradamus forecasting API. Use it as a reference implementation for building repeatable item-level demand forecasts.

## Data Access Utilities
Cell 3 defines SalesHistoryReader, which wraps DuckDB queries for daily and monthly sales history. Update the SQL or connection details here when your storage changes.

In [15]:
import os
import duckdb
from pathlib import Path
from timing_utils import log, start_timer
import pandas as pd


class SalesHistoryReader:
    """
    Utility class for reading sales history from DuckDB.
    """

    def __init__(self, project="SOG", db_path=None):
        self.project = project
        self.db_path = Path(
            db_path or os.getenv("BENCHMARK_DB_PATH", Path.cwd() / "sog.duckdb")
        )

    def _coerce_item_ids(self, item_ids):
        if item_ids is None:
            raise ValueError("item_ids must not be None")
        if isinstance(item_ids, (str, bytes)):
            items = [item_ids]
        else:
            try:
                items = list(item_ids)
            except TypeError:
                items = [item_ids]
        if not items:
            raise ValueError("item_ids must contain at least one identifier")
        return items

    # ---- DB connection ----
    def get_connection(self):
        return duckdb.connect(str(self.db_path))

    # ---- Queries ----
    def fetch_daily_history(self, item_ids):
        items = self._coerce_item_ids(item_ids)
        placeholders = ", ".join("?" for _ in items)
        sql = f"""
        SELECT
            item_id,
            sale_date,
            sales
        FROM sales_history
        WHERE project = ? AND item_id IN ({placeholders})
        ORDER BY item_id, sale_date
        """
        params = [self.project] + items
        with self.get_connection() as con:
            return con.execute(sql, params).fetchdf()

    def fetch_monthly_history(self, item_ids, start_date=None):
        items = self._coerce_item_ids(item_ids)
        placeholders = ", ".join("?" for _ in items)
        sql = """
        SELECT
            item_id,
            sale_date,
            sales
        FROM sales_history_monthly
        WHERE item_id IN ({placeholders})
        {date_filter}
        ORDER BY item_id, sale_date
        """

        date_filter = ""
        params = items.copy()

        if start_date:
            date_filter = "AND sale_date >= ?"
            params.append(start_date)

        sql = sql.format(placeholders=placeholders, date_filter=date_filter)

        with self.get_connection() as con:
            return con.execute(sql, params).fetchdf()




## Forecast Orchestration
Cell 4 introduces NostradamusForecaster, responsible for formatting the payload, calling the API, and parsing the response. The helper event_log records each step with UTC timestamps for debugging latency.

In [22]:
import json
import requests
import pandas as pd
from datetime import datetime, timezone


class NostradamusForecaster:
    """
    Formats sales history, calls the Nostradamus API, and parses forecasts.
    """

    def __init__(
        self,
        api_url="https://api.nostradamus-api.com/api/v1/forecast/generate_async",
        forecast_periods=12,
        local_model="auto_arima",
        season_length=12,
        freq="MS",
        mode="local",
    ):
        self.api_url = api_url
        self.forecast_periods = forecast_periods
        self.local_model = local_model
        self.season_length = season_length
        self.freq = freq
        self.mode = mode
        self.event_log = []  # chronological record of forecast steps

    def _record_event(self, action: str) -> None:
        """Append a timestamped action to the event log."""
        self.event_log.append(
            {
                "timestamp": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
                "action": action,
            }
        )

    # -----------------------------
    # History → sim_input_his
    # -----------------------------
    def format_sim_input(self, df_history: pd.DataFrame) -> list:
        self._record_event("format_sim_input_started")
        if df_history.empty:
            self._record_event("format_sim_input_empty_history")
            return []

        df = df_history.copy()
        df["sale_date"] = pd.to_datetime(df["sale_date"]).dt.date
        df = df.sort_values(["item_id", "sale_date"])

        sim_input = [
            {
                "item_id": row["item_id"],
                "actual_sale": float(row["sales"]),
                "day": row["sale_date"].isoformat(),
            }
            for _, row in df.iterrows()
        ]
        self._record_event("format_sim_input_completed")
        return sim_input

    # -----------------------------
    # Build API payload
    # -----------------------------
    def build_payload(self, df_history: pd.DataFrame) -> dict:
        self._record_event("build_payload_started")
        sim_input_his = self.format_sim_input(df_history)

        payload = {
            "sim_input_his": sim_input_his,
            "forecast_periods": self.forecast_periods,
            "mode": self.mode,
            "local_model": self.local_model,
            "season_length": self.season_length,
            "freq": self.freq,
        }
        self._record_event("build_payload_completed")
        return payload

    def build_payload_json(self, df_history: pd.DataFrame) -> str:
        return json.dumps(self.build_payload(df_history), ensure_ascii=False, indent=2)

    # -----------------------------
    # Call API
    # -----------------------------
    def call_api(self, df_history: pd.DataFrame, timeout=30) -> dict:
        self._record_event("call_api_started")
        payload = self.build_payload(df_history)
        self._record_event("call_api_payload_ready")
        r = requests.post(self.api_url, json=payload, timeout=timeout)
        self._record_event("call_api_response_received")
        r.raise_for_status()
        self._record_event("call_api_completed")
        return r.json()

    # -----------------------------
    # Parse API response
    # -----------------------------
    def parse_forecast_df(self, resp: dict) -> pd.DataFrame:
        """
        Returns a dataframe with forecast_date + forecast
        for each forecast item in the response.
        """
        self._record_event("parse_forecast_df_started")
        if not resp or "forecasts" not in resp or not resp["forecasts"]:
            self._record_event("parse_forecast_df_empty_response")
            return pd.DataFrame(columns=["item_id", "forecast_date", "forecast"])

        frames = []
        for forecast_payload in resp["forecasts"]:
            item_id = forecast_payload.get("item_id")
            dates = forecast_payload.get("forecast_dates", [])
            values = forecast_payload.get("forecast", [])
            if not dates or not values:
                continue
            frame = pd.DataFrame(
                {
                    "forecast_date": pd.to_datetime(dates),
                    "forecast": values,
                }
            )
            frame["item_id"] = item_id
            frames.append(frame)

        if not frames:
            self._record_event("parse_forecast_df_no_valid_entries")
            return pd.DataFrame(columns=["item_id", "forecast_date", "forecast"])

        df = (
            pd.concat(frames)
            .sort_values(["item_id", "forecast_date"])
            .reset_index(drop=True)
        )
        df = df[["item_id", "forecast_date", "forecast"]]
        self._record_event("parse_forecast_df_completed")
        return df

    # -----------------------------
    # Convenience one-shot method
    # -----------------------------
    def forecast(self, df_history: pd.DataFrame, timeout=300) -> pd.DataFrame:
        self.event_log = []
        self._record_event("forecast_started")
        resp = self.call_api(df_history, timeout=timeout)
        self._record_event("forecast_api_returned")
        forecast_df = self.parse_forecast_df(resp)
        self._record_event("forecast_completed")
        return forecast_df

## Run Example Forecast
Cell 6 configures the item identifier, loads monthly history, and triggers NostradamusForecaster. Adjust ITEM or start_date to experiment with other series.

In [23]:

#ITEMS = ["20-000", "20-001"]

ITEMS = ["X135",
"X265",
"X742",
"X704",
"X103",
"X753",
"X328",
"X703",
"X115",
"X320",
"X744",
"X730",
"X751",    
"X715",
"X332",
"X752",
"X705",
"X824",
"X745",
"X329"]

PROJECT = "SOG"

reader = SalesHistoryReader(project=PROJECT)

df_hist = reader.fetch_monthly_history(ITEMS, start_date="2022-01-01")

print(df_hist)

forecaster = NostradamusForecaster()


df_fcst = forecaster.forecast(df_hist)

print(df_fcst)

    item_id  sale_date     sales
0      X103 2022-01-01    128.16
1      X103 2022-02-01    114.00
2      X103 2022-03-01    119.88
3      X103 2022-04-01    225.37
4      X103 2022-05-01    217.34
..      ...        ...       ...
635    X824 2024-09-01   6074.00
636    X824 2024-10-01   4270.00
637    X824 2024-11-01   1900.00
638    X824 2024-12-01  44986.78
639    X824 2025-01-01  31200.00

[640 rows x 3 columns]
    item_id forecast_date      forecast
0      X103    2025-03-01    235.297027
1      X103    2025-04-01    235.297027
2      X103    2025-05-01    235.297027
3      X103    2025-06-01    235.297027
4      X103    2025-07-01    235.297027
..      ...           ...           ...
235    X824    2025-08-01  10337.600000
236    X824    2025-09-01  10337.600000
237    X824    2025-10-01  10337.600000
238    X824    2025-11-01  10337.600000
239    X824    2025-12-01  10337.600000

[240 rows x 3 columns]


## Inspect Event Log
Cell 8 exposes forecaster.event_log so you can review timing of each step and pinpoint the slowest stage (usually the API call).

In [27]:
forecaster.event_log

[{'timestamp': '2025-12-26T11:58:36.770815Z', 'action': 'forecast_started'},
 {'timestamp': '2025-12-26T11:58:36.770827Z', 'action': 'call_api_started'},
 {'timestamp': '2025-12-26T11:58:36.770831Z',
  'action': 'build_payload_started'},
 {'timestamp': '2025-12-26T11:58:36.770833Z',
  'action': 'format_sim_input_started'},
 {'timestamp': '2025-12-26T11:58:36.783441Z',
  'action': 'format_sim_input_completed'},
 {'timestamp': '2025-12-26T11:58:36.783453Z',
  'action': 'build_payload_completed'},
 {'timestamp': '2025-12-26T11:58:36.783455Z',
  'action': 'call_api_payload_ready'},
 {'timestamp': '2025-12-26T11:59:32.192704Z',
  'action': 'call_api_response_received'},
 {'timestamp': '2025-12-26T11:59:32.193180Z', 'action': 'call_api_completed'},
 {'timestamp': '2025-12-26T11:59:32.199899Z',
  'action': 'forecast_api_returned'},
 {'timestamp': '2025-12-26T11:59:32.199926Z',
  'action': 'parse_forecast_df_started'},
 {'timestamp': '2025-12-26T11:59:32.236012Z',
  'action': 'parse_forecast_d

Dæmi um vörur með margar hreyfingar

X135
X265
X742
X704
X103
X753
X328
X703
X115
X320
X744
X730
X751
X715
X332
X752
X705
X824
X745
X329
X609
X709
X430
X822
X714
X435
X406
X748
X648
X823
X724
X407
25-630
X712
X828
X141
X104
X345
X405
X348
X701
24-640
X647
26-110
X641
X757
X750
26-133
X826
X747
X713
25-620
X350
26-130
X409
X643
44-162
X700
X846
X827
X268