In [34]:
# ============================================================
# 1_Signal_Intake_Layer.ipynb
# VinV — Tranche_1
# Purpose:
#   - Read per-ticker Yahoo "combined" daily files
#   - Build monthly features (returns, momentum, div yield, vol)
#   - Write vinv_inputs_raw.parquet for downstream modeling
#
# Outputs:
#   ..\data\processed\vinv_inputs_raw.parquet
#   ..\artifacts\intake_coverage.csv
#   ..\artifacts\intake_errors_full.csv
#   ..\artifacts\intake_errors_sample.csv
#   ..\artifacts\FIRST_INTAKE_TRACEBACK.txt (first failure)
# ============================================================

from __future__ import annotations

import os
import re
import traceback
from pathlib import Path

import numpy as np
import pandas as pd


# -------------------------
# CONFIG (edit this only)
# -------------------------
YAHOO_COMBINED_DIR = Path(r"C:\Users\Rand Sobczak Jr\_rts\3_AI\the_Spine\the_OracleChambers\the_OracleChambers\VinV_1_0\data_yahoo_combined")

OUT_FEATURES = Path("..") / "data" / "processed" / "vinv_inputs_raw.parquet"
OUT_FEATURES.parent.mkdir(parents=True, exist_ok=True)

ARTIFACTS = Path("..") / "artifacts"
ARTIFACTS.mkdir(parents=True, exist_ok=True)

OUT_COVERAGE  = ARTIFACTS / "intake_coverage.csv"
OUT_ERRORS    = ARTIFACTS / "intake_errors_full.csv"
OUT_TRACEBACK = ARTIFACTS / "FIRST_INTAKE_TRACEBACK.txt"

# --- VinV_2.0 time policy ---
KEEP_START   = "1990-01-01"   # what you keep in the final panel
WARMUP_START = "1989-01-01"   # minimum history to compute YoY for 1990
# If you want safer QoQ/YoY stability, set WARMUP_START = "1988-01-01"

In [35]:
def robust_read(path: Path) -> pd.DataFrame:
    """
    Reads CSV/TSV/Excel defensively.
    Your files often look like:
      Date,Open,High,Low,Close,Adj_Close,Volume,Dividend
    OR
      Date,price,dividend,ticker
    """
    p = Path(path)
    suf = p.suffix.lower()

    if suf in [".xlsx", ".xlsm", ".xls"]:
        return pd.read_excel(p)

    # Try CSV with common separators
    for sep in [",", "\t", ";", "|"]:
        try:
            df = pd.read_csv(p, sep=sep, engine="python", encoding="utf-8-sig")
            if df.shape[1] >= 2:
                return df
        except Exception:
            continue

    # fallback: latin1
    for sep in [",", "\t", ";", "|"]:
        try:
            df = pd.read_csv(p, sep=sep, engine="python", encoding="latin1")
            if df.shape[1] >= 2:
                return df
        except Exception:
            continue

    raise ValueError("robust_read failed (could not parse as CSV/TSV/Excel).")


def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()

    # bring index out if it contains dates
    if "Date" not in d.columns and isinstance(d.index, pd.DatetimeIndex):
        d = d.reset_index().rename(columns={"index": "Date"})
    elif "Date" not in d.columns and "date" in [str(c).lower() for c in d.columns]:
        # normalize later, but keep going
        pass

    new_cols = []
    for c in d.columns:
        c2 = str(c).replace("\ufeff", "").strip()
        c2 = re.sub(r"[^\w]+", "_", c2)     # spaces/hyphens -> _
        c2 = re.sub(r"_+", "_", c2).strip("_")
        new_cols.append(c2)
    d.columns = new_cols

    # unify Date
    if "date" in d.columns and "Date" not in d.columns:
        d = d.rename(columns={"date": "Date"})
    if "DATE" in d.columns and "Date" not in d.columns:
        d = d.rename(columns={"DATE": "Date"})
    if "datetime" in d.columns and "Date" not in d.columns:
        d = d.rename(columns={"datetime": "Date"})

    # unify dividend naming
    if "Dividends" in d.columns and "Dividend" not in d.columns:
        d = d.rename(columns={"Dividends": "Dividend"})
    if "dividend" in d.columns and "Dividend" not in d.columns:
        d = d.rename(columns={"dividend": "Dividend"})

    # unify ticker naming
    if "symbol" in d.columns and "ticker" not in d.columns:
        d = d.rename(columns={"symbol": "ticker"})
    if "Ticker" in d.columns and "ticker" not in d.columns:
        d = d.rename(columns={"Ticker": "ticker"})

    # unify price naming
    if "Adj_Close" not in d.columns and "AdjClose" in d.columns:
        d = d.rename(columns={"AdjClose": "Adj_Close"})
    if "Adj_Close" not in d.columns and "Adj_Close_" in d.columns:
        d = d.rename(columns={"Adj_Close_": "Adj_Close"})
    if "Adj_Close" not in d.columns and "Adj_Close" in d.columns:
        pass

    if "price" in d.columns and "price" not in d.columns:
        pass

    return d


