<a href="https://colab.research.google.com/github/jcsellers/HAWK1090/blob/master/X_DTE_selector_live.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
!pip install pandas-market-calendars --quiet
!pip install py_vollib_vectorized --quiet
import yfinance as yf
import pandas as pd
import json
from pathlib import Path
import os
import time

# --- 1. Project Setup and Configuration ---

# Define the root path of your project in Google Drive
# This helps keep all file paths consistent.
try:
    from google.colab import drive
    drive.mount('/content/drive')
    PROJECT_ROOT = Path('/content/drive/MyDrive/1dte_selector/')
except ImportError:
    # If not in Colab, use a local path (adjust as needed)
    PROJECT_ROOT = Path('.')

# Create necessary directories if they don't exist
CONFIG_DIR = PROJECT_ROOT / 'configs'
LOG_DIR = PROJECT_ROOT / 'live_logs'
CONFIG_DIR.mkdir(exist_ok=True)
LOG_DIR.mkdir(exist_ok=True)

CONFIG_FILE_PATH = CONFIG_DIR / 'champion_model_config.json'

def create_champion_config(config_path: Path):
    """Defines and saves the configuration for our champion model (Trial #198)."""

    champion_params = {
      "model_name": "Trial_198_DeltaSharpe_Champion",
      "description": "Best parameters from the 250-trial deep dive, optimizing for Delta Sharpe.",
      "hyperparameters": {
        "MIN_TRADES": 11,
        "MIN_WIN_RATE": 0.76,
        "MIN_AVG_PNL": 40,
        "MAX_LEG_DELTA": 55,
        "VOL_CAP": 30,
        "GAP_CAP": 5,
        "FALLBACK_OK": True
      },
      # We can add other system settings here later
      "system_settings": {
          "lookback_period_months": 24
      }
    }

    print(f"Creating configuration file at: {config_path}")
    with open(config_path, 'w') as f:
        json.dump(champion_params, f, indent=2)

def load_config(config_path: Path) -> dict:
    """Loads the model configuration from a JSON file."""
    if not config_path.exists():
        print(f"Config file not found at {config_path}, creating a default one.")
        create_champion_config(config_path)

    print(f"Loading configuration from: {config_path}")
    with open(config_path, 'r') as f:
        config = json.load(f)
    return config

# --- 2. Live Data Fetching ---

def fetch_live_data(tickers: list) -> pd.DataFrame | None:
    """
    Fetches the last 2 years of daily data for the given tickers.
    Cleans and formats the DataFrame for feature calculation.
    """
    print(f"\nFetching live data for tickers: {tickers}...")
    try:
        # Fetch data for the last 2 years, which is plenty for rolling features
        raw_data = yf.download(tickers, period="2y", auto_adjust=True)

        if raw_data.empty:
            print("❌ Error: yfinance returned an empty DataFrame.")
            return None

        # The downloaded data has multi-level columns, e.g., ('Close', '^VIX')
        # We need to flatten and rename them for easy access.
        df = raw_data.stack().reset_index().rename(columns={'level_1': 'Ticker'})
        df = df.pivot(index='Date', columns='Ticker', values=['Open', 'High', 'Low', 'Close', 'Volume'])

        # Flatten the multi-level columns, e.g., ('Close', '^VIX') -> 'Close_^VIX'
        df.columns = [f'{val}_{ticker}' for val, ticker in df.columns]

        # Rename columns to match the style of our backtest CSV for consistency
        # e.g., 'Close_^GSPC' -> 'SPX_Close'
        rename_map = {
            '_^GSPC': '_SPX',
            '_^VIX': '_VIX',
            '_^VVIX': '_VVIX',
            '_^SKEW': '_SKEW'
        }
        for old_suffix, new_suffix in rename_map.items():
            df.columns = [col.replace(old_suffix, new_suffix) for col in df.columns]

        print("✅ Data fetched successfully.")
        return df

    except Exception as e:
        print(f"❌ An error occurred during data fetching: {e}")
        return None


# --- 3. Main Execution Block ---

# Load the configuration
config = load_config(CONFIG_FILE_PATH)
print("\nLoaded Configuration:")
print(json.dumps(config, indent=2))

