# Phase 2 — Data Build (modern snapshot)
Fetch OHLCV, engineer baseline features, and write `data/df_nb02.csv`.


In [27]:
# --- Imports & params ---
import warnings, json
from pathlib import Path
import numpy as np
import pandas as pd

try:
    import yfinance as yf
except Exception:
    yf = None

DATA = Path("data"); ART = Path("artifacts"); FIG = Path("reports/figures")
for p in (DATA, ART, FIG): p.mkdir(parents=True, exist_ok=True)

TICKER = "AAPL"
START  = "2015-01-01"
END    = (pd.Timestamp.now(tz="America/Los_Angeles") + pd.Timedelta(days=1)).date().isoformat()
USE_MARKET = True
warnings.filterwarnings("ignore")

print("Build params →", {"TICKER":TICKER, "START":START, "END":END, "USE_MARKET":USE_MARKET})


Build params → {'TICKER': 'AAPL', 'START': '2015-01-01', 'END': '2025-10-10', 'USE_MARKET': True}


In [28]:
# Run once if the import failed
%pip install yfinance
import yfinance as yf
print("yfinance", yf.__version__)


Note: you may need to restart the kernel to use updated packages.
yfinance 0.2.66


In [29]:
# --- Feature helpers (no external TA libs) ---
def rsi(series: pd.Series, window: int = 14) -> pd.Series:
    s = series.astype(float)
    delta = s.diff()
    up = delta.clip(lower=0); down = -delta.clip(upper=0)
    roll_up = up.ewm(alpha=1/window, adjust=False).mean()
    roll_down = down.ewm(alpha=1/window, adjust=False).mean()
    rs = roll_up / roll_down.replace(0, np.nan)
    return 100 - (100 / (1 + rs))

def macd(series: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9):
    s = series.astype(float)
    ema_fast = s.ewm(span=fast, adjust=False).mean()
    ema_slow = s.ewm(span=slow, adjust=False).mean()
    macd_line = ema_fast - ema_slow
    macd_sig  = macd_line.ewm(span=signal, adjust=False).mean()
    return macd_line, macd_sig

def zscore(s: pd.Series, win: int = 20) -> pd.Series:
    m = s.rolling(win, min_periods=win).mean()
    v = s.rolling(win, min_periods=win).std(ddof=0)
    return (s - m) / v.replace(0, np.nan)


In [30]:
# --- Download base OHLCV for the main ticker ---
if yf is None:
    raise ImportError("Please `pip install yfinance` to run this notebook.")

raw = yf.download(TICKER, start=START, end=END, auto_adjust=True, progress=False)
if raw is None or raw.empty:
    raise ValueError(f"No price data for {TICKER} in {START}..{END}")

# Normalize index to tz-naive dates
idx = pd.to_datetime(raw.index, errors="coerce")
try:
    if getattr(idx, "tz", None) is not None:
        idx = idx.tz_localize(None)
except Exception:
    idx = pd.to_datetime(idx, errors="coerce").tz_localize(None)

px = raw.copy()
px.index = idx
px = px.sort_index()
px = px[~px.index.duplicated(keep="last")]
print(px.shape, "rows →", px.index.min().date(), "→", px.index.max().date())
px.tail(3)


(2709, 5) rows → 2015-01-02 → 2025-10-09


Price,Close,High,Low,Open,Volume
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2
2025-10-07,256.480011,257.399994,255.429993,256.809998,31955800
2025-10-08,258.059998,258.519989,256.109985,256.519989,36465000
2025-10-09,253.550003,258.0,253.140198,257.899994,21611570


In [31]:
# --- Normalize yfinance columns to 1-D Series (handles MultiIndex) ---
def get_price_series(df, field: str, ticker: str = None):
    if isinstance(df.columns, pd.MultiIndex):
        s = None
        try: s = df.xs(field, axis=1, level=0)
        except Exception: pass
        if s is None or isinstance(s, pd.DataFrame):
            try: s = df.xs(field, axis=1, level=1)
            except Exception: pass
        if isinstance(s, pd.DataFrame):
            if ticker is not None and ticker in s.columns: s = s[ticker]
            else: s = s.iloc[:, 0]
    else:
        s = df[field]
    return pd.to_numeric(s, errors="coerce")

open_s   = get_price_series(px, "Open",   TICKER)
high_s   = get_price_series(px, "High",   TICKER)
low_s    = get_price_series(px, "Low",    TICKER)
close_s  = get_price_series(px, "Close",  TICKER)
volume_s = get_price_series(px, "Volume", TICKER)

px_clean = pd.DataFrame({
    "Open":   open_s.astype(float),
    "High":   high_s.astype(float),
    "Low":    low_s.astype(float),
    "Close":  close_s.astype(float),
    "Volume": volume_s.astype(float),
}, index=px.index).sort_index().dropna()

