In [None]:
! conda install pyarrow yfinance joblib pandas numpy matplotlib seaborn scikit-learn


Channels:
 - defaults
Platform: win-64
Collecting package metadata (repodata.json): done
Solving environment: failed



PackagesNotFoundError: The following packages are not available from current channels:

  - py_vollib_vectorized

Current channels:

  - defaults

To search for alternate channels that may provide the conda package you're
looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.




In [None]:
! pip install polars-lts-cpu
! pip install py_vollib_vectorized
! pip install git+https://github.com/vollib/py_lets_be_rational.git # overwrite py_vollib_vectorized

Collecting polars-lts-cpu
  Using cached polars_lts_cpu-1.31.0-cp39-abi3-win_amd64.whl.metadata (15 kB)
Using cached polars_lts_cpu-1.31.0-cp39-abi3-win_amd64.whl (35.1 MB)
Installing collected packages: polars-lts-cpu
Successfully installed polars-lts-cpu-1.31.0


ERROR: Invalid requirement: '#': Expected package name at the start of dependency specifier
    #
    ^


In [1]:
# Add the folder with your calibration .py files to the Python path
import sys
sys.path.append(r'C:\Users\kevin\OneDrive\Documents\Draco\Summer-Quant-2025\heston calibration\tools')

In [None]:
# polars version of filtering & enriching SPXW options data

import polars as pl
import os
import shutil
import stat
import pandas as pd
import yfinance as yf
from pathlib import Path

# 0) Make it a directory, not a “.parquet” file
INPUT_PATTERN = r"C:\Users\kevin\OneDrive\Documents\Draco\spxw_filtered\*.csv"
OUTPUT_DIR    = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_pre_filtered_parquet")

# Clean up any old output directory
def on_rm_error(func, path, exc_info):
    os.chmod(path, stat.S_IWRITE)
    func(path)

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR, onerror=on_rm_error)

# 1. Get min_date, max_date from your options LazyFrame
lf = pl.scan_csv(
    INPUT_PATTERN,
    dtypes={
        "ticker": pl.Utf8, "volume":pl.Int64, "open":pl.Float64,
        "close":pl.Float64, "high":pl.Float64, "low":pl.Float64,
        "window_start":pl.Int64, "transactions":pl.Int64
    }
).filter(pl.col("window_start").is_not_null() & (pl.col("window_start") > 0))

# Parse ticker and derive date columns for min/max date
pat = r"O:SPXW(\d{6})([CP])(\d{8})"
lf_dates = (
    lf.with_columns([
        pl.col("ticker").str.extract(pat, 1).str.strptime(pl.Date, "%y%m%d", strict=False).alias("expiration"),
        pl.col("window_start").cast(pl.Datetime("ns")).dt.date().alias("date")
    ])
    .filter(pl.col("expiration").is_not_null() & pl.col("date").is_not_null())
)
date_bounds = lf_dates.select([
    pl.col("date").min().alias("min"),
    pl.col("date").max().alias("max")
]).collect()
min_date, max_date = date_bounds["min"][0], date_bounds["max"][0]

# 2. Build a complete date range as a DataFrame
all_dates = pd.DataFrame({"date": pd.date_range(min_date, max_date, freq="D").date})

# 3. Fetch SPX and IRX, merge with all_dates, and forward-fill
spx_pd = (yf.Ticker("^GSPC")
           .history(start=min_date, end=max_date + pd.Timedelta(days=1))
           ["Close"].reset_index().rename(columns={"Close":"S"}))
spx_pd["date"] = spx_pd["Date"].dt.date
spx_pd = all_dates.merge(spx_pd[["date", "S"]], on="date", how="left").fillna(method="ffill")
spx = pl.from_pandas(spx_pd[["date", "S"]])

irx_pd = (yf.Ticker("^IRX")
           .history(start=min_date, end=max_date + pd.Timedelta(days=1))
           ["Close"].reset_index().rename(columns={"Close":"r"}))
