# IV Strategy Analysis (Professional Screening and Allocation)

**Purpose**: Screen a liquid U.S. equity universe on fundamentals, then score option opportunity across covered calls, cash-secured puts, and long-dated calls, with formal reporting and sizing guidance.

**What this produces**

1. Fundamental quality score (profitability, growth, balance sheet, valuation).
2. Options opportunity score (ATM IV, term structure, income yield, liquidity).
3. Composite grade and ranked short list.
4. Strategy-specific tables (covered calls, cash-secured puts, LEAP calls, protective puts).
5. Portfolio sizing guidance using a conservative fractional Kelly framework.

**Modes**: Set `QUICK_SCAN = True` for a lightweight IV-only scorecard (no strategy details or sizing). Set `QUICK_RUN = True` for a faster run with fewer tickers.

**Important**: This notebook is for research and education only. It is not investment advice.


**Dependencies**: `yfinance`, `pandas`, `numpy`, `plotly`.
Install: `python -m pip install -r requirements.txt`


In [11]:
import os
import time
import warnings
import math
from datetime import datetime
from typing import Optional

import numpy as np
import pandas as pd
import yfinance as yf
from yfinance import EquityQuery

import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio

try:
 from IPython.display import display, Markdown
except ImportError:

 def display(x):
 print(x)

 class Markdown(str):
 pass


warnings.filterwarnings("ignore")

pd.set_option("display.max_columns", 80)
pd.set_option("display.width", 180)
pd.set_option("display.max_rows", 200)


## Configuration (Inputs and Assumptions)

Adjust these parameters to reflect your screening and strategy preferences.


In [None]:
# Universe selection
USE_SCREEN = os.getenv("USE_SCREEN", "1") == "1"
TICKER_OVERRIDE = [
    t.strip().upper() for t in os.getenv("TICKER_OVERRIDE", "").split(",") if t.strip()
]

SCREEN_PARAMS = dict(
    max_price=300.0,
    min_market_cap=2_000_000_000,
    min_roe=0.12,
    min_rev_growth=0.05,
    max_pe=40.0,
    max_ps=10.0,
    min_beta=1.0,
    min_inst_held=0.40,
    size=60,
    sort_by="eodvolume",
)

# Strategy parameters
TARGET_DTES = [30, 50]
CC_TARGET_DTE = 50
CC_MAX_DTE = 60
LEAP_MIN_DTE = 365
LEAP_TARGET_MONEYNESS = 0.90
HEDGE_TARGET_DTE = 30
HEDGE_MAX_DTE = 50

OTM_LEVELS = [0.01, 0.03, 0.05, 0.07]
TARGET_OTM_FOR_SCORING = 0.03

MAX_TERM_DTE = 180
TERM_STRUCTURE_SAMPLE = 1

STRIKE_RANGE_PCT = 0.20
BOLLINGER_WINDOW = 20
HISTORY_PERIOD = "6mo"
TREND_PERIOD = "1y"
MA_SHORT = 50
MA_LONG = 200
MA_SLOPE_LOOKBACK = 20
TREND_FILTER = True
TREND_FILTER_MODE = "score"  # "strict" or "score"
MIN_TREND_SCORE = 0.34
HV_WINDOWS = [30, 60]
RS_TICKER = "SPY"
RET_1M_DAYS = 21
RET_3M_DAYS = 63
RET_6M_DAYS = 126
RSI_WINDOW = 14
ATR_WINDOW = 14
DRAWDOWN_WINDOW = 126
DIST_52W_WINDOW = 252

RATE_LIMIT_SLEEP = 0.3
TERM_STRUCTURE_SLEEP_MULTIPLIER = 0.5

MAX_IV_SMILES = 6

# Liquidity filters (options)
LIQUIDITY_FILTER = True
MIN_OPEN_INTEREST = 50
MIN_VOLUME = 10
MAX_SPREAD_PCT = 0.35

# Underlying liquidity filter (optional)
MIN_AVG_DAILY_VOLUME = 1_000_000

# Scoring weights
FUNDAMENTAL_WEIGHT = 0.45
OPTIONS_WEIGHT = 0.55
MISSING_VALUE_FILL = 0.40

FUND_METRIC_WEIGHTS = {
    "roe": 0.20,
    "rev_growth": 0.20,
    "profit_margin": 0.15,
    "operating_margin": 0.10,
    "current_ratio": 0.10,
    "debt_to_equity": 0.15,
    "pe": 0.05,
    "ps": 0.05,
}

FUND_METRIC_BETTER = {
    "roe": "higher",
    "rev_growth": "higher",
    "profit_margin": "higher",
    "operating_margin": "higher",
    "current_ratio": "higher",
    "debt_to_equity": "lower",
    "pe": "lower",
    "ps": "lower",
}

OPTION_METRIC_WEIGHTS = {
    "atm_iv_cc": 0.22,
    "cc_ann_yield": 0.22,
    "csp_ann_yield": 0.18,
    "term_slope": 0.14,
    "avg_spread_pct": 0.14,
    "hv_iv_ratio": 0.10,
}

OPTION_METRIC_BETTER = {
    "atm_iv_cc": "higher",
    "cc_ann_yield": "higher",
    "csp_ann_yield": "higher",
    "term_slope": "lower",
    "avg_spread_pct": "lower",
}

COMBO_HORIZON_DTE = HEDGE_TARGET_DTE
COMBO_SCENARIO_WEIGHTS = {"bear": 0.25, "base": 0.50, "bull": 0.25}
COMBO_SCORE_WEIGHTS = {
    "expected_pnl_pct": 0.45,
    "bear_pnl_pct": 0.20,
    "bull_pnl_pct": 0.20,
    "hedge_cost_ann": 0.15,
}
COMBO_METRIC_BETTER = {
    "expected_pnl_pct": "higher",
    "bear_pnl_pct": "higher",
    "bull_pnl_pct": "higher",
    "hedge_cost_ann": "lower",
}

TOP_N = 12

# Portfolio sizing (Kelly-style guidance)
PORTFOLIO_SIZE = float(os.getenv("PORTFOLIO_SIZE", "250000"))
MAX_SINGLE_TRADE_PCT = 0.06
FRACTIONAL_KELLY = 0.25
KELLY_CAP = 0.12
RISK_FREE_RATE = 0.04
EDGE_HAIRCUT = 0.85
COVERED_CALL_ALLOC_PCT = 0.25

# Plotly rendering
PLOTLY_RENDERER = os.getenv("PLOTLY_RENDERER", "notebook_connected")
DISABLE_PLOTS = os.getenv("DISABLE_PLOTS", "0") == "1"

# Quick-run overrides
QUICK_RUN = os.getenv("QUICK_RUN", "0") == "1"
DEFAULT_MAX_TICKERS = None
DEFAULT_TERM_STRUCTURE_SAMPLE = TERM_STRUCTURE_SAMPLE
DEFAULT_MAX_TERM_DTE = MAX_TERM_DTE

MAX_TICKERS = 8 if QUICK_RUN else DEFAULT_MAX_TICKERS
TERM_STRUCTURE_SAMPLE = 3 if QUICK_RUN else DEFAULT_TERM_STRUCTURE_SAMPLE
MAX_TERM_DTE = 90 if QUICK_RUN else DEFAULT_MAX_TERM_DTE

# Quick-scan mode: produces only the ranked scorecard and top-level
# visualisations â€” no strategy details, pricing, combo evaluation, or
# sizing guidance.  Set to True to replicate the lightweight IV screener.
QUICK_SCAN = os.getenv("QUICK_SCAN", "0") == "1"
if QUICK_SCAN:
    QUICK_RUN = True
    MAX_TICKERS = MAX_TICKERS or 20
    TOP_N = 15

# Plot style (formal, APA-like)
REPORT_TEMPLATE = go.layout.Template(
    layout=go.Layout(
        font=dict(family="Times New Roman", size=14, color="#111827"),
        title=dict(font=dict(size=20)),
        paper_bgcolor="white",
        plot_bgcolor="white",
        xaxis=dict(
            showgrid=True,
            gridcolor="#E5E7EB",
            zeroline=False,
            linecolor="#111827",
            mirror=True,
        ),
        yaxis=dict(
            showgrid=True,
            gridcolor="#E5E7EB",
            zeroline=False,
            linecolor="#111827",
            mirror=True,
        ),
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1),
        margin=dict(l=60, r=30, t=70, b=50),
    )
)

pio.templates["report"] = REPORT_TEMPLATE
pio.templates.default = "report"
pio.renderers.default = PLOTLY_RENDERER

COLOR_DISCRETE = ["#1F3A5F", "#4C6E91", "#8B9BB4", "#B0533C", "#7A3E3E", "#556B2F"]
COLOR_CONTINUOUS = ["#f7fbff", "#c6dbef", "#6baed6", "#2171b5", "#08306b"]