def parse_date_column(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()
    if "Date" not in d.columns:
        # common case: first col is Date but named weird
        first = d.columns[0]
        if str(first).lower() in ["date", "datetime", "time", "timestamp", "index"]:
            d = d.rename(columns={first: "Date"})
        else:
            raise ValueError("No Date column found.")

    # numeric Date: excel serial or yyyymmdd
    if pd.api.types.is_numeric_dtype(d["Date"]):
        s = d["Date"]
        s_non = s.dropna()
        if len(s_non) == 0:
            d["Date"] = pd.NaT
        else:
            # attempt yyyymmdd
            s_int = s_non.astype(int).astype(str)
            if s_int.str.len().median() == 8:
                d["Date"] = pd.to_datetime(s.astype("Int64").astype(str), format="%Y%m%d", errors="coerce")
            else:
                d["Date"] = pd.to_datetime(s, unit="D", origin="1899-12-30", errors="coerce")
    else:
        d["Date"] = pd.to_datetime(d["Date"], errors="coerce")

    d = d.dropna(subset=["Date"])
    return d


def infer_price_column(d: pd.DataFrame) -> str:
    """
    Your combined data sometimes has:
      - price
    or Yahoo:
      - Adj_Close
      - Close
    """
    for c in ["price", "Adj_Close", "Close", "adj_close", "close"]:
        if c in d.columns:
            return c
    raise ValueError(f"Missing price column. Have: {list(d.columns)[:25]}")


def ensure_required_cols(df: pd.DataFrame) -> pd.DataFrame:
    d = df.copy()

    # ticker
    if "ticker" not in d.columns:
        # If not provided, caller should set it (from filename)
        d["ticker"] = None

    # dividend may be missing
    if "Dividend" not in d.columns:
        d["Dividend"] = 0.0

    px = infer_price_column(d)

    # numeric
    d[px] = pd.to_numeric(d[px], errors="coerce")
    d["Dividend"] = pd.to_numeric(d["Dividend"], errors="coerce").fillna(0.0)

    # optional volume
    if "Volume" in d.columns:
        d["Volume"] = pd.to_numeric(d["Volume"], errors="coerce").fillna(0.0)
    else:
        d["Volume"] = 0.0

    # keep only rows with a price
    d = d.dropna(subset=[px])

    return d

In [None]:
def build_monthly(daily: pd.DataFrame) -> pd.DataFrame:
    d = daily.sort_values("Date").copy()

    # Month bucket (NO 'MS' anywhere)
    d["date"] = d["Date"].dt.to_period("M").dt.to_timestamp()

    px = infer_price_column(d)

    d["ret_1d"] = d[px].pct_change()

    m = (
        d.groupby("date", dropna=True)
         .agg(
            px_m_start=(px, "first"),
            px_m_end=(px, "last"),
            vol_m_sum=("Volume", "sum"),
            div_m_sum=("Dividend", "sum"),
            ret_m_std=("ret_1d", lambda x: x.std(ddof=0)),
            n_obs=(px, "count"),
         )
         .reset_index()
         .sort_values("date")
         .reset_index(drop=True)
    )

    if len(m) == 0:
        return m

    m["ret_m_price"] = (m["px_m_end"] / m["px_m_start"]) - 1.0
    denom = m["px_m_start"].replace(0, np.nan)
    m["ret_m_total"] = m["ret_m_price"] + (m["div_m_sum"] / denom)
    m["ret_m_total"] = m["ret_m_total"].replace([np.inf, -np.inf], np.nan).fillna(0.0)

    m["mom_1m"] = m["ret_m_total"]
    m["mom_3m"] = (1 + m["ret_m_total"]).rolling(3).apply(np.prod, raw=True) - 1.0
    m["mom_12m"] = (1 + m["ret_m_total"]).rolling(12).apply(np.prod, raw=True) - 1.0

    m = m.sort_values("date").reset_index(drop=True)

    # MoM / QoQ / YoY features (based on total return series)
    m["ret_mom"] = m["ret_m_total"]

    m["ret_qoq"] = (1 + m["ret_m_total"]).rolling(3).apply(np.prod, raw=True) - 1.0
    m["ret_yoy"] = (1 + m["ret_m_total"]).rolling(12).apply(np.prod, raw=True) - 1.0

    # Optional: simple deltas (often useful for ML)
    m["ret_mom_delta"] = m["ret_mom"].diff()
    m["ret_qoq_delta"] = m["ret_qoq"].diff()
    m["ret_yoy_delta"] = m["ret_yoy"].diff()

    denom2 = m["px_m_end"].replace(0, np.nan)
    m["div_cash_m"] = (m["div_m_sum"] / m["px_m_end"].replace(0, np.nan)).replace([np.inf, -np.inf], np.nan).fillna(0.0)

    return m

    m = m[
        [
            "date",
            "ret_m_total",
            "mom_1m",
            "mom_3m",
            "mom_12m",
            "div_cash_m",
            "vol_m_sum",
            "ret_m_std",
            "n_obs",
        ]
    ]

In [None]:
files = sorted([p for p in YAHOO_COMBINED_DIR.rglob("*") if p.suffix.lower() in [".csv", ".txt", ".xlsx", ".xls", ".xlsm"]])

print(f"Found combined files: {len(files)}")
print(f"Dir: {YAHOO_COMBINED_DIR}")

coverage_rows = []
error_rows = []
panel_parts = []
first_tb_written = False

for i, f in enumerate(files, start=1):
    try:
        raw = robust_read(f)
        raw = normalize_columns(raw)
        raw = parse_date_column(raw)

        # If ticker absent, infer from filename
        if "ticker" not in raw.columns or raw["ticker"].isna().all():
            # e.g. AAPL_COMBINED.csv -> AAPL
            stem = f.stem
            t = f.stem
            t = t.replace("_COMBINED", "").replace("_combined", "")
            t = t.upper().strip()
            raw["ticker"] = t

        raw = ensure_required_cols(raw)

        if raw.empty:
            raise ValueError("Raw frame empty after parsing/cleaning.")
        if raw["Date"].isna().all():
            raise ValueError("All Date values failed to parse.")

        monthly = build_monthly(raw)
        monthly = monthly[monthly["date"] >= WARMUP_START].copy()
        if len(monthly) == 0:
            raise ValueError("0 monthly rows (after build_monthly).")

        asset_id = str(raw["ticker"].iloc[0])
        monthly["asset_id"] = asset_id

        asset_id = str(raw["ticker"].iloc[0])
        asset_id = asset_id.replace("_COMBINED", "").replace("_combined", "").upper().strip()
        monthly["asset_id"] = asset_id

        # keep only the features we want
        keep = ["date","asset_id","ret_m_total","mom_1m","mom_3m","mom_12m","div_yield_m","vol_m_sum","ret_m_std","n_obs"]
        monthly = monthly[keep]

        panel_parts.append(monthly)

        coverage_rows.append({
            "asset_id": asset_id,
            "file": str(f),
            "rows_daily": int(len(raw)),
            "rows_monthly": int(len(monthly)),
            "date_min": str(raw["Date"].min()),
            "date_max": str(raw["Date"].max()),
        })

    except Exception as e:
        asset_guess = f.stem.replace("_COMBINED", "")
        error_rows.append({
            "asset_id": asset_guess,
            "file": str(f),
            "error": str(e)[:300],
        })

        if not first_tb_written:
            OUT_TRACEBACK.write_text(traceback.format_exc(), encoding="utf-8")
            print("\nWROTE FIRST TRACEBACK:", OUT_TRACEBACK)
            first_tb_written = True

    if i % 250 == 0:
        print(f"Processed {i}/{len(files)}...")

# write artifacts
pd.DataFrame(coverage_rows).to_csv(OUT_COVERAGE, index=False)
pd.DataFrame(error_rows).to_csv(OUT_ERRORS, index=False)
print("\nWrote coverage:", OUT_COVERAGE)
print("Wrote errors  :", OUT_ERRORS)

if len(error_rows) > 0:
    errs = pd.DataFrame(error_rows)
    print("\nTop error messages:")
    print(errs["error"].value_counts().head(15))

# combine and write final features
if len(panel_parts) == 0:
    raise ValueError(
        "NO SUCCESSFUL FILES produced monthly rows.\n"
        "Open:\n"
        f"- {OUT_ERRORS}\n"
        f"- {OUT_COVERAGE}\n"
        f"- {OUT_TRACEBACK}\n"
    )

panel = pd.concat(panel_parts, ignore_index=True)
panel["date"] = pd.to_datetime(panel["date"])
panel = panel[panel["date"] >= KEEP_START].copy()

# enforce types
panel["date"] = pd.to_datetime(panel["date"], errors="coerce")
panel = panel.dropna(subset=["date"])
panel["asset_id"] = panel["asset_id"].astype(str)

# numeric coercion
for c in panel.columns:
    if c not in ["date","asset_id"]:
        panel[c] = pd.to_numeric(panel[c], errors="coerce")

panel = panel.replace([np.inf, -np.inf], np.nan)
panel = panel.dropna(subset=["ret_m_total"])  # minimum
panel = panel.fillna(0.0)

panel = panel.sort_values(["date","asset_id"]).reset_index(drop=True)

# FAILSAFE: if any old column survives, rename it
if "div_yield_m" in panel.columns and "div_cash_m" not in panel.columns:
    panel = panel.rename(columns={"div_yield_m": "div_cash_m"})

panel.to_parquet(OUT_FEATURES, index=False)

print("\nSAVED:", OUT_FEATURES)
print("rows:", len(panel), "cols:", panel.shape[1])
print("columns:", panel.columns.tolist())
print("date range:", panel["date"].min(), "→", panel["date"].max())
print("tickers:", panel["asset_id"].nunique())
panel.head(5)

Found combined files: 2553
Dir: C:\Users\Rand Sobczak Jr\_rts\3_AI\the_Spine\the_OracleChambers\the_OracleChambers\VinV_1_0\data_yahoo_combined
Processed 250/2553...
Processed 500/2553...
Processed 750/2553...
Processed 1000/2553...
Processed 1250/2553...
Processed 1500/2553...
Processed 1750/2553...
Processed 2000/2553...
Processed 2250/2553...


  sqr = _ensure_numeric((avg - values) ** 2)


Processed 2500/2553...

Wrote coverage: ..\artifacts\intake_coverage.csv
Wrote errors  : ..\artifacts\intake_errors_full.csv

SAVED: ..\data\processed\vinv_inputs_raw.parquet
rows: 625055 cols: 10
columns: ['date', 'asset_id', 'ret_m_total', 'mom_1m', 'mom_3m', 'mom_12m', 'div_yield_m', 'vol_m_sum', 'ret_m_std', 'n_obs']
date range: 1990-01-01 00:00:00 → 2025-12-01 00:00:00
tickers: 2553


Unnamed: 0,date,asset_id,ret_m_total,mom_1m,mom_3m,mom_12m,div_yield_m,vol_m_sum,ret_m_std,n_obs
0,1990-01-01,AA,-0.164587,-0.164587,-0.116775,-0.040187,0.0,56181100.0,0.020062,22
1,1990-01-01,AAPL,-0.076923,-0.076923,-0.276562,-0.093928,0.0,3747498000.0,0.028892,22
2,1990-01-01,AB,0.0,0.0,0.04,3.043692,0.0,1604400.0,0.026138,22
3,1990-01-01,ABM,0.021399,0.021399,0.032717,0.371862,0.015878,448800.0,0.007388,22
4,1990-01-01,ABT,-0.049001,-0.049001,-0.018174,0.374647,0.012753,119319000.0,0.014395,22


In [31]:
panel.tail(20)

Unnamed: 0,date,asset_id,ret_m_total,mom_1m,mom_3m,mom_12m,div_yield_m,vol_m_sum,ret_m_std,n_obs
625035,2025-12-01,YUM,-0.028413,-0.028413,-0.036411,0.094169,0.004898,12437300.0,0.009074,5
625036,2025-12-01,YUMC,-0.042857,-0.042857,0.051079,-0.003619,0.0,10876200.0,0.009435,5
625037,2025-12-01,Z,0.006936,0.006936,0.023501,0.074532,0.0,8758200.0,0.027911,5
625038,2025-12-01,ZBH,-0.005065,-0.005065,-0.012169,-0.032438,0.0,10598600.0,0.013342,5
625039,2025-12-01,ZBIO,0.062315,0.062315,0.860052,4.093303,0.0,1333600.0,0.051736,5
625040,2025-12-01,ZBRA,0.053802,0.053802,-0.091678,-0.241044,0.0,2586500.0,0.009629,5
625041,2025-12-01,ZD,0.066626,0.066626,-0.100387,-0.327489,0.0,3430000.0,0.032039,5
625042,2025-12-01,ZETA,0.061873,0.061873,-0.012176,0.174369,0.0,20338800.0,0.022055,5
625043,2025-12-01,ZG,0.004331,0.004331,0.022125,0.089911,0.0,2068600.0,0.027236,5
625044,2025-12-01,ZGN,0.008531,0.008531,0.145496,0.549467,0.0,3510100.0,0.017574,5


In [2]:
# --- Load inputs (replace these with your VinV_2.0 canonical outputs) ---
# Expectation: you already have deterministic monthly VinV panel(s)
# Example placeholders:
# vinv_panel = pd.read_parquet(PROJECT_ROOT / "data" / "vinv" / "vinv_2_0_monthly.parquet")

# TEMP stub for wiring:
vinv_panel = pd.DataFrame({"date": [], "asset_id": []})

In [3]:
def file_sha256(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(1024 * 1024), b""):
            h.update(chunk)
    return h.hexdigest()

def df_schema(df: pd.DataFrame) -> dict:
    return {c: str(df[c].dtype) for c in df.columns}

# Save raw snapshot
raw_path = DATA_PROCESSED / "vinv_inputs_raw.parquet"
vinv_panel.to_parquet(raw_path, index=False)

manifest = {
    "run_id": RUN_ID,
    "outputs": {
        "vinv_inputs_raw.parquet": {
            "rows": int(len(vinv_panel)),
            "cols": int(vinv_panel.shape[1]),
            "sha256": file_sha256(raw_path),
        }
    }
}
schema = {"vinv_inputs_raw": df_schema(vinv_panel)}

(ARTIFACTS / "intake_manifest.json").write_text(json.dumps(manifest, indent=2))
(ARTIFACTS / "intake_schema.json").write_text(json.dumps(schema, indent=2))

print("Saved:", raw_path)

Saved: ..\data\processed\vinv_inputs_raw.parquet


In [4]:
import pandas as pd
from pathlib import Path

PROJECT_ROOT = Path("..")
p = PROJECT_ROOT / "data" / "processed" / "vinv_inputs_raw.parquet"
df = pd.read_parquet(p)

print("rows:", len(df), "cols:", df.shape[1])
print("columns:", df.columns.tolist())
print(df.head(3))

# Confirm date looks right
if "date" in df.columns:
    print("date dtype:", df["date"].dtype)
    if len(df) > 0:
        print("date min/max:", df["date"].min(), df["date"].max())

# Count numeric features (excluding identifiers)
id_cols = set(["date","asset_id","symbol","ticker"])
num_cols = [c for c in df.columns if c not in id_cols and pd.api.types.is_numeric_dtype(df[c])]
print("numeric feature cols:", len(num_cols))
print("numeric cols sample:", num_cols[:15])

rows: 0 cols: 2
columns: ['date', 'asset_id']
Empty DataFrame
Columns: [date, asset_id]
Index: []
date dtype: float64
numeric feature cols: 0
numeric cols sample: []
