In [4]:
import sys
NB   = Path.cwd()
ROOT = NB.parent
SRC  = ROOT / "src"
if str(SRC) not in sys.path:
    sys.path.append(str(SRC))

from pathlib import Path
from data_pipeline.config import ROOT, RAW_SPX, RAW_SPY  # you showed these in your config

print("CWD :", Path.cwd())
print("ROOT:", ROOT)
print("SPX dir:", RAW_SPX, "exists?", RAW_SPX.exists())
print("SPY dir:", RAW_SPY, "exists?", RAW_SPY.exists())

def pick_csv(dirpath: Path) -> Path:
    files = sorted(dirpath.glob("*.csv"))
    if not files:
        raise FileNotFoundError(f"No CSVs in {dirpath}. Check the path or LFS checkout.")
    # choose the largest file if multiple
    return max(files, key=lambda p: p.stat().st_size)

SPX_FILE = pick_csv(RAW_SPX)
SPY_FILE = pick_csv(RAW_SPY)
SPX_FILE, SPY_FILE


CWD : /Users/ya/Desktop/deep-hedging-rl/notebooks
ROOT: /Users/ya/Desktop/deep-hedging-rl
SPX dir: /Users/ya/Desktop/deep-hedging-rl/data/raw/options_spx exists? True
SPY dir: /Users/ya/Desktop/deep-hedging-rl/data/raw/options_spy exists? True


(PosixPath('/Users/ya/Desktop/deep-hedging-rl/data/raw/options_spx/l8au9t4q1ij3lofm.csv'),
 PosixPath('/Users/ya/Desktop/deep-hedging-rl/data/raw/options_spy/nwymczfl7n3h33rg.csv'))

In [9]:
# --- config paths from your data_pipeline.config ---
from pathlib import Path
from data_pipeline.config import ROOT, RAW_SPX, RAW_SPY

OUT_DIR = ROOT / "data" / "processed" / "options_parquet"
(OUT_DIR / "SPX").mkdir(parents=True, exist_ok=True)
(OUT_DIR / "SPY").mkdir(parents=True, exist_ok=True)

OM_COLS = [
    "secid","date","exdate","cp_flag","strike_price","best_bid","best_offer",
    "volume","open_interest","impl_volatility","delta","gamma","vega","theta",
    "optionid","root","ticker","index_flag","issuer","exercise_style"
]

def csv_to_parquet_parts_om(csv_path: Path, out_dir: Path, chunksize=2_000_000):
    import pandas as pd
    part = 0
    for chunk in pd.read_csv(
        csv_path,
        usecols=lambda c: c in OM_COLS,
        parse_dates=["date","exdate"],
        dtype={
            "cp_flag":"category", "ticker":"category", "root":"category",
            "index_flag":"category", "exercise_style":"category", "issuer":"category"
        },
        low_memory=False, chunksize=chunksize
    ):
        # --- normalize dates ---
        for c in ("date","exdate"):
            chunk[c] = pd.to_datetime(chunk[c], errors="coerce").dt.tz_localize(None).dt.normalize()

        # --- rename to canonical ---
        chunk = chunk.rename(columns={
            "exdate":"expiry",
            "cp_flag":"put_call",
            "strike_price":"strike",
            "best_bid":"bid",
            "best_offer":"ask",
            "impl_volatility":"iv",
        })

        # --- underlying ---
        if "underlying" not in chunk:
            chunk["underlying"] = chunk["ticker"].astype("string")
            # fallback if ticker missing:
            mask = chunk["underlying"].isna()
            if mask.any() and "root" in chunk:
                chunk.loc[mask, "underlying"] = chunk.loc[mask, "root"].astype("string")

        # --- compute mid if missing ---
        chunk["mid"] = (chunk["bid"].astype("float32") + chunk["ask"].astype("float32")) / 2.0

        # --- strike unit auto-fix (handles ×1000 dumps) ---
        s = pd.to_numeric(chunk["strike"], errors="coerce")
        if s.max() and s.max() > 100000:   # heuristically detect milli-dollars
            chunk["strike"] = s / 1000.0
        else:
            chunk["strike"] = s.astype("float32")

        # --- downcast numerics for memory ---
        for c in ("bid","ask","mid","last","iv","delta","gamma","vega","theta"):
            if c in chunk:
                chunk[c] = pd.to_numeric(chunk[c], errors="coerce", downcast="float")
        for c in ("open_interest","volume"):
            if c in chunk:
                chunk[c] = pd.to_numeric(chunk[c], errors="coerce", downcast="unsigned")

        # --- write this chunk as its own part ---
        part += 1
        chunk.to_parquet(out_dir / f"part_{part:04d}.parquet", index=False)

# pick files and convert
def pick_csv(dirpath: Path) -> Path:
    files = sorted(dirpath.glob("*.csv"))
    if not files:
        raise FileNotFoundError(f"No CSVs in {dirpath}")
    return max(files, key=lambda p: p.stat().st_size)

SPX_FILE = pick_csv(RAW_SPX)
SPY_FILE = pick_csv(RAW_SPY)

csv_to_parquet_parts_om(SPX_FILE, OUT_DIR / "SPX")
csv_to_parquet_parts_om(SPY_FILE, OUT_DIR / "SPY")


In [10]:
import pyarrow.dataset as ds
import pandas as pd
from pathlib import Path

CANONICAL_ORDER = [
    "date","underlying","put_call","expiry","strike",
    "bid","ask","mid","last","iv","delta","gamma","vega","theta",
    "open_interest","volume","secid","optionid","root","ticker",
    "index_flag","issuer","exercise_style"
]

def load_options_dir(dirpath: Path, columns=None) -> pd.DataFrame:
    dset = ds.dataset(dirpath, format="parquet")
    tbl = dset.to_table(columns=None)  # all, we've already harmonized at write
    df  = tbl.to_pandas()
    # final guards
    for c in ("date","expiry"):
        if c in df:
            df[c] = pd.to_datetime(df[c], errors="coerce").dt.tz_localize(None).dt.normalize()
    # order columns (keep extras at end)
    cols = [c for c in CANONICAL_ORDER if c in df.columns] + [c for c in df.columns if c not in CANONICAL_ORDER]
    return df[cols]

spx = load_options_dir(OUT_DIR / "SPX")
spy = load_options_dir(OUT_DIR / "SPY")


In [11]:
KEY = ["date","underlying","put_call","expiry","strike"]
spx = spx.sort_values(KEY).drop_duplicates(KEY, keep="last")
spy = spy.sort_values(KEY).drop_duplicates(KEY, keep="last")

def qc(df, name):
    print(f"[{name}] rows={len(df):,}  {df['date'].min().date()}→{df['date'].max().date()}")
    s = (df["ask"] - df["bid"])
    print(f"  neg_spread%={(s<0).mean():.3%}  zero_spread%={(s==0).mean():.3%}  iv>5%={(df['iv']>5).mean():.3%}")

qc(spx, "SPX"); qc(spy, "SPY")

dates_spx = set(spx["date"]); dates_spy = set(spy["date"])
print("Shared trading days:", len(dates_spx & dates_spy))


[SPX] rows=32,190,466  1996-01-04→2023-08-31
  neg_spread%=0.000%  zero_spread%=0.000%  iv>5%=0.039%
[SPY] rows=20,357,886  2005-01-10→2023-08-31
  neg_spread%=0.014%  zero_spread%=0.016%  iv>5%=0.025%
Shared trading days: 4693