screening_df = pd.DataFrame(
    {
        "Parameter": [
            "USE_SCREEN",
            "SCREEN_PARAMS",
            "TARGET_DTES",
            "CC_TARGET_DTE",
            "OTM_LEVELS",
            "TARGET_OTM_FOR_SCORING",
            "LEAP_MIN_DTE",
            "LEAP_TARGET_MONEYNESS",
            "HEDGE_TARGET_DTE",
            "MAX_TERM_DTE",
            "TERM_STRUCTURE_SAMPLE",
            "STRIKE_RANGE_PCT",
            "COMBO_HORIZON_DTE",
            "TREND_FILTER",
            "TREND_FILTER_MODE",
            "MIN_TREND_SCORE",
            "MA_SHORT",
            "MA_LONG",
            "MA_SLOPE_LOOKBACK",
            "HV_WINDOWS",
            "RS_TICKER",
            "RET_1M_DAYS",
            "RET_3M_DAYS",
            "RET_6M_DAYS",
            "RSI_WINDOW",
            "ATR_WINDOW",
            "DRAWDOWN_WINDOW",
            "DIST_52W_WINDOW",
            "LIQUIDITY_FILTER",
            "MIN_OPEN_INTEREST",
            "MIN_VOLUME",
            "MAX_SPREAD_PCT",
            "MIN_AVG_DAILY_VOLUME",
            "MAX_TICKERS (QUICK_RUN)",
        ],
        "Value": [
            USE_SCREEN,
            SCREEN_PARAMS,
            str(TARGET_DTES),
            CC_TARGET_DTE,
            str(OTM_LEVELS),
            TARGET_OTM_FOR_SCORING,
            LEAP_MIN_DTE,
            LEAP_TARGET_MONEYNESS,
            HEDGE_TARGET_DTE,
            MAX_TERM_DTE,
            TERM_STRUCTURE_SAMPLE,
            STRIKE_RANGE_PCT,
            COMBO_HORIZON_DTE,
            TREND_FILTER,
            TREND_FILTER_MODE,
            MIN_TREND_SCORE,
            MA_SHORT,
            MA_LONG,
            MA_SLOPE_LOOKBACK,
            str(HV_WINDOWS),
            RS_TICKER,
            RET_1M_DAYS,
            RET_3M_DAYS,
            RET_6M_DAYS,
            RSI_WINDOW,
            ATR_WINDOW,
            DRAWDOWN_WINDOW,
            DIST_52W_WINDOW,
            LIQUIDITY_FILTER,
            MIN_OPEN_INTEREST,
            MIN_VOLUME,
            MAX_SPREAD_PCT,
            MIN_AVG_DAILY_VOLUME,
            MAX_TICKERS,
        ],
    }
)

scoring_df = pd.DataFrame(
    {
        "Parameter": [
            "FUNDAMENTAL_WEIGHT",
            "OPTIONS_WEIGHT",
            "MISSING_VALUE_FILL",
            "FUND_METRIC_WEIGHTS",
            "OPTION_METRIC_WEIGHTS",
            "COMBO_SCORE_WEIGHTS",
        ],
        "Value": [
            FUNDAMENTAL_WEIGHT,
            OPTIONS_WEIGHT,
            MISSING_VALUE_FILL,
            FUND_METRIC_WEIGHTS,
            OPTION_METRIC_WEIGHTS,
            COMBO_SCORE_WEIGHTS,
        ],
    }
)

sizing_df = pd.DataFrame(
    {
        "Parameter": [
            "PORTFOLIO_SIZE",
            "MAX_SINGLE_TRADE_PCT",
            "FRACTIONAL_KELLY",
            "KELLY_CAP",
            "RISK_FREE_RATE",
            "EDGE_HAIRCUT",
            "COVERED_CALL_ALLOC_PCT",
        ],
        "Value": [
            PORTFOLIO_SIZE,
            MAX_SINGLE_TRADE_PCT,
            FRACTIONAL_KELLY,
            KELLY_CAP,
            RISK_FREE_RATE,
            EDGE_HAIRCUT,
            COVERED_CALL_ALLOC_PCT,
        ],
    }
)

display(screening_df)
display(scoring_df)
display(sizing_df)


## Core Functions (Data, Scoring, Probability, Sizing)


In [13]:
def safe_float(value):
 try:
 if value is None:
 return np.nan
 return float(value)
 except Exception:
 return np.nan


try:
 import jinja2

 HAS_JINJA = True
except Exception:
 HAS_JINJA = False


def display_table(df, caption=None, format_dict=None):
 if HAS_JINJA:
 styler = df.style
 if format_dict:
 styler = styler.format(format_dict)
 if caption:
 styler = styler.set_caption(caption)
 display(styler)
 else:
 if caption:
 display(Markdown(f"**{caption}**"))
 display(df)


def screen_for_candidates(
 max_price: float = 300.0,
 min_market_cap: float = 2_000_000_000,
 min_roe: float = 0.12,
 min_rev_growth: float = 0.05,
 max_pe: float = 40.0,
 max_ps: float = 10.0,
 min_beta: float = 1.0,
 min_inst_held: float = 0.40,
 size: int = 50,
 sort_by: str = "eodvolume",
) -> list[str]:
 sectors = [
 "Communication Services",
 "Consumer Cyclical",
 "Consumer Defensive",
 "Financial Services",
 "Healthcare",
 "Industrials",
 "Technology",
 ]
 filters = [
 EquityQuery("eq", ["region", "us"]),
 EquityQuery("is-in", ["exchange", "NMS", "NYQ"]),
 EquityQuery("btwn", ["intradaymarketcap", min_market_cap, 4_000_000_000_000]),
 EquityQuery("btwn", ["intradayprice", 10, max_price]),
 EquityQuery("btwn", ["peratio.lasttwelvemonths", 0, max_pe]),
 EquityQuery("lt", ["lastclosemarketcaptotalrevenue.lasttwelvemonths", max_ps]),
 EquityQuery("gte", ["returnontotalcapital.lasttwelvemonths", min_roe]),
 EquityQuery("gte", ["returnonequity.lasttwelvemonths", min_roe]),
 EquityQuery("gte", ["totalrevenues1yrgrowth.lasttwelvemonths", min_rev_growth]),
 EquityQuery("gte", ["pctheldinst", min_inst_held]),
 EquityQuery("gte", ["beta", min_beta]),
 EquityQuery("is-in", ["sector"] + sectors),
 ]

 q = EquityQuery("and", filters)

 resp = yf.screen(q, size=size, sortField=sort_by, sortAsc=False)
 quotes = []
 if resp:
 if "quotes" in resp:
 quotes = resp.get("quotes", [])
 elif "finance" in resp:
 result = resp.get("finance", {}).get("result", [])
 if result:
 quotes = result[0].get("quotes", [])

 return [row.get("symbol") for row in quotes if row.get("symbol")]


def get_spot(ticker: str) -> Optional[float]:
 try:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 hist = t.history(period="1d")
 if not hist.empty and "Close" in hist.columns:
 return float(hist["Close"].iloc[-1])

 time.sleep(RATE_LIMIT_SLEEP)
 if hasattr(t, "fast_info") and t.fast_info:
 price = t.fast_info.get("lastPrice") or t.fast_info.get(
 "regularMarketPrice"
 )
 if price:
 return float(price)

 time.sleep(RATE_LIMIT_SLEEP)
 info = t.info
 if info:
 price = info.get("regularMarketPrice") or info.get("currentPrice")
 if price:
 return float(price)

 return None
 except Exception:
 return None


def fetch_fundamentals(ticker: str) -> dict:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 info = {}
 try:
 info = t.info or {}
 except Exception:
 info = {}

 fast = {}
 try:
 fast = t.fast_info or {}
 except Exception:
 fast = {}

 fundamentals = {
 "ticker": ticker,
 "market_cap": safe_float(info.get("marketCap") or fast.get("marketCap")),
 "beta": safe_float(info.get("beta")),
 "pe": safe_float(info.get("trailingPE") or info.get("forwardPE")),
 "ps": safe_float(info.get("priceToSalesTrailing12Months")),
 "roe": safe_float(info.get("returnOnEquity")),
 "rev_growth": safe_float(info.get("revenueGrowth")),
 "profit_margin": safe_float(info.get("profitMargins")),
 "operating_margin": safe_float(info.get("operatingMargins")),
 "debt_to_equity": safe_float(info.get("debtToEquity")),
 "current_ratio": safe_float(info.get("currentRatio")),
 "avg_volume_3m": safe_float(
 info.get("averageVolume") or info.get("averageDailyVolume3Month")
 ),
 "avg_volume_10d": safe_float(info.get("averageDailyVolume10Day")),
 "inst_held_pct": safe_float(info.get("heldPercentInstitutions")),
 "sector": info.get("sector"),
 "industry": info.get("industry"),
 }
 return fundamentals


def get_expirations(ticker: str) -> list[tuple[str, int]]:
 try:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 exp_dates = t.options
 if not exp_dates:
 return []

 today = datetime.now().date()
 result = []
 for exp_str in exp_dates:
 try:
 exp_date = datetime.strptime(exp_str, "%Y-%m-%d").date()
 dte = (exp_date - today).days
 if dte > 0:
 result.append((exp_str, dte))
 except ValueError:
 continue

 return sorted(result, key=lambda x: x[1])
 except Exception:
 return []


def find_expiration(
 expirations: list[tuple[str, int]],
 target_dte: Optional[int] = None,
 min_dte: Optional[int] = None,
 max_dte: Optional[int] = None,
) -> Optional[tuple[str, int]]:
 if not expirations:
 return None

 candidates = expirations
 if min_dte is not None:
 candidates = [e for e in candidates if e[1] >= min_dte]
 if max_dte is not None:
 candidates = [e for e in candidates if e[1] <= max_dte]
 if not candidates:
 return None

 if target_dte is None:
 return candidates[0]
 return min(candidates, key=lambda x: abs(x[1] - target_dte))


def add_chain_columns(
 df: pd.DataFrame, ticker: str, exp_date: str, spot: float
) -> pd.DataFrame:
 if df.empty:
 return df

 df = df.copy()
 exp_dt = datetime.strptime(exp_date, "%Y-%m-%d").date()
 dte = (exp_dt - datetime.now().date()).days

 df["ticker"] = ticker
 df["expiration"] = exp_date
 df["dte"] = dte
 df["spot"] = spot
 df["mid"] = (df["bid"] + df["ask"]) / 2
 df.loc[df["mid"] <= 0, "mid"] = df.get("lastPrice")
 df["moneyness"] = df["strike"] / spot
 df["spread"] = df["ask"] - df["bid"]
 df["spread_pct"] = np.where(df["mid"] > 0, df["spread"] / df["mid"], np.nan)

 return df


