In [2]:
import os
import time
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple

import numpy as np
import pandas as pd
import norgatedata


"""
===============================================================================
DOWNLOAD FULL NORGATE CONTINUOUS FUTURES PRICE HISTORY (per instrument)
===============================================================================

Reads the futures universe produced by your Norgate universe builder and
downloads continuous price series (e.g. &ES) for every instrument with data.

Outputs (per instrument):
- CSV:    OUTPUT_DIR/csv/<SYMBOL>.csv
- Parquet OUTPUT_DIR/parquet/<SYMBOL>.parquet

Optional consolidated outputs:
- CSV:    OUTPUT_DIR/norgate_continuous_all.csv
- Parquet OUTPUT_DIR/norgate_continuous_all.parquet

Diagnostics:
- No price data
- < MIN_DAYS_AFTER_START after START_DATE
- Last price date older than STALE_DAYS
- Very late start (possible limited history)
- Short full history
- Large gaps in trading history (> GAP_DAYS)
- Optional suspicious jumps (very large daily returns)

Requirements:
- Norgate Data Updater (NDU) running
- norgatedata package installed
- continuous_contracts.csv from your universe builder

===============================================================================
"""


# ============================================================
# 1. CONFIG
# ============================================================
CONTINUOUS_LIST_PATH = "./01-futures_universe/continuous_contracts.csv"

OUTPUT_DIR = "./02-futures_prices/norgate_continuous"
CSV_DIR = os.path.join(OUTPUT_DIR, "csv")
PARQUET_DIR = os.path.join(OUTPUT_DIR, "parquet")
os.makedirs(CSV_DIR, exist_ok=True)
os.makedirs(PARQUET_DIR, exist_ok=True)

# Consolidated output toggles
SAVE_ALL_PARQUET = True
SAVE_ALL_CSV = True
ALL_PARQUET_PATH = os.path.join(OUTPUT_DIR, "norgate_continuous_all.parquet")
ALL_CSV_PATH = os.path.join(OUTPUT_DIR, "norgate_continuous_all.csv")

# Date range
START_DATE = pd.Timestamp("1998-01-01")
END_DATE = None  # None = up to most recent available

# Diagnostics thresholds
MIN_DAYS_AFTER_START = 100
STALE_DAYS = 30
LATE_START_YEAR = 2015
EARLY_TERMINATION_MIN_TOTAL_DAYS = 250 * 5  # < ~5y
GAP_DAYS = 10

# Optional: flag very large daily moves (helps catch bad stitching/roll artifacts)
ENABLE_JUMP_CHECK = True
JUMP_Z_THRESHOLD = 8.0  # z-score on daily returns
MIN_RET_OBS_FOR_Z = 252

# Throttle (usually not needed for local Norgate, but keep small pause)
SLEEP_SECONDS = 0.05

# Parquet compression (good default)
PARQUET_COMPRESSION = "snappy"


# ============================================================
# 2. UTILITIES
# ============================================================
def print_flag(symbol: str, message: str) -> None:
    print(f"⚠ {symbol}: {message}")


def ensure_ndu_running() -> None:
    if not norgatedata.status():
        raise RuntimeError(
            "Norgate Data Updater (NDU) is not running. Start NDU and retry."
        )


def fetch_timeseries(
    continuous_symbol: str,
    start_date: pd.Timestamp,
    end_date: Optional[pd.Timestamp],
) -> pd.DataFrame:
    """
    Fetch continuous futures timeseries from Norgate.
    Returns a DataFrame indexed by date with OHLCV columns (depending on availability).
    """
    kwargs: Dict[str, Any] = dict(
        symbol=continuous_symbol,
        start_date=str(start_date.date()),
        format="pandas-dataframe",
    )
    if end_date is not None:
        kwargs["end_date"] = str(end_date.date())

    df = norgatedata.price_timeseries(**kwargs)
    if df is None or len(df) == 0:
        return pd.DataFrame()

    if not isinstance(df.index, pd.DatetimeIndex):
        df.index = pd.to_datetime(df.index)

    df = df.sort_index()
    df.index.name = "date"

    # Normalize column names
    df.columns = [c.strip().lower() for c in df.columns]

    return df