print("px_clean:", px_clean.shape, "rows →", px_clean.index.min().date(), "→", px_clean.index.max().date())
px_clean.head()


px_clean: (2709, 5) rows → 2015-01-02 → 2025-10-09


Unnamed: 0_level_0,Open,High,Low,Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2015-01-02,24.718169,24.729265,23.821666,24.261042,212818400.0
2015-01-05,24.030263,24.11015,23.391173,23.577574,257142000.0
2015-01-06,23.641929,23.839426,23.218087,23.579796,263188400.0
2015-01-07,23.788382,24.010288,23.677428,23.910431,160423600.0
2015-01-08,24.238858,24.886824,24.121246,24.829128,237458000.0


In [32]:
# --- Feature engineering (returns, vol, RSI, MACD, volume stats) ---
df = pd.DataFrame({
    "date":   px_clean.index,
    "open":   px_clean["Open"].values,
    "high":   px_clean["High"].values,
    "low":    px_clean["Low"].values,
    "close":  px_clean["Close"].values,
    "volume": px_clean["Volume"].values,
})

# Simple returns
df["ret1"]  = df["close"].pct_change()
df["ret5"]  = df["close"].pct_change(5)
df["ret10"] = df["close"].pct_change(10)

# Rolling volatility (10d)
df["vol10"] = df["ret1"].rolling(10, min_periods=10).std(ddof=0)

# Volume z-score (20d)
df["volz"] = zscore(df["volume"], win=20)

# RSI(14) and MACD(12,26,9)
df["rsi14"] = rsi(df["close"], window=14)
df["macd"], df["macd_signal"] = macd(df["close"], fast=12, slow=26, signal=9)

# Drop warm-up NaNs & tag ticker
df = df.dropna().reset_index(drop=True)
df["ticker"] = TICKER

print("Engineered:", df.shape, "| first:", df['date'].min().date(), "| last:", df['date'].max().date())
df.head(3)


Engineered: (2690, 15) | first: 2015-01-30 | last: 2025-10-09


Unnamed: 0,date,open,high,low,close,volume,ret1,ret5,ret10,vol10,volz,rsi14,macd,macd_signal,ticker
0,2015-01-30,26.273739,26.628789,25.929783,25.998575,334982000.0,-0.014634,0.036998,0.096798,0.024845,0.788808,44.043684,0.379658,0.174628,AAPL
1,2015-02-02,26.196075,26.444609,25.758919,26.32478,250956400.0,0.012547,0.048895,0.119257,0.024164,-0.128917,46.492566,0.451778,0.230058,AAPL
2,2015-02-03,26.29593,26.426854,26.098433,26.329216,207662800.0,0.000169,0.087136,0.091335,0.023884,-0.56264,46.526842,0.503487,0.284744,AAPL


In [33]:
# --- Optional context via alignment (no merges) ---
if USE_MARKET:
    def fetch_close(ticker: str, start_date: str, end_date: str):
        r = yf.download(ticker, start=start_date, end=end_date, auto_adjust=True, progress=False)
        if r is None or r.empty:
            raise ValueError(f"No data for {ticker}")

        # Normalize index
        ii = pd.to_datetime(r.index, errors="coerce")
        try:
            if getattr(ii, "tz", None) is not None:
                ii = ii.tz_localize(None)
        except Exception:
            ii = pd.to_datetime(ii, errors="coerce").tz_localize(None)

        # Use the same helper to get a 1-D Close series (from Cell 6)
        close = get_price_series(r, "Close", ticker).astype(float)
        s = pd.Series(close.values, index=ii, name=ticker).sort_index()
        s = s[~s.index.duplicated(keep="last")]
        return s

    start_date = str(pd.to_datetime(df["date"]).min().date())
    spy = fetch_close("SPY", start_date, END)
    vix = fetch_close("^VIX", start_date, END)

    dti = pd.to_datetime(df["date"], errors="coerce")
    df["spy_close"] = dti.map(spy)
    df["vix_close"] = dti.map(vix)
    df["mkt_ret1"]  = df["spy_close"].pct_change(1)
    df["mkt_ret5"]  = df["spy_close"].pct_change(5)
    df["vix_chg1"]  = df["vix_close"].pct_change(1)

    df = df.dropna(subset=["spy_close","vix_close","mkt_ret1","mkt_ret5","vix_chg1"]).reset_index(drop=True)

print("With context:", df.shape)
df.tail(3)


With context: (2685, 20)