def fetch_chain(
 ticker: str, exp_date_str: str, spot: float
) -> tuple[pd.DataFrame, pd.DataFrame]:
 try:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 chain = t.option_chain(exp_date_str)

 calls = add_chain_columns(chain.calls, ticker, exp_date_str, spot)
 puts = add_chain_columns(chain.puts, ticker, exp_date_str, spot)
 return calls, puts
 except Exception:
 return pd.DataFrame(), pd.DataFrame()


def compute_atm_iv(
 calls: pd.DataFrame, puts: pd.DataFrame, spot: float
) -> Optional[float]:
 atm_ivs = []
 if not calls.empty and "impliedVolatility" in calls.columns:
 calls_valid = calls[
 calls["impliedVolatility"].notna() & (calls["impliedVolatility"] > 0)
 ]
 if not calls_valid.empty:
 idx = (calls_valid["strike"] - spot).abs().idxmin()
 atm_ivs.append(calls_valid.loc[idx, "impliedVolatility"])

 if not puts.empty and "impliedVolatility" in puts.columns:
 puts_valid = puts[
 puts["impliedVolatility"].notna() & (puts["impliedVolatility"] > 0)
 ]
 if not puts_valid.empty:
 idx = (puts_valid["strike"] - spot).abs().idxmin()
 atm_ivs.append(puts_valid.loc[idx, "impliedVolatility"])

 if atm_ivs:
 return float(np.mean(atm_ivs))
 return None


def compute_bollinger_midline(ticker: str, window: int = 20) -> Optional[float]:
 try:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 hist = t.history(period=HISTORY_PERIOD)
 if hist.empty or "Close" not in hist.columns:
 return None
 closes = hist["Close"].dropna()
 if len(closes) < window:
 return None
 return float(closes.rolling(window).mean().iloc[-1])
 except Exception:
 return None


SPY_CACHE = {}


def get_spy_history(period: str = TREND_PERIOD) -> pd.DataFrame:
 if period in SPY_CACHE:
 return SPY_CACHE[period]
 try:
 t = yf.Ticker(RS_TICKER)
 time.sleep(RATE_LIMIT_SLEEP)
 hist = t.history(period=period)
 SPY_CACHE[period] = hist
 return hist
 except Exception:
 return pd.DataFrame()


def compute_realized_vol(returns: pd.Series, window: int) -> Optional[float]:
 if returns is None or len(returns) < window:
 return None
 return float(returns.iloc[-window:].std() * math.sqrt(252))


def compute_trend_metrics(ticker: str, period: str = TREND_PERIOD) -> dict:
 default = {
 "trend_score": None,
 "trend_label": None,
 "ma_short": None,
 "ma_long": None,
 "ma_slope": None,
 "price_above_ma_long": None,
 "ma_short_above_ma_long": None,
 "ma_slope_positive": None,
 "ret_1m": None,
 "ret_3m": None,
 "ret_6m": None,
 "rs_1m": None,
 "rs_3m": None,
 "rs_6m": None,
 "rsi_14": None,
 "atr": None,
 "atr_pct": None,
 "drawdown_6m": None,
 "dist_52w_high": None,
 "dist_52w_low": None,
 "hv_30": None,
 "hv_60": None,
 }
 try:
 t = yf.Ticker(ticker)
 time.sleep(RATE_LIMIT_SLEEP)
 hist = t.history(period=period)
 if hist.empty or "Close" not in hist.columns:
 return default

 closes = hist["Close"].dropna()
 if closes.empty:
 return default

 price = float(closes.iloc[-1])

        # Moving averages and trend signals
 ma_short_series = closes.rolling(MA_SHORT).mean()
 ma_long_series = closes.rolling(MA_LONG).mean()

 ma_short = (
 float(ma_short_series.iloc[-1])
 if not pd.isna(ma_short_series.iloc[-1])
 else None
 )
 ma_long = (
 float(ma_long_series.iloc[-1])
 if not pd.isna(ma_long_series.iloc[-1])
 else None
 )

 ma_slope = None
 if len(ma_short_series.dropna()) > MA_SLOPE_LOOKBACK:
 prev = ma_short_series.dropna().iloc[-(MA_SLOPE_LOOKBACK + 1)]
 if prev and not pd.isna(prev):
 ma_slope = float((ma_short_series.dropna().iloc[-1] - prev) / prev)

 price_above_ma_long = None if ma_long is None else price > ma_long
 ma_short_above_ma_long = (
 None if (ma_short is None or ma_long is None) else ma_short > ma_long
 )
 ma_slope_positive = None if ma_slope is None else ma_slope > 0

 flags = [
 1 if price_above_ma_long else 0,
 1 if ma_short_above_ma_long else 0,
 1 if ma_slope_positive else 0,
 ]
 trend_score = float(np.mean(flags)) if flags else None

 trend_label = "Side"
 if trend_score is not None:
 if trend_score >= 0.67:
 trend_label = "Up"
 elif trend_score <= 0.33:
 trend_label = "Down"

        # Returns
 ret_1m = ret_3m = ret_6m = None
 if len(closes) > RET_1M_DAYS:
 ret_1m = float(closes.iloc[-1] / closes.iloc[-RET_1M_DAYS] - 1)
 if len(closes) > RET_3M_DAYS:
 ret_3m = float(closes.iloc[-1] / closes.iloc[-RET_3M_DAYS] - 1)
 if len(closes) > RET_6M_DAYS:
 ret_6m = float(closes.iloc[-1] / closes.iloc[-RET_6M_DAYS] - 1)

        # Relative strength vs SPY
 rs_1m = rs_3m = rs_6m = None
 spy_hist = get_spy_history(period)
 if spy_hist is not None and not spy_hist.empty and "Close" in spy_hist.columns:
 spy_closes = spy_hist["Close"].dropna()
 if len(spy_closes) > RET_1M_DAYS and ret_1m is not None:
 spy_ret_1m = float(
 spy_closes.iloc[-1] / spy_closes.iloc[-RET_1M_DAYS] - 1
 )
 rs_1m = ret_1m - spy_ret_1m
 if len(spy_closes) > RET_3M_DAYS and ret_3m is not None:
 spy_ret_3m = float(
 spy_closes.iloc[-1] / spy_closes.iloc[-RET_3M_DAYS] - 1
 )
 rs_3m = ret_3m - spy_ret_3m
 if len(spy_closes) > RET_6M_DAYS and ret_6m is not None:
 spy_ret_6m = float(
 spy_closes.iloc[-1] / spy_closes.iloc[-RET_6M_DAYS] - 1
 )
 rs_6m = ret_6m - spy_ret_6m

        # RSI
 rsi_14 = None
 delta = closes.diff()
 gain = delta.clip(lower=0)
 loss = -delta.clip(upper=0)
 avg_gain = gain.rolling(RSI_WINDOW).mean()
 avg_loss = loss.rolling(RSI_WINDOW).mean()
 if not avg_gain.empty and not avg_loss.empty:
 rs = (
 avg_gain.iloc[-1] / avg_loss.iloc[-1]
 if avg_loss.iloc[-1] not in (0, None)
 else np.nan
 )
 if rs is not None and not pd.isna(rs):
 rsi_14 = float(100 - (100 / (1 + rs)))

        # ATR
 atr = atr_pct = None
 if {"High", "Low", "Close"}.issubset(hist.columns):
 highs = hist["High"].dropna()
 lows = hist["Low"].dropna()
 closes_full = hist["Close"].dropna()
 prev_close = closes_full.shift(1)
 tr = pd.concat(
 [highs - lows, (highs - prev_close).abs(), (lows - prev_close).abs()],
 axis=1,
 ).max(axis=1)
 atr_series = tr.rolling(ATR_WINDOW).mean()
 if not atr_series.empty and not pd.isna(atr_series.iloc[-1]):
 atr = float(atr_series.iloc[-1])
 if price > 0:
 atr_pct = atr / price

        # Drawdown
 drawdown_6m = None
 if len(closes) >= 2:
 window = min(DRAWDOWN_WINDOW, len(closes))
 recent = closes.iloc[-window:]
 peak = recent.cummax()
 dd = (recent / peak) - 1
 drawdown_6m = float(dd.min()) if not dd.empty else None

        # 52-week distance
 dist_52w_high = dist_52w_low = None
 if {"High", "Low"}.issubset(hist.columns):
 highs = hist["High"].dropna()
 lows = hist["Low"].dropna()
 if len(highs) >= 1:
 high_window = highs.iloc[-min(DIST_52W_WINDOW, len(highs)) :]
 low_window = lows.iloc[-min(DIST_52W_WINDOW, len(lows)) :]
 high_52 = float(high_window.max())
 low_52 = float(low_window.min())
 if high_52:
 dist_52w_high = price / high_52 - 1
 if low_52:
 dist_52w_low = price / low_52 - 1

 returns = np.log(closes / closes.shift(1)).dropna()
 hv_30 = compute_realized_vol(returns, 30) if 30 in HV_WINDOWS else None
 hv_60 = compute_realized_vol(returns, 60) if 60 in HV_WINDOWS else None

 result = {
 "trend_score": trend_score,
 "trend_label": trend_label,
 "ma_short": ma_short,
 "ma_long": ma_long,
 "ma_slope": ma_slope,
 "price_above_ma_long": price_above_ma_long,
 "ma_short_above_ma_long": ma_short_above_ma_long,
 "ma_slope_positive": ma_slope_positive,
 "ret_1m": ret_1m,
 "ret_3m": ret_3m,
 "ret_6m": ret_6m,
 "rs_1m": rs_1m,
 "rs_3m": rs_3m,
 "rs_6m": rs_6m,
 "rsi_14": rsi_14,
 "atr": atr,
 "atr_pct": atr_pct,
 "drawdown_6m": drawdown_6m,
 "dist_52w_high": dist_52w_high,
 "dist_52w_low": dist_52w_low,
 "hv_30": hv_30,
 "hv_60": hv_60,
 }
 return result
 except Exception:
 return default


