### Data Processing
SPY, QQQ, AAPL, MSFT, AMZN, GOOGL, META (2010–present)

In [1]:
import os
import datetime as dt
import logging
import pandas as pd
from typing import Optional

# -----------------------------
# CONFIG
# -----------------------------
API_KEY = 'uwQtl3txGt5BLbecq7ZbIu0ZbuitCGjc' 
TICKERS = ["SPY", "QQQ", "AAPL", "MSFT", "AMZN", "GOOGL", "META"]
START_DATE = "2010-10-01"
END_DATE = dt.datetime.today().strftime("2025-10-01") 
OUTDIR = "./all_data"
os.makedirs(OUTDIR, exist_ok=True)

logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

In [2]:
# -----------------------------
# HELPERS
# -----------------------------
def _to_frame_from_polygon(items) -> pd.DataFrame:
    """Convert Polygon aggregate items to a DataFrame with a normalized Date index."""
    if not items:
        return pd.DataFrame(columns=["Open", "High", "Low", "Close", "Volume", "Date"])
    df = pd.DataFrame([{
        "Date": pd.to_datetime(it.timestamp, unit="ms", utc=True).tz_convert(None).normalize(),
        "Open": it.open,
        "High": it.high,
        "Low": it.low,
        "Close": it.close,
        "Volume": it.volume
    } for it in items])
    df = df.drop_duplicates(subset=["Date"]).sort_values("Date").reset_index(drop=True)
    return df

def fetch_polygon_ohlcv(ticker: str, start_date: str, end_date: str, adjusted: bool, api_key: str) -> Optional[pd.DataFrame]:
    """Fetch daily bars from Polygon. If adjusted=True, returns split/dividend-adjusted OHLC."""
    try:
        from polygon import RESTClient
    except ImportError:
        logging.warning("polygon-api-client not installed; will need to fallback.")
        return None

    try:
        client = RESTClient(api_key)
        # Newer client supports list_aggs; older supports get_aggs—handle both.
        try:
            items = list(client.list_aggs(
                ticker=ticker,
                multiplier=1,
                timespan="day",
                from_=start_date,
                to=end_date,
                adjusted=adjusted,
                limit=50000
            ))
        except Exception:
            # Fallback to older method signature if present in your env
            items = client.get_aggs(ticker, 1, "day", start_date, end_date, limit=50000, adjusted=adjusted)
        df = _to_frame_from_polygon(items)
        return df
    except Exception as e:
        logging.error(f"Polygon fetch failed for {ticker} (adjusted={adjusted}): {e}")
        return None

def fetch_yf_ohlcv(ticker: str, start_date: str, end_date: str) -> Optional[pd.DataFrame]:
    """Fallback using yfinance (includes Adj Close)."""
    try:
        import yfinance as yf
        hist = yf.Ticker(ticker).history(start=start_date, end=end_date, interval="1d", auto_adjust=False)
        if hist.empty:
            return None
        df = hist.reset_index().rename(columns={
            "Date":"Date",
            "Open":"Open",
            "High":"High",
            "Low":"Low",
            "Close":"Close",
            "Adj Close":"Adj Close",
            "Volume":"Volume"
        })
        # Normalize date to midnight (no tz)
        df["Date"] = pd.to_datetime(df["Date"], utc=True).dt.tz_convert(None).dt.normalize()
        df = df[["Date","Open","High","Low","Close","Adj Close","Volume"]]
        df = df.drop_duplicates(subset=["Date"]).sort_values("Date").reset_index(drop=True)
        return df
    except Exception as e:
        logging.error(f"yfinance fetch failed for {ticker}: {e}")
        return None