Unnamed: 0,date,open,high,low,close,volume,ret1,ret5,ret10,vol10,volz,rsi14,macd,macd_signal,ticker,spy_close,vix_close,mkt_ret1,mkt_ret5,vix_chg1
2682,2025-10-07,256.809998,257.399994,255.429993,256.480011,31955800.0,-0.000818,0.007265,0.008057,0.007273,-0.880517,67.782931,6.96712,7.019165,AAPL,669.119995,17.24,-0.003707,0.004413,0.053146
2683,2025-10-08,256.519989,258.519989,256.109985,258.059998,36465000.0,0.00616,0.010217,0.022789,0.006727,-0.652271,69.437503,6.812987,6.977929,AAPL,673.109985,16.299999,0.005963,0.006971,-0.054524
2684,2025-10-09,257.899994,258.0,253.140198,253.550003,21611570.0,-0.017477,-0.013923,-0.012925,0.006833,-1.081399,59.969894,6.254815,6.833306,AAPL,669.40918,16.91,-0.005498,0.000283,0.037423


In [34]:
# --- Save outputs (robust) ---
from pathlib import Path
import json, pandas as pd, numpy as np

out_csv = DATA / "df_nb02.csv"
df.to_csv(out_csv, index=False)
print("Saved CSV:", out_csv, "| bytes:", out_csv.stat().st_size)

def _has_fastparquet():
    try:
        import fastparquet  # noqa: F401
        return True
    except Exception:
        return False

def _sanitize_periods(df: pd.DataFrame) -> pd.DataFrame:
    # Convert any Period dtype columns to string to avoid Arrow extension issues
    for c in df.columns:
        if pd.api.types.is_period_dtype(df[c]):
            df[c] = df[c].astype(str)
    return df

# optional Parquet
out_parq = DATA / "df_nb02.parquet"
df_parq = _sanitize_periods(df.copy())

saved_parquet = False
# Try pyarrow first with unregister workaround
try:
    import pyarrow as pa
    # Workaround duplicate registration in long-running kernels
    try:
        pa.unregister_extension_type("pandas.period")
    except Exception:
        pass
    df_parq.to_parquet(out_parq, index=False, engine="pyarrow")
    saved_parquet = True
    print("Saved Parquet (pyarrow):", out_parq, "| bytes:", out_parq.stat().st_size)
except Exception as e:
    print("pyarrow failed →", e)
    # Fall back to fastparquet if available
    if _has_fastparquet():
        try:
            df_parq.to_parquet(out_parq, index=False, engine="fastparquet")
            saved_parquet = True
            print("Saved Parquet (fastparquet):", out_parq, "| bytes:", out_parq.stat().st_size)
        except Exception as e2:
            print("fastparquet also failed →", e2)

if not saved_parquet:
    print("Parquet save skipped (CSV written).")

# Quick QA
dts = pd.to_datetime(df["date"])
print("Rows:", len(df), "| date span:", dts.min().date(), "→", dts.max().date())
print("Columns:", list(df.columns))
print("NaNs total:", int(df.isna().sum().sum()))
print("\nret1 describe:\n", df["ret1"].describe().to_string())

# Optional: record snapshot meta
meta_path = DATA / "storage_format.json"
record = {"path": str(out_csv), "format": "csv"}
try:
    if meta_path.exists():
        meta = json.load(open(meta_path, "r", encoding="utf-8"))
        if isinstance(meta, dict): meta = [meta]
    else:
        meta = []
    meta = [m for m in meta if m.get("path") != record["path"]] + [record]
    json.dump(meta, open(meta_path, "w", encoding="utf-8"), indent=2)
except Exception:
    json.dump([record], open(meta_path, "w", encoding="utf-8"), indent=2)
print("Updated:", meta_path)


Saved CSV: data\df_nb02.csv | bytes: 974172
pyarrow failed → A type extension with name pandas.interval already defined
Saved Parquet (fastparquet): data\df_nb02.parquet | bytes: 385207
Rows: 2685 | date span: 2015-02-06 → 2025-10-09
Columns: ['date', 'open', 'high', 'low', 'close', 'volume', 'ret1', 'ret5', 'ret10', 'vol10', 'volz', 'rsi14', 'macd', 'macd_signal', 'ticker', 'spy_close', 'vix_close', 'mkt_ret1', 'mkt_ret5', 'vix_chg1']
NaNs total: 0

ret1 describe:
 count    2685.000000
mean        0.001005
std         0.018255
min        -0.128647
25%        -0.007395
50%         0.000941
75%         0.010063
max         0.153288
Updated: data\storage_format.json


In [35]:
# --- Optional: record snapshot meta (quiet) ---
meta_path = DATA / "storage_format.json"
record = {"path": str(out_csv), "format": "csv"}
try:
    if meta_path.exists():
        meta = json.load(open(meta_path, "r", encoding="utf-8"))
        if isinstance(meta, dict):
            meta = [meta]
    else:
        meta = []
    meta = [m for m in meta if m.get("path") != record["path"]] + [record]
    json.dump(meta, open(meta_path, "w", encoding="utf-8"), indent=2)
except Exception:
    json.dump([record], open(meta_path, "w", encoding="utf-8"), indent=2)
print("Updated:", meta_path)


Updated: data\storage_format.json
