In [None]:
# Imports and Paths
from __future__ import annotations

import os
from pathlib import Path
from dataclasses import dataclass
from typing import Iterable

import numpy as np
import pandas as pd
import yfinance as yf
from tqdm import tqdm

DATA_DIR = Path("../data").resolve()
RAW_DIR = DATA_DIR / "raw" / "prices"
RAW_DIR.mkdir(parents=True, exist_ok=True)

pd.set_option("display.max_columns", 50)



In [None]:
# Define Ticker Universe 
TICKERS = sorted(set([
    # Core indices / ETFs
    "SPY","QQQ","IWM","DIA","VTI","VOO","VEA","VWO","EEM","EFA",
    "TLT","IEF","SHY","LQD","HYG","TIP",
    "GLD","SLV","USO","UNG","DBC",
    "VNQ","XLF","XLK","XLE","XLV","XLY","XLP","XLI","XLB","XLU",

    # Mega caps / highly traded
    "AAPL","MSFT","NVDA","AMZN","GOOGL","META","TSLA","BRK-B","JPM","BAC",
    "XOM","CVX","JNJ","PG","KO","PEP","WMT","COST","HD","UNH",
]))
len(TICKERS), TICKERS[:10]


(51,
 ['AAPL', 'AMZN', 'BAC', 'BRK-B', 'COST', 'CVX', 'DBC', 'DIA', 'EEM', 'EFA'])

In [None]:
# Download and Normmalization Functions
def _normalize_yf_columns(df: pd.DataFrame, symbol: str) -> pd.DataFrame:
    if df is None or df.empty:
        return pd.DataFrame()

    df = df.copy()

    # If columns are MultiIndex (common yfinance behavior), reduce to single level
    if isinstance(df.columns, pd.MultiIndex):
        # Typical shapes:
        # 1) columns levels: [PriceField] x [Ticker]
        # Try to select the ticker level if present
        lvl0 = df.columns.get_level_values(0)
        lvl1 = df.columns.get_level_values(1)

        if symbol in set(lvl1):
            df = df.xs(symbol, axis=1, level=1, drop_level=True)
        elif symbol.replace("-", ".") in set(lvl1):
            df = df.xs(symbol.replace("-", "."), axis=1, level=1, drop_level=True)
        else:
            # Fallback: flatten by taking first level names
            df.columns = lvl0

    # If any duplicate columns exist, keep last
    if df.columns.duplicated().any():
        df = df.loc[:, ~df.columns.duplicated(keep="last")]

    return df

def download_symbol(symbol: str, start: str = "2005-01-01") -> pd.DataFrame:
    df = yf.download(
        symbol,
        start=start,
        auto_adjust=False,
        actions=False,
        progress=False,
        group_by="column",
        threads=True,
    )

    df = _normalize_yf_columns(df, symbol)
    if df.empty:
        return pd.DataFrame()

    df.index = pd.to_datetime(df.index)
    df = df.sort_index()
    df = df[~df.index.duplicated(keep="last")]

    cols = [c for c in ["Open", "High", "Low", "Close", "Adj Close", "Volume"] if c in df.columns]
    df = df[cols]

    # Convert each column robustly (works whether df[col] is Series or accidentally DataFrame)
    for c in cols:
        s = df[c]
        if isinstance(s, pd.DataFrame):
            s = s.iloc[:, 0]  # squeeze
        df[c] = pd.to_numeric(s, errors="coerce")

    if "Adj Close" in df.columns:
        df = df.dropna(subset=["Adj Close"])

    return df


In [None]:
# Storage Helper Functions
def symbol_path(symbol: str) -> Path:
    safe = symbol.replace("/", "-")
    return RAW_DIR / f"symbol={safe}" / "part.parquet"

def write_symbol(symbol: str, df: pd.DataFrame) -> None:
    if df.empty:
        return
    out = symbol_path(symbol)
    out.parent.mkdir(parents=True, exist_ok=True)

    # store date as a column for parquet friendliness
    dfx = df.copy()
    dfx.insert(0, "date", dfx.index)
    dfx.to_parquet(out, index=False)

def read_symbol(symbol: str) -> pd.DataFrame:
    p = symbol_path(symbol)
    if not p.exists():
        return pd.DataFrame()
    df = pd.read_parquet(p)
    df["date"] = pd.to_datetime(df["date"])
    df = df.set_index("date").sort_index()
    return df


