# Data Ingestion & Preprocessing Pipeline

This notebook serves as the entry point for the project's data lifecycle. It performs the following steps:
1. **Download**: Fetches historical stock and crypto data using `yfinance`.
2. **Raw Storage**: Minimizes memory usage with `float32` and saves the raw state to Parquet.
3. **Preprocessing**: Aligns trading days, computes features (lags, volatility, etc.), and prepares a target-ready panel data.
4. **Processed Storage**: Saves the final engineered dataset for Model training.

**Note for GitHub Users**: Run this notebook once to populate the `data/` directory.

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
import os
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

# =============================================================================
# Configuration
# =============================================================================
START_DATE  = "2010-01-01"
END_DATE    = "2024-12-31"
ROLLING_WIN = 20

# 12 tickers: 6 sectors + market ETF + crypto
TICKERS = [
    "AAPL", "MSFT", "NVDA",       # Tech
    "JPM",  "GS",                  # Finance
    "JNJ",  "UNH",                 # Healthcare
    "KO",   "MCD",                 # Consumer
    "XOM",                         # Energy
    "SPY",                         # Market ETF (feature only)
    "BTC-USD",                     # Crypto
]

# Ensure directories exist
os.makedirs("../data/raw", exist_ok=True)
os.makedirs("../data/processed", exist_ok=True)

## 1. Download & Save Raw Data
We download the data and immediately cast numeric columns to `float32` to save 50% memory. We save this to `data/raw/stock_data_raw.parquet`.

In [None]:
raw_dict = {}
for ticker in tqdm(TICKERS, desc="Downloading"):
    df = yf.download(
        ticker, start=START_DATE, end=END_DATE,
        interval="1d", auto_adjust=False, progress=False,
    )
    if df.empty:
        print(f"  ⚠ {ticker}: empty — skipped")
        continue
    
    # Handle MultiIndex columns from newer yfinance versions
    if isinstance(df.columns, pd.MultiIndex):
        df = df.droplevel(1, axis=1)
    
    df.columns = df.columns.str.lower()
    df.index.name = "date"
    
    # Cast to float32 for memory efficiency
    float_cols = df.select_dtypes(include=['float']).columns
    df[float_cols] = df[float_cols].astype('float32')
    
    raw_dict[ticker] = df

print(f"  ✓ {len(raw_dict)}/{len(TICKERS)} tickers downloaded")

# Save individual raw files (optional) or combine them
# For simplicity, we save a combined raw panel
raw_panel = pd.concat([df.assign(ticker=t) for t, df in raw_dict.items()])
raw_panel.to_parquet("../data/raw/stock_data_raw.parquet", index=True)
print("  ✓ Raw data saved to data/raw/stock_data_raw.parquet")

## 2. Feature Engineering & Preprocessing
We apply the logic from `EDA.ipynb` to create the final training dataset, aligning common trading days across all tickers.

In [None]:
# Align Dates
common_dates = sorted(set.intersection(*(set(d.index) for d in raw_dict.values())))
aligned = {t: d.loc[common_dates].copy() for t, d in raw_dict.items()}
print(f"  ✓ {len(common_dates)} common trading days")

pieces = []
for ticker, df in aligned.items():
    d = df.copy()

    # Target: today's log return
    d["log_return"] = np.log(d["adj close"] / d["adj close"].shift(1))

    # Lagged returns
    d["ret_lag1"] = d["log_return"].shift(1)
    d["ret_lag2"] = d["log_return"].shift(2)
    d["ret_lag5"] = d["log_return"].shift(5)

    # 20-day rolling volatility
    d["roll_vol"] = d["log_return"].rolling(ROLLING_WIN).std().shift(1)

    # Normalized high-low range
    d["range_norm"] = ((d["high"] - d["low"]) / d["close"]).shift(1)

    # Volume z-score
    v_mu  = d["volume"].rolling(ROLLING_WIN).mean()
    v_sig = d["volume"].rolling(ROLLING_WIN).std()
    d["vol_zscore"] = ((d["volume"] - v_mu) / v_sig).shift(1)

    d["ticker"] = ticker
    pieces.append(d)

panel = pd.concat(pieces).reset_index().set_index(["date", "ticker"]).sort_index()

# Market return (SPY)
if "SPY" in raw_dict:
    spy_lr = (
        panel.xs("SPY", level="ticker")["log_return"]
        .shift(1)
        .rename("mkt_return")
        .reset_index()
    )
    panel = (
        panel.reset_index()
        .merge(spy_lr, on="date", how="left")
        .set_index(["date", "ticker"])
    )

panel.dropna(inplace=True)

# Cast final features to float32
panel = panel.astype({col: 'float32' for col in panel.select_dtypes('float64').columns})

panel.to_parquet("../data/processed/stock_data_processed.parquet", index=True)
print(f"  ✓ Processed panel shape: {panel.shape}")
print("  ✓ Processed data saved to data/processed/stock_data_processed.parquet")