Tickerflow - baseline

Config + imports

In [1]:
%pip install pandas numpy pyarrow xgboost joblib requests scikit-learn sklearn

import os
import io
import json
import time
import logging
from datetime import datetime, timezone
from urllib.parse import urlencode
from urllib.request import urlopen
from urllib.error import HTTPError, URLError

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from joblib import dump
from xgboost import XGBRegressor

logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(message)s")
log = logging.getLogger(__name__)


#CONFIG
#----------

# Tickers to backfill
TICKERS = ["AAPL", "AMZN", "GOOGL", "META", "MSFT"]

# November window 
START_DATE = "2025-11-01"
END_DATE = "2025-11-30"

# training window end (for metrics evaluation)
TRAIN_END_DATE = "2025-11-20"

#local data directory
DATA_DIR = "local_baseline_data"
os.makedirs(DATA_DIR, exist_ok=True)

RAW_PARQUET_PATH = os.path.join(DATA_DIR, "raw_prices_nov.parquet")
FEATURES_PARQUET_PATH = os.path.join(DATA_DIR, "features_nov.parquet")
MODEL_DIR = os.path.join(DATA_DIR, "models", "xgboost")
os.makedirs(MODEL_DIR, exist_ok=True)
FORECASTS_PARQUET_PATH = os.path.join(DATA_DIR, "forecasts_nov.parquet")

# Alpha Vantage
ALPHA_BASE = "https://www.alphavantage.co/query"
ALPHAVANTAGE_KEY = os.environ.get("ALPHAVANTAGE_KEY", "4DQAHSBPMEBEXIXL")

# Throttle between API calls to avoid rate limit
SLEEP_BETWEEN_CALLS_SEC = 12

# Metrics tracking
timings = {}