# Define tickers needed for our features.
# Note: V1D data is not available in yfinance. We may need to skip features
# that rely on it or find an alternative source/proxy later.
required_tickers = ['^GSPC', '^VIX', '^VVIX', '^SKEW']

# Fetch the live data
live_market_data = fetch_live_data(required_tickers)

if live_market_data is not None:
    print(f"\nSuccessfully fetched market data. Shape: {live_market_data.shape}")
    print("Latest data point (T-1):")
    # Display the second to last row, as the last row might be for the current, incomplete day
    print(live_market_data.iloc[-2])

[                       0%                       ]

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Loading configuration from: /content/drive/MyDrive/1dte_selector/configs/champion_model_config.json

Loaded Configuration:
{
  "model_name": "Trial_198_DeltaSharpe_Champion",
  "description": "Best parameters from the 250-trial deep dive, optimizing for Delta Sharpe.",
  "hyperparameters": {
    "MIN_TRADES": 11,
    "MIN_WIN_RATE": 0.76,
    "MIN_AVG_PNL": 40,
    "MAX_LEG_DELTA": 55,
    "VOL_CAP": 30,
    "GAP_CAP": 5,
    "FALLBACK_OK": true
  },
  "system_settings": {
    "lookback_period_months": 24
  }
}

Fetching live data for tickers: ['^GSPC', '^VIX', '^VVIX', '^SKEW']...


[*********************100%***********************]  4 of 4 completed

✅ Data fetched successfully.

Successfully fetched market data. Shape: (503, 20)
Latest data point (T-1):
Open_SPX       5.978940e+03
Open_SKEW      1.374600e+02
Open_VIX       1.768000e+01
Open_VVIX      9.116000e+01
High_SPX       5.990480e+03
High_SKEW      1.374600e+02
High_VIX       1.807000e+01
High_VVIX      9.456000e+01
Low_SPX        5.966110e+03
Low_SKEW       1.374600e+02
Low_VIX        1.741000e+01
Low_VVIX       9.042000e+01
Close_SPX      5.970810e+03
Close_SKEW     1.374600e+02
Close_VIX      1.761000e+01
Close_VVIX     9.079000e+01
Volume_SPX     4.767050e+09
Volume_SKEW    0.000000e+00
Volume_VIX     0.000000e+00
Volume_VVIX    0.000000e+00
Name: 2025-06-04 00:00:00, dtype: float64



  df = raw_data.stack().reset_index().rename(columns={'level_1': 'Ticker'})


In [25]:
# --- 4. Feature Calculation (Revised with Dynamic Binning) ---

# The helper function remains the same
def cut_bins(series: pd.Series, edges: list) -> pd.Series:
    """Cuts a series into bins with predefined edges."""
    if series.empty:
        return pd.Series(dtype='category')
    # Using [-np.inf, *edges, np.inf] is more robust for dynamic quantiles
    bins = [-np.inf, *edges, np.inf]
    labels = [f"Q{i+1}" for i in range(len(bins) - 1)]
    return pd.cut(series, bins=bins, labels=labels, include_lowest=True, duplicates='drop')


def calculate_live_features(df: pd.DataFrame) -> pd.DataFrame | None:
    """
    Calculates all necessary features, using DYNAMIC binning for categorical features.

    Args:
        df (pd.DataFrame): The DataFrame from fetch_live_data.

    Returns:
        A DataFrame with the original and all new feature columns.
    """
    if df is None:
        return None

    print("\nCalculating live features with DYNAMIC binning...")
    features = df.copy()

    # --- A. Calculate Raw Numerical Features ---
    features['VIX_ONMPctT'] = (features['Open_VIX'] / features['Close_VIX'].shift(1)) - 1
    features['TS_RTm1'] = (features['Close_VIX'].shift(1) / features['Close_VVIX'].shift(1)) - 1
    features['VIX_ClTm1'] = features['Close_VIX'].shift(1)

    # --- B. Calculate Categorical Features using Dynamic Binning ---

    # Define which features to bin and how many bins (quantiles) to create
    features_to_bin = {
        "VIX_ClTm1": 5,
        "TS_RTm1": 5,
        "VIX_ONMPctT": 5
    }

    for feature_name, num_bins in features_to_bin.items():
        if feature_name in features.columns:
            # To avoid lookahead bias, calculate quantiles on all data EXCEPT the most recent point
            historical_series = features[feature_name].dropna().iloc[:-1]

            # Calculate the quantile edges (e.g., for 5 bins, we need 4 edges: .2, .4, .6, .8)
            quantiles = np.linspace(0, 1, num_bins + 1)[1:-1] # e.g., [0.2, 0.4, 0.6, 0.8]

            try:
                # Calculate the bin edges from the historical distribution
                edges = historical_series.quantile(quantiles).tolist()

                cat_col_name = f"{feature_name}_Cat"
                # Apply these fresh, dynamic edges to the entire series
                features[cat_col_name] = cut_bins(features[feature_name], edges)
                print(f"  Dynamically created categorical feature: {cat_col_name}")
            except Exception as e:
                print(f"  ⚠️ Could not dynamically bin '{feature_name}'. Error: {e}")
        else:
            print(f"  ⚠️ Warning: Raw column '{feature_name}' not found. Cannot create categorical feature.")

    # --- C. Add Time-Based Features ---
    features['DOW_Cat'] = features.index.day_name().astype('category')

    print("✅ Feature calculation complete.")
    return features