irx_pd["date"] = irx_pd["Date"].dt.date
irx_pd["r"] = irx_pd["r"] / 100.0
irx_pd = all_dates.merge(irx_pd[["date", "r"]], on="date", how="left").fillna(method="ffill")
irx = pl.from_pandas(irx_pd[["date", "r"]])

spx_lazy = spx.lazy()
irx_lazy = irx.lazy()

# 4. Build the full LazyFrame pipeline
# previous steps up to joining S & r
lf = (
    pl.scan_csv(INPUT_PATTERN, dtypes={ 
        # specify your dtypes here, e.g.:
        "ticker": pl.Utf8, "volume":pl.Int64, "open":pl.Float64,
        "close":pl.Float64, "high":pl.Float64, "low":pl.Float64,
        "window_start":pl.Int64, "transactions":pl.Int64
    })
      .filter(pl.col("window_start") > 0)
      .with_columns([
         pl.col("ticker").str.extract(pat, 2).replace({"C":"call","P":"put"}).alias("option_type"),
         pl.col("ticker").str.extract(pat, 1).str.strptime(pl.Date,"%y%m%d",strict=False).alias("expiration"),
         pl.col("ticker").str.extract(pat, 3).cast(pl.Int64).alias("strike_raw"),
         pl.col("window_start").cast(pl.Datetime("ns")).dt.date().alias("date")
      ])
      .filter(
         pl.col("expiration").is_not_null() &
         pl.col("strike_raw").is_not_null() &
         pl.col("date").is_not_null()
      )
      .with_columns([
         (pl.col("strike_raw")/1_000).alias("strike"),
         ((pl.col("expiration").cast(pl.Datetime("ns")) -
           pl.col("date"      ).cast(pl.Datetime("ns")))
          .dt.total_days()/365.0).alias("T")
      ])
      # your T and price filters
      .filter((pl.col("T")>7/365)&(pl.col("T")<180/365)&(pl.col("close")>0.1))
      # filter out very low volume options
      .filter(pl.col("volume") >= 5)
      # join in S & r
      .join(spx_lazy, on="date", how="left")
      .join(irx_lazy, on="date", how="left")

      # put–call parity / OTM filter
      .with_columns([
        # forward price F = S * exp(r * T)
        (pl.col("S") * (pl.col("r") * pl.col("T")).exp()).alias("F")
      ])
      .filter(
         ((pl.col("option_type")=="call") & (pl.col("strike") >= pl.col("F")))
       | ((pl.col("option_type")=="put")  & (pl.col("strike") <= pl.col("F")))
      )
      .drop("F")

      # now select your final columns
      .select(["date","strike","T","close","option_type","S","r"])
)

# 5. Collect to eager DataFrame
df = lf.collect()

# 6. Write out partitioned Parquet directory by date
df.write_parquet(
    OUTPUT_DIR,
    compression="snappy",
    partition_by="date"
)

print("Pre-filtered & date-partitioned Parquet written to", OUTPUT_DIR)


In [None]:
# KMeans + PCA + Vega Weighted sampling

import numpy as np
import pandas as pd
from sklearn.cluster import MiniBatchKMeans
from sklearn.decomposition import PCA
from py_vollib_vectorized import vectorized_implied_volatility as calculate_iv
from pathlib import Path
from joblib import Parallel, delayed
import shutil
import os
import stat