In [None]:
# Intitial Full Data Ingest
START = "2005-01-01"

failed = []
for sym in tqdm(TICKERS):
    df = download_symbol(sym, start=START)
    if df.empty:
        failed.append(sym)
        continue
    write_symbol(sym, df)

failed, len(failed)


100%|██████████| 51/51 [00:36<00:00,  1.39it/s]


([], 0)

In [None]:
# Build a wide Adj Close panel for later analytics
def load_adj_close(symbols: Iterable[str]) -> pd.DataFrame:
    series = {}
    for sym in symbols:
        df = read_symbol(sym)
        if df.empty or "Adj Close" not in df.columns:
            continue
        series[sym] = df["Adj Close"].rename(sym)
    if not series:
        return pd.DataFrame()
    px = pd.concat(series.values(), axis=1).sort_index()
    return px

px = load_adj_close(TICKERS)
px.shape, px.tail()


((5278, 51),
                   AAPL        AMZN        BAC       BRK-B        COST  \
 date                                                                    
 2025-12-17  271.839996  221.270004  54.549999  504.269989  862.650024   
 2025-12-18  272.190002  226.759995  54.259998  503.390015  857.590027   
 2025-12-19  273.670013  227.350006  55.270000  494.529999  855.619995   
 2025-12-22  270.970001  228.429993  55.880001  499.950012  850.000000   
 2025-12-23  272.005005         NaN  56.035000  500.600006  852.744995   
 
                    CVX        DBC         DIA        EEM        EFA  \
 date                                                                  
 2025-12-17  149.520004  22.106001  478.757507  52.599998  94.150002   
 2025-12-18  147.690002  21.951210  479.466003  53.200001  94.820000   
 2025-12-19  147.750000  22.106001  481.149994  53.720001  95.459999   
 2025-12-22  149.800003  22.389999  483.459991  54.009998  95.699997   
 2025-12-23  150.537201  22.635000 

In [None]:
# Clean panel (align, handle missingness) + compute log returns
def clean_price_panel(px: pd.DataFrame, min_coverage: float = 0.90) -> pd.DataFrame:
    # Coverage per symbol over the available date range
    coverage = px.notna().mean()
    keep = coverage[coverage >= min_coverage].index.tolist()
    px2 = px[keep].copy()

    # Align on business days by union of dates, then forward fill small gaps
    px2 = px2.sort_index()
    px2 = px2.ffill(limit=5)  # small gaps only
    return px2

px_clean = clean_price_panel(px, min_coverage=0.85)

logret = np.log(px_clean).diff().dropna(how="all")
logret = logret.dropna(axis=1, how="any")  # strict: keep symbols with complete return history after fill

px_clean.shape, logret.shape


((5278, 48), (5277, 41))

In [None]:
# Store processed datasets (for reuse across projects)
PROC_DIR = DATA_DIR / "processed"
PROC_DIR.mkdir(parents=True, exist_ok=True)

(px_clean.reset_index()
 .rename(columns={"index":"date"})
 .to_parquet(PROC_DIR / "adj_close_panel.parquet", index=False))

(logret.reset_index()
 .rename(columns={"index":"date"})
 .to_parquet(PROC_DIR / "log_returns.parquet", index=False))

print("Saved:",
      PROC_DIR / "adj_close_panel.parquet",
      PROC_DIR / "log_returns.parquet")


Saved: C:\Users\matt5\Documents\code\portfolio-risk\data\processed\adj_close_panel.parquet C:\Users\matt5\Documents\code\portfolio-risk\data\processed\log_returns.parquet


In [None]:
# Update processed datasets with most recent data (cron ready)
from datetime import timedelta

def update_symbol(symbol: str, buffer_days: int = 10) -> bool:
    existing = read_symbol(symbol)
    if existing.empty:
        df = download_symbol(symbol, start=START)
        if df.empty:
            return False
        write_symbol(symbol, df)
        return True

    last_dt = existing.index.max()
    start_dt = (last_dt - timedelta(days=buffer_days)).strftime("%Y-%m-%d")
    df_new = download_symbol(symbol, start=start_dt)
    if df_new.empty:
        return False

    combined = pd.concat([existing, df_new]).sort_index()
    combined = combined[~combined.index.duplicated(keep="last")]
    combined = combined.dropna(subset=["Adj Close"])
    write_symbol(symbol, combined)
    return True