def build_ohlcv_with_adj_close(ticker: str, start_date: str, end_date: str, api_key: str) -> Optional[pd.DataFrame]:
    """
    Preferred path (Polygon):
      - Pull unadjusted bars  -> gives {Open,High,Low,Close,Volume}
      - Pull adjusted bars    -> take adjusted Close as 'Adj Close'
      - Join on Date

    Fallback (yfinance):
      - One call gives {Open,High,Low,Close,Adj Close,Volume}
    """
    # Try Polygon first
    unadj = fetch_polygon_ohlcv(ticker, start_date, end_date, adjusted=False, api_key=api_key)
    adj   = fetch_polygon_ohlcv(ticker, start_date, end_date, adjusted=True,  api_key=api_key)

    if unadj is not None and not unadj.empty and adj is not None and not adj.empty:
        # Keep unadjusted OHLC + Volume, and take adjusted Close
        left = unadj.rename(columns={"Close":"Close", "Volume":"Volume"})
        right = adj[["Date","Close"]].rename(columns={"Close":"Adj Close"})
        out = pd.merge(left, right, on="Date", how="inner")
    else:
        # Fallback to yfinance (already includes Adj Close)
        out = fetch_yf_ohlcv(ticker, start_date, end_date)
        if out is None or out.empty:
            return None

    # Enforce schema and dtypes
    cols = ["Date","Open","High","Low","Close","Adj Close","Volume"]
    if "Adj Close" not in out.columns:
        # If adjusted not available for some reason, default Adj Close to Close
        out["Adj Close"] = out["Close"]

    out = out[cols].copy()
    for c in ["Open","High","Low","Close","Adj Close"]:
        out[c] = pd.to_numeric(out[c], errors="coerce")
    out["Volume"] = pd.to_numeric(out["Volume"], errors="coerce").astype("Int64")

    # Clip to requested window and drop any NA rows that might linger
    mask = (out["Date"] >= pd.to_datetime(start_date)) & (out["Date"] <= pd.to_datetime(end_date))
    out = out.loc[mask].dropna(subset=["Open","High","Low","Close","Adj Close","Volume"]).reset_index(drop=True)

    return out


In [3]:

# -----------------------------
# MAIN
# -----------------------------
def main():
    for t in TICKERS:
        logging.info(f"Fetching {t} ({START_DATE} → {END_DATE}) …")
        df = build_ohlcv_with_adj_close(t, START_DATE, END_DATE, API_KEY)
        if df is None or df.empty:
            logging.error(f"Failed to build dataset for {t}.")
            continue
        outpath = os.path.join(OUTDIR, f"{t}-Daily-2010-present.csv")
        df.to_csv(outpath, index=False)
        logging.info(f"Saved: {outpath}")

    # Optional: create a single merged file (outer join on Date) to align calendars
    logging.info("Building merged wide CSV…")
    frames = []
    for t in TICKERS:
        p = os.path.join(OUTDIR, f"{t}-Daily-2010-present.csv")
        if os.path.exists(p):
            tmp = pd.read_csv(p, parse_dates=["Date"])
            tmp = tmp.rename(columns={
                "Open": f"{t}_Open",
                "High": f"{t}_High",
                "Low": f"{t}_Low",
                "Close": f"{t}_Close",
                "Adj Close": f"{t}_AdjClose",
                "Volume": f"{t}_Volume",
            })
            frames.append(tmp)
    if frames:
        merged = frames[0]
        for f in frames[1:]:
            merged = pd.merge(merged, f, on="Date", how="outer")

        merged = merged.sort_values("Date").reset_index(drop=True)
        merged.to_csv(os.path.join(OUTDIR, "merged_ohlcv_2010_present.csv"), index=False)
        logging.info(f"Saved merged file: {os.path.join(OUTDIR, 'merged_ohlcv_2010_present.csv')}")

if __name__ == "__main__":
    main()


INFO: Fetching SPY (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/SPY-Daily-2010-present.csv
INFO: Fetching QQQ (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/QQQ-Daily-2010-present.csv
INFO: Fetching AAPL (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/AAPL-Daily-2010-present.csv
INFO: Fetching MSFT (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/MSFT-Daily-2010-present.csv
INFO: Fetching AMZN (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/AMZN-Daily-2010-present.csv
INFO: Fetching GOOGL (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/GOOGL-Daily-2010-present.csv
INFO: Fetching META (2010-10-01 → 2025-10-01) …
INFO: Saved: ./all_data/META-Daily-2010-present.csv
INFO: Building merged wide CSV…
INFO: Saved merged file: ./all_data/merged_ohlcv_2010_present.csv