def select_option_near_strike(
 df: pd.DataFrame,
 target_strike: float,
 direction: Optional[str] = None,
 min_oi: int = MIN_OPEN_INTEREST,
 min_volume: int = MIN_VOLUME,
 max_spread_pct: float = MAX_SPREAD_PCT,
) -> Optional[pd.Series]:
 if df.empty:
 return None

 candidates = df[df["mid"].notna() & (df["mid"] > 0)].copy()
 if candidates.empty:
 return None

 if direction == "below":
 candidates = candidates[candidates["strike"] <= target_strike]
 elif direction == "above":
 candidates = candidates[candidates["strike"] >= target_strike]

 if candidates.empty:
 return None

 if LIQUIDITY_FILTER:
 if "openInterest" in candidates.columns:
 candidates = candidates[candidates["openInterest"].fillna(0) >= min_oi]
 if "volume" in candidates.columns:
 candidates = candidates[candidates["volume"].fillna(0) >= min_volume]
 if "spread_pct" in candidates.columns:
 candidates = candidates[
 candidates["spread_pct"].isna()
 | (candidates["spread_pct"] <= max_spread_pct)
 ]

 if candidates.empty:
 return None

 idx = (candidates["strike"] - target_strike).abs().idxmin()
 return candidates.loc[idx]


def norm_cdf(x: float) -> float:
 return 0.5 * (1 + math.erf(x / math.sqrt(2)))


def bs_call_price(
 spot: float, strike: float, t: float, iv: float, r: float = RISK_FREE_RATE
) -> float:
 if t <= 0 or iv is None or pd.isna(iv) or iv <= 0 or spot <= 0 or strike <= 0:
 return max(spot - strike, 0.0)
 d1 = (math.log(spot / strike) + (r + 0.5 * iv**2) * t) / (iv * math.sqrt(t))
 d2 = d1 - iv * math.sqrt(t)
 return spot * norm_cdf(d1) - strike * math.exp(-r * t) * norm_cdf(d2)


def bs_put_price(
 spot: float, strike: float, t: float, iv: float, r: float = RISK_FREE_RATE
) -> float:
 if t <= 0 or iv is None or pd.isna(iv) or iv <= 0 or spot <= 0 or strike <= 0:
 return max(strike - spot, 0.0)
 d1 = (math.log(spot / strike) + (r + 0.5 * iv**2) * t) / (iv * math.sqrt(t))
 d2 = d1 - iv * math.sqrt(t)
 return strike * math.exp(-r * t) * norm_cdf(-d2) - spot * norm_cdf(-d1)


def scenario_prices(
 spot: float, iv: float, horizon_dte: int, r: float = RISK_FREE_RATE
):
 if spot <= 0 or iv is None or pd.isna(iv) or iv <= 0 or horizon_dte <= 0:
 return None
 t = horizon_dte / 365.0
 mu = (r - 0.5 * iv**2) * t
 sigma = iv * math.sqrt(t)
 return {
 "bear": spot * math.exp(mu - sigma),
 "base": spot * math.exp(mu),
 "bull": spot * math.exp(mu + sigma),
 }


def evaluate_combo(
 spot: float,
 leap_strike: float,
 leap_mid: float,
 leap_iv: float,
 leap_dte: int,
 hedge_strike: float,
 hedge_mid: float,
 hedge_iv: float,
 hedge_dte: int,
 horizon_dte: int = COMBO_HORIZON_DTE,
):
 if spot <= 0 or leap_mid <= 0 or hedge_mid <= 0:
 return None
 if leap_dte <= 0 or hedge_dte <= 0:
 return None

 horizon = max(1, min(horizon_dte, leap_dte, hedge_dte))

 scenario_iv = (
 hedge_iv
 if hedge_iv is not None and not pd.isna(hedge_iv) and hedge_iv > 0
 else leap_iv
 )
 if scenario_iv is None or pd.isna(scenario_iv) or scenario_iv <= 0:
 return None

 prices = scenario_prices(spot, scenario_iv, horizon, r=RISK_FREE_RATE)
 if prices is None:
 return None

 call_iv = (
 leap_iv
 if leap_iv is not None and not pd.isna(leap_iv) and leap_iv > 0
 else scenario_iv
 )
 put_iv = (
 hedge_iv
 if hedge_iv is not None and not pd.isna(hedge_iv) and hedge_iv > 0
 else scenario_iv
 )

 t_call = max((leap_dte - horizon) / 365.0, 0)
 t_put = max((hedge_dte - horizon) / 365.0, 0)

 net_debit = leap_mid + hedge_mid
 if net_debit <= 0:
 return None

 pnl = {}
 for key, price in prices.items():
 call_val = bs_call_price(price, leap_strike, t_call, call_iv, r=RISK_FREE_RATE)
 put_val = bs_put_price(price, hedge_strike, t_put, put_iv, r=RISK_FREE_RATE)
 pnl[key] = call_val + put_val - net_debit

 weights = COMBO_SCENARIO_WEIGHTS
 total_weight = sum(weights.values()) if weights else 1
 expected_pnl = sum(weights.get(k, 0) * pnl[k] for k in pnl) / total_weight

 result = {
 "horizon_dte": horizon,
 "scenario_iv": scenario_iv,
 "net_debit": net_debit,
 "net_debit_pct": net_debit / spot,
 "breakeven_pct": (leap_strike + net_debit) / spot - 1,
 "expected_pnl": expected_pnl,
 "expected_pnl_pct": expected_pnl / net_debit,
 "bear_pnl": pnl["bear"],
 "bear_pnl_pct": pnl["bear"] / net_debit,
 "base_pnl": pnl["base"],
 "base_pnl_pct": pnl["base"] / net_debit,
 "bull_pnl": pnl["bull"],
 "bull_pnl_pct": pnl["bull"] / net_debit,
 "bear_price": prices["bear"],
 "base_price": prices["base"],
 "bull_price": prices["bull"],
 "hedge_cost_ann": (hedge_mid / spot) * (365 / hedge_dte)
 if hedge_dte > 0
 else np.nan,
 }
 return result


def calc_prob_itm(
 spot: float,
 strike: float,
 iv: float,
 dte: int,
 option_type: str,
 r: float = RISK_FREE_RATE,
) -> Optional[float]:
 if spot <= 0 or strike <= 0 or iv is None or pd.isna(iv) or iv <= 0 or dte <= 0:
 return None

 t = dte / 365.0
 try:
 d2 = (math.log(spot / strike) + (r - 0.5 * iv**2) * t) / (iv * math.sqrt(t))
 except Exception:
 return None

 if option_type == "call":
 return norm_cdf(d2)
 return norm_cdf(-d2)


def shrink_prob(p: Optional[float], shrink: float = EDGE_HAIRCUT) -> Optional[float]:
 if p is None or pd.isna(p):
 return None
 return 0.5 + (p - 0.5) * shrink


def calc_kelly_fraction(p_win: Optional[float], b: Optional[float]) -> Optional[float]:
 if p_win is None or pd.isna(p_win) or b is None or pd.isna(b) or b <= 0:
 return None
 q = 1 - p_win
 k = (b * p_win - q) / b
 if k < 0:
 return 0.0
 return min(k, KELLY_CAP)


def contracts_from_risk(max_risk_dollars: float, per_contract_risk: float) -> int:
 if (
 per_contract_risk is None
 or pd.isna(per_contract_risk)
 or per_contract_risk <= 0
 ):
 return 0
 return int(np.floor(max_risk_dollars / per_contract_risk))


def score_from_series(
 series: pd.Series,
 higher_better: bool = True,
 fill_value: float = MISSING_VALUE_FILL,
) -> pd.Series:
 s = pd.to_numeric(series, errors="coerce")
 ranks = s.rank(pct=True)
 if not higher_better:
 ranks = 1 - ranks
 return ranks.fillna(fill_value)


def compute_weighted_score(
 df: pd.DataFrame,
 weights: dict,
 directions: dict,
 fill_value: float = MISSING_VALUE_FILL,
) -> pd.Series:
 total = 0
 weight_sum = 0
 for metric, weight in weights.items():
 if metric not in df.columns:
 continue
 higher = directions.get(metric, "higher") == "higher"
 score = score_from_series(
 df[metric], higher_better=higher, fill_value=fill_value
 )
 total += score * weight
 weight_sum += weight
 if weight_sum == 0:
 return pd.Series([np.nan] * len(df), index=df.index)
 return total / weight_sum


def add_weighted_score(
 df: pd.DataFrame,
 weights: dict,
 directions: dict,
 score_col: str,
) -> pd.DataFrame:
 df[score_col] = compute_weighted_score(df, weights, directions) * 100
 return df


def assign_grade(score: Optional[float]) -> str:
 if score is None or pd.isna(score):
 return "N/A"
 if score >= 90:
 return "A+"
 if score >= 80:
 return "A"
 if score >= 70:
 return "B"
 if score >= 60:
 return "C"
 if score >= 50:
 return "D"
 return "F"


def compute_term_slope(df: pd.DataFrame) -> Optional[float]:
 if df.empty or len(df) < 2:
 return None
 ordered = df.sort_values("dte")
 near = ordered.iloc[0]
 far = ordered.iloc[-1]
 denom = far["dte"] - near["dte"]
 if denom == 0:
 return None
 return (far["atm_iv"] - near["atm_iv"]) / denom


def pick_target_otm(df: pd.DataFrame, target_otm: float) -> pd.DataFrame:
 if df.empty or "otm" not in df.columns:
 return pd.DataFrame()
 target = df[np.isclose(df["otm"], target_otm)]
 if not target.empty:
 return target

 rows = []
 for _, sub in df.groupby("ticker"):
 idx = (sub["otm"] - target_otm).abs().idxmin()
 rows.append(sub.loc[idx])
 return pd.DataFrame(rows)