def hybrid_kmeans_pca_atm(
    df: pd.DataFrame,
    n_clusters: int = 200,
    n_pca_extremes: int = 0,
    atm_width: float = 0.05,
    atm_frac: float = 0.20,
    num_m_bins: int = 10,             # ← new: how many moneyness bins to enforce
    random_state: int = 42
) -> pd.DataFrame:
    df = df.copy()
    # 1) days & moneyness
    df['days']      = (df['T'] * 365).round().astype(int)
    S, r            = df['S'].iloc[0], df['r'].iloc[0]
    q               = df.get('q', 0.0)
    df['moneyness'] = df['strike'] / S

    # 2) implied‐vol + vega
    flags = np.where(df['option_type']=='call','c','p')
    iv = calculate_iv(
        df['close'].values, S, df['strike'].values, df['T'].values,
        r, flags, q, model='black_scholes_merton', return_as='numpy'
    )/100.0
    iv = np.nan_to_num(iv, nan=1e-6)
    sqrtT = np.sqrt(df['T'])
    d1    = (
        np.log(S/df['strike']) +
        (r - q + 0.5*iv**2)*df['T']
    )/(iv*sqrtT + 1e-12)
    df['vega'] = S * np.exp(-q*df['T']) * sqrtT * np.exp(-0.5*d1**2)/np.sqrt(2*np.pi)

    # 3) quantile‐transform moneyness & normalize days
    max_days     = df['days'].max() or 1
    df['m_rank'] = df['moneyness'].rank(pct=True)
    df['d_norm'] = df['days'] / max_days

    # 4) KMeans → pick max‐vega per cluster
    X  = np.vstack([df['m_rank'], df['d_norm']]).T
    km = MiniBatchKMeans(n_clusters=n_clusters, random_state=random_state)
    df['cluster'] = km.fit_predict(X)
    idx_k        = df.groupby('cluster')['vega'].idxmax().values
    samp_k       = df.loc[idx_k]

    # 5) PCA extremes (optional)
    if n_pca_extremes > 0:
        pca     = PCA(n_components=1, random_state=random_state)
        df['pc1'] = pca.fit_transform(X).ravel()
        half     = n_pca_extremes//2
        samp_pc  = pd.concat([
            df.nlargest(half, 'pc1'),
            df.nsmallest(half, 'pc1')
        ])
    else:
        samp_pc = df.head(0)

    # 6) combine & dedupe
    combined = pd.concat([samp_k, samp_pc]) \
                 .drop_duplicates(subset=['strike','T','option_type'])

    # 7) enforce ATM quota
    total_target = n_clusters + n_pca_extremes
    atm_target   = int(atm_frac * total_target)
    in_atm       = combined[np.abs(combined['moneyness']-1) <= atm_width]
    if len(in_atm) < atm_target:
        need   = atm_target - len(in_atm)
        extras = (df[np.abs(df['moneyness']-1) <= atm_width]
                  .drop(combined.index, errors='ignore')
                  .nlargest(need, 'vega'))
        combined = pd.concat([combined, extras])

    # 8) **new**: enforce at least one per m‐bin
    # build m‐bin edges in rank‐space
    edges = np.linspace(0, 1, num_m_bins+1)
    for i in range(num_m_bins):
        lo, hi = edges[i], edges[i+1]
        # does combined have a pick in [lo,hi)?
        mask_comb = combined['moneyness'].rank(pct=True).between(lo, hi, inclusive='left')
        if not mask_comb.any():
            # pick the highest-vega in df for that slice
            mask_df   = df['m_rank'].between(lo, hi, inclusive='left')
            if mask_df.any():
                pick = df[mask_df].nlargest(1, 'vega')
                combined = pd.concat([combined, pick])

    # 9) fill to total_target by vega
    if len(combined) < total_target:
        need      = total_target - len(combined)
        leftovers = df.drop(combined.index, errors='ignore')
        extra     = leftovers.nlargest(need, 'vega')
        combined  = pd.concat([combined, extra])

    # 10) cleanup
    to_drop = ['days','moneyness','m_rank','d_norm','vega','cluster','pc1']
    return combined.drop(columns=[c for c in to_drop if c in combined.columns]) \
                   .reset_index(drop=True)


# ------------------------------------------------------------------------------
# 2) Parallelize and write per-date
# ------------------------------------------------------------------------------
PRE_FILTERED_DIR = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_pre_filtered_parquet")
OUTPUT_DIR       = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_sampled_hybrid_parquet")

# Cleanup existing output
def _on_rm_error(func, path, exc_info):
    os.chmod(path, stat.S_IWRITE)
    func(path)