# --- 5. Main Execution Block (continued) ---

# Re-run the feature calculation on the data we fetched previously
features_df = calculate_live_features(live_market_data)

if features_df is not None:
    # Display the latest data point with all the new features
    latest_features = features_df.iloc[-1]

    print("\n--- Latest Features for Next Day's Signal (using Dynamic Bins) ---")
    print(f"Data for decision on: {latest_features.name.strftime('%Y-%m-%d')}")

    important_cols = [
        'VIX_ClTm1', 'VIX_ClTm1_Cat',
        'TS_RTm1', 'TS_RTm1_Cat',
        'VIX_ONMPctT', 'VIX_ONMPctT_Cat',
        'DOW_Cat'
    ]
    cols_to_print = [col for col in important_cols if col in latest_features]
    print(latest_features[cols_to_print])


Calculating live features with DYNAMIC binning...
  Dynamically created categorical feature: VIX_ClTm1_Cat
  Dynamically created categorical feature: TS_RTm1_Cat
  Dynamically created categorical feature: VIX_ONMPctT_Cat
✅ Feature calculation complete.

--- Latest Features for Next Day's Signal (using Dynamic Bins) ---
Data for decision on: 2025-06-05
VIX_ClTm1          17.610001
VIX_ClTm1_Cat             Q4
TS_RTm1            -0.806036
TS_RTm1_Cat               Q5
VIX_ONMPctT         0.003975
VIX_ONMPctT_Cat           Q3
DOW_Cat             Thursday
Name: 2025-06-05 00:00:00, dtype: object


In [29]:
# --- 6. Final Signal Generation ---
# This section brings in all necessary logic from our backtesting script
# to build the strategy rulebook and make a final decision.

import unicodedata
from datetime import timedelta
import pandas_market_calendars as mcal # For robust holiday/weekend handling

# Define paths to the data the lookup builder needs
# Note: Ensure VERSION is defined, or replace it with your version string e.g., "yf_v1.4"
try:
    VERSION
except NameError:
    VERSION = "yf_v1.4" # Fallback if not defined earlier

BT_DIR = PROJECT_ROOT / "backtest_data"
EDA_DIR = PROJECT_ROOT / f"eda_results_1dte_NO_LOOKAHEAD_{VERSION}"

# The primary VIX category column name must match what's in your EDA files
PRIMARY_VIX_CAT_ACTUAL = "VIX_ClTm1_Cat"

# --- Helper functions copied from the backtesting script ---

def _strategy_key(side: str, delta: int, width: int) -> str:
    """Generates a standardized key for the PNL dictionary."""
    return f"{side.lower()}_{delta}_{width}"

def _has_strategy_legs(strategy_name: str, pnl_data: dict) -> bool:
    """Checks if PNL series for all legs of a strategy exist."""
    if not pnl_data: return False
    # This parser handles names like "IC_P20C10_10w" or "PutSpread_D12_5w"
    if strategy_name.startswith(("PutSpread", "CallSpread")):
        m = re.search(r"(Put|Call)Spread_D(\d+)_(\d+)w", strategy_name)
        if not m: return False
        side, d, w = m.group(1), int(m.group(2)), int(m.group(3))
        return _strategy_key(side, d, w) in pnl_data
    elif strategy_name.startswith("IC"):
        m = re.search(r"IC_P(\d+)C(\d+)_(\d+)w", strategy_name)
        if not m: return False
        p_delta, c_delta, width = int(m.group(1)), int(m.group(2)), int(m.group(3))
        return (_strategy_key("put", p_delta, width) in pnl_data and
                _strategy_key("call", c_delta, width) in pnl_data)
    return False