def detect_large_gaps(dates: pd.DatetimeIndex, gap_days: int) -> Optional[int]:
    if len(dates) < 2:
        return None
    gaps = pd.Series(dates).diff().dt.days
    if (gaps > gap_days).any():
        return int(gaps.max())
    return None


def jump_check(df: pd.DataFrame) -> Optional[Tuple[float, pd.Timestamp]]:
    """
    Flags extremely large daily returns by z-score.
    Uses 'close' if present, else tries 'settle' or last column fallback.
    """
    if not ENABLE_JUMP_CHECK:
        return None

    price_col = None
    for candidate in ["close", "settle", "last"]:
        if candidate in df.columns:
            price_col = candidate
            break
    if price_col is None:
        numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
        if not numeric_cols:
            return None
        price_col = numeric_cols[0]

    px = df[price_col].astype(float).replace([np.inf, -np.inf], np.nan).dropna()
    if len(px) < max(MIN_RET_OBS_FOR_Z, 20):
        return None

    rets = px.pct_change().dropna()
    if len(rets) < MIN_RET_OBS_FOR_Z:
        return None

    mu = rets.mean()
    sig = rets.std(ddof=1)
    if sig == 0 or np.isnan(sig):
        return None

    z = (rets - mu) / sig
    zmax = float(z.abs().max())
    if zmax >= JUMP_Z_THRESHOLD:
        dt = z.abs().idxmax()
        return zmax, pd.Timestamp(dt)

    return None


def load_continuous_universe(path: str) -> pd.DataFrame:
    df = pd.read_csv(path)
    required = {"symbol", "continuous_symbol"}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Universe file missing columns: {sorted(missing)}")

    # Keep only those marked as having data if present
    if "data_points" in df.columns:
        df = df[df["data_points"].fillna(0) > 0]

    df["symbol"] = df["symbol"].astype(str).str.strip().str.upper()
    df["continuous_symbol"] = df["continuous_symbol"].astype(str).str.strip()

    df = df.drop_duplicates(subset=["symbol", "continuous_symbol"]).reset_index(drop=True)
    return df