def show_figure(fig):
 if DISABLE_PLOTS:
 return
 try:
 show_figure(fig)
 except Exception as exc:
 display(Markdown(f"Plot rendering skipped: {exc}"))


def plot_iv_smile(
 ticker: str, calls: pd.DataFrame, puts: pd.DataFrame, spot: float, exp: str
) -> go.Figure:
 fig = go.Figure()

 low_strike = spot * (1 - STRIKE_RANGE_PCT)
 high_strike = spot * (1 + STRIKE_RANGE_PCT)

 if not calls.empty and "impliedVolatility" in calls.columns:
 calls_plot = calls[
 (calls["strike"] >= low_strike)
 & (calls["strike"] <= high_strike)
 & (calls["impliedVolatility"].notna())
 & (calls["impliedVolatility"] > 0)
 ]
 if not calls_plot.empty:
 fig.add_trace(
 go.Scatter(
 x=calls_plot["strike"],
 y=calls_plot["impliedVolatility"] * 100,
 mode="lines+markers",
 name="Calls",
 marker=dict(size=6),
 line=dict(width=2),
 hovertemplate="Strike: $%{x:.2f}<br>IV: %{y:.2f}%<extra></extra>",
 )
 )

 if not puts.empty and "impliedVolatility" in puts.columns:
 puts_plot = puts[
 (puts["strike"] >= low_strike)
 & (puts["strike"] <= high_strike)
 & (puts["impliedVolatility"].notna())
 & (puts["impliedVolatility"] > 0)
 ]
 if not puts_plot.empty:
 fig.add_trace(
 go.Scatter(
 x=puts_plot["strike"],
 y=puts_plot["impliedVolatility"] * 100,
 mode="lines+markers",
 name="Puts",
 marker=dict(size=6),
 line=dict(width=2, dash="dash"),
 hovertemplate="Strike: $%{x:.2f}<br>IV: %{y:.2f}%<extra></extra>",
 )
 )

 fig.add_vline(x=spot, line_dash="dash", line_color="green")
 fig.update_layout(
 title=f"Figure. {ticker} IV Smile (Exp: {exp})",
 xaxis_title="Strike Price ($)",
 yaxis_title="Implied Volatility (%)",
 height=450,
 )
 return fig


## Universe Selection and Fundamentals


In [None]:
if USE_SCREEN:
 TICKERS = screen_for_candidates(**SCREEN_PARAMS)
elif TICKER_OVERRIDE:
 TICKERS = list(TICKER_OVERRIDE)
else:
 TICKERS = []

if MAX_TICKERS:
 TICKERS = TICKERS[:MAX_TICKERS]

RUN_DATE = datetime.now().strftime("%Y-%m-%d")

display(Markdown(f"**Run Date**: {RUN_DATE} "))
display(Markdown(f"**Quick Run**: {QUICK_RUN}"))
display(Markdown(f"**Tickers Loaded**: {len(TICKERS)}"))

if not TICKERS:
 display(Markdown("No tickers were loaded. Check USE_SCREEN or TICKER_OVERRIDE."))

fund_rows = []
spot_map = {}

for ticker in TICKERS:
 spot = get_spot(ticker)
 if spot is None:
 continue
 fundamentals = fetch_fundamentals(ticker)
 fundamentals["spot"] = spot
 fundamentals.update(compute_trend_metrics(ticker))
 fund_rows.append(fundamentals)
 spot_map[ticker] = spot

fund_df = pd.DataFrame(fund_rows)
if not fund_df.empty:
 if MIN_AVG_DAILY_VOLUME:
 fund_df = fund_df[
 (fund_df["avg_volume_3m"].isna())
 | (fund_df["avg_volume_3m"] >= MIN_AVG_DAILY_VOLUME)
 ]

 if TREND_FILTER:
 if TREND_FILTER_MODE == "strict":
 fund_df = fund_df[
 (fund_df["price_above_ma_long"] == True)
 & (fund_df["ma_short_above_ma_long"] == True)
 & (fund_df["ma_slope_positive"] == True)
 ]
 else:
 fund_df = fund_df[(fund_df["trend_score"].fillna(0)) >= MIN_TREND_SCORE]

 if TREND_FILTER and fund_df.empty:
 display(
 Markdown(
 "Trend filter removed all tickers. Consider setting TREND_FILTER=False or TREND_FILTER_MODE='score' and adjusting MIN_TREND_SCORE."
 )
 )

 fund_df = add_weighted_score(
 fund_df,
 FUND_METRIC_WEIGHTS,
 FUND_METRIC_BETTER,
 score_col="fund_score",
 )

 fund_df = fund_df.sort_values("fund_score", ascending=False)

 display_cols = [
 "ticker",
 "sector",
 "industry",
 "market_cap",
 "spot",
 "roe",
 "rev_growth",
 "profit_margin",
 "operating_margin",
 "debt_to_equity",
 "pe",
 "ps",
 "ret_1m",
 "ret_3m",
 "ret_6m",
 "rs_3m",
 "trend_score",
 "trend_label",
 "rsi_14",
 "atr_pct",
 "drawdown_6m",
 "dist_52w_high",
 "dist_52w_low",
 "hv_30",
 "fund_score",
 ]

 fund_view = fund_df[display_cols].copy()
 fund_view = fund_view.rename(
 columns={
 "ticker": "Ticker",
 "market_cap": "Market Cap",
 "spot": "Spot",
 "roe": "ROE",
 "rev_growth": "Revenue Growth",
 "profit_margin": "Profit Margin",
 "operating_margin": "Operating Margin",
 "debt_to_equity": "Debt/Equity",
 "pe": "P/E",
 "ps": "P/S",
 "ret_1m": "1M Return",
 "ret_3m": "3M Return",
 "ret_6m": "6M Return",
 "rs_3m": "RS 3M",
 "trend_score": "Trend Score",
 "trend_label": "Trend",
 "rsi_14": "RSI 14",
 "atr_pct": "ATR %",
 "drawdown_6m": "Drawdown 6M",
 "dist_52w_high": "Dist 52W High",
 "dist_52w_low": "Dist 52W Low",
 "hv_30": "HV 30d",
 "fund_score": "Fund Score",
 }
 )

 display_table(
 fund_view,
 caption="Fundamental Screen and Scores",
 format_dict={
 "Market Cap": "${:,.0f}",
 "Spot": "${:,.2f}",
 "ROE": "{:.2%}",
 "Revenue Growth": "{:.2%}",
 "Profit Margin": "{:.2%}",
 "Operating Margin": "{:.2%}",
 "Debt/Equity": "{:.2f}",
 "P/E": "{:.2f}",
 "P/S": "{:.2f}",
 "1M Return": "{:.1%}",
 "3M Return": "{:.1%}",
 "6M Return": "{:.1%}",
 "RS 3M": "{:.1%}",
 "Trend Score": "{:.2f}",
 "RSI 14": "{:.1f}",
 "ATR %": "{:.1%}",
 "Drawdown 6M": "{:.1%}",
 "Dist 52W High": "{:.1%}",
 "Dist 52W Low": "{:.1%}",
 "HV 30d": "{:.1%}",
 "Fund Score": "{:.1f}",
 },
 )

 TICKERS_FINAL = fund_df["ticker"].tolist()
else:
 TICKERS_FINAL = []

if not TICKERS_FINAL:
 display(Markdown("No tickers passed the fundamental screen. Adjust filters."))
else:
 display(Markdown(f"**Tickers After Fundamentals**: {len(TICKERS_FINAL)}"))
 display(pd.DataFrame({"Ticker": TICKERS_FINAL}))


## Run Analysis (Data Pull + Feature Engineering)


In [15]:
summary_rows = []
term_rows = []
cc_yield_rows = []
csp_yield_rows = []
leap_rows = []
hedge_rows = []
combo_rows = []
skip_log = []

chain_cache: dict[tuple[str, str], tuple[pd.DataFrame, pd.DataFrame]] = {}
chain_for_smile = {}


def get_chain_cached(
 ticker: str, exp_date: str, spot: float
) -> tuple[pd.DataFrame, pd.DataFrame]:
 key = (ticker, exp_date)
 if key not in chain_cache:
 chain_cache[key] = fetch_chain(ticker, exp_date, spot)
 return chain_cache[key]