def _clean_column_names(columns: list) -> list:
    """Normalizes column names for consistency."""
    out = []
    for c in columns:
        c = unicodedata.normalize('NFKD', str(c)).encode('ascii', 'ignore').decode('utf-8')
        c = c.replace(' ', '_').replace('.', '_').strip()
        out.append(c)
    return out

# --- PNL Data Loading (to know which strategies are valid) ---
def load_pnl_placeholders(backtest_dir: Path) -> dict:
    """Scans for PNL files and creates a dictionary of existing strategy keys."""
    print("\nLoading available strategy PNL placeholders...")
    all_pnl_keys = {}
    file_pattern = re.compile(r"1dte_(put|call)_s?p[er]{2}ad_(\d+)delta_(\d+)pts.*\.csv", re.I)
    for fp in backtest_dir.glob("*.csv"):
        match = file_pattern.match(fp.name)
        if match:
            side, delta, width = match.groups()
            key = _strategy_key(side, int(delta), int(width))
            all_pnl_keys[key] = True # We just need the key to exist
    print(f"✅ Found {len(all_pnl_keys)} unique PNL backtests available.")
    return all_pnl_keys

# --- The "Brain": Strategy Lookup Builder ---
def build_strategy_lookup(
    params: dict, eda_files_dir: Path, pnl_data: dict, train_range_dates: tuple
) -> tuple[dict, dict, set]:
    """Builds strategy lookup tables based on rules from EDA files."""
    lut_full, lut_primary, sitout_keys = {}, {}, set()
    train_start, train_end = pd.to_datetime(train_range_dates[0]), pd.to_datetime(train_range_dates[1])

    for eda_file_path in eda_files_dir.glob("eda_*_Cat*.csv"):
        try:
            df_eda = pd.read_csv(eda_file_path)
            df_eda.columns = _clean_column_names(df_eda.columns)

            # Filter EDA data by the dynamic training date range
            if "Date" in df_eda.columns:
                df_eda["Date"] = pd.to_datetime(df_eda["Date"], errors='coerce')
                df_eda = df_eda[df_eda["Date"].between(train_start, train_end)]
            if df_eda.empty: continue

            # Identify strategy, VIX, and secondary columns
            strat_col = next((c for c in df_eda.columns if c in ["Structure_Name", "Strategy_Name"]), None)
            vix_cat_col = next((c for c in df_eda.columns if PRIMARY_VIX_CAT_ACTUAL in c), None)
            sec_cat_col = next((c for c in df_eda.columns if c.endswith("_Cat") and c != vix_cat_col), None)
            if not all([strat_col, vix_cat_col, sec_cat_col]): continue

            # Filter rules based on champion hyperparameters
            valid_rows = df_eda[df_eda["Trade_Count"] >= params.get("MIN_TRADES", 10)]
            best_strats = valid_rows.sort_values("Avg_PNL", ascending=False).groupby(
                [vix_cat_col, sec_cat_col], sort=False).head(1)

            for _, row in best_strats.iterrows():
                strategy = row[strat_col]
                if not _has_strategy_legs(strategy, pnl_data): continue

                deltas = [int(d) for d in re.findall(r'\d+', strategy.split("_")[1])]
                if deltas and max(deltas) > params.get("MAX_LEG_DELTA", 50): continue

                key = (str(row[vix_cat_col]).split("_")[0], sec_cat_col, str(row[sec_cat_col]).split("_")[0])

                if (row.get("Win_Rate", 0) < params.get("MIN_WIN_RATE", 0.55)) or \
                   (row.get("Avg_PNL", 0) < params.get("MIN_AVG_PNL", 0)):
                    sitout_keys.add(key)
                    continue

                lut_full[key] = strategy
                if key[0] not in lut_primary: lut_primary[key[0]] = strategy
        except Exception:
            continue
    return lut_full, lut_primary, sitout_keys