Collecting sklearn
  Using cached sklearn-0.0.post12.tar.gz (2.6 kB)
  Preparing metadata (setup.py) ... [?25lerror
  [1;31merror[0m: [1msubprocess-exited-with-error[0m
  
  [31m×[0m [32mpython setup.py egg_info[0m did not run successfully.
  [31m│[0m exit code: [1;36m1[0m
  [31m╰─>[0m [31m[15 lines of output][0m
  [31m   [0m The 'sklearn' PyPI package is deprecated, use 'scikit-learn'
  [31m   [0m rather than 'sklearn' for pip commands.
  [31m   [0m 
  [31m   [0m Here is how to fix this error in the main use cases:
  [31m   [0m - use 'pip install scikit-learn' rather than 'pip install sklearn'
  [31m   [0m - replace 'sklearn' by 'scikit-learn' in your pip requirements files
  [31m   [0m   (requirements.txt, setup.py, setup.cfg, Pipfile, etc ...)
  [31m   [0m - if the 'sklearn' package is used by one of your dependencies,
  [31m   [0m   it would be great if you take some time to track which package uses
  [31m   [0m   'sklearn' instead of 'scikit-le

Ingestion helpers + backfilling data from all of November   

In [None]:
def alpha_daily_adjusted(symbol: str, api_key: str, outputsize: str = "compact") -> dict:
    """
    Calls/gets TIME_SERIES_DAILY_ADJUSTED table from alpha vantage and return parsed JSON.
    """
    params = {
        "function": "TIME_SERIES_DAILY_ADJUSTED",
        "symbol": symbol,
        "outputsize": outputsize,
        "datatype": "json",
        "apikey": api_key,
    }
    url = f"{ALPHA_BASE}?{urlencode(params)}"
    for attempt in range(3):
        try:
            with urlopen(url, timeout=20) as r:
                body = r.read().decode("utf-8")
            js = json.loads(body)
            if "Note" in js or "Information" in js:
                raise RuntimeError(
                    f"Alpha Vantage info for {symbol}: {js.get('Note') or js.get('Information')}"
                )
            if "Error Message" in js:
                raise RuntimeError(f"Alpha Vantage error for {symbol}: {js['Error Message']}")
            if "Time Series (Daily)" not in js:
                raise RuntimeError(f"Unexpected response for {symbol}: keys={list(js.keys())[:5]}")
            return js
        except (HTTPError, URLError) as e:
            wait = 5 * (attempt + 1)
            log.warning(f"HTTP error for {symbol}: {e}. Retry in {wait}s")
            time.sleep(wait)
    raise RuntimeError(f"Failed to fetch after retries for {symbol}")


# Extracts all daily rows for [start_date, end_date] inclusive
def extract_rows_between(symbol: str, av_json: dict,
                         start_date: str, end_date: str) -> pd.DataFrame:
    
    ts = av_json.get("Time Series (Daily)", {})
    if not ts:
        raise RuntimeError(f"No daily time series for {symbol}")

    start_ts = pd.to_datetime(start_date)
    end_ts = pd.to_datetime(end_date)

    rows = []
    for date_str, rec in ts.items():
        dt = pd.to_datetime(date_str)
        if not (start_ts <= dt <= end_ts):
            continue
        rows.append(
            {
                "symbol": symbol,
                "date": dt,
                "open": float(rec["1. open"]),
                "high": float(rec["2. high"]),
                "low": float(rec["3. low"]),
                "close": float(rec["4. close"]),
                "adj_close": float(rec.get("5. adjusted close", rec["4. close"])),
                "volume": int(rec["6. volume"]),
                "dividend": float(rec.get("7. dividend amount", 0.0)),
                "split_coef": float(rec.get("8. split coefficient", 1.0)),
                "source": "alpha_vantage",
            }
        )
    df = pd.DataFrame(rows)
    return df.sort_values("date").reset_index(drop=True)

# Ingest November locally
# if ALPHAVANTAGE_KEY != "4DQAHSBPMEBEXIXL":
#     raise RuntimeError("Set ALPHAVANTAGE_KEY env var or edit ALPHAVANTAGE_KEY in the config cell.")

all_rows = []
per_symbol_timing = {}

t_ingest_start = time.perf_counter()

for i, sym in enumerate(TICKERS, 1):
    t0 = time.perf_counter()
    log.info(f"[INGEST] Fetching {sym}")
    js = alpha_daily_adjusted(sym, ALPHAVANTAGE_KEY, outputsize="compact")
    df_sym = extract_rows_between(sym, js, START_DATE, END_DATE)
    log.info(f"[INGEST] {sym} -> {len(df_sym)} rows in November window")
    all_rows.append(df_sym)
    per_symbol_timing[sym] = time.perf_counter() - t0

    if i < len(TICKERS) and SLEEP_BETWEEN_CALLS_SEC > 0:
        time.sleep(SLEEP_BETWEEN_CALLS_SEC)

df_raw = pd.concat(all_rows, ignore_index=True)
df_raw["date"] = pd.to_datetime(df_raw["date"])
df_raw.to_parquet(RAW_PARQUET_PATH, index=False)

t_ingest = time.perf_counter() - t_ingest_start
timings["ingest_total_seconds"] = t_ingest
timings["ingest_per_symbol_seconds"] = per_symbol_timing

log.info(f"[INGEST] Done. Total rows={len(df_raw)}, time={t_ingest:.2f}s")
df_raw.head()


INFO 2025-12-01 03:48:55,077 [INGEST] Fetching AAPL...
INFO 2025-12-01 03:48:55,250 [INGEST] AAPL -> 19 rows in November window
INFO 2025-12-01 03:49:07,255 [INGEST] Fetching AMZN...
INFO 2025-12-01 03:49:07,501 [INGEST] AMZN -> 19 rows in November window
INFO 2025-12-01 03:49:19,503 [INGEST] Fetching GOOGL...
INFO 2025-12-01 03:49:19,658 [INGEST] GOOGL -> 19 rows in November window
INFO 2025-12-01 03:49:31,663 [INGEST] Fetching META...
INFO 2025-12-01 03:49:31,865 [INGEST] META -> 19 rows in November window
INFO 2025-12-01 03:49:43,870 [INGEST] Fetching MSFT...
INFO 2025-12-01 03:49:44,039 [INGEST] MSFT -> 19 rows in November window
INFO 2025-12-01 03:49:44,047 [INGEST] Done. Total rows=95, time=48.97s


Unnamed: 0,symbol,date,open,high,low,close,adj_close,volume,dividend,split_coef,source
0,AAPL,2025-11-03,270.42,270.85,266.25,269.05,268.790617,50194583,0.0,1.0,alpha_vantage
1,AAPL,2025-11-04,268.325,271.486,267.615,270.04,269.779663,49274846,0.0,1.0,alpha_vantage
2,AAPL,2025-11-05,268.61,271.7,266.93,270.14,269.879566,42586288,0.0,1.0,alpha_vantage
3,AAPL,2025-11-06,267.89,273.4,267.89,269.77,269.509923,51204045,0.0,1.0,alpha_vantage
4,AAPL,2025-11-07,269.795,272.29,266.77,268.47,268.211176,48227365,0.0,1.0,alpha_vantage


Ensuring data quality and feature engineering

In [3]:
def run_data_quality_checks(df: pd.DataFrame):
    report = {"errors": [], "warnings": [], "row_count": int(len(df))}
    required_cols = ["symbol", "date", "open", "high", "low", "close", "volume"]

    if df.empty:
        report["errors"].append("DataFrame is empty (no raw rows found).")
        return False, report

    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        report["errors"].append(f"Missing required columns: {missing}")
        return False, report

    if df["date"].isna().any():
        n = int(df["date"].isna().sum())
        report["errors"].append(f"'date' column has {n} unparsable values.")

    for col in required_cols:
        null_count = int(df[col].isna().sum())
        if null_count > 0:
            report["errors"].append(f"Column '{col}' has {null_count} null values.")

    for col in ["open", "high", "low", "close"]:
        bad = int((df[col] <= 0).sum())
        if bad > 0:
            report["errors"].append(f"Column '{col}' has {bad} non-positive values.")

    bad_high_low = int((df["high"] < df["low"]).sum())
    if bad_high_low > 0:
        report["errors"].append(f"{bad_high_low} rows where high < low.")

    bad_volume = int((df["volume"] < 0).sum())
    if bad_volume > 0:
        report["errors"].append(f"{bad_volume} rows with negative volume.")

    dupes = int(df.duplicated(subset=["symbol", "date"]).sum())
    if dupes > 0:
        report["warnings"].append(f"{dupes} duplicate (symbol, date) rows.")

    ok = len(report["errors"]) == 0
    return ok, report


def build_features(df_proc: pd.DataFrame) -> pd.DataFrame:
    df = df_proc.sort_values(["symbol", "date"]).copy()

    # 1-day log return
    df["log_ret_1d"] = df.groupby("symbol")["adj_close"].transform(
        lambda s: np.log(s).diff()
    )

    # Rolling stats on returns
    for win in (5, 10, 20):
        df[f"ret_mean_{win}"] = df.groupby("symbol")["log_ret_1d"].transform(
            lambda s, w=win: s.rolling(w, min_periods=max(2, w // 2)).mean()
        )
        df[f"ret_std_{win}"] = df.groupby("symbol")["log_ret_1d"].transform(
            lambda s, w=win: s.rolling(w, min_periods=max(2, w // 2)).std()
        )

    # Rolling stats on prices
    for win in (5, 10):
        df[f"close_mean_{win}"] = df.groupby("symbol")["adj_close"].transform(
            lambda s, w=win: s.rolling(w, min_periods=max(2, w // 2)).mean()
        )
        df[f"close_std_{win}"] = df.groupby("symbol")["adj_close"].transform(
            lambda s, w=win: s.rolling(w, min_periods=max(2, w // 2)).std()
        )

    # Calendar features
    df["day_of_week"] = df["date"].dt.dayofweek
    df["month"] = df["date"].dt.month

    return df


# ------------------ run DQ + features ------------------

t_dq_start = time.perf_counter()
ok, dq_report = run_data_quality_checks(df_raw)
t_dq = time.perf_counter() - t_dq_start
timings["dq_seconds"] = t_dq

print("DQ report:")
print(json.dumps(dq_report, indent=2, default=str))
if not ok:
    raise RuntimeError("Data quality failed; aborting pipeline.")

t_feat_start = time.perf_counter()
df_feat = build_features(df_raw)
t_feat = time.perf_counter() - t_feat_start
timings["feature_build_seconds"] = t_feat

df_feat.to_parquet(FEATURES_PARQUET_PATH, index=False)
log.info(f"[FEATURES] Built features with {len(df_feat)} rows in {t_feat:.2f}s")
df_feat.head()


INFO 2025-12-01 03:49:44,078 [FEATURES] Built features with 95 rows in 0.01s


DQ report:
{
  "errors": [],
  "row_count": 95
}


Unnamed: 0,symbol,date,open,high,low,close,adj_close,volume,dividend,split_coef,...,ret_mean_10,ret_std_10,ret_mean_20,ret_std_20,close_mean_5,close_std_5,close_mean_10,close_std_10,day_of_week,month
0,AAPL,2025-11-03,270.42,270.85,266.25,269.05,268.790617,50194583,0.0,1.0,...,,,,,,,,,0,11
1,AAPL,2025-11-04,268.325,271.486,267.615,270.04,269.779663,49274846,0.0,1.0,...,,,,,269.28514,0.699361,,,1,11
2,AAPL,2025-11-05,268.61,271.7,266.93,270.14,269.879566,42586288,0.0,1.0,...,,,,,269.483282,0.601942,,,2,11
3,AAPL,2025-11-06,267.89,273.4,267.89,269.77,269.509923,51204045,0.0,1.0,...,,,,,269.489942,0.491664,,,3,11
4,AAPL,2025-11-07,269.795,272.29,266.77,268.47,268.211176,48227365,0.0,1.0,...,,,,,269.234189,0.712986,269.234189,0.712986,4,11


Training (local run of XGBoost) + metrics

In [4]:
def prepare_data(df: pd.DataFrame):
    df = df.copy()
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values(["symbol", "date"])

    if "target_log_return" not in df.columns:
        log.info(
            "[TRAIN] 'target_log_return' not found; computing as next-day log return of adj_close."
        )
        if "adj_close" not in df.columns:
            raise RuntimeError("adj_close is required to compute target_log_return.")
        df["target_log_return"] = df.groupby("symbol")["adj_close"].transform(
            lambda s: np.log(s.shift(-1) / s)
        )

    df = df.dropna(subset=["target_log_return"]).copy()

    base_features = [
        "adj_close",
        "volume",
        "log_ret_1d",
        "ret_mean_5",
        "ret_std_5",
        "ret_mean_10",
        "ret_std_10",
        "close_mean_5",
        "close_std_5",
        "close_mean_10",
        "close_std_10",
        "day_of_week",
        "month",
    ]
    feature_cols = [c for c in base_features if c in df.columns]

    X_num = df[feature_cols].astype(float)
    sym_dummies = pd.get_dummies(df["symbol"], prefix="sym")
    X = pd.concat([X_num, sym_dummies], axis=1)

    y = df["target_log_return"].astype(float).values
    meta = df[["date", "symbol"]].copy()

    log.info(
        f"[TRAIN] Using {len(feature_cols)} numeric features + "
        f"{sym_dummies.shape[1]} symbol dummies"
    )
    return X, y, meta, list(X.columns)


def time_based_split(X: pd.DataFrame, y: np.ndarray, meta: pd.DataFrame,
                     train_frac: float, val_frac: float):
    dates = np.sort(meta["date"].dt.normalize().unique())
    n_dates = len(dates)
    if n_dates < 5:
        raise RuntimeError(f"Not enough unique dates ({n_dates}) for train/val/test split")

    train_end = int(n_dates * train_frac)
    val_end = int(n_dates * (train_frac + val_frac))
    train_dates = dates[:train_end]
    val_dates = dates[train_end:val_end]
    test_dates = dates[val_end:]

    def mask_for_dates(date_set):
        return meta["date"].dt.normalize().isin(date_set)

    m_train = mask_for_dates(train_dates)
    m_val = mask_for_dates(val_dates)
    m_test = mask_for_dates(test_dates)

    def subset(mask):
        return X[mask].values, y[mask], meta.loc[mask].copy()

    X_tr, y_tr, meta_tr = subset(m_train)
    X_val, y_val, meta_val = subset(m_val)
    X_te, y_te, meta_te = subset(m_test)

    split_info = {
        "n_dates_total": int(n_dates),
        "n_train_dates": int(len(train_dates)),
        "n_val_dates": int(len(val_dates)),
        "n_test_dates": int(len(test_dates)),
        "n_train_rows": int(len(y_tr)),
        "n_val_rows": int(len(y_val)),
        "n_test_rows": int(len(y_te)),
    }

    log.info(f"[TRAIN] Split info: {split_info}")
    return (X_tr, y_tr, meta_tr), (X_val, y_val, meta_val), (X_te, y_te, meta_te), split_info


def eval_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> dict:
    if len(y_true) == 0:
        return {
            "rmse": None,
            "mae": None,
            "smape": None,
            "directional_accuracy": None,
            "n_samples": 0,
        }

    err = y_pred - y_true
    rmse = float(np.sqrt(np.mean(err ** 2)))
    mae = float(np.mean(np.abs(err)))
    denom = np.abs(y_true) + np.abs(y_pred) + 1e-8
    smape = float(np.mean(2.0 * np.abs(err) / denom))
    dir_acc = float((np.sign(y_pred) == np.sign(y_true)).mean())

    return {
        "rmse": rmse,
        "mae": mae,
        "smape": smape,
        "directional_accuracy": dir_acc,
        "n_samples": int(len(y_true)),
    }


# ------------------ run training ------------------

t_prep_start = time.perf_counter()
X, y, meta, feature_names = prepare_data(df_feat)
t_prep = time.perf_counter() - t_prep_start
timings["train_prepare_seconds"] = t_prep

# 70/15/15 time-based split (similar to your cloud flow)
(train_set, val_set, test_set, split_info) = time_based_split(
    X, y, meta, train_frac=0.7, val_frac=0.15
)
X_tr, y_tr, meta_tr = train_set
X_val, y_val, meta_val = val_set
X_te, y_te, meta_te = test_set

# Baseline (zero-return)
baseline_metrics = {}
baseline_val = eval_metrics(y_val, np.zeros_like(y_val)) if len(y_val) > 0 else None
baseline_test = eval_metrics(y_te, np.zeros_like(y_te))
baseline_metrics["zero"] = {"val": baseline_val, "test": baseline_test}
print("Baseline (zero) test metrics:", baseline_test)

t_train_start = time.perf_counter()
model = XGBRegressor(
    n_estimators=300,
    max_depth=4,
    learning_rate=0.05,
    subsample=0.8,
    colsample_bytree=0.8,
    random_state=42,
    n_jobs=-1,
    objective="reg:squarederror",
    eval_metric="rmse",
    early_stopping_rounds=20 if len(y_val) > 0 else None,
)

if len(y_val) > 0:
    model.fit(
        X_tr,
        y_tr,
        eval_set=[(X_val, y_val)],
        verbose=False,
    )
else:
    model.fit(X_tr, y_tr, verbose=False)

t_train = time.perf_counter() - t_train_start
timings["train_fit_seconds"] = t_train

t_eval_start = time.perf_counter()
y_tr_pred = model.predict(X_tr)
y_val_pred = model.predict(X_val) if len(y_val) > 0 else np.array([])
y_te_pred = model.predict(X_te)
t_eval = time.perf_counter() - t_eval_start
timings["train_eval_seconds"] = t_eval

metrics_xgb = {
    "train": eval_metrics(y_tr, y_tr_pred),
    "val": eval_metrics(y_val, y_val_pred) if len(y_val) > 0 else None,
    "test": eval_metrics(y_te, y_te_pred),
}
print("XGBoost test metrics:", metrics_xgb["test"])

# Save local model + metrics
run_id = datetime.now(timezone.utc).strftime("LOCAL_%Y%m%dT%H%M%SZ")
model_path = os.path.join(MODEL_DIR, run_id)
os.makedirs(model_path, exist_ok=True)
model_file = os.path.join(model_path, "model.joblib")

with open(model_file, "wb") as f:
    dump(model, f)

metrics_file = os.path.join(model_path, "metrics.json")
metrics_payload = {
    "run_id": run_id,
    "feature_names": feature_names,
    "split_info": split_info,
    "baseline": baseline_metrics,
    "xgboost": metrics_xgb,
    "timing_seconds": {
        "prepare": t_prep,
        "train": t_train,
        "eval": t_eval,
        "total": t_prep + t_train + t_eval,
    },
}
with open(metrics_file, "w") as f:
    json.dump(metrics_payload, f, indent=2, default=str)

log.info(f"[TRAIN] Saved model to {model_file}")
log.info(f"[TRAIN] Saved metrics to {metrics_file}")


INFO 2025-12-01 03:49:44,096 [TRAIN] 'target_log_return' not found; computing as next-day log return of adj_close.
INFO 2025-12-01 03:49:44,099 [TRAIN] Using 13 numeric features + 5 symbol dummies
INFO 2025-12-01 03:49:44,100 [TRAIN] Split info: {'n_dates_total': 18, 'n_train_dates': 12, 'n_val_dates': 3, 'n_test_dates': 3, 'n_train_rows': 60, 'n_val_rows': 15, 'n_test_rows': 15}
INFO 2025-12-01 03:49:44,151 [TRAIN] Saved model to local_baseline_data/models/xgboost/LOCAL_20251201T084944Z/model.joblib
INFO 2025-12-01 03:49:44,152 [TRAIN] Saved metrics to local_baseline_data/models/xgboost/LOCAL_20251201T084944Z/metrics.json


Baseline (zero) test metrics: {'rmse': 0.014935962150597379, 'mae': 0.011520400641255304, 'smape': 1.9999950870957781, 'directional_accuracy': 0.0, 'n_samples': 15}
XGBoost test metrics: {'rmse': 0.01877312396233493, 'mae': 0.015302628650309819, 'smape': 1.763940513081606, 'directional_accuracy': 0.2, 'n_samples': 15}


Forecasting / Price Prediction

In [5]:
BASE_FEATURES = [
    "adj_close",
    "volume",
    "log_ret_1d",
    "ret_mean_5",
    "ret_std_5",
    "ret_mean_10",
    "ret_std_10",
    "close_mean_5",
    "close_std_5",
    "close_mean_10",
    "close_std_10",
    "day_of_week",
    "month",
]


def build_latest_feature_matrix(df: pd.DataFrame, feature_names: list[str]):
    df_sorted = df.sort_values(["symbol", "date"])
    latest = df_sorted.groupby("symbol", as_index=False).tail(1).reset_index(drop=True)

    missing_base = [c for c in BASE_FEATURES if c not in latest.columns]
    if missing_base:
        raise RuntimeError(f"Missing expected feature columns: {missing_base}")

    X_num = latest[BASE_FEATURES].astype(float)
    sym_dummies = pd.get_dummies(latest["symbol"], prefix="sym")
    X = pd.concat([X_num, sym_dummies], axis=1)

    # Align to training feature order
    X = X.reindex(columns=feature_names, fill_value=0.0)
    return latest, X.values


# ------------------ run forecast ------------------

t_forecast_start = time.perf_counter()

# Reload features from disk just to mimic cloud pattern
df_feat_loaded = pd.read_parquet(FEATURES_PARQUET_PATH)
latest_rows, X_latest = build_latest_feature_matrix(df_feat_loaded, feature_names)

log.info(f"[FORECAST] Predicting for {len(latest_rows)} symbols.")
y_pred = model.predict(X_latest)
pred_direction = np.sign(y_pred).astype(int)

as_of_dates = latest_rows["date"]
target_dates = as_of_dates + pd.to_timedelta(1, unit="D")
pred_adj_close = latest_rows["adj_close"].astype(float) * np.exp(y_pred)

created_ts = datetime.now(timezone.utc).isoformat()

forecast_df = pd.DataFrame(
    {
        "symbol": latest_rows["symbol"],
        "as_of_date": as_of_dates.dt.strftime("%Y-%m-%d"),
        "target_date": target_dates.dt.strftime("%Y-%m-%d"),
        "adj_close": latest_rows["adj_close"].astype(float),
        "pred_log_return": y_pred,
        "pred_direction": pred_direction,
        "pred_adj_close": pred_adj_close,
        "model_run_id": run_id,
        "model_s3_key": f"local://{model_file}",
        "created_ts": created_ts,
    }
)

# Optional dt partition column, like S3
forecast_df["dt"] = forecast_df["target_date"]

# Write local Parquet
table = pa.Table.from_pandas(forecast_df)
buf = io.BytesIO()
pq.write_table(table, buf)
buf.seek(0)
with open(FORECASTS_PARQUET_PATH, "wb") as f:
    f.write(buf.getvalue())

t_forecast = time.perf_counter() - t_forecast_start
timings["forecast_seconds"] = t_forecast

log.info(f"[FORECAST] Wrote {len(forecast_df)} rows to {FORECASTS_PARQUET_PATH} in {t_forecast:.2f}s")
forecast_df


INFO 2025-12-01 03:49:44,197 [FORECAST] Predicting for 5 symbols.
INFO 2025-12-01 03:49:44,200 [FORECAST] Wrote 5 rows to local_baseline_data/forecasts_nov.parquet in 0.04s


Unnamed: 0,symbol,as_of_date,target_date,adj_close,pred_log_return,pred_direction,pred_adj_close,model_run_id,model_s3_key,created_ts,dt
0,AAPL,2025-11-28,2025-11-29,278.85,-0.004573,-1,277.577798,LOCAL_20251201T084944Z,local://local_baseline_data/models/xgboost/LOC...,2025-12-01T08:49:44.198601+00:00,2025-11-29
1,AMZN,2025-11-28,2025-11-29,233.22,-0.001225,-1,232.934404,LOCAL_20251201T084944Z,local://local_baseline_data/models/xgboost/LOC...,2025-12-01T08:49:44.198601+00:00,2025-11-29
2,GOOGL,2025-11-28,2025-11-29,320.18,0.00211,1,320.856306,LOCAL_20251201T084944Z,local://local_baseline_data/models/xgboost/LOC...,2025-12-01T08:49:44.198601+00:00,2025-11-29
3,META,2025-11-28,2025-11-29,647.95,-0.000419,-1,647.678843,LOCAL_20251201T084944Z,local://local_baseline_data/models/xgboost/LOC...,2025-12-01T08:49:44.198601+00:00,2025-11-29
4,MSFT,2025-11-28,2025-11-29,492.01,-0.00312,-1,490.477272,LOCAL_20251201T084944Z,local://local_baseline_data/models/xgboost/LOC...,2025-12-01T08:49:44.198601+00:00,2025-11-29


Metrics Summary

In [6]:
summary = {
    "ingest_total_seconds": timings.get("ingest_total_seconds"),
    "ingest_per_symbol_seconds": timings.get("ingest_per_symbol_seconds"),
    "dq_seconds": timings.get("dq_seconds"),
    "feature_build_seconds": timings.get("feature_build_seconds"),
    "train_prepare_seconds": timings.get("train_prepare_seconds"),
    "train_fit_seconds": timings.get("train_fit_seconds"),
    "train_eval_seconds": timings.get("train_eval_seconds"),
    "forecast_seconds": timings.get("forecast_seconds"),
    "xgboost_test_metrics": metrics_xgb.get("test"),
    "baseline_zero_test_metrics": baseline_metrics["zero"]["test"],
    "split_info": split_info,
    "model_run_id": run_id,
    "raw_rows": int(len(df_raw)),
    "feature_rows": int(len(df_feat)),
    "forecast_rows": int(len(forecast_df)),
}

import pprint
pprint.pp(summary)


{'ingest_total_seconds': 48.9701365410001,
 'ingest_per_symbol_seconds': {'AAPL': 0.17375866702059284,
                               'AMZN': 0.2464177919900976,
                               'GOOGL': 0.1555922089901287,
                               'META': 0.20249587498256005,
                               'MSFT': 0.16923533400404267},
 'dq_seconds': 0.0009083330223802477,
 'feature_build_seconds': 0.01022691698744893,
 'train_prepare_seconds': 0.004221292008878663,
 'train_fit_seconds': 0.04815529199549928,
 'train_eval_seconds': 0.0011144999880343676,
 'forecast_seconds': 0.04235899998457171,
 'xgboost_test_metrics': {'rmse': 0.01877312396233493,
                          'mae': 0.015302628650309819,
                          'smape': 1.763940513081606,
                          'directional_accuracy': 0.2,
                          'n_samples': 15},
 'baseline_zero_test_metrics': {'rmse': 0.014935962150597379,
                                'mae': 0.011520400641255304,
       

In [11]:
import resource  

usage = resource.getrusage(resource.RUSAGE_SELF)
max_rss_bytes = usage.ru_maxrss          # max resident set size
max_rss_kb = max_rss_bytes / 1024       # rough MB
max_rss_mb = max_rss_kb / 1024
max_rss_gb =max_rss_mb / 1024
print(f"Local max memory used ~ {max_rss_mb:.1f} MB")
print(f"Local max memory used ~ {max_rss_gb:.1f} GB")



Local max memory used ~ 192.1 MB
Local max memory used ~ 0.2 GB