if OUTPUT_DIR.exists():
    shutil.rmtree(OUTPUT_DIR, onerror=_on_rm_error)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

def process_date(part_dir: Path):
    df = pl.read_parquet(str(part_dir/"0.parquet")).to_pandas()
    sampled = hybrid_kmeans_pca_atm(df)
    sampled['date'] = part_dir.name.split('=')[1]
    out_dir = OUTPUT_DIR / part_dir.name
    out_dir.mkdir(parents=True, exist_ok=True)
    pl.from_pandas(sampled).write_parquet(str(out_dir/"0.parquet"), compression='snappy')
    return len(sampled)

# Run in parallel
date_dirs = sorted(PRE_FILTERED_DIR.glob("date=*"))
sizes = Parallel(n_jobs=-1, verbose=5)(delayed(process_date)(d) for d in date_dirs)

print("Hybrid KMeans+PCA+Vega Weighted sampling done!")
print("Total picks:", sum(sizes))
print("Sample sizes (first 5):", sizes[:5])

Hybrid KMeans+PCA+Vega Weighted sampling done!
Total picks: 0
Sample sizes (first 5): []


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 12 concurrent workers.


In [None]:
# Read sampled options data and visualize against full options universe

import polars as pl
import os
import random
import matplotlib.pyplot as plt

SAMPLED_PARQUET_DIR = r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_sampled_hybrid_parquet"
PRE_FILTERED_PARQUET_DIR = r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_pre_filtered_parquet"

# Gather all available pre-filtered parquet files (look for '0.parquet')
pre_filtered_files = []
for root, dirs, files in os.walk(PRE_FILTERED_PARQUET_DIR):
    for file in files:
        if file == '0.parquet':
            pre_filtered_files.append(os.path.join(root, file))

if not pre_filtered_files:
    raise FileNotFoundError(f"No '0.parquet' files found in {PRE_FILTERED_PARQUET_DIR}")

# Load all pre-filtered data (full options universe)
df_all = pl.concat([pl.read_parquet(f) for f in pre_filtered_files]).to_pandas()

# List all date partitions
partitions = [d for d in os.listdir(SAMPLED_PARQUET_DIR) if os.path.isdir(os.path.join(SAMPLED_PARQUET_DIR, d))]
partitions.sort()
sampled_partitions = random.sample(partitions, min(10, len(partitions)))

print("Sampled partitions:", sampled_partitions)

for part in sampled_partitions:
    # Extract date from partition name (assumes format date=YYYY-MM-DD)
    date_str = part.split('=')[1] if '=' in part else part
    part_dir = os.path.join(SAMPLED_PARQUET_DIR, part)
    parquet_files = [os.path.join(part_dir, f) for f in os.listdir(part_dir) if f == '0.parquet']
    if not parquet_files:
        print(f"No '0.parquet' file found in {part_dir}")
        continue
    df_sampled = pl.read_parquet(parquet_files).to_pandas()
    df_full = df_all[df_all['date'].astype(str) == date_str]
    print(f"\nPartition: {part}")
    print("Sampled shape:", df_sampled.shape, "Full shape:", df_full.shape)
    print(df_sampled.head(5))
    # Visualization: overlay sampled and full data
    if all(col in df_sampled.columns for col in ['strike', 'S', 'T']) and all(col in df_full.columns for col in ['strike', 'S', 'T']):
        moneyness_full = df_full['strike'] / df_full['S']
        expiry_full = df_full['T'] * 365
        moneyness_sampled = df_sampled['strike'] / df_sampled['S']
        expiry_sampled = df_sampled['T'] * 365
        plt.figure(figsize=(7, 5))
        plt.scatter(moneyness_full, expiry_full, s=10, c='gray', alpha=0.3, label='All Options')
        plt.scatter(moneyness_sampled, expiry_sampled, s=60, c='tab:orange', edgecolor='black', marker='o', label='Sampled')
        plt.xlabel('Moneyness (K/S)')
        plt.ylabel('Days to Expiry')
        plt.title(f"Sampled vs All Options: {date_str}")
        plt.legend()
        plt.grid(alpha=0.3)
        plt.tight_layout()
        plt.show()
    else:
        print("Required columns for visualization not found in this partition.")