# --- The Final Signal Generation Function ---
def generate_final_signal(
    latest_features: pd.Series, model_config: dict, eda_dir: Path, pnl_data: dict
) -> str:
    """Takes the latest features and generates the final trading signal."""
    print("\n--- Generating Final Signal ---")
    params = model_config['hyperparameters']

    # 1. Check master sit-out filters
    if params.get("VOL_CAP") and latest_features.get('VIX_ClTm1', 0) > params["VOL_CAP"]:
        return f"Sit Out (VIX {latest_features['VIX_ClTm1']:.2f} > VOL_CAP {params['VOL_CAP']})"
    if params.get("GAP_CAP") and abs(latest_features.get('VIX_ONMPctT', 0)) * 100 > params["GAP_CAP"]:
         return f"Sit Out (VIX Gap {abs(latest_features['VIX_ONMPctT']*100):.1f}% > GAP_CAP {params['GAP_CAP']})"

    # 2. Build the dynamic lookup table
    today = latest_features.name
    lookback = model_config['system_settings']['lookback_period_months']
    train_end_date = today - timedelta(days=1)
    train_start_date = today - pd.DateOffset(months=lookback)

    print(f"Building strategy lookup using data from {train_start_date.date()} to {train_end_date.date()}...")
    lut_full, lut_primary, sitout_keys = build_strategy_lookup(
        params=params, eda_files_dir=eda_dir, pnl_data=pnl_data,
        train_range_dates=(str(train_start_date.date()), str(train_end_date.date()))
    )
    if not lut_full: return "Sit Out (No valid rules found in lookup table for period)"

    # 3. Find strategy for today's conditions
    vix_q = str(latest_features.get(PRIMARY_VIX_CAT_ACTUAL, "")).split("_")[0]
    all_cat_cols = [c for c in latest_features.index if c.endswith('_Cat') and c != PRIMARY_VIX_CAT_ACTUAL]

    for sec_col in all_cat_cols:
        sec_q = str(latest_features.get(sec_col, "")).split("_")[0]
        key = (vix_q, sec_col, sec_q)

        if key in sitout_keys: return f"Sit Out (Rule for {key} is an explicit sit-out)"

        strategy = lut_full.get(key)
        if strategy: return strategy

    # 4. Check fallback if allowed and no specific rule found
    if params.get("FALLBACK_OK", False):
        strategy = lut_primary.get(vix_q)
        if strategy: return strategy

    return "Sit Out (No rule matched)"

# --- 7. Main Execution Block (Finalized) ---

# --- 7. Main Execution Block (Finalized with Logging) ---

# Load available strategies from PNL backtest files
ALL_PNL_GLOBAL = load_pnl_placeholders(BT_DIR)

# Generate the signal using the latest features
final_signal = generate_final_signal(
    latest_features=latest_features,
    model_config=config,
    eda_dir=EDA_DIR,
    pnl_data=ALL_PNL_GLOBAL
)

# Use NYSE Market Calendar to find the next valid trading day
nyse = mcal.get_calendar('NYSE')
schedule = nyse.schedule(start_date=pd.Timestamp.today().strftime('%Y-%m-%d'),
                         end_date=(pd.Timestamp.today() + timedelta(days=10)).strftime('%Y-%m-%d'))
next_trading_day = schedule.index[0].strftime('%Y-%m-%d')

print("\n" + "="*50)
print(f" फैसला │ Signal for Next Trading Day ({next_trading_day})")
print("="*50)
print(f"  >> {final_signal} <<")
print("="*50)

# --- 8. Logging and Notification (ACTIVATED) ---
# This part is now active. It will create and write to the log file.

# a) Log the signal to a file
print("\n--- Logging Signal ---")
try:
    signal_log_path = LOG_DIR / 'signal_log.csv'
    log_entry = pd.DataFrame([{
        'log_timestamp_utc': pd.Timestamp.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
        'signal_for_date': next_trading_day, # Use the correctly calculated date
        'signal': final_signal
    }])

    if not signal_log_path.exists():
        # If the file doesn't exist, create it with a header
        log_entry.to_csv(signal_log_path, index=False)
        print(f"✅ Created new signal log and saved entry to: {signal_log_path}")
    else:
        # If it exists, append without writing the header
        log_entry.to_csv(signal_log_path, mode='a', header=False, index=False)
        print(f"✅ Appended new signal to existing log: {signal_log_path}")