for ticker in TICKERS_FINAL:
 spot = spot_map.get(ticker) or get_spot(ticker)
 if spot is None:
 skip_log.append((ticker, "spot_unavailable"))
 continue

 expirations = get_expirations(ticker)
 if not expirations:
 skip_log.append((ticker, "no_expirations"))
 continue

 boll_mid = compute_bollinger_midline(ticker, window=BOLLINGER_WINDOW)

 leap_selection = None
 hedge_selection = None

 row = {
 "ticker": ticker,
 "spot": spot,
 "boll_mid": boll_mid,
 }

 for target_dte in TARGET_DTES:
 picked = find_expiration(expirations, target_dte=target_dte)
 if not picked:
 continue

 exp_date, actual_dte = picked
 calls, puts = get_chain_cached(ticker, exp_date, spot)
 atm_iv = compute_atm_iv(calls, puts, spot)

 row[f"exp_{target_dte}"] = exp_date
 row[f"actual_dte_{target_dte}"] = actual_dte
 row[f"atm_iv_{target_dte}"] = atm_iv

    # Covered call chain (<= 50 DTE target)
 cc_pick = find_expiration(expirations, target_dte=CC_TARGET_DTE, max_dte=CC_MAX_DTE)
 if cc_pick:
 cc_exp, cc_dte = cc_pick
 cc_calls, cc_puts = get_chain_cached(ticker, cc_exp, spot)
 row["exp_cc"] = cc_exp
 row["dte_cc"] = cc_dte
 row["atm_iv_cc"] = compute_atm_iv(cc_calls, cc_puts, spot)

 chain_for_smile[ticker] = {
 "calls": cc_calls,
 "puts": cc_puts,
 "spot": spot,
 "exp": cc_exp,
 }

        # Covered call metrics
 if not cc_calls.empty:
 for otm in OTM_LEVELS:
 target_strike = spot * (1 + otm)
 selected = select_option_near_strike(cc_calls, target_strike)
 if selected is None:
 continue

 premium_yield = selected["mid"] / spot
 annualized_yield = (
 premium_yield * (365 / cc_dte) if cc_dte > 0 else None
 )

 iv = safe_float(selected.get("impliedVolatility"))
 prob_itm = calc_prob_itm(spot, selected["strike"], iv, cc_dte, "call")
 p_win = None if prob_itm is None else 1 - prob_itm
 p_win_adj = (
 shrink_prob(p_win, EDGE_HAIRCUT) if p_win is not None else None
 )

 cc_yield_rows.append(
 {
 "ticker": ticker,
 "dte": cc_dte,
 "otm": otm,
 "otm_label": f"{otm:.0%}",
 "target_strike": target_strike,
 "actual_strike": selected["strike"],
 "mid": selected["mid"],
 "bid": selected.get("bid"),
 "ask": selected.get("ask"),
 "open_interest": safe_float(selected.get("openInterest")),
 "volume": safe_float(selected.get("volume")),
 "spread_pct": safe_float(selected.get("spread_pct")),
 "iv": iv,
 "premium_yield": premium_yield,
 "annualized_yield": annualized_yield,
 "prob_itm": prob_itm,
 "p_win_adj": p_win_adj,
 }
 )

        # Cash-secured put metrics (same expiration)
 if not cc_puts.empty:
 for otm in OTM_LEVELS:
 target_strike = spot * (1 - otm)
 selected = select_option_near_strike(
 cc_puts, target_strike, direction="below"
 )
 if selected is None:
 continue

 premium_yield = selected["mid"] / selected["strike"]
 annualized_yield = (
 premium_yield * (365 / cc_dte) if cc_dte > 0 else None
 )

 iv = safe_float(selected.get("impliedVolatility"))
 prob_itm = calc_prob_itm(spot, selected["strike"], iv, cc_dte, "put")
 p_win = None if prob_itm is None else 1 - prob_itm
 p_win_adj = (
 shrink_prob(p_win, EDGE_HAIRCUT) if p_win is not None else None
 )

 max_loss = selected["strike"] - selected["mid"]
 b = None if max_loss <= 0 else selected["mid"] / max_loss
 kelly_fraction = calc_kelly_fraction(p_win_adj, b)
 expected_value = (
 None
 if p_win_adj is None
 else p_win_adj * selected["mid"] - (1 - p_win_adj) * max_loss
 )

 csp_yield_rows.append(
 {
 "ticker": ticker,
 "dte": cc_dte,
 "otm": otm,
 "otm_label": f"{otm:.0%}",
 "target_strike": target_strike,
 "actual_strike": selected["strike"],
 "mid": selected["mid"],
 "bid": selected.get("bid"),
 "ask": selected.get("ask"),
 "open_interest": safe_float(selected.get("openInterest")),
 "volume": safe_float(selected.get("volume")),
 "spread_pct": safe_float(selected.get("spread_pct")),
 "iv": iv,
 "premium_yield": premium_yield,
 "annualized_yield": annualized_yield,
 "prob_itm": prob_itm,
 "p_win_adj": p_win_adj,
 "max_loss": max_loss,
 "b_ratio": b,
 "kelly_fraction": kelly_fraction,
 "expected_value": expected_value,
 }
 )

    # Term structure
 term_exps = [e for e in expirations if e[1] <= MAX_TERM_DTE]
 term_exps = term_exps[::TERM_STRUCTURE_SAMPLE]

 for exp_date, dte in term_exps:
 time.sleep(RATE_LIMIT_SLEEP * TERM_STRUCTURE_SLEEP_MULTIPLIER)
 calls, puts = get_chain_cached(ticker, exp_date, spot)
 atm_iv = compute_atm_iv(calls, puts, spot)
 if atm_iv is not None:
 term_rows.append(
 {
 "ticker": ticker,
 "expiration": exp_date,
 "dte": dte,
 "atm_iv": atm_iv,
 }
 )

    # LEAP call selection
 leap_pick = find_expiration(expirations, min_dte=LEAP_MIN_DTE)
 if leap_pick:
 leap_exp, leap_dte = leap_pick
 leap_calls, _ = get_chain_cached(ticker, leap_exp, spot)
 target_strike = spot * LEAP_TARGET_MONEYNESS
 leap_option = select_option_near_strike(
 leap_calls, target_strike, direction="below"
 )
 if leap_option is not None:
 breakeven = leap_option["strike"] + leap_option["mid"]
 breakeven_pct = (breakeven - spot) / spot
 leap_rows.append(
 {
 "ticker": ticker,
 "leap_exp": leap_exp,
 "leap_dte": leap_dte,
 "leap_strike": leap_option["strike"],
 "leap_mid": leap_option["mid"],
 "leap_breakeven": breakeven,
 "leap_breakeven_pct": breakeven_pct,
 }
 )

    # Protective put near Bollinger midline
 if boll_mid is not None:
 hedge_pick = find_expiration(
 expirations, target_dte=HEDGE_TARGET_DTE, max_dte=HEDGE_MAX_DTE
 )
 if hedge_pick:
 hedge_exp, hedge_dte = hedge_pick
 _, hedge_puts = get_chain_cached(ticker, hedge_exp, spot)
 target_strike = min(boll_mid, spot)
 hedge_option = select_option_near_strike(
 hedge_puts, target_strike, direction="below"
 )
 if hedge_option is not None:
 hedge_selection = {
 "hedge_exp": hedge_exp,
 "hedge_dte": hedge_dte,
 "hedge_strike": hedge_option["strike"],
 "hedge_mid": hedge_option["mid"],
 "hedge_iv": safe_float(hedge_option.get("impliedVolatility")),
 }
 hedge_rows.append(
 {
 "ticker": ticker,
 "hedge_exp": hedge_exp,
 "hedge_dte": hedge_dte,
 "hedge_strike": hedge_option["strike"],
 "hedge_mid": hedge_option["mid"],
 "boll_mid": boll_mid,
 "hedge_cost_pct": hedge_option["mid"] / spot,
 }
 )

 if leap_selection and hedge_selection:
 combo_metrics = evaluate_combo(
 spot=spot,
 leap_strike=leap_selection["leap_strike"],
 leap_mid=leap_selection["leap_mid"],
 leap_iv=leap_selection["leap_iv"],
 leap_dte=leap_selection["leap_dte"],
 hedge_strike=hedge_selection["hedge_strike"],
 hedge_mid=hedge_selection["hedge_mid"],
 hedge_iv=hedge_selection["hedge_iv"],
 hedge_dte=hedge_selection["hedge_dte"],
 horizon_dte=COMBO_HORIZON_DTE,
 )
 if combo_metrics:
 combo_rows.append(
 {
 "ticker": ticker,
 "spot": spot,
 **leap_selection,
 **hedge_selection,
 **combo_metrics,
 }
 )

 summary_rows.append(row)

if skip_log:
 skip_df = pd.DataFrame(skip_log, columns=["Ticker", "Reason"])
 display(skip_df)


## Scorecard and Ranking


In [None]:
summary_df = pd.DataFrame(summary_rows)
term_df = pd.DataFrame(term_rows)
cc_df = pd.DataFrame(cc_yield_rows)
csp_df = pd.DataFrame(csp_yield_rows)
leap_df = pd.DataFrame(leap_rows)
hedge_df = pd.DataFrame(hedge_rows)
combo_df = pd.DataFrame(combo_rows)

cc_target = pd.DataFrame()
csp_target = pd.DataFrame()
ranked = pd.DataFrame()

if summary_df.empty:
 display(Markdown("No options data collected. Check tickers and data availability."))