In [None]:
# Robust pre-check for sampled options data

import numpy as np
import polars as pl
from pathlib import Path

# Paths (adjust if needed)
SAMPLED_DIR = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_sampled_hybrid_parquet")

# 1) Load all sampled partitions into a single pandas DataFrame
df_sampled_all = (
    pl.scan_parquet(f"{SAMPLED_DIR}/date=*/*.parquet")
      .collect()
      .to_pandas()
)

# 2) Sanity‐check required columns
required_cols = ['date', 'strike', 'T', 'close', 'option_type', 'S', 'r']
missing = [c for c in required_cols if c not in df_sampled_all.columns]
if missing:
    raise ValueError(f"Missing required cols in sampled data: {missing}")

# 3) Basic emptiness & date‐count checks
if df_sampled_all.empty:
    raise ValueError("No sampled data found!")
unique_dates = df_sampled_all['date'].unique()
print(f"[Pre-Check] Dates covered: {len(unique_dates)}  (e.g. {unique_dates[:3]})")

# 4) Inspect the very first date's slice
first_date = unique_dates[0]
df0 = df_sampled_all[df_sampled_all['date'] == first_date]
print(f"\n[Pre-Check] Date = {first_date} → {len(df0)} rows")

for col in required_cols[1:]:  # skip 'date'
    ser     = df0[col]
    dtype   = getattr(ser, 'dtype', type(ser.iloc[0]))
    n_nan   = ser.isna().sum()
    print(f"  • {col:12s} dtype={dtype},  rows={ser.shape[0]},  NaNs={n_nan}")

# 5) Convert to numpy and check
strikes = np.asarray(df0['strike'], dtype=float)
T       = np.asarray(df0['T'], dtype=float)
prices  = np.asarray(df0['close'], dtype=float)
flags   = np.where(df0['option_type']=='call','c','p')
r_arr   = np.full_like(strikes, df0['r'].iloc[0], dtype=float)
q_arr   = np.zeros_like(strikes, dtype=float)

print("\n[Pre-Check] NumPy arrays:")
for name, arr in [
    ('strikes', strikes), ('T', T), ('prices', prices),
    ('flags', flags), ('r_arr', r_arr), ('q_arr', q_arr)
]:
    arr = np.asarray(arr)
    print(f"  • {name:8s} shape={arr.shape}, dtype={arr.dtype}", end='')
    if np.issubdtype(arr.dtype, np.number):
        print(f", NaNs={np.isnan(arr).sum()}")
    else:
        print()

# 6) Warn on degenerate arrays
for name, arr in [('strikes', strikes), ('T', T), ('prices', prices)]:
    if arr.size == 0:
        print(f"  [Warning] {name} is empty!")
    elif np.all(np.isnan(arr)):
        print(f"  [Warning] {name} all NaN!")
    elif np.all(arr == 0):
        print(f"  [Warning] {name} all zero!")

print("\n[Pre-Check] Completed successfully – ready for calibration.")


In [None]:
# Calibration over a date range with full worker logic, now with LM progress logs returned

import os
import time
from datetime import datetime
from pathlib import Path

import polars as pl
import numpy as np
from joblib import Parallel, delayed

from Heston_COS_METHOD import heston_cosine_method
from Levenberg_Marquardt import levenberg_Marquardt
from Heston_Calibration_Class import Data_Class
from py_vollib_vectorized import vectorized_implied_volatility as calculate_iv

# === CONFIG ===
SAMPLED_DIR = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_sampled_hybrid_parquet")
CALIB_DIR   = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_calibrated_parquet")
CALIB_DIR.mkdir(parents=True, exist_ok=True)

# === SELECT DATE WINDOW ===
start_date = datetime(2024, 1, 2).date()
end_date   = datetime(2024, 2, 1).date()