except Exception as e:
    print(f"❌ Failed to write to log file. Error: {e}")


# b) Send a notification (simulation)
# (In production, your code to send an email/Discord message would go here)
print("✅ Notification sent (simulation).")


Loading available strategy PNL placeholders...
✅ Found 36 unique PNL backtests available.

--- Generating Final Signal ---
Building strategy lookup using data from 2023-06-05 to 2025-06-04...

 फैसला │ Signal for Next Trading Day (2025-06-06)
  >> Sit Out (Rule for ('Q4', 'TS_RTm1_Cat', 'Q5') is an explicit sit-out) <<

--- Logging Signal ---
✅ Created new signal log and saved entry to: /content/drive/MyDrive/1dte_selector/live_logs/signal_log.csv
✅ Notification sent (simulation).


Papertrade logger

In [30]:
# Install the required library
!pip install py_vollib_vectorized --quiet

import pandas as pd
import yfinance as yf
from pathlib import Path
import re
from datetime import datetime, timedelta
import numpy as np
# CORRECTED IMPORT STATEMENT:
from py_vollib_vectorized import vectorized_delta

# --- 1. Setup ---
try:
    from google.colab import drive
    drive.mount('/content/drive')
    PROJECT_ROOT = Path('/content/drive/MyDrive/1dte_selector/')
except ImportError:
    PROJECT_ROOT = Path('.')

LOG_DIR = PROJECT_ROOT / 'live_logs'
SIGNAL_LOG_PATH = LOG_DIR / 'signal_log.csv'
TRADE_LEDGER_PATH = LOG_DIR / 'paper_trade_ledger.csv'


# --- 2. Helper Functions (Upgraded) ---

def parse_strategy_name(strategy_name: str) -> dict | None:
    """Parses a strategy string like 'IC_P20C10_10w' into its components."""
    if strategy_name.startswith("IC"):
        m = re.search(r"IC_P(\d+)C(\d+)_(\d+)w", strategy_name)
        if m:
            return { "type": "IC", "put_delta": int(m.group(1)), "call_delta": int(m.group(2)), "width": int(m.group(3)) }
    return None

def get_risk_free_rate() -> float:
    """Fetches the latest 3-Month Treasury Bill rate as a proxy for the risk-free rate."""
    try:
        rate = yf.Ticker("^IRX").history(period="5d")['Close'].iloc[-1] / 100
        return max(rate, 0.001)
    except Exception:
        return 0.05 # Fallback to a reasonable default

def find_strike_for_delta(chain, target_delta, underlying_price, risk_free_rate, flag):
    """
    Calculates the delta for each option in the chain and finds the strike
    with the delta closest to the target.
    """
    if chain.empty: raise ValueError("Options chain is empty.")

    target_delta_val = abs(target_delta) if flag == 'c' else -abs(target_delta)

    # Required inputs for the formula
    S = underlying_price
    K = chain['strike'].values
    t = 1.0 / 252.0  # Time to expiration: 1 trading day
    r = risk_free_rate
    sigma = chain['impliedVolatility'].values

    # CORRECTED FUNCTION CALL:
    # Use the specific vectorized_delta function
    calculated_deltas = vectorized_delta(flag, S, K, t, r, sigma)
    chain['calculated_delta'] = calculated_deltas

    # Find the row in the chain with the delta closest to our target
    closest_strike_row = chain.iloc[(chain['calculated_delta'] - target_delta_val).abs().argsort()[:1]]

    return closest_strike_row['strike'].values[0]

# --- 3. Main Evaluation Logic ---