# ============================================================
# 3. MAIN
# ============================================================
def main() -> None:
    print("=" * 79)
    print("DOWNLOAD FULL NORGATE CONTINUOUS FUTURES PRICE HISTORY")
    print("=" * 79)
    print("Run timestamp:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    ensure_ndu_running()

    print("\nLoading continuous universe:", CONTINUOUS_LIST_PATH)
    uni = load_continuous_universe(CONTINUOUS_LIST_PATH)
    print(f"✔ Loaded {len(uni)} continuous instruments")

    all_rows: List[pd.DataFrame] = []
    today = pd.Timestamp.today().normalize()

    print("\nDownloading + saving per-instrument CSV + Parquet + running diagnostics…\n")

    for _, row in uni.iterrows():
        sym = row["symbol"]
        csym = row["continuous_symbol"]

        try:
            df = fetch_timeseries(csym, START_DATE, END_DATE)
        except Exception as e:
            print_flag(sym, f"ERROR fetching {csym}: {str(e)[:120]}")
            continue

        if df.empty:
            print_flag(sym, f"NO PRICE DATA RECEIVED for {csym}")
            time.sleep(SLEEP_SECONDS)
            continue

        # --- Save per instrument CSV + Parquet
        csv_path = os.path.join(CSV_DIR, f"{sym}.csv")
        pq_path = os.path.join(PARQUET_DIR, f"{sym}.parquet")

        df.to_csv(csv_path)
        df.to_parquet(pq_path, compression=PARQUET_COMPRESSION)

        print(f"Saved: {csv_path}")
        print(f"Saved: {pq_path}")

        # --- Optional consolidated storage
        if SAVE_ALL_PARQUET or SAVE_ALL_CSV:
            tmp = df.copy()
            tmp.insert(0, "symbol", sym)
            tmp.insert(1, "continuous_symbol", csym)
            all_rows.append(tmp.reset_index())

        # ========================================================
        # DIAGNOSTICS
        # ========================================================
        df_after = df.loc[df.index >= START_DATE]
        n_days = len(df_after)
        if n_days < MIN_DAYS_AFTER_START:
            last_day = df_after.index.max() if n_days > 0 else None
            print_flag(
                sym,
                f"LESS THAN {MIN_DAYS_AFTER_START} DAYS after {START_DATE.date()} — "
                f"{n_days} days (last date = {last_day.date() if last_day is not None else None})"
            )

        first_dt = df.index.min()
        if first_dt.year > LATE_START_YEAR:
            print_flag(sym, f"Very late start — first date is {first_dt.date()}")

        last_dt = df.index.max()
        if last_dt < today - pd.Timedelta(days=STALE_DAYS):
            print_flag(sym, f"Data ends at {last_dt.date()} — older than {STALE_DAYS} days")

        full_days = len(df)
        if full_days < EARLY_TERMINATION_MIN_TOTAL_DAYS:
            print_flag(sym, f"Short full history (<~5y) — only {full_days} trading days total.")

        max_gap = detect_large_gaps(df.index, GAP_DAYS)
        if max_gap is not None:
            print_flag(sym, f"LARGE GAP in data — max gap = {max_gap} days")

        jc = jump_check(df)
        if jc is not None:
            zmax, jdate = jc
            print_flag(sym, f"SUSPICIOUS JUMP — max |z(ret)|={zmax:.1f} on {jdate.date()}")

        time.sleep(SLEEP_SECONDS)

    # --- Write consolidated outputs
    if (SAVE_ALL_PARQUET or SAVE_ALL_CSV) and all_rows:
        big = pd.concat(all_rows, ignore_index=True)

        if SAVE_ALL_PARQUET:
            big.to_parquet(ALL_PARQUET_PATH, index=False, compression=PARQUET_COMPRESSION)
            print(f"\n✔ Saved consolidated Parquet → {ALL_PARQUET_PATH}")

        if SAVE_ALL_CSV:
            big.to_csv(ALL_CSV_PATH, index=False)
            print(f"✔ Saved consolidated CSV → {ALL_CSV_PATH}")

    print("\n" + "=" * 79)
    print("DONE — Norgate continuous futures history saved per instrument")
    print("CSV folder:   ", CSV_DIR)
    print("Parquet folder:", PARQUET_DIR)
    print("=" * 79)


if __name__ == "__main__":
    main()


DOWNLOAD FULL NORGATE CONTINUOUS FUTURES PRICE HISTORY
Run timestamp: 2026-01-28 18:03:35

Loading continuous universe: ./01-futures_universe/continuous_contracts.csv
✔ Loaded 105 continuous instruments

Downloading + saving per-instrument CSV + Parquet + running diagnostics…

Saved: ./02-futures_prices/norgate_continuous\csv\RS.csv
Saved: ./02-futures_prices/norgate_continuous\parquet\RS.parquet
⚠ RS: Very late start — first date is 2024-01-22
⚠ RS: Short full history (<~5y) — only 505 trading days total.
Saved: ./02-futures_prices/norgate_continuous\csv\ZW.csv
Saved: ./02-futures_prices/norgate_continuous\parquet\ZW.parquet
⚠ ZW: Very late start — first date is 2024-01-22
⚠ ZW: Short full history (<~5y) — only 507 trading days total.
Saved: ./02-futures_prices/norgate_continuous\csv\DC.csv
Saved: ./02-futures_prices/norgate_continuous\parquet\DC.parquet
⚠ DC: Very late start — first date is 2024-01-22
⚠ DC: Short full history (<~5y) — only 507 trading days total.
⚠ DC: SUSPICIOUS JUM