def calibrate_one_date(part_dir: Path):
    date = part_dir.name.split('=')[1]
    print(f"→ Starting calibration for {date}")

    # Read that date's Parquet(s)
    dfs = [pl.read_parquet(f) for f in part_dir.glob("*.parquet")]
    df_pl = pl.concat(dfs)

    # Extract arrays
    S       = float(df_pl["S"][0])
    strikes = df_pl["strike"].to_numpy()
    T       = df_pl["T"].to_numpy()
    prices  = df_pl["close"].to_numpy()
    flags   = np.where(df_pl["option_type"].to_numpy() == "call", "c", "p")
    r       = float(df_pl["r"][0])
    q       = 0.0
    r_arr   = np.full_like(strikes, r, dtype=float)
    q_arr   = np.full_like(strikes, q, dtype=float)

    # Build calibration container
    data = Data_Class()
    data.S             = S
    data.K             = strikes
    data.T             = T
    data.r             = r_arr
    data.q             = q_arr
    data.market_prices = prices
    data.flag          = flags
    data.calculate_implied_vol()
    market_vol = data.market_vol

    # Calibration parameters
    initial_guess       = np.array([0.04, 0.50, -0.70, 1.0, 0.04]).reshape(5,1)
    N, L                = 100, 10
    I, w                = 500, 1e-3
    precision           = 0.01
    params_to_calibrate = ['vbar','sigma','rho','kappa','v0']
    accel_mag, min_acc  = 1, 1e-3

    # Run Levenberg–Marquardt and collect logs
    calib_params, acc, rej, RMSE, lm_logs = levenberg_Marquardt(
        data, initial_guess, I, w, N, L, precision,
        params_to_calibrate, accel_mag, min_acc, return_logs=True
    )

    # Price via Heston COS
    h_prices = heston_cosine_method(
        data.S, strikes, T, N, L, r_arr, q_arr,
        calib_params[0,0], calib_params[4,0],
        calib_params[1,0], calib_params[2,0],
        calib_params[3,0], flags
    )

    # Implied vol of Heston prices
    calib_iv = (
        calculate_iv(
            h_prices, S, strikes, T, r_arr, flags, q_arr,
            model='black_scholes_merton', return_as='numpy'
        ) * 100
    ).ravel()

    # Assemble output DataFrame
    out = pl.DataFrame({
        "date": [date]*len(strikes),
        "strike": strikes,
        "T": T,
        "close": prices,
        "option_type": df_pl["option_type"].to_numpy(),
        "S": [S]*len(strikes),
        "r": [r]*len(strikes),
        "q": [q]*len(strikes),
        "market_vol": market_vol,
        "calib_iv": calib_iv,
        "calib_vbar": float(calib_params[0,0]),
        "calib_sigma": float(calib_params[1,0]),
        "calib_rho": float(calib_params[2,0]),
        "calib_kappa": float(calib_params[3,0]),
        "calib_v0": float(calib_params[4,0]),
        "calib_rmse": float(RMSE[-1]) if len(RMSE)>0 else np.nan,
        "calib_acc": int(acc),
        "calib_rej": int(rej),
    })

    # Write partitioned by date
    out_dir = CALIB_DIR / f"date={date}"
    out_dir.mkdir(parents=True, exist_ok=True)
    out.write_parquet(out_dir/"part-0.parquet", compression="snappy")

    print(f"✓ Finished calibration for {date}")
    return date, lm_logs

# === MAIN EXECUTION ===
all_date_dirs = sorted(SAMPLED_DIR.glob("date=*"))
# Filter to your desired window
date_dirs = []
for d in all_date_dirs:
    dstr = d.name.split('=')[1]
    ddate = datetime.strptime(dstr, "%Y-%m-%d").date()
    if start_date <= ddate <= end_date:
        date_dirs.append(d)

print(f"Running calibration on {len(date_dirs)} days from {start_date} to {end_date}")