def evaluate_and_log_trades():
    print("--- Running Trade Evaluator (with Black-Scholes Delta Approximation) ---")
    if not SIGNAL_LOG_PATH.exists(): print(f"Signal log not found at {SIGNAL_LOG_PATH}."); return

    signal_log_df = pd.read_csv(SIGNAL_LOG_PATH)
    if Path(TRADE_LEDGER_PATH).exists():
        ledger_df = pd.read_csv(TRADE_LEDGER_PATH); evaluated_dates = ledger_df['trade_date'].tolist()
    else:
        ledger_df = pd.DataFrame(); evaluated_dates = []

    trades_to_evaluate = signal_log_df[~signal_log_df['signal_for_date'].isin(evaluated_dates)]
    if trades_to_evaluate.empty: print("No new trades to evaluate."); return

    print(f"Found {len(trades_to_evaluate)} new signals to evaluate...")
    spx = yf.Ticker("^SPX")
    risk_free_rate = get_risk_free_rate()
    print(f"Using Risk-Free Rate: {risk_free_rate:.3%}")
    new_ledger_entries = []

    for _, row in trades_to_evaluate.iterrows():
        trade_date_str, signal = row['signal_for_date'], row['signal']
        trade_date = pd.to_datetime(trade_date_str)

        if trade_date >= pd.Timestamp.today().normalize(): continue
        if "Sit Out" in signal: continue

        print(f"\nEvaluating trade for {trade_date_str}: {signal}")

        try:
            hist_spx = spx.history(start=trade_date, end=trade_date + timedelta(days=1))
            open_price, close_price = hist_spx['Open'].iloc[0], hist_spx['Close'].iloc[0]

            opt_chain = spx.option_chain(trade_date_str)
            calls, puts = opt_chain.calls, opt_chain.puts

            pnl, details = 0.0, {}
            parsed_strat = parse_strategy_name(signal)

            if parsed_strat and parsed_strat['type'] == 'IC':
                put_delta_target, call_delta_target = parsed_strat['put_delta'] / 100.0, parsed_strat['call_delta'] / 100.0

                short_put_strike = find_strike_for_delta(puts, put_delta_target, open_price, risk_free_rate, 'p')
                long_put_strike = short_put_strike - parsed_strat['width']
                short_call_strike = find_strike_for_delta(calls, call_delta_target, open_price, risk_free_rate, 'c')
                long_call_strike = short_call_strike + parsed_strat['width']

                short_put_premium = puts[puts.strike == short_put_strike]['lastPrice'].iloc[0]
                long_put_premium = puts[puts.strike == long_put_strike]['lastPrice'].iloc[0]
                short_call_premium = calls[calls.strike == short_call_strike]['lastPrice'].iloc[0]
                long_call_premium = calls[calls.strike == long_call_strike]['lastPrice'].iloc[0]

                credit_received = (short_put_premium - long_put_premium) + (short_call_premium - long_call_premium)

                outcome = "Win (Max Profit)" if short_put_strike < close_price < short_call_strike else "Loss (Max Loss)"
                pnl = credit_received * 100 if outcome == "Win (Max Profit)" else (credit_received - parsed_strat['width']) * 100

                details = {"outcome": outcome, "credit": f"{credit_received:.2f}", "spx_close": f"{close_price:.2f}",
                           "short_put": short_put_strike, "short_call": short_call_strike}

            new_ledger_entries.append({"trade_date": trade_date_str, "signal": signal, "pnl": pnl, **details})
            print(f"  Calculated PNL: ${pnl:.2f}, Outcome: {outcome}")
        except Exception as e:
            print(f"  ❌ Could not evaluate trade for {trade_date_str}. Error: {e}")
            new_ledger_entries.append({"trade_date": trade_date_str, "signal": signal, "pnl": 0.0, "outcome": "Evaluation Error"})

    if new_ledger_entries:
        new_results_df = pd.DataFrame(new_ledger_entries)
        if ledger_df.empty:
            new_results_df.to_csv(TRADE_LEDGER_PATH, index=False)
        else:
            pd.concat([ledger_df, new_results_df]).to_csv(TRADE_LEDGER_PATH, index=False)
        print(f"\n✅ Wrote {len(new_ledger_entries)} new results to {TRADE_LEDGER_PATH}")

# --- 4. Main Execution ---
if __name__ == "__main__":
    evaluate_and_log_trades()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
--- Running Trade Evaluator (with Black-Scholes Delta Approximation) ---
Found 1 new signals to evaluate...
Using Risk-Free Rate: 4.232%