else:
 if not fund_df.empty:
 summary_df = summary_df.merge(
 fund_df,
 on="ticker",
 how="left",
 suffixes=("", "_fund"),
 )

 if "fund_score" not in summary_df.columns:
 summary_df["fund_score"] = np.nan

 summary_df["spot_vs_boll_mid"] = np.where(
 summary_df["boll_mid"].notna(),
 (summary_df["spot"] / summary_df["boll_mid"]) - 1,
 np.nan,
 )

 if not term_df.empty:
 term_slope = (
 term_df.groupby("ticker")
 .apply(compute_term_slope)
 .reset_index(name="term_slope")
 )
 summary_df = summary_df.merge(term_slope, on="ticker", how="left")

 if "hv_30" in summary_df.columns:
 summary_df["hv_iv_ratio"] = np.where(
 summary_df["hv_30"] > 0,
 summary_df["atm_iv_cc"] / summary_df["hv_30"],
 np.nan,
 )
 summary_df["iv_hv_spread"] = summary_df["atm_iv_cc"] - summary_df["hv_30"]

 if not cc_df.empty:
 cc_target = pick_target_otm(cc_df, TARGET_OTM_FOR_SCORING)
 cc_merge = cc_target.rename(
 columns={
 "annualized_yield": "cc_ann_yield",
 "spread_pct": "cc_spread_pct",
 "prob_itm": "cc_prob_itm",
 "p_win_adj": "cc_p_win_adj",
 }
 )
 summary_df = summary_df.merge(
 cc_merge[
 [
 "ticker",
 "cc_ann_yield",
 "cc_spread_pct",
 "cc_prob_itm",
 "cc_p_win_adj",
 ]
 ],
 on="ticker",
 how="left",
 )

 if not csp_df.empty:
 csp_target = pick_target_otm(csp_df, TARGET_OTM_FOR_SCORING)
 csp_merge = csp_target.rename(
 columns={
 "annualized_yield": "csp_ann_yield",
 "spread_pct": "csp_spread_pct",
 "prob_itm": "csp_prob_itm",
 "p_win_adj": "csp_p_win_adj",
 "kelly_fraction": "csp_kelly_fraction",
 }
 )
 summary_df = summary_df.merge(
 csp_merge[
 [
 "ticker",
 "csp_ann_yield",
 "csp_spread_pct",
 "csp_prob_itm",
 "csp_p_win_adj",
 "csp_kelly_fraction",
 ]
 ],
 on="ticker",
 how="left",
 )

 spread_cols = [
 col for col in ["cc_spread_pct", "csp_spread_pct"] if col in summary_df.columns
 ]
 if spread_cols:
 summary_df["avg_spread_pct"] = np.nanmean(
 summary_df[spread_cols].values, axis=1
 )
 else:
 summary_df["avg_spread_pct"] = np.nan

 summary_df = add_weighted_score(
 summary_df,
 OPTION_METRIC_WEIGHTS,
 OPTION_METRIC_BETTER,
 score_col="options_score",
 )

 summary_df["total_score"] = (
 FUNDAMENTAL_WEIGHT * summary_df["fund_score"]
 + OPTIONS_WEIGHT * summary_df["options_score"]
 )
 summary_df["grade"] = summary_df["total_score"].apply(assign_grade)

 ranked = summary_df.sort_values("total_score", ascending=False)

 display_cols = [
 "ticker",
 "grade",
 "total_score",
 "fund_score",
 "options_score",
 "spot",
 "market_cap",
 "sector",
 "cc_ann_yield",
 "csp_ann_yield",
 "atm_iv_cc",
 "term_slope",
 "avg_spread_pct",
 "hv_30",
 "hv_iv_ratio",
 "iv_hv_spread",
 ]

 view = ranked[display_cols].copy()
 view = view.rename(
 columns={
 "ticker": "Ticker",
 "grade": "Grade",
 "total_score": "Total Score",
 "fund_score": "Fund Score",
 "options_score": "Options Score",
 "spot": "Spot",
 "market_cap": "Market Cap",
 "sector": "Sector",
 "cc_ann_yield": "CC Ann Yield",
 "csp_ann_yield": "CSP Ann Yield",
 "atm_iv_cc": "ATM IV (CC)",
 "term_slope": "Term Slope",
 "avg_spread_pct": "Avg Spread",
 "hv_30": "HV 30d",
 "hv_iv_ratio": "IV/HV",
 "iv_hv_spread": "IV - HV",
 }
 )

 format_dict = {
 "Total Score": "{:.1f}",
 "Fund Score": "{:.1f}",
 "Options Score": "{:.1f}",
 "Spot": "${:,.2f}",
 "Market Cap": "${:,.0f}",
 "CC Ann Yield": "{:.1%}",
 "CSP Ann Yield": "{:.1%}",
 "ATM IV (CC)": "{:.1%}",
 "Term Slope": "{:.3f}",
 "Avg Spread": "{:.1%}",
 "HV 30d": "{:.1%}",
 "IV/HV": "{:.2f}",
 "IV - HV": "{:.1%}",
 }

 display_table(
 view.head(TOP_N), caption="Composite Scorecard (Top N)", format_dict=format_dict
 )

 if not ranked.empty:
 top = ranked.head(5)
 summary_lines = ["**Executive Summary**"] + [
 f"- {row.ticker}: Grade {row.grade}, Total Score {row.total_score:.1f}"
 for _, row in top.iterrows()
 ]
 display(Markdown("No options data collected. Check tickers and data availability."))


In [None]:
if not ranked.empty:
 tech_view = ranked[
 [
 "ticker",
 "trend_label",
 "trend_score",
 "ret_1m",
 "ret_3m",
 "ret_6m",
 "rs_3m",
 "rsi_14",
 "atr_pct",
 "drawdown_6m",
 "dist_52w_high",
 "dist_52w_low",
 ]
 ].copy()

 tech_view = tech_view.rename(
 columns={
 "ticker": "Ticker",
 "trend_label": "Trend",
 "trend_score": "Trend Score",
 "ret_1m": "1M Return",
 "ret_3m": "3M Return",
 "ret_6m": "6M Return",
 "rs_3m": "RS 3M",
 "rsi_14": "RSI 14",
 "atr_pct": "ATR %",
 "drawdown_6m": "Drawdown 6M",
 "dist_52w_high": "Dist 52W High",
 "dist_52w_low": "Dist 52W Low",
 }
 )

 display_table(
 tech_view.head(TOP_N),
 caption="Technical Dashboard (Top N)",
 format_dict={
 "Trend Score": "{:.2f}",
 "1M Return": "{:.1%}",
 "3M Return": "{:.1%}",
 "6M Return": "{:.1%}",
 "RS 3M": "{:.1%}",
 "RSI 14": "{:.1f}",
 "ATR %": "{:.1%}",
 "Drawdown 6M": "{:.1%}",
 "Dist 52W High": "{:.1%}",
 "Dist 52W Low": "{:.1%}",
 },
 )

if not ranked.empty:
 grade_colors = {
 "A+": "#0f172a",
 "A": "#1e3a8a",
 "B": "#334155",
 "C": "#6b7280",
 "D": "#9ca3af",
 "F": "#b91c1c",
 "N/A": "#9ca3af",
 }

 fig = px.bar(
 ranked.head(TOP_N).sort_values("total_score", ascending=True),
 x="total_score",
 y="ticker",
 color="grade",
 title="Figure 1. Composite Opportunity Score (Top N)",
 labels={"total_score": "Score", "ticker": "Ticker"},
 color_discrete_map=grade_colors,
 orientation="h",
 )
 fig.update_layout(height=500, xaxis_range=[0, 100])
 show_figure(fig)

 fig = px.scatter(
 ranked,
 x="fund_score",
 y="options_score",
 color="total_score",
 size="market_cap",
 color_continuous_scale=COLOR_CONTINUOUS,
 title="Figure 2. Fundamental vs Options Opportunity",
 labels={"fund_score": "Fundamental Score", "options_score": "Options Score"},
 hover_data=["ticker", "grade", "cc_ann_yield", "csp_ann_yield", "atm_iv_cc"],
 )
 fig.update_layout(height=500, xaxis_range=[0, 100], yaxis_range=[0, 100])
 show_figure(fig)

 if not term_df.empty:
 top_tickers = ranked["ticker"].head(6).tolist()
 term_plot = term_df[term_df["ticker"].isin(top_tickers)]
 if not term_plot.empty:
 fig = px.line(
 term_plot,
 x="dte",
 y="atm_iv",
 color="ticker",
 markers=True,
 title="Figure 3. IV Term Structure (Top Tickers)",
 labels={"dte": "Days to Expiration", "atm_iv": "ATM IV"},
 color_discrete_sequence=COLOR_DISCRETE,
 )
 fig.update_layout(yaxis_tickformat=".1%", height=500)
 show_figure(fig)


## Income Strategies (Covered Calls and Cash-Secured Puts)

_Skipped when `QUICK_SCAN = True`._


In [None]:
if QUICK_SCAN:
    display(Markdown("*Income strategy details skipped in quick-scan mode.*"))
elif not cc_df.empty:
    cc_view = cc_df[
        [
            "ticker",
            "dte",
            "otm_label",
            "actual_strike",
            "mid",
            "premium_yield",
            "annualized_yield",
            "spread_pct",
            "prob_itm",
        ]
    ].copy()

    cc_view = cc_view.rename(
        columns={
            "ticker": "Ticker",
            "dte": "DTE",
            "otm_label": "OTM",
            "actual_strike": "Strike",
            "mid": "Mid",
            "premium_yield": "Yield",
            "annualized_yield": "Ann Yield",
            "spread_pct": "Spread",
            "prob_itm": "Prob ITM",
        }
    )

    display_table(
        cc_view,
        caption="Covered Call Income Candidates",
        format_dict={
            "Strike": "${:,.2f}",
            "Mid": "${:,.2f}",
            "Yield": "{:.2%}",
            "Ann Yield": "{:.1%}",
            "Spread": "{:.1%}",
            "Prob ITM": "{:.1%}",
        },
    )

if not csp_df.empty:
    csp_view = csp_df[
        [
            "ticker",
            "dte",
            "otm_label",
            "actual_strike",
            "mid",
            "premium_yield",
            "annualized_yield",
            "spread_pct",
            "prob_itm",
            "kelly_fraction",
        ]
    ].copy()

    csp_view = csp_view.rename(
        columns={
            "ticker": "Ticker",
            "dte": "DTE",
            "otm_label": "OTM",
            "actual_strike": "Strike",
            "mid": "Mid",
            "premium_yield": "Yield",
            "annualized_yield": "Ann Yield",
            "spread_pct": "Spread",
            "prob_itm": "Prob ITM",
            "kelly_fraction": "Kelly",
        }
    )

    display_table(
        csp_view,
        caption="Cash-Secured Put Income Candidates",
        format_dict={
            "Strike": "${:,.2f}",
            "Mid": "${:,.2f}",
            "Yield": "{:.2%}",
            "Ann Yield": "{:.1%}",
            "Spread": "{:.1%}",
            "Prob ITM": "{:.1%}",
            "Kelly": "{:.1%}",
        },
    )

if not cc_df.empty:
    heatmap = cc_df.pivot_table(
        index="ticker",
        columns="otm_label",
        values="annualized_yield",
        aggfunc="mean",
    ).sort_index()

    fig = px.imshow(
        heatmap,
        text_auto=".1%",
        color_continuous_scale="RdYlGn",
        aspect="auto",
        title="Figure 4. Covered Call Annualized Yield Heatmap",
    )
    fig.update_layout(
        xaxis_title="OTM Level",
        yaxis_title="Ticker",
        coloraxis_colorbar=dict(title="Ann Yield"),
        height=500,
    )
    show_figure(fig)