start = time.time()
processed = Parallel(n_jobs=-1, verbose=5)(
    delayed(calibrate_one_date)(d) for d in date_dirs
)
total_min = (time.time() - start) / 60
print(f"\n✅ Calibrated {len(processed)} dates in {total_min:.1f} minutes")

# Print LM logs for each date
for date, lm_logs in processed:
    print(f"\n--- LM Progress for {date} ---")
    for line in lm_logs:
        print(line)

In [None]:
from joblib import Parallel, delayed
import polars as pl
import numpy as np
from pathlib import Path
import os
from Heston_Calibration_Class import Data_Class

def plot_surface_for_date(date: str, calib_dir: Path, out_dir: Path):
    day_dir = calib_dir / f"date={date}"
    files = list(day_dir.glob("*.parquet"))
    if not files:
        print(f"[Surface] {date} → no data, skipping")
        return False

    # load into pandas for Data_Class
    df = pl.concat([pl.read_parquet(p) for p in files]).to_pandas()
    if df.empty:
        print(f"[Surface] {date} → empty DataFrame, skipping")
        return False

    # build Data_Class
    data = Data_Class()
    data.S             = float(df["S"].iloc[0])
    data.K             = df["strike"].values.astype(float)
    data.T             = df["T"].values.astype(float)
    data.r             = np.full_like(data.K, float(df["r"].iloc[0]), dtype=float)
    data.q             = np.zeros_like(data.K, dtype=float)
    data.market_prices = df["close"].values.astype(float)
    data.flag          = np.where(df["option_type"]=="call","c","p")
    data.calculate_implied_vol()

    calib_iv = df["calib_iv"].values.astype(float)

    # ensure output folder exists
    out_dir.mkdir(parents=True, exist_ok=True)

    # switch cwd so plot_save_surface writes <date>.png into out_dir
    prev_cwd = os.getcwd()
    os.chdir(str(out_dir))
    try:
        data.plot_save_surface(calib_iv, date_today=date)
        print(f"[Surface] {date} → saved {date}.png in {out_dir}")
    finally:
        os.chdir(prev_cwd)

    return True

# ------------- usage -------------
from joblib import Parallel
import os

CALIB_PARQUET_DIR = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_calibrated_parquet")
SURFACE_PNG_DIR   = Path(r"C:\Users\kevin\OneDrive\Documents\Draco\data\spxw_surface_png")

all_dates = sorted(d.name.split("=")[1]
                   for d in CALIB_PARQUET_DIR.glob("date=*") if d.is_dir())
# pick your window, or use all_dates:
sel = [d for d in all_dates if "2024-01-02" <= d <= "2024-02-01"]

results = Parallel(n_jobs=os.cpu_count(), verbose=5)(
    delayed(plot_surface_for_date)(date, CALIB_PARQUET_DIR, SURFACE_PNG_DIR)
    for date in sel
)

print(f"[Surface] done: {sum(results)}/{len(sel)} succeeded")


[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done   4 out of  22 | elapsed:    5.7s remaining:   26.1s
[Parallel(n_jobs=12)]: Done   9 out of  22 | elapsed:    5.9s remaining:    8.6s
[Parallel(n_jobs=12)]: Done   4 out of  22 | elapsed:    5.7s remaining:   26.1s
[Parallel(n_jobs=12)]: Done   9 out of  22 | elapsed:    5.9s remaining:    8.6s
[Parallel(n_jobs=12)]: Done  14 out of  22 | elapsed:    8.6s remaining:    4.9s
[Parallel(n_jobs=12)]: Done  19 out of  22 | elapsed:    8.7s remaining:    1.3s
[Parallel(n_jobs=12)]: Done  14 out of  22 | elapsed:    8.6s remaining:    4.9s
[Parallel(n_jobs=12)]: Done  19 out of  22 | elapsed:    8.7s remaining:    1.3s


[Surface] done: 22/22 succeeded


[Parallel(n_jobs=12)]: Done  22 out of  22 | elapsed:    9.3s finished