⚙️ 1DTE Live Signal & Evaluation System
📜 Overview
This system is a live prototype designed to generate a daily trading signal for a 1-day-to-expiration (1DTE) options strategy. It is built upon the champion model parameters (Trial #198) discovered through a comprehensive Optuna optimization process.

The architecture consists of two primary scripts that work together to form a complete operational loop.

🏛️ System Architecture
Script	🤖 Purpose	🗓️ When to Run
generate_signal.py	The Decision-Maker. Analyzes the market to decide on tomorrow's trade.	Daily, at ~3:20 PM ET on trading days.
evaluate_trades.py	The Accountant. Calculates the PNL of completed paper trades.	Daily or Weekly (not time-sensitive).

Export to Sheets
Script 1: generate_signal.py (The Decision-Maker)
This script's sole responsibility is to produce a high-quality, data-driven decision for the next trading day.

Loads Config: It begins by loading the champion model's hyperparameters from configs/champion_model_config.json. This keeps our logic separate from our parameters.
Fetches Live Data: It uses yfinance to create an intraday snapshot of the market as of ~3:15 PM ET, aligning perfectly with the backtest methodology.
Calculates Dynamic Features: It computes all necessary features. For categorical bins (e.g., VIX_ClTm1_Cat), it dynamically calculates the quantiles based on a rolling 2-year window of data, ensuring the model adapts to changing market regimes.
Builds Lookup Table: Using the last 24 months of historical data, it builds a fresh strategy rulebook based on the champion parameters.
Generates Signal: It looks up the live features in this new rulebook to produce the final signal (e.g., "IC_P25C25_10w" or "Sit Out").
Logs Signal: The decision is saved with a timestamp to live_logs/signal_log.csv.
Script 2: evaluate_trades.py (The Accountant)
This script automates the tedious process of performance tracking.

Reads Signal Log: It checks signal_log.csv for any trade signals that have recently concluded and have not yet been evaluated.
Fetches Option Data: It uses yfinance to pull the historical options chain for the specific day the trade occurred.
Approximates PNL: Using the Black-Scholes model (py_vollib_vectorized) and a live risk-free rate, it accurately finds the strikes corresponding to the target deltas and calculates the approximate PNL of the trade.
Updates Ledger: The final result is written to a permanent live_logs/paper_trade_ledger.csv for long-term performance analysis.
🔄 Daily Paper Trading Workflow
Your daily operational process is simple:

Run generate_signal.py at ~3:20 PM ET. Note the signal produced for the next trading day.
Run evaluate_trades.py at your convenience (e.g., the next day or at the end of the week) to update your performance ledger.
Periodically analyze paper_trade_ledger.csv in a separate notebook to review your live paper-trading performance against the backtest results.
🚀 How to Update the Champion Model (Periodic Retraining)
As you collect more data over time, you can re-run the Optuna optimization to see if a new set of champion hyperparameters can be found.

Frequency: A good cadence is every 3 to 6 months, or after you observe a significant, lasting change in market behavior.

Prerequisites
Before starting an update, ensure your source data is current:

PNL Data: Your backtest_data folder must be updated with your latest backtest CSVs.
EDA Files: Your summary EDA files must be regenerated from the new PNL data.
Market Features: Your main market feature CSV should be updated.
Running the Optuna Update
In your main analysis notebook (the one with the WORKFLOW_MODE switch):

Set Workflow:
Python

WORKFLOW_MODE = 'DEEP_DIVE'
Choose Objective: Select your best-performing scoring function.
Python

chosen_scoring_function = score_by_delta_sharpe
Resume Study: Modify the script to load your existing study so it can continue learning, rather than starting from scratch.
Python

study_name = "DeepDive_DeltaSharpe_LongRun" # Use your long-run study name
storage_path = f"sqlite:///{PROJECT_ROOT / study_name}.db"

print(f"Loading study '{study_name}' to continue optimization...")
optuna_study = optuna.load_study(study_name=study_name, storage=storage_path)
Set New Trial Target: Decide how many more trials to run.
Python

n_existing_trials = len(optuna_study.trials)
n_target_trials = 4000 # Example: Add 1000 trials to your previous 3000
n_new_trials = max(0, n_target_trials - n_existing_trials)

if n_new_trials > 0:
    optuna_study.optimize(objective_runner, n_trials=n_new_trials)
Analyze and Promote: After the run, use the run_sensitivity_analysis_on_top_trials function to compare any new top trials to your old champion. If a new winner emerges, update your configs/champion_model_config.json file.