if not csp_df.empty:
    heatmap = csp_df.pivot_table(
        index="ticker",
        columns="otm_label",
        values="annualized_yield",
        aggfunc="mean",
    ).sort_index()

    fig = px.imshow(
        heatmap,
        text_auto=".1%",
        color_continuous_scale="RdYlGn",
        aspect="auto",
        title="Figure 5. Cash-Secured Put Annualized Yield Heatmap",
    )
    fig.update_layout(
        xaxis_title="OTM Level",
        yaxis_title="Ticker",
        coloraxis_colorbar=dict(title="Ann Yield"),
        height=500,
    )
    show_figure(fig)


## Volatility Diagnostics (IV Smiles and Term Structure)


In [None]:
if not ranked.empty:
 tickers_to_plot = ranked["ticker"].head(MAX_IV_SMILES).tolist()

 for ticker in tickers_to_plot:
 chain = chain_for_smile.get(ticker)
 if not chain:
 continue
 fig = plot_iv_smile(
 ticker,
 chain["calls"],
 chain["puts"],
 chain["spot"],
 chain["exp"],
 )
 show_figure(fig)


## Long-Term Upside, Downside, and Sizing

Set `PORTFOLIO_SIZE` to your total account value. The Kelly sizing table uses IV-derived win probabilities, applies `EDGE_HAIRCUT` and `FRACTIONAL_KELLY`, then caps each trade at `MAX_SINGLE_TRADE_PCT`.

_Skipped when `QUICK_SCAN = True`._


In [None]:
if QUICK_SCAN:
 display(Markdown("*Sizing and combo details skipped in quick-scan mode.*"))
elif not combo_df.empty:
 combo_scored = add_weighted_score(
 combo_df.copy(),
 COMBO_SCORE_WEIGHTS,
 COMBO_METRIC_BETTER,
 score_col="combo_score",
 )
 combo_scored["combo_grade"] = combo_scored["combo_score"].apply(assign_grade)
 combo_ranked = combo_scored.sort_values("combo_score", ascending=False)

 combo_view = combo_ranked[
 [
 "ticker",
 "combo_grade",
 "combo_score",
 "net_debit",
 "expected_pnl_pct",
 "bear_pnl_pct",
 "bull_pnl_pct",
 "hedge_cost_ann",
 "leap_strike",
 "hedge_strike",
 "horizon_dte",
 ]
 ].copy()

 combo_view = combo_view.rename(
 columns={
 "ticker": "Ticker",
 "combo_grade": "Grade",
 "combo_score": "Combo Score",
 "net_debit": "Net Debit",
 "expected_pnl_pct": "Expected PnL %",
 "bear_pnl_pct": "Bear PnL %",
 "bull_pnl_pct": "Bull PnL %",
 "hedge_cost_ann": "Hedge Cost (Ann)",
 "leap_strike": "LEAP Strike",
 "hedge_strike": "Put Strike",
 "horizon_dte": "Horizon DTE",
 }
 )

 display_table(
 combo_view.head(TOP_N),
 caption="LEAP + Protective Put Combo Scorecard",
 format_dict={
 "Combo Score": "{:.1f}",
 "Net Debit": "${:,.2f}",
 "Expected PnL %": "{:.1%}",
 "Bear PnL %": "{:.1%}",
 "Bull PnL %": "{:.1%}",
 "Hedge Cost (Ann)": "{:.1%}",
 "LEAP Strike": "${:,.2f}",
 "Put Strike": "${:,.2f}",
 },
 )

 fig = px.scatter(
 combo_ranked,
 x="bear_pnl_pct",
 y="expected_pnl_pct",
 color="combo_score",
 size="net_debit",
 color_continuous_scale=COLOR_CONTINUOUS,
 title="Figure 6. LEAP + Protective Put Scenario Map",
 labels={"bear_pnl_pct": "Bear PnL %", "expected_pnl_pct": "Expected PnL %"},
 hover_data=["ticker", "combo_grade", "net_debit", "bull_pnl_pct"],
 )
 fig.update_layout(height=500)
 show_figure(fig)

if not leap_df.empty:
 leap_view = leap_df.rename(
 columns={
 "ticker": "Ticker",
 "leap_exp": "LEAP Exp",
 "leap_dte": "LEAP DTE",
 "leap_strike": "LEAP Strike",
 "leap_mid": "LEAP Mid",
 "leap_breakeven": "Breakeven",
 "leap_breakeven_pct": "Breakeven %",
 }
 )

 display_table(
 leap_view,
 caption="LEAP Call Candidates",
 format_dict={
 "LEAP Strike": "${:,.2f}",
 "LEAP Mid": "${:,.2f}",
 "Breakeven": "${:,.2f}",
 "Breakeven %": "{:.1%}",
 },
 )

if not hedge_df.empty:
 hedge_view = hedge_df.rename(
 columns={
 "ticker": "Ticker",
 "hedge_exp": "Hedge Exp",
 "hedge_dte": "Hedge DTE",
 "hedge_strike": "Hedge Strike",
 "hedge_mid": "Hedge Mid",
 "boll_mid": "Boll Mid",
 "hedge_cost_pct": "Hedge Cost %",
 }
 )

 display_table(
 hedge_view,
 caption="Protective Put Candidates",
 format_dict={
 "Hedge Strike": "${:,.2f}",
 "Hedge Mid": "${:,.2f}",
 "Boll Mid": "${:,.2f}",
 "Hedge Cost %": "{:.1%}",
 },
 )

if not ranked.empty and not csp_target.empty:
 sizing_base = ranked.head(TOP_N).merge(
 csp_target,
 on="ticker",
 how="left",
 suffixes=("", "_csp"),
 )

 sizing_base = sizing_base.dropna(subset=["actual_strike", "mid", "dte"])
 if not sizing_base.empty:
 sizing_base["max_loss_per_contract"] = (
 sizing_base["actual_strike"] - sizing_base["mid"]
 ) * 100
 sizing_base["kelly_fraction"] = sizing_base["kelly_fraction"].fillna(0)
 sizing_base["kelly_risk"] = (
 PORTFOLIO_SIZE * sizing_base["kelly_fraction"] * FRACTIONAL_KELLY
 )
 sizing_base["kelly_risk"] = sizing_base["kelly_risk"].clip(
 upper=PORTFOLIO_SIZE * MAX_SINGLE_TRADE_PCT
 )
 sizing_base["contracts"] = sizing_base.apply(
 lambda row: contracts_from_risk(
 row["kelly_risk"], row["max_loss_per_contract"]
 ),
 axis=1,
 )
 sizing_base["collateral_per_contract"] = sizing_base["actual_strike"] * 100
 sizing_base["total_collateral"] = (
 sizing_base["contracts"] * sizing_base["collateral_per_contract"]
 )

 sizing_view = sizing_base[
 [
 "ticker",
 "actual_strike",
 "dte",
 "mid",
 "prob_itm",
 "kelly_fraction",
 "contracts",
 "max_loss_per_contract",
 "total_collateral",
 ]
 ].copy()

 sizing_view = sizing_view.rename(
 columns={
 "ticker": "Ticker",
 "actual_strike": "Strike",
 "dte": "DTE",
 "mid": "Premium",
 "prob_itm": "Prob ITM",
 "kelly_fraction": "Kelly",
 "contracts": "Contracts",
 "max_loss_per_contract": "Max Loss/Contract",
 "total_collateral": "Total Collateral",
 }
 )

 display_table(
 sizing_view,
 caption="Kelly-Based Sizing (Cash-Secured Puts)",
 format_dict={
 "Strike": "${:,.2f}",
 "Premium": "${:,.2f}",
 "Prob ITM": "{:.1%}",
 "Kelly": "{:.1%}",
 "Max Loss/Contract": "${:,.0f}",
 "Total Collateral": "${:,.0f}",
 },
 )

if not ranked.empty and not cc_target.empty:
 cc_size = ranked.head(TOP_N).merge(
 cc_target,
 on="ticker",
 how="left",
 suffixes=("", "_cc"),
 )

 cc_size = cc_size.dropna(subset=["spot", "mid"])
 if not cc_size.empty:
 per_trade_cap = PORTFOLIO_SIZE * MAX_SINGLE_TRADE_PCT
 cc_size["contracts_cap"] = (per_trade_cap / (cc_size["spot"] * 100)).apply(
 np.floor
 )
 cc_size["contracts_alloc"] = (
 (PORTFOLIO_SIZE * COVERED_CALL_ALLOC_PCT) / (cc_size["spot"] * 100)
 ).apply(np.floor)
 cc_size["contracts"] = cc_size[["contracts_cap", "contracts_alloc"]].min(axis=1)
 cc_size["shares"] = cc_size["contracts"] * 100
 cc_size["equity_notional"] = cc_size["shares"] * cc_size["spot"]

 cc_view = cc_size[
 [
 "ticker",
 "spot",
 "contracts",
 "shares",
 "equity_notional",
 "mid",
 ]
 ].copy()

 cc_view = cc_view.rename(
 columns={
 "ticker": "Ticker",
 "spot": "Spot",
 "contracts": "Contracts",
 "shares": "Shares",
 "equity_notional": "Equity Notional",
 "mid": "Premium",
 }
 )

 display_table(
 cc_view,
 caption="Sizing Guide (Covered Calls)",
 format_dict={
 "Spot": "${:,.2f}",
 "Equity Notional": "${:,.0f}",
 "Premium": "${:,.2f}",
 },
 )


## Notes and Limitations

- This notebook is for screening and research only. It is not investment advice.
- `yfinance` data can be delayed, incomplete, or inconsistent.
- IV and probability estimates are risk-neutral and do not represent forecasts.
- Kelly sizing is highly sensitive to assumptions; the notebook uses a conservative fractional Kelly and caps.
- Trend filters are based on moving averages and can whipsaw in range-bound markets.
- HV vs IV metrics are backward-looking and do not predict future volatility.
- Liquidity filters are heuristic and may exclude viable contracts in thin markets.
