# CSP (Cash-Secured Put) Strategy - Phase 1: Data Layer

This notebook establishes the data infrastructure for the options selling strategy.

**Phase 1 Objectives:**
1. Configuration management
2. Alpaca API client setup
3. VIX data fetching (via yfinance)
4. Equity historical data fetching
5. Options chain fetching
6. Data validation and testing

## 1. Setup & Configuration

In [9]:
# Install required packages (run once)
# !pip install alpaca-py yfinance pandas numpy python-dotenv

In [10]:
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, date, timedelta
from enum import Enum
import pandas as pd
import numpy as np
import yfinance as yf
from dotenv import load_dotenv

# Alpaca imports
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import GetOptionContractsRequest
from alpaca.trading.enums import AssetStatus

# Load environment variables
load_dotenv()

print("Imports successful!")

Imports successful!


In [11]:
@dataclass
class StrategyConfig:
    """
    Central configuration for the CSP strategy.
    All parameters in one place for easy tuning.
    """
    # ==================== UNIVERSE & CAPITAL ====================
    ticker_universe: List[str] = field(default_factory=lambda: 
    [
        'AAPL', 'MSFT', 'GOOG'
    ])
    num_tickers: int = 10  # Max positions for diversification
    starting_cash: float = 50_000
    
    # ==================== VIX REGIME RULES ====================
    # (vix_lower, vix_upper): deployment_multiplier
    vix_deployment_rules: Dict[Tuple[float, float], float] = field(default_factory=lambda: {
        (0, 15): 1.0,       # VIX < 15: deploy 100%
        (15, 20): 0.8,      # 15 <= VIX < 20: deploy 80%
        (20, 25): 0.2,      # 20 <= VIX < 25: deploy 20%
        (25, float('inf')): 0.0  # VIX >= 25: deploy 0%
    })
    
    # ==================== EQUITY FILTER PARAMS ====================
    sma_periods: List[int] = field(default_factory=lambda: [8, 20, 50])
    rsi_period: int = 14
    rsi_lower: int = 30
    rsi_upper: int = 70
    bb_period: int = 50
    bb_std: float = 1.0
    sma_trend_lookback: int = 3  # Days to confirm SMA(50) uptrend
    history_days: int = 60  # Days of price history to fetch
    
    # ==================== OPTIONS FILTER PARAMS ====================
    min_daily_return: float = 0.15  # 0.15% daily yield on strike (premium/dte/strike)
    max_strike_pct: float = 0.98      # Strike <= 90% of stock price
    min_strike_pct: float = 0.85      # Strike <= 90% of stock price
    delta_min: float = 0.15
    delta_max: float = 0.40
    max_dte: int = 10
    min_dte: int = 1  # Avoid 0 DTE
    
    # ==================== RISK MANAGEMENT ====================
    delta_stop_multiplier: float = 2.0   # Exit if delta >= 2x entry
    stock_drop_stop_pct: float = 0.05    # Exit if stock drops 5%
    vix_spike_multiplier: float = 1.15   # Exit if VIX >= 1.15x entry
    early_exit_buffer: float = 0.15      # Exit if 15% ahead of decay schedule
    
    # ==================== OPERATIONAL ====================
    poll_interval_seconds: int = 60
    paper_trading: bool = True  # Safety first!
    
    def get_deployment_multiplier(self, vix: float) -> float:
        """Get cash deployment multiplier based on current VIX."""
        for (lower, upper), multiplier in self.vix_deployment_rules.items():
            if lower <= vix < upper:
                return multiplier
        return 0.0  # Default to no deployment if VIX out of range
    
    def get_deployable_cash(self, vix: float) -> float:
        """Calculate deployable cash based on VIX regime."""
        return self.starting_cash * self.get_deployment_multiplier(vix)


# Initialize config
config = StrategyConfig()

# Test VIX deployment rules
print("VIX Deployment Rules Test:")
for test_vix in [12, 17, 22, 30]:
    deployable = config.get_deployable_cash(test_vix)
    print(f"  VIX={test_vix}: Deploy ${deployable:,.0f} ({config.get_deployment_multiplier(test_vix):.0%})")

VIX Deployment Rules Test:
  VIX=12: Deploy $50,000 (100%)
  VIX=17: Deploy $40,000 (80%)
  VIX=22: Deploy $10,000 (20%)
  VIX=30: Deploy $0 (0%)


## 2. Alpaca Client Setup

In [12]:
class AlpacaClientManager:
    """
    Manages Alpaca API clients for data and trading.
    Handles authentication and provides unified access.
    """
    
    def __init__(self, paper: bool = True):
        """
        Initialize Alpaca clients.
        
        Args:
            paper: If True, use paper trading credentials
        """
        self.paper = paper
        
        # Get credentials from environment
        if paper:
            self.api_key = os.getenv('ALPACA_API_KEY')
            self.secret_key = os.getenv('ALPACA_SECRET_KEY')
        
        if not self.api_key or not self.secret_key:
            raise ValueError(
                "Alpaca credentials not found. Set environment variables:\n"
                "  ALPACA_API_KEY and ALPACA_SECRET_KEY (or ALPACA_PAPER_* variants)"
            )
        
        # Initialize clients
        self._data_client = None
        self._trading_client = None
    
    @property
    def data_client(self) -> StockHistoricalDataClient:
        """Lazy initialization of data client."""
        if self._data_client is None:
            self._data_client = StockHistoricalDataClient(
                api_key=self.api_key,
                secret_key=self.secret_key
            )
        return self._data_client
    
    @property
    def trading_client(self) -> TradingClient:
        """Lazy initialization of trading client."""
        if self._trading_client is None:
            self._trading_client = TradingClient(
                api_key=self.api_key,
                secret_key=self.secret_key,
                paper=self.paper
            )
        return self._trading_client
    
    def get_account_info(self) -> dict:
        """Get account information."""
        account = self.trading_client.get_account()
        return {
            'cash': float(account.cash),
            'buying_power': float(account.buying_power),
            'portfolio_value': float(account.portfolio_value),
            'status': account.status,
            'trading_blocked': account.trading_blocked,
            'options_trading_level': getattr(account, 'options_trading_level', None)
        }


# Test client initialization (will fail without credentials, that's expected)
try:
    alpaca = AlpacaClientManager(paper=config.paper_trading)
    account = alpaca.get_account_info()
    print("âœ“ Alpaca connection successful!")
    print(f"  Account status: {account['status']}")
    print(f"  Cash available: ${account['cash']:,.2f}")
    print(f"  Options level: {account['options_trading_level']}")
except ValueError as e:
    print(f"âš  Credentials not configured: {e}")
    print("\nTo configure, create a .env file or set environment variables:")
    print("  ALPACA_API_KEY=your_api_key")
    print("  ALPACA_SECRET_KEY=your_secret_key")
    alpaca = None
except Exception as e:
    print(f"âš  Connection error: {e}")
    alpaca = None

âœ“ Alpaca connection successful!
  Account status: AccountStatus.ACTIVE
  Cash available: $50,000.00
  Options level: 3


## 3. VIX Data Fetcher

Since Alpaca doesn't provide VIX directly, we use yfinance for `^VIX`.

In [13]:
class VixDataFetcher:
    """
    Fetches VIX data from Yahoo Finance.
    Provides current VIX and historical data for analysis.
    """
    
    SYMBOL = "^VIX"
    
    def __init__(self):
        self._ticker = yf.Ticker(self.SYMBOL)
        self._cache = {}
        self._cache_time = None
        self._cache_ttl = timedelta(minutes=1)
    
    def get_current_vix(self) -> float:
        """
        Get the current/latest VIX value.
        Uses last trading day's close when market is closed.
        
        Returns:
            Current VIX value as float
        """
        if (self._cache_time and 
            datetime.now() - self._cache_time < self._cache_ttl and
            'current' in self._cache):
            return self._cache['current']
        
        try:
            # Use 5d history - more reliable than 1d on weekends
            daily = self._ticker.history(period='5d')
            if daily.empty:
                raise RuntimeError("No VIX data available")
            
            vix = float(daily['Close'].iloc[-1])
            
            self._cache['current'] = vix
            self._cache_time = datetime.now()
            
            return vix
            
        except Exception as e:
            raise RuntimeError(f"Failed to fetch VIX data: {e}")
    
    def get_vix_history(self, days: int = 30) -> pd.DataFrame:
        """
        Get historical VIX OHLC data.
        
        Args:
            days: Number of days of history
            
        Returns:
            DataFrame with Open, High, Low, Close columns
        """
        history = self._ticker.history(period=f'{days}d')
        return history[['Open', 'High', 'Low', 'Close']]
    
    def get_last_session(self) -> dict:
        """
        Get the most recent completed trading session's data.
        
        Returns:
            Dict with session_date, open, high, low, close
        """
        history = self._ticker.history(period='5d')
        if history.empty:
            raise RuntimeError("No VIX history available")
        
        last_row = history.iloc[-1]
        session_date = history.index[-1]
        
        return {
            'session_date': session_date.date() if hasattr(session_date, 'date') else session_date,
            'open': float(last_row['Open']),
            'high': float(last_row['High']),
            'low': float(last_row['Low']),
            'close': float(last_row['Close']),
        }
    
    def get_session_reference_vix(self) -> Tuple[date, float]:
        """
        Get the reference VIX for stop-loss calculations.
        
        During market hours: that day's open
        Outside market hours: last trading day's open (for next session planning)
        
        Returns:
            Tuple of (session_date, reference_vix)
        """
        session = self.get_last_session()
        return session['session_date'], session['open']
    
    def check_vix_stop_loss(
        self, 
        reference_vix: float, 
        multiplier: float = config.vix_spike_multiplier
    ) -> dict:
        """
        Check if VIX stop-loss condition is triggered.
        Condition: current_vix >= reference_vix * multiplier
        
        Args:
            reference_vix: The VIX value to compare against (entry or session open)
            multiplier: Stop-loss multiplier (default 1.15 = 15% spike)
            
        Returns:
            Dict with triggered (bool), current_vix, threshold, reason
        """
        current_vix = self.get_current_vix()
        threshold = reference_vix * multiplier
        triggered = current_vix >= threshold
        
        return {
            'triggered': triggered,
            'current_vix': current_vix,
            'reference_vix': reference_vix,
            'threshold': threshold,
            'pct_change': (current_vix / reference_vix - 1) * 100,
            'reason': f"VIX {current_vix:.2f} >= {threshold:.2f}" if triggered else ""
        }


# Test VIX fetcher
vix_fetcher = VixDataFetcher()

try:
    current_vix = vix_fetcher.get_current_vix()
    last_session = vix_fetcher.get_last_session()
    
    print(f"âœ“ VIX Data Retrieved")
    print(f"\n  Last Trading Session: {last_session['session_date']}")
    print(f"    Open:  {last_session['open']:.2f}")
    print(f"    High:  {last_session['high']:.2f}")
    print(f"    Low:   {last_session['low']:.2f}")
    print(f"    Close: {last_session['close']:.2f}")
    
    print(f"\n  Current VIX: {current_vix:.2f}")
    print(f"  Deployment: {config.get_deployment_multiplier(current_vix):.0%}")
    print(f"  Deployable Cash: ${config.get_deployable_cash(current_vix):,.0f}")
    
    # Test stop-loss check using session open as reference
    stop_loss_check = vix_fetcher.check_vix_stop_loss(
        reference_vix=last_session['open'],
        multiplier=config.vix_spike_multiplier
    )
    
    print(f"\n  VIX Stop-Loss Check (vs session open):")
    print(f"    Reference: {stop_loss_check['reference_vix']:.2f}")
    print(f"    Threshold: {stop_loss_check['threshold']:.2f} ({config.vix_spike_multiplier:.0%})")
    print(f"    Current:   {stop_loss_check['current_vix']:.2f} ({stop_loss_check['pct_change']:+.1f}%)")
    print(f"    Triggered: {'ðŸš¨ YES - EXIT ALL' if stop_loss_check['triggered'] else 'âœ“ No'}")
    
    # Show recent history
    vix_history = vix_fetcher.get_vix_history(10)
    print(f"\n  Last 5 sessions:")
    print(f"    {'Date':<12} {'Open':>8} {'Close':>8}")
    print(f"    {'-'*12} {'-'*8} {'-'*8}")
    for dt, row in vix_history.tail(5).iterrows():
        print(f"    {dt.strftime('%Y-%m-%d'):<12} {row['Open']:>8.2f} {row['Close']:>8.2f}")
        
except Exception as e:
    print(f"âš  VIX fetch error: {e}")
    import traceback
    traceback.print_exc()

âœ“ VIX Data Retrieved

  Last Trading Session: 2026-01-30
    Open:  18.72
    High:  19.27
    Low:   16.67
    Close: 17.44

  Current VIX: 17.44
  Deployment: 80%
  Deployable Cash: $40,000

  VIX Stop-Loss Check (vs session open):
    Reference: 18.72
    Threshold: 21.53 (115%)
    Current:   17.44 (-6.8%)
    Triggered: âœ“ No

  Last 5 sessions:
    Date             Open    Close
    ------------ -------- --------
    2026-01-26      16.90    16.15
    2026-01-27      16.02    16.35
    2026-01-28      16.09    16.35
    2026-01-29      16.04    16.88
    2026-01-30      18.72    17.44


## 4. Equity Historical Data Fetcher

In [14]:
class EquityDataFetcher:
    """
    Fetches equity price data from Alpaca.
    Provides historical bars and current prices.
    """
    
    def __init__(self, alpaca_manager: AlpacaClientManager):
        self.client = alpaca_manager.data_client
    
    def get_close_history(
        self, 
        symbols: List[str], 
        days: int = 60
    ) -> Dict[str, pd.Series]:
        """
        Get closing price history for multiple symbols.
        
        Args:
            symbols: List of ticker symbols
            days: Number of trading days of history
            
        Returns:
            Dict mapping symbol -> pd.Series of close prices
        """
        # Calculate date range (add buffer for weekends/holidays)
        end_date = datetime.now()
        start_date = end_date - timedelta(days=int(days * 1.5))  # Buffer for non-trading days
        
        request = StockBarsRequest(
            symbol_or_symbols=symbols,
            timeframe=TimeFrame.Day,
            start=start_date,
            end=end_date
        )
        
        bars = self.client.get_stock_bars(request)
        
        # Process into dict of Series
        result = {}
        for symbol in symbols:
            if symbol in bars.data:
                symbol_bars = bars.data[symbol]
                closes = pd.Series(
                    [bar.close for bar in symbol_bars],
                    index=[bar.timestamp for bar in symbol_bars],
                    name=symbol
                )
                # Take last N days
                result[symbol] = closes.tail(days)
            else:
                print(f"  Warning: No data for {symbol}")
        
        return result
    
    def get_current_price(self, symbol: str) -> float:
        """
        Get the most recent price for a symbol.
        
        Args:
            symbol: Ticker symbol
            
        Returns:
            Latest close price
        """
        history = self.get_close_history([symbol], days=5)
        if symbol in history and len(history[symbol]) > 0:
            return float(history[symbol].iloc[-1])
        raise ValueError(f"No price data for {symbol}")
    
    def get_current_prices(self, symbols: List[str]) -> Dict[str, float]:
        """
        Get current prices for multiple symbols efficiently.
        
        Args:
            symbols: List of ticker symbols
            
        Returns:
            Dict mapping symbol -> current price
        """
        history = self.get_close_history(symbols, days=5)
        return {
            symbol: float(prices.iloc[-1]) 
            for symbol, prices in history.items() 
            if len(prices) > 0
        }


# Test equity data fetcher
if alpaca:
    equity_fetcher = EquityDataFetcher(alpaca)
    
    # Test with subset of universe
    test_symbols = config.ticker_universe[:3]  # First 3 symbols
    
    try:
        close_history = equity_fetcher.get_close_history(test_symbols, days=config.history_days)
        
        print(f"âœ“ Equity Data Retrieved for {len(close_history)} symbols")
        for symbol, prices in close_history.items():
            print(f"\n  {symbol}:")
            print(f"    Data points: {len(prices)}")
            print(f"    Date range: {prices.index[0].strftime('%Y-%m-%d')} to {prices.index[-1].strftime('%Y-%m-%d')}")
            print(f"    Current price: ${prices.iloc[-1]:.2f}")
            print(f"    50-day range: ${prices.min():.2f} - ${prices.max():.2f}")
            
    except Exception as e:
        print(f"âš  Equity fetch error: {e}")
else:
    print("âš  Skipping equity test - Alpaca not configured")

âœ“ Equity Data Retrieved for 3 symbols

  AAPL:
    Data points: 60
    Date range: 2025-11-04 to 2026-01-30
    Current price: $259.48
    50-day range: $246.70 - $286.19

  MSFT:
    Data points: 60
    Date range: 2025-11-04 to 2026-01-30
    Current price: $430.29
    50-day range: $430.29 - $514.33

  GOOG:
    Data points: 60
    Date range: 2025-11-04 to 2026-01-30
    Current price: $338.53
    50-day range: $276.98 - $338.66


## 5. Options Chain Fetcher

This is the most complex data component. Alpaca's options API requires specific handling.

In [15]:
import numpy as np
from py_vollib.black_scholes.implied_volatility import implied_volatility
from py_vollib.black_scholes.greeks.analytical import delta as bs_delta

# Risk-free rate (can pull from config or treasury API later)
RISK_FREE_RATE = 0.04


class GreeksCalculator:
    """
    Calculates IV and Delta using Black-Scholes via py_vollib.
    Falls back gracefully when calculation fails.
    """
    
    def __init__(self, risk_free_rate: float = RISK_FREE_RATE):
        self.r = risk_free_rate
    
    def compute_iv(
        self,
        option_price: float,
        stock_price: float,
        strike: float,
        dte: int,
        option_type: str = 'put'
    ) -> Optional[float]:
        """
        Compute implied volatility from option price.
        
        Args:
            option_price: Mid price of the option
            stock_price: Current underlying price
            strike: Strike price
            dte: Days to expiration
            option_type: 'put' or 'call'
            
        Returns:
            IV as decimal (e.g., 0.25 for 25%) or None if calculation fails
        """
        t = dte / 365.0
        flag = 'p' if option_type == 'put' else 'c'
        
        # Validate inputs
        if not all([
            np.isfinite(option_price),
            np.isfinite(stock_price),
            np.isfinite(strike),
            t > 0,
            option_price > 0,
            stock_price > 0,
            strike > 0
        ]):
            return None
        
        try:
            iv = implied_volatility(option_price, stock_price, strike, t, self.r, flag)
            return iv if np.isfinite(iv) and iv > 0 else None
        except Exception:
            return None
    
    def compute_delta(
        self,
        stock_price: float,
        strike: float,
        dte: int,
        iv: float,
        option_type: str = 'put'
    ) -> Optional[float]:
        """
        Compute delta from IV.
        
        Args:
            stock_price: Current underlying price
            strike: Strike price
            dte: Days to expiration
            iv: Implied volatility as decimal
            option_type: 'put' or 'call'
            
        Returns:
            Delta (negative for puts) or None if calculation fails
        """
        if iv is None or not np.isfinite(iv) or iv <= 0:
            return None
        
        t = dte / 365.0
        flag = 'p' if option_type == 'put' else 'c'
        
        if t <= 0:
            return None
        
        try:
            d = bs_delta(flag, stock_price, strike, t, self.r, iv)
            return d if np.isfinite(d) else None
        except Exception:
            return None
    
    def compute_greeks(
        self,
        option_price: float,
        stock_price: float,
        strike: float,
        dte: int,
        option_type: str = 'put'
    ) -> dict:
        """
        Compute both IV and delta in one call.
        
        Returns:
            Dict with 'iv' and 'delta' keys (values may be None)
        """
        iv = self.compute_iv(option_price, stock_price, strike, dte, option_type)
        delta = self.compute_delta(stock_price, strike, dte, iv, option_type) if iv else None
        
        return {
            'iv': iv,
            'delta': delta,
            'delta_abs': abs(delta) if delta else None
        }


# Test the calculator
greeks_calc = GreeksCalculator()

# Test with one of the contracts that had Greeks from Alpaca
# AAPL260206P00225000: strike=225, dte=5, mid=0.15, alpaca_delta=-0.0214, alpaca_iv=0.6077
test_result = greeks_calc.compute_greeks(
    option_price=0.15,
    stock_price=259.48,  # approximate from your data
    strike=225.0,
    dte=5,
    option_type='put'
)

print("Greeks Calculator Test:")
print(f"  Calculated IV: {test_result['iv']:.4f}" if test_result['iv'] else "  IV: Failed")
print(f"  Calculated Delta: {test_result['delta']:.4f}" if test_result['delta'] else "  Delta: Failed")
print(f"\nAlpaca provided:")
print(f"  Alpaca IV: 0.6077")
print(f"  Alpaca Delta: -0.0214")


Greeks Calculator Test:
  Calculated IV: 0.6136
  Calculated Delta: -0.0212

Alpaca provided:
  Alpaca IV: 0.6077
  Alpaca Delta: -0.0214


In [16]:
from alpaca.trading.requests import GetOptionContractsRequest
from alpaca.trading.enums import ContractType, AssetStatus
from alpaca.data.historical.option import OptionHistoricalDataClient
from alpaca.data.requests import OptionLatestQuoteRequest, OptionSnapshotRequest

# daily return meaning secured_amount = 100*underlying_share_price
# premium/DTE -- daily cash / cash_outlayed = daily_return_as_ptct
# daily_return = (premium/DTE) 

@dataclass
class OptionContract:
    """
    Represents a single option contract with relevant data.
    """
    symbol: str
    underlying: str
    contract_type: str
    strike: float
    expiration: date
    dte: int
    bid: float
    ask: float
    mid: float
    stock_price: float  # ADD THIS - needed for return calculations
    delta: Optional[float] = None
    gamma: Optional[float] = None
    theta: Optional[float] = None
    vega: Optional[float] = None
    implied_volatility: Optional[float] = None
    open_interest: Optional[int] = None
    volume: Optional[int] = None
    
    @property
    def premium(self) -> float:
        """Premium received when selling (use bid price)."""
        return self.bid
    
    @property
    def premium_per_day(self) -> float:
        """Daily premium decay if held to expiration."""
        if self.dte <= 0:
            return 0.0
        return self.premium / self.dte
    
    @property
    def collateral_required(self) -> float:
        """Cash required to secure 1 contract."""
        return self.strike * 100
    
    @property
    def cost_basis(self) -> float:
        """Cost basis = stock price * 100 (exposure value)."""
        return self.stock_price * 100
    
    @property
    def daily_return_on_collateral(self) -> float:
        """Daily yield as % of collateral (strike-based)."""
        if self.strike <= 0 or self.dte <= 0:
            return 0.0
        return self.premium_per_day / self.strike
    
    @property
    def daily_return_on_cost_basis(self) -> float:
        """Daily yield as % of cost basis (stock price-based)."""
        if self.stock_price <= 0 or self.dte <= 0:
            return 0.0
        return self.premium_per_day / self.stock_price
    
    @property
    def delta_abs(self) -> Optional[float]:
        """Absolute value of delta for filtering."""
        return abs(self.delta) if self.delta else None

class OptionsDataFetcher:
    """
    Fetches options chain data from Alpaca.
    Handles contract discovery and quote retrieval.
    """
    
    def __init__(self, alpaca_manager: AlpacaClientManager):
        self.trading_client = alpaca_manager.trading_client
        self.data_client = OptionHistoricalDataClient(
            api_key=alpaca_manager.api_key,
            secret_key=alpaca_manager.secret_key
        )
    
    def get_option_contracts(
        self,
        underlying: str,
        contract_type: str = 'put',
        min_dte: int = config.min_dte,
        max_dte: int = config.max_dte,
        min_strike: Optional[float] = None,
        max_strike: Optional[float] = None
    ) -> List[dict]:
        """
        Get available option contracts for an underlying.
        
        Args:
            underlying: Ticker symbol
            contract_type: 'put' or 'call'
            min_dte: Minimum days to expiration
            max_dte: Maximum days to expiration
            min_strike: Minimum strike price
            max_strike: Maximum strike price
            
        Returns:
            List of contract dictionaries
        """
        # Calculate expiration date range
        today = date.today()
        min_expiry = today + timedelta(days=min_dte)
        max_expiry = today + timedelta(days=max_dte)
        
        request_params = {
            'underlying_symbols': [underlying],
            'status': AssetStatus.ACTIVE,
            'type': ContractType.PUT if contract_type == 'put' else ContractType.CALL,
            'expiration_date_gte': min_expiry,
            'expiration_date_lte': max_expiry,
        }
        
        if min_strike is not None:
            request_params['strike_price_gte'] = str(min_strike)
        if max_strike is not None:
            request_params['strike_price_lte'] = str(max_strike)
        
        request = GetOptionContractsRequest(**request_params)
        
        response = self.trading_client.get_option_contracts(request)
        
        # Convert to list of dicts
        contracts = []
        if response.option_contracts:
            for contract in response.option_contracts:
                contracts.append({
                    'symbol': contract.symbol,
                    'underlying': contract.underlying_symbol,
                    'strike': float(contract.strike_price),
                    'expiration': contract.expiration_date,
                    'contract_type': contract_type,
                })
        
        return contracts
    
    def get_option_quotes(
        self, 
        option_symbols: List[str]
    ) -> Dict[str, dict]:
        """
        Get current quotes for option contracts.
        
        Args:
            option_symbols: List of OCC option symbols
            
        Returns:
            Dict mapping symbol -> quote data
        """
        if not option_symbols:
            return {}
        
        # Alpaca has limits on batch size, chunk if needed
        chunk_size = 100
        all_quotes = {}
        
        for i in range(0, len(option_symbols), chunk_size):
            chunk = option_symbols[i:i + chunk_size]
            
            try:
                request = OptionLatestQuoteRequest(symbol_or_symbols=chunk)
                quotes = self.data_client.get_option_latest_quote(request)
                
                for symbol, quote in quotes.items():
                    all_quotes[symbol] = {
                        'bid': float(quote.bid_price) if quote.bid_price else 0.0,
                        'ask': float(quote.ask_price) if quote.ask_price else 0.0,
                        'bid_size': quote.bid_size,
                        'ask_size': quote.ask_size,
                    }
            except Exception as e:
                print(f"  Warning: Quote fetch error for chunk: {e}")
        
        return all_quotes
    
    def get_option_snapshots(
        self,
        option_symbols: List[str]
    ) -> Dict[str, dict]:
        """
        Get snapshots including Greeks for option contracts.
        
        Args:
            option_symbols: List of OCC option symbols
            
        Returns:
            Dict mapping symbol -> snapshot data with Greeks
        """
        if not option_symbols:
            return {}
        
        chunk_size = 100
        all_snapshots = {}
        
        for i in range(0, len(option_symbols), chunk_size):
            chunk = option_symbols[i:i + chunk_size]
            
            try:
                request = OptionSnapshotRequest(symbol_or_symbols=chunk)
                snapshots = self.data_client.get_option_snapshot(request)
                
                for symbol, snapshot in snapshots.items():
                    greeks = snapshot.greeks if snapshot.greeks else None
                    quote = snapshot.latest_quote if snapshot.latest_quote else None
                    
                    all_snapshots[symbol] = {
                        'bid': float(quote.bid_price) if quote and quote.bid_price else 0.0,
                        'ask': float(quote.ask_price) if quote and quote.ask_price else 0.0,
                        'delta': float(greeks.delta) if greeks and greeks.delta else None,
                        'gamma': float(greeks.gamma) if greeks and greeks.gamma else None,
                        'theta': float(greeks.theta) if greeks and greeks.theta else None,
                        'vega': float(greeks.vega) if greeks and greeks.vega else None,
                        'implied_volatility': float(snapshot.implied_volatility) if snapshot.implied_volatility else None,
                    }
            except Exception as e:
                print(f"  Warning: Snapshot fetch error for chunk: {e}")
        
        return all_snapshots
    
    def get_puts_chain(
        self,
        underlying: str,
        stock_price: float,
        config: StrategyConfig
    ) -> List[OptionContract]:
        """
        Get filtered put options chain with full data.
        """
        # Get contracts within strike range
        max_strike = stock_price * config.max_strike_pct
        min_strike = stock_price * 0.70

        contracts = self.get_option_contracts(
            underlying=underlying,
            contract_type='put',
            min_dte=config.min_dte,
            max_dte=config.max_dte,
            min_strike=min_strike,
            max_strike=max_strike
        )

        if not contracts:
            return []

        # Get snapshots with Greeks
        symbols = [c['symbol'] for c in contracts]
        snapshots = self.get_option_snapshots(symbols)

        # Build OptionContract objects
        today = date.today()
        result = []

        for contract in contracts:
            symbol = contract['symbol']
            snapshot = snapshots.get(symbol, {})

            bid = snapshot.get('bid', 0.0)
            ask = snapshot.get('ask', 0.0)

            # Skip if no bid (can't sell)
            if bid <= 0:
                continue

            dte = (contract['expiration'] - today).days

            option = OptionContract(
                symbol=symbol,
                underlying=underlying,
                contract_type='put',
                strike=contract['strike'],
                expiration=contract['expiration'],
                dte=dte,
                bid=bid,
                ask=ask,
                mid=(bid + ask) / 2,
                stock_price=stock_price,  
                delta=snapshot.get('delta'),
                gamma=snapshot.get('gamma'),
                theta=snapshot.get('theta'),
                vega=snapshot.get('vega'),
                implied_volatility=snapshot.get('implied_volatility'),
            )

            result.append(option)

        return result



# Test options data fetcher
if alpaca:
    options_fetcher = OptionsDataFetcher(alpaca)
    
    test_symbol = 'AAPL'
    
    try:
        # Get current price
        current_price = equity_fetcher.get_current_price(test_symbol)
        print(f"Testing options chain for {test_symbol} @ ${current_price:.2f}")
        
        # Get puts chain
        puts = options_fetcher.get_puts_chain(test_symbol, current_price, config)
        
        print(f"\nâœ“ Retrieved {len(puts)} put contracts")
        
        if puts:
            print(f"\nSample contracts (first 5):")
            for put in puts:
                delta_str = f"{abs(put.delta):.2f}" if put.delta else "N/A"
                print(f"  {put.symbol}")
                print(f"    Strike: ${put.strike:.2f} | DTE: {put.dte}")
                print(f"    Bid/Ask: ${put.bid:.2f}/${put.ask:.2f}")
                print(f"    Delta: {delta_str}")
                print(f"    Daily return: {put.daily_return_on_collateral:.4%}")
                print()
    except Exception as e:
        print(f"âš  Options fetch error: {e}")
        import traceback
        traceback.print_exc()
else:
    print("âš  Skipping options test - Alpaca not configured")

Testing options chain for AAPL @ $259.48

âœ“ Retrieved 83 put contracts

Sample contracts (first 5):
  AAPL260202P00220000
    Strike: $220.00 | DTE: 1
    Bid/Ask: $0.03/$0.04
    Delta: N/A
    Daily return: 0.0136%

  AAPL260202P00225000
    Strike: $225.00 | DTE: 1
    Bid/Ask: $0.03/$0.05
    Delta: N/A
    Daily return: 0.0133%

  AAPL260202P00230000
    Strike: $230.00 | DTE: 1
    Bid/Ask: $0.05/$0.08
    Delta: N/A
    Daily return: 0.0217%

  AAPL260202P00232500
    Strike: $232.50 | DTE: 1
    Bid/Ask: $0.03/$0.24
    Delta: N/A
    Daily return: 0.0129%

  AAPL260202P00235000
    Strike: $235.00 | DTE: 1
    Bid/Ask: $0.08/$0.17
    Delta: 0.02
    Daily return: 0.0340%

  AAPL260202P00237500
    Strike: $237.50 | DTE: 1
    Bid/Ask: $0.10/$0.12
    Delta: 0.02
    Daily return: 0.0421%

  AAPL260202P00240000
    Strike: $240.00 | DTE: 1
    Bid/Ask: $0.12/$0.18
    Delta: 0.03
    Daily return: 0.0500%

  AAPL260202P00242500
    Strike: $242.50 | DTE: 1
    Bid/Ask: $0.13

## 6. Unified Data Manager

Combines all data fetchers into a single interface for the strategy.

In [17]:
@dataclass
class MarketSnapshot:
    """
    Complete market state at a point in time.
    Used by strategy logic to make decisions.
    """
    timestamp: datetime
    vix_current: float
    vix_open: float
    deployable_cash: float
    equity_prices: Dict[str, float]  # symbol -> current price
    equity_history: Dict[str, pd.Series]  # symbol -> price history


class DataManager:
    """
    Unified data manager that combines all data sources.
    Provides a clean interface for strategy logic.
    """
    
    def __init__(
        self, 
        alpaca_manager: Optional[AlpacaClientManager],
        config: StrategyConfig
    ):
        self.config = config
        self.vix_fetcher = VixDataFetcher()
        
        if alpaca_manager:
            self.equity_fetcher = EquityDataFetcher(alpaca_manager)
            self.options_fetcher = OptionsDataFetcher(alpaca_manager)
        else:
            self.equity_fetcher = None
            self.options_fetcher = None
    
    def get_market_snapshot(self) -> MarketSnapshot:
        """
        Get complete market state for strategy decision-making.
        
        Returns:
            MarketSnapshot with all current market data
        """
        # VIX data
        vix_current = self.vix_fetcher.get_current_vix()
        # vix_open = self.vix_fetcher.get_vix_open_today() or vix_current
        _, vix_open = self.vix_fetcher.get_session_reference_vix()
        
        # Deployable cash based on VIX
        deployable_cash = self.config.get_deployable_cash(vix_current)
        
        # Equity data
        if self.equity_fetcher:
            equity_history = self.equity_fetcher.get_close_history(
                self.config.ticker_universe,
                days=self.config.history_days
            )
            equity_prices = {
                symbol: float(prices.iloc[-1])
                for symbol, prices in equity_history.items()
            }
        else:
            equity_history = {}
            equity_prices = {}
        
        return MarketSnapshot(
            timestamp=datetime.now(),
            vix_current=vix_current,
            vix_open=vix_open,
            deployable_cash=deployable_cash,
            equity_prices=equity_prices,
            equity_history=equity_history
        )
    
    def get_puts_for_symbol(
        self, 
        symbol: str, 
        stock_price: float
    ) -> List[OptionContract]:
        """
        Get filtered put options for a symbol.
        
        Args:
            symbol: Ticker symbol
            stock_price: Current stock price
            
        Returns:
            List of OptionContract objects
        """
        if not self.options_fetcher:
            raise RuntimeError("Options fetcher not configured")
        
        return self.options_fetcher.get_puts_chain(
            symbol, 
            stock_price, 
            self.config
        )
    
    def refresh_option_data(self, option_symbol: str) -> Optional[OptionContract]:
        """
        Refresh data for a single option (for position monitoring).
        
        Args:
            option_symbol: OCC option symbol
            
        Returns:
            Updated OptionContract or None if fetch fails
        """
        if not self.options_fetcher:
            return None
        
        snapshots = self.options_fetcher.get_option_snapshots([option_symbol])
        
        if option_symbol not in snapshots:
            return None
        
        snapshot = snapshots[option_symbol]
        
        # Parse symbol to extract details (OCC format)
        # Format: AAPL240119P00150000
        # We'd need to parse this - for now return partial data
        return snapshot  # Return raw snapshot for now


# Test unified data manager
data_manager = DataManager(alpaca, config)

try:
    print("Fetching market snapshot...")
    snapshot = data_manager.get_market_snapshot()
    
    print(f"\nâœ“ Market Snapshot @ {snapshot.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"  VIX: {snapshot.vix_current:.2f} (Open: {snapshot.vix_open:.2f})")
    print(f"  Deployable Cash: ${snapshot.deployable_cash:,.0f}")
    print(f"  Equities tracked: {len(snapshot.equity_prices)}")
    
    if snapshot.equity_prices:
        print(f"\n  Sample prices:")
        for symbol, price in list(snapshot.equity_prices.items())[:5]:
            print(f"    {symbol}: ${price:.2f}")
            
except Exception as e:
    print(f"âš  Snapshot error: {e}")
    import traceback
    traceback.print_exc()

Fetching market snapshot...

âœ“ Market Snapshot @ 2026-02-01 17:50:48
  VIX: 17.44 (Open: 18.72)
  Deployable Cash: $40,000
  Equities tracked: 3

  Sample prices:
    AAPL: $259.48
    MSFT: $430.29
    GOOG: $338.53


## 7. Data Validation & Summary

Verify all data components are working correctly.

In [18]:
def run_data_layer_diagnostics(data_manager: DataManager) -> dict:
    """
    Run comprehensive diagnostics on the data layer.
    
    Returns:
        Dict with diagnostic results
    """
    results = {
        'vix': {'status': 'unknown', 'details': {}},
        'equity': {'status': 'unknown', 'details': {}},
        'options': {'status': 'unknown', 'details': {}},
    }
    
    # Test VIX
    print("Testing VIX data...")
    try:
        vix = data_manager.vix_fetcher.get_current_vix()
        vix_history = data_manager.vix_fetcher.get_vix_history(10)
        results['vix'] = {
            'status': 'ok',
            'details': {
                'current': vix,
                'history_points': len(vix_history),
                'history_range': f"{vix_history['Close'].min():.2f} - {vix_history['Close'].max():.2f}"            }
        }
        print(f"  âœ“ VIX OK: {vix:.2f}")
    except Exception as e:
        results['vix'] = {'status': 'error', 'details': {'error': str(e)}}
        print(f"  âœ— VIX Error: {e}")
    
    # Test Equity
    print("\nTesting equity data...")
    if data_manager.equity_fetcher:
        try:
            test_symbols = data_manager.config.ticker_universe[:3]
            history = data_manager.equity_fetcher.get_close_history(test_symbols, days=60)
            results['equity'] = {
                'status': 'ok',
                'details': {
                    'symbols_tested': len(test_symbols),
                    'symbols_returned': len(history),
                    'avg_data_points': np.mean([len(v) for v in history.values()])
                }
            }
            print(f"  âœ“ Equity OK: {len(history)}/{len(test_symbols)} symbols")
        except Exception as e:
            results['equity'] = {'status': 'error', 'details': {'error': str(e)}}
            print(f"  âœ— Equity Error: {e}")
    else:
        results['equity'] = {'status': 'not_configured', 'details': {}}
        print("  âš  Equity: Not configured (no Alpaca credentials)")
    
    # Test Options
    print("\nTesting options data...")
    if data_manager.options_fetcher and results['equity']['status'] == 'ok':
        try:
            test_symbol = data_manager.config.ticker_universe[0]
            test_price = data_manager.equity_fetcher.get_current_price(test_symbol)
            puts = data_manager.get_puts_for_symbol(test_symbol, test_price)
            
            # Count puts with Greeks
            puts_with_delta = sum(1 for p in puts if p.delta is not None)
            
            results['options'] = {
                'status': 'ok',
                'details': {
                    'test_symbol': test_symbol,
                    'contracts_found': len(puts),
                    'contracts_with_greeks': puts_with_delta,
                }
            }
            print(f"  âœ“ Options OK: {len(puts)} contracts for {test_symbol}")
            print(f"    {puts_with_delta}/{len(puts)} have Greeks")
        except Exception as e:
            results['options'] = {'status': 'error', 'details': {'error': str(e)}}
            print(f"  âœ— Options Error: {e}")
    else:
        results['options'] = {'status': 'not_configured', 'details': {}}
        print("  âš  Options: Not configured")
    
    # Summary
    print("\n" + "="*50)
    print("PHASE 1 DATA LAYER STATUS")
    print("="*50)
    
    all_ok = all(r['status'] == 'ok' for r in results.values())
    
    for component, result in results.items():
        status_icon = {
            'ok': 'âœ“',
            'error': 'âœ—',
            'not_configured': 'âš ',
            'unknown': '?'
        }.get(result['status'], '?')
        print(f"  {status_icon} {component.upper()}: {result['status']}")
    
    if all_ok:
        print("\nðŸŽ‰ All data components working! Ready for Phase 2.")
    else:
        print("\nâš  Some components need attention before proceeding.")
    
    return results


# Run diagnostics
diagnostics = run_data_layer_diagnostics(data_manager)

Testing VIX data...
  âœ“ VIX OK: 17.44

Testing equity data...
  âœ“ Equity OK: 3/3 symbols

Testing options data...
  âœ“ Options OK: 83 contracts for AAPL
    73/83 have Greeks

PHASE 1 DATA LAYER STATUS
  âœ“ VIX: ok
  âœ“ EQUITY: ok
  âœ“ OPTIONS: ok

ðŸŽ‰ All data components working! Ready for Phase 2.


## Next Steps: Phase 2 Preview

With the data layer complete, Phase 2 will implement:

1. **Technical Indicators** (SMA, RSI, Bollinger Bands)
2. **Equity Filter** (`equity_passes_filter()`)
3. **Options Filter** (`filter_and_rank_options()`)

The data structures and fetchers built here will feed directly into those components.

In [19]:
# Preview: Save config and data manager for Phase 2
# These will be imported in the next notebook

print("Phase 1 Complete!")
print("\nObjects ready for Phase 2:")
print("  - config (StrategyConfig)")
print("  - data_manager (DataManager)")
print("  - OptionContract (dataclass)")
print("  - MarketSnapshot (dataclass)")

Phase 1 Complete!

Objects ready for Phase 2:
  - config (StrategyConfig)
  - data_manager (DataManager)
  - OptionContract (dataclass)
  - MarketSnapshot (dataclass)


# CSP Strategy - Phase 2: Filters & Indicators

Building on Phase 1's data layer, this notebook implements:

1. **Greeks Calculator** - IV and Delta via Black-Scholes (py_vollib)
2. **Technical Indicators** - SMA, RSI, Bollinger Bands
3. **Equity Filter** - Technical filter for stock selection
4. **Options Filter** - Premium, delta, strike filters + ranking

**Prerequisites:** Run Phase 1 notebook first to initialize `config`, `alpaca`, `equity_fetcher`, `options_fetcher`

## 1. Greeks Calculator

Alpaca doesn't always provide Greeks (especially for illiquid/short-DTE options). We calculate them ourselves using Black-Scholes via `py_vollib`.

In [20]:
# Install py_vollib if needed
# !pip install py_vollib

In [21]:
import numpy as np
from typing import Optional, Dict
from py_vollib.black_scholes.implied_volatility import implied_volatility
from py_vollib.black_scholes.greeks.analytical import delta as bs_delta, gamma as bs_gamma, theta as bs_theta, vega as bs_vega


class GreeksCalculator:
    """
    Calculates IV and Greeks using Black-Scholes via py_vollib.
    Used to fill in missing Greeks from Alpaca data.
    """
    
    def __init__(self, risk_free_rate: float = 0.04):
        """
        Args:
            risk_free_rate: Annual risk-free rate as decimal (default 4%)
        """
        self.r = risk_free_rate
    
    def compute_iv(
        self,
        option_price: float,
        stock_price: float,
        strike: float,
        dte: int,
        option_type: str = 'put'
    ) -> Optional[float]:
        """
        Compute implied volatility from option price.
        
        Args:
            option_price: Mid price of the option
            stock_price: Current underlying price
            strike: Strike price
            dte: Days to expiration
            option_type: 'put' or 'call'
            
        Returns:
            IV as decimal (e.g., 0.25 for 25%) or None if calculation fails
        """
        t = dte / 365.0
        flag = 'p' if option_type == 'put' else 'c'
        
        # Validate inputs
        if not all([
            np.isfinite(option_price),
            np.isfinite(stock_price),
            np.isfinite(strike),
            t > 0,
            option_price > 0,
            stock_price > 0,
            strike > 0
        ]):
            return None
        
        try:
            iv = implied_volatility(option_price, stock_price, strike, t, self.r, flag)
            return iv if np.isfinite(iv) and iv > 0 else None
        except Exception:
            return None
    
    def compute_delta(
        self,
        stock_price: float,
        strike: float,
        dte: int,
        iv: float,
        option_type: str = 'put'
    ) -> Optional[float]:
        """
        Compute delta from IV.
        
        Returns:
            Delta (negative for puts) or None if calculation fails
        """
        if iv is None or not np.isfinite(iv) or iv <= 0:
            return None
        
        t = dte / 365.0
        flag = 'p' if option_type == 'put' else 'c'
        
        if t <= 0:
            return None
        
        try:
            d = bs_delta(flag, stock_price, strike, t, self.r, iv)
            return d if np.isfinite(d) else None
        except Exception:
            return None
    
    def compute_all_greeks(
        self,
        stock_price: float,
        strike: float,
        dte: int,
        iv: float,
        option_type: str = 'put'
    ) -> Dict[str, Optional[float]]:
        """
        Compute all Greeks from IV.
        
        Returns:
            Dict with delta, gamma, theta, vega (values may be None)
        """
        result = {'delta': None, 'gamma': None, 'theta': None, 'vega': None}
        
        if iv is None or not np.isfinite(iv) or iv <= 0:
            return result
        
        t = dte / 365.0
        flag = 'p' if option_type == 'put' else 'c'
        
        if t <= 0:
            return result
        
        try:
            result['delta'] = bs_delta(flag, stock_price, strike, t, self.r, iv)
            result['gamma'] = bs_gamma(flag, stock_price, strike, t, self.r, iv)
            result['theta'] = bs_theta(flag, stock_price, strike, t, self.r, iv)
            result['vega'] = bs_vega(flag, stock_price, strike, t, self.r, iv)
        except Exception:
            pass
        
        return result
    
    def compute_greeks_from_price(
        self,
        option_price: float,
        stock_price: float,
        strike: float,
        dte: int,
        option_type: str = 'put'
    ) -> Dict[str, Optional[float]]:
        """
        Compute IV and all Greeks from option price in one call.
        
        Returns:
            Dict with 'iv', 'delta', 'gamma', 'theta', 'vega'
        """
        iv = self.compute_iv(option_price, stock_price, strike, dte, option_type)
        
        result = {'iv': iv, 'delta': None, 'gamma': None, 'theta': None, 'vega': None}
        
        if iv:
            greeks = self.compute_all_greeks(stock_price, strike, dte, iv, option_type)
            result.update(greeks)
        
        return result


# Initialize calculator
greeks_calc = GreeksCalculator(risk_free_rate=0.04)

print("Greeks Calculator initialized")

Greeks Calculator initialized


In [22]:
# Test Greeks Calculator against Alpaca-provided values
# Using AAPL260206P00225000 from Phase 1 output:
# Alpaca: delta=-0.0214, iv=0.6077, mid=0.15, strike=225, dte=5, stock=259.48

test_cases = [
    # (mid, stock, strike, dte, alpaca_delta, alpaca_iv)
    (0.15, 259.48, 225.0, 5, -0.0214, 0.6077),
    (0.31, 259.48, 250.0, 1, -0.0939, 0.5235),
    (0.245, 259.48, 232.5, 8, -0.0372, 0.42),
]

print("Greeks Calculator Validation:")
print("=" * 70)

for mid, stock, strike, dte, alpaca_delta, alpaca_iv in test_cases:
    result = greeks_calc.compute_greeks_from_price(mid, stock, strike, dte, 'put')
    
    print(f"\nStrike: ${strike:.0f} | DTE: {dte} | Mid: ${mid:.2f}")
    print(f"  {'Metric':<8} {'Alpaca':>10} {'Calculated':>12} {'Diff':>10}")
    print(f"  {'-'*8} {'-'*10} {'-'*12} {'-'*10}")
    
    if result['iv']:
        iv_diff = (result['iv'] - alpaca_iv) / alpaca_iv * 100
        print(f"  {'IV':<8} {alpaca_iv:>10.4f} {result['iv']:>12.4f} {iv_diff:>+9.1f}%")
    else:
        print(f"  {'IV':<8} {alpaca_iv:>10.4f} {'Failed':>12}")
    
    if result['delta']:
        delta_diff = (result['delta'] - alpaca_delta) / alpaca_delta * 100
        print(f"  {'Delta':<8} {alpaca_delta:>10.4f} {result['delta']:>12.4f} {delta_diff:>+9.1f}%")
    else:
        print(f"  {'Delta':<8} {alpaca_delta:>10.4f} {'Failed':>12}")

Greeks Calculator Validation:

Strike: $225 | DTE: 5 | Mid: $0.15
  Metric       Alpaca   Calculated       Diff
  -------- ---------- ------------ ----------
  IV           0.6077       0.6136      +1.0%
  Delta       -0.0214      -0.0212      -0.8%

Strike: $250 | DTE: 1 | Mid: $0.31
  Metric       Alpaca   Calculated       Diff
  -------- ---------- ------------ ----------
  IV           0.5235       0.5384      +2.8%
  Delta       -0.0939      -0.0903      -3.8%

Strike: $232 | DTE: 8 | Mid: $0.24
  Metric       Alpaca   Calculated       Diff
  -------- ---------- ------------ ----------
  IV           0.4200       0.4252      +1.2%
  Delta       -0.0372      -0.0368      -1.1%


## 2. Technical Indicators

Implementing SMA, RSI, and Bollinger Bands for equity filtering.

In [23]:
import pandas as pd


class TechnicalIndicators:
    """
    Technical indicator calculations for equity filtering.
    All methods are static and work with pandas Series.
    """
    
    @staticmethod
    def sma(prices: pd.Series, period: int) -> pd.Series:
        """
        Simple Moving Average.
        
        Args:
            prices: Series of prices
            period: Lookback period
            
        Returns:
            Series of SMA values
        """
        return prices.rolling(window=period).mean()
    
    @staticmethod
    def rsi(prices: pd.Series, period: int = 14) -> pd.Series:
        """
        Relative Strength Index.
        
        Args:
            prices: Series of prices
            period: Lookback period (default 14)
            
        Returns:
            Series of RSI values (0-100)
        """
        delta = prices.diff()
        gain = delta.where(delta > 0, 0).rolling(window=period).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
        
        # Avoid division by zero
        rs = gain / loss.replace(0, np.nan)
        rsi = 100 - (100 / (1 + rs))
        
        return rsi
    
    @staticmethod
    def bollinger_bands(
        prices: pd.Series, 
        period: int = 20, 
        num_std: float = 2.0
    ) -> tuple:
        """
        Bollinger Bands.
        
        Args:
            prices: Series of prices
            period: Lookback period for SMA (default 20)
            num_std: Number of standard deviations (default 2.0)
            
        Returns:
            Tuple of (lower_band, middle_band, upper_band) as Series
        """
        middle = prices.rolling(window=period).mean()
        std = prices.rolling(window=period).std()
        
        upper = middle + (num_std * std)
        lower = middle - (num_std * std)
        
        return lower, middle, upper
    
    @staticmethod
    def sma_trend(
        prices: pd.Series, 
        sma_period: int, 
        lookback_days: int = 3
    ) -> bool:
        """
        Check if SMA is trending upward over lookback period.
        
        Args:
            prices: Series of prices
            sma_period: Period for SMA calculation
            lookback_days: Number of days to check trend
            
        Returns:
            True if SMA has been rising for all lookback_days
        """
        sma = TechnicalIndicators.sma(prices, sma_period)
        
        if len(sma) < lookback_days + 1:
            return False
        
        # Check if each day's SMA > previous day's SMA
        for i in range(1, lookback_days + 1):
            if sma.iloc[-i] <= sma.iloc[-(i+1)]:
                return False
        
        return True


# Create instance for convenience
indicators = TechnicalIndicators()

print("Technical Indicators module loaded")

Technical Indicators module loaded


In [24]:
# Test Technical Indicators with sample data
# Generate sample price data
np.random.seed(42)
dates = pd.date_range(end=pd.Timestamp.today(), periods=60, freq='D')
sample_prices = pd.Series(
    100 + np.cumsum(np.random.randn(60) * 2),  # Random walk with drift
    index=dates,
    name='SAMPLE'
)

print("Technical Indicators Test:")
print("=" * 50)

# Calculate indicators
sma_8 = indicators.sma(sample_prices, 8)
sma_20 = indicators.sma(sample_prices, 20)
sma_50 = indicators.sma(sample_prices, 50)
rsi_14 = indicators.rsi(sample_prices, 14)
bb_lower, bb_middle, bb_upper = indicators.bollinger_bands(sample_prices, 50, 1.0)

current_price = sample_prices.iloc[-1]

print(f"\nCurrent Price: ${current_price:.2f}")
print(f"\nSMAs:")
print(f"  SMA(8):  ${sma_8.iloc[-1]:.2f}")
print(f"  SMA(20): ${sma_20.iloc[-1]:.2f}")
print(f"  SMA(50): ${sma_50.iloc[-1]:.2f}")
print(f"\nRSI(14): {rsi_14.iloc[-1]:.2f}")
print(f"\nBollinger Bands (50, 1std):")
print(f"  Lower:  ${bb_lower.iloc[-1]:.2f}")
print(f"  Middle: ${bb_middle.iloc[-1]:.2f}")
print(f"  Upper:  ${bb_upper.iloc[-1]:.2f}")

# Test trend detection
sma_50_trending = indicators.sma_trend(sample_prices, 50, 3)
print(f"\nSMA(50) Uptrend (3 days): {sma_50_trending}")

Technical Indicators Test:

Current Price: $81.44

SMAs:
  SMA(8):  $79.10
  SMA(20): $80.03
  SMA(50): $88.18

RSI(14): 55.83

Bollinger Bands (50, 1std):
  Lower:  $79.51
  Middle: $88.18
  Upper:  $96.86

SMA(50) Uptrend (3 days): False


## 3. Equity Filter

Implements the equity technical filter from strategy spec:
- SMA(8) < SMA(20) < SMA(50) < stock_price < BB_UPPER(50, 1std)
- SMA(50) trending up for last 3 days
- 30 < RSI < 70

In [25]:
from dataclasses import dataclass
from typing import Dict, List, Tuple


@dataclass
class EquityFilterResult:
    """
    Result of equity technical filter.
    """
    symbol: str
    passes: bool
    current_price: float
    sma_8: float
    sma_20: float
    sma_50: float
    rsi: float
    bb_upper: float
    sma_50_trending: bool
    failure_reasons: List[str]
    
    def __str__(self):
        status = "âœ“ PASS" if self.passes else "âœ— FAIL"
        reasons = ", ".join(self.failure_reasons) if self.failure_reasons else "All criteria met"
        return f"{self.symbol}: {status} | {reasons}"


class EquityFilter:
    """
    Filters equities based on technical criteria.
    
    Filter Rules:
    1. SMA(8) < SMA(20) < SMA(50) < current_price < BB_upper(50, 1std)
    2. SMA(50) has been rising for last 3 days
    3. RSI is between 30 and 70
    """
    
    def __init__(self, config: 'StrategyConfig'):
        self.config = config
        self.indicators = TechnicalIndicators()
    
    def evaluate(self, symbol: str, prices: pd.Series) -> EquityFilterResult:
        """
        Evaluate a single equity against filter criteria.
        
        Args:
            symbol: Ticker symbol
            prices: Series of closing prices (at least 50 days)
            
        Returns:
            EquityFilterResult with pass/fail and details
        """
        failure_reasons = []
        
        # Check minimum data
        if len(prices) < 50:
            return EquityFilterResult(
                symbol=symbol,
                passes=False,
                current_price=prices.iloc[-1] if len(prices) > 0 else 0,
                sma_8=0, sma_20=0, sma_50=0, rsi=0, bb_upper=0,
                sma_50_trending=False,
                failure_reasons=["Insufficient price history"]
            )
        
        # Calculate indicators
        current_price = prices.iloc[-1]
        
        sma_8 = self.indicators.sma(prices, 8).iloc[-1]
        sma_20 = self.indicators.sma(prices, 20).iloc[-1]
        sma_50 = self.indicators.sma(prices, 50).iloc[-1]
        
        rsi = self.indicators.rsi(prices, self.config.rsi_period).iloc[-1]
        
        _, _, bb_upper = self.indicators.bollinger_bands(
            prices, 
            self.config.bb_period, 
            self.config.bb_std
        )
        bb_upper_val = bb_upper.iloc[-1]
        
        sma_50_trending = self.indicators.sma_trend(
            prices, 
            50, 
            self.config.sma_trend_lookback
        )
        
        # Check criteria
        
        # 1. SMA alignment: SMA(8) < SMA(20) < SMA(50) < price
        if not (sma_8 < sma_20):
            failure_reasons.append(f"SMA(8) {sma_8:.2f} >= SMA(20) {sma_20:.2f}")
        if not (sma_20 < sma_50):
            failure_reasons.append(f"SMA(20) {sma_20:.2f} >= SMA(50) {sma_50:.2f}")
        if not (sma_50 < current_price):
            failure_reasons.append(f"SMA(50) {sma_50:.2f} >= Price {current_price:.2f}")
        
        # 2. Price below BB upper
        if not (current_price < bb_upper_val):
            failure_reasons.append(f"Price {current_price:.2f} >= BB_upper {bb_upper_val:.2f}")
        
        # 3. SMA(50) trending up
        if not sma_50_trending:
            failure_reasons.append("SMA(50) not trending up")
        
        # 4. RSI in range
        if not (self.config.rsi_lower < rsi < self.config.rsi_upper):
            failure_reasons.append(f"RSI {rsi:.1f} outside [{self.config.rsi_lower}, {self.config.rsi_upper}]")
        
        passes = len(failure_reasons) == 0
        
        return EquityFilterResult(
            symbol=symbol,
            passes=passes,
            current_price=current_price,
            sma_8=sma_8,
            sma_20=sma_20,
            sma_50=sma_50,
            rsi=rsi,
            bb_upper=bb_upper_val,
            sma_50_trending=sma_50_trending,
            failure_reasons=failure_reasons
        )
    
    def filter_universe(
        self, 
        price_history: Dict[str, pd.Series]
    ) -> Tuple[List[str], List[EquityFilterResult]]:
        """
        Filter entire universe and return passing symbols.
        
        Args:
            price_history: Dict mapping symbol -> price Series
            
        Returns:
            Tuple of (passing_symbols, all_results)
        """
        results = []
        passing = []
        
        for symbol, prices in price_history.items():
            result = self.evaluate(symbol, prices)
            results.append(result)
            if result.passes:
                passing.append(symbol)
        
        return passing, results


print("Equity Filter module loaded")

Equity Filter module loaded


In [26]:
# Test Equity Filter with real data (requires Phase 1 setup)
try:
    # Initialize filter
    equity_filter = EquityFilter(config)
    
    # Get price history for universe
    print("Fetching price history for universe...")
    price_history = equity_fetcher.get_close_history(
        config.ticker_universe, 
        days=config.history_days
    )
    
    # Run filter
    passing_symbols, all_results = equity_filter.filter_universe(price_history)
    
    print(f"\nEquity Filter Results:")
    print("=" * 70)
    print(f"Universe: {len(config.ticker_universe)} symbols")
    print(f"Passing:  {len(passing_symbols)} symbols")
    
    print(f"\n{'Symbol':<8} {'Price':>10} {'SMA8':>10} {'SMA20':>10} {'SMA50':>10} {'RSI':>8} {'Status':>8}")
    print("-" * 70)
    
    for result in all_results:
        status = "âœ“" if result.passes else "âœ—"
        print(f"{result.symbol:<8} ${result.current_price:>9.2f} ${result.sma_8:>9.2f} ${result.sma_20:>9.2f} ${result.sma_50:>9.2f} {result.rsi:>7.1f} {status:>8}")
    
    # Show failure reasons for failed symbols
    print(f"\nFailure Details:")
    print("-" * 70)
    for result in all_results:
        if not result.passes:
            print(f"{result.symbol}: {', '.join(result.failure_reasons)}")
    
    if passing_symbols:
        print(f"\nâœ“ Passing symbols: {', '.join(passing_symbols)}")
    else:
        print(f"\nâš  No symbols pass the equity filter currently")
        
except NameError:
    print("âš  Run Phase 1 first to initialize equity_fetcher and config")
except Exception as e:
    print(f"âš  Error: {e}")
    import traceback
    traceback.print_exc()

Fetching price history for universe...

Equity Filter Results:
Universe: 3 symbols
Passing:  0 symbols

Symbol        Price       SMA8      SMA20      SMA50      RSI   Status
----------------------------------------------------------------------
AAPL     $   259.48 $   253.99 $   257.65 $   268.30    50.2        âœ—
MSFT     $   430.29 $   457.19 $   465.05 $   476.12    31.2        âœ—
GOOG     $   338.53 $   333.71 $   329.28 $   318.66    60.9        âœ—

Failure Details:
----------------------------------------------------------------------
AAPL: SMA(50) 268.30 >= Price 259.48, SMA(50) not trending up
MSFT: SMA(50) 476.12 >= Price 430.29, SMA(50) not trending up
GOOG: SMA(8) 333.71 >= SMA(20) 329.28, SMA(20) 329.28 >= SMA(50) 318.66, Price 338.53 >= BB_upper 331.10

âš  No symbols pass the equity filter currently


## 4. Options Filter

Filters and ranks option contracts based on:
- Premium >= 0.15% daily return on cost basis
- Strike <= 90% of stock price
- Delta between 0.15 and 0.40
- DTE between 1 and 10 days

Prioritizes by premium per day.

In [27]:
@dataclass
class OptionsFilterResult:
    """
    Result of options filter for a single contract.
    """
    contract: 'OptionContract'
    passes: bool
    daily_return: float
    delta_abs: Optional[float]
    failure_reasons: List[str]
    
    def __str__(self):
        status = "âœ“" if self.passes else "âœ—"
        delta_str = f"{self.delta_abs:.3f}" if self.delta_abs else "N/A"
        return f"{status} {self.contract.symbol} | Î”={delta_str} | Ret={self.daily_return:.4%}"


class OptionsFilter:
    """
    Filters and ranks options based on strategy criteria.
    
    Filter Rules:
    1. Daily return on cost basis >= 0.15%
    2. Strike <= 90% of stock price
    3. |Delta| between 0.15 and 0.40
    4. DTE between min_dte and max_dte
    
    Ranking: By premium per day (descending)
    """
    
    def __init__(self, config: 'StrategyConfig', greeks_calculator: GreeksCalculator):
        self.config = config
        self.greeks_calc = greeks_calculator
    
    def _ensure_greeks(self, contract: 'OptionContract') -> 'OptionContract':
        """
        Ensure contract has Greeks, calculating if missing.
        Returns contract with Greeks filled in.
        """
        # If we already have delta and IV, return as-is
        if contract.delta is not None and contract.implied_volatility is not None:
            return contract
        
        # Calculate Greeks from mid price
        greeks = self.greeks_calc.compute_greeks_from_price(
            option_price=contract.mid,
            stock_price=contract.stock_price,
            strike=contract.strike,
            dte=contract.dte,
            option_type=contract.contract_type
        )
        
        # Update contract with calculated Greeks (only if missing)
        if contract.implied_volatility is None and greeks['iv'] is not None:
            contract.implied_volatility = greeks['iv']
        if contract.delta is None and greeks['delta'] is not None:
            contract.delta = greeks['delta']
        if contract.gamma is None and greeks['gamma'] is not None:
            contract.gamma = greeks['gamma']
        if contract.theta is None and greeks['theta'] is not None:
            contract.theta = greeks['theta']
        if contract.vega is None and greeks['vega'] is not None:
            contract.vega = greeks['vega']
        
        return contract
    
    def evaluate(self, contract: 'OptionContract') -> OptionsFilterResult:
        """
        Evaluate a single option contract against filter criteria.
        
        Args:
            contract: OptionContract to evaluate
            
        Returns:
            OptionsFilterResult with pass/fail and details
        """
        # Ensure we have Greeks
        contract = self._ensure_greeks(contract)
        
        failure_reasons = []
        
        # Calculate metrics
        daily_return = contract.daily_return_on_cost_basis
        delta_abs = abs(contract.delta) if contract.delta else None
        strike_pct = contract.strike / contract.stock_price
        
        # 1. Premium filter: daily return >= 0.15%
        if daily_return < self.config.min_daily_return:
            failure_reasons.append(
                f"Daily return {daily_return:.4%} < {self.config.min_daily_return:.4%}"
            )
        
        # 2. Strike filter: strike <= 90% of stock price
        if strike_pct > self.config.max_strike_pct:
            failure_reasons.append(
                f"Strike {strike_pct:.1%} > {self.config.max_strike_pct:.1%} of stock"
            )
        
        # 3. Delta filter: |delta| between 0.15 and 0.40
        if delta_abs is None:
            failure_reasons.append("Delta unavailable")
        elif not (self.config.delta_min <= delta_abs <= self.config.delta_max):
            failure_reasons.append(
                f"Delta {delta_abs:.3f} outside [{self.config.delta_min}, {self.config.delta_max}]"
            )
        
        # 4. DTE filter (should already be filtered, but double-check)
        if not (self.config.min_dte <= contract.dte <= self.config.max_dte):
            failure_reasons.append(
                f"DTE {contract.dte} outside [{self.config.min_dte}, {self.config.max_dte}]"
            )
        
        passes = len(failure_reasons) == 0
        
        return OptionsFilterResult(
            contract=contract,
            passes=passes,
            daily_return=daily_return,
            delta_abs=delta_abs,
            failure_reasons=failure_reasons
        )
    
    def filter_and_rank(
        self, 
        contracts: List['OptionContract']
    ) -> Tuple[List['OptionContract'], List[OptionsFilterResult]]:
        """
        Filter contracts and rank passing ones by premium per day.
        
        Args:
            contracts: List of OptionContracts to evaluate
            
        Returns:
            Tuple of (ranked_passing_contracts, all_results)
        """
        results = []
        passing = []
        
        for contract in contracts:
            result = self.evaluate(contract)
            results.append(result)
            if result.passes:
                passing.append(result.contract)
        
        # Rank by premium_per_day descending
        passing.sort(key=lambda c: c.premium_per_day, reverse=True)
        
        return passing, results
    
    def get_best_candidates(
        self,
        contracts: List['OptionContract'],
        max_candidates: int = 10
    ) -> List['OptionContract']:
        """
        Get top N candidates after filtering and ranking.
        
        Args:
            contracts: List of OptionContracts
            max_candidates: Maximum number to return
            
        Returns:
            List of top candidates, ranked by premium per day
        """
        passing, _ = self.filter_and_rank(contracts)
        return passing[:max_candidates]


print("Options Filter module loaded")

Options Filter module loaded


In [28]:
# Test Options Filter with real data (requires Phase 1 setup)
try:
    # Initialize filter
    options_filter = OptionsFilter(config, greeks_calc)
    
    # Get test data - use AAPL puts from Phase 1
    test_symbol = 'AAPL'
    current_price = equity_fetcher.get_current_price(test_symbol)
    puts = options_fetcher.get_puts_chain(test_symbol, current_price, config)
    
    print(f"Options Filter Test: {test_symbol} @ ${current_price:.2f}")
    print("=" * 80)
    print(f"Total contracts fetched: {len(puts)}")
    
    # Run filter
    passing, all_results = options_filter.filter_and_rank(puts)
    
    print(f"Contracts passing filter: {len(passing)}")
    
    # Show summary stats
    total_with_delta = sum(1 for r in all_results if r.delta_abs is not None)
    print(f"Contracts with Delta: {total_with_delta}/{len(all_results)}")
    
    # Show filter failure breakdown
    failure_counts = {}
    for result in all_results:
        for reason in result.failure_reasons:
            # Extract failure type
            if "Daily return" in reason:
                key = "Premium too low"
            elif "Strike" in reason:
                key = "Strike too high"
            elif "Delta" in reason:
                key = "Delta out of range" if "outside" in reason else "Delta unavailable"
            elif "DTE" in reason:
                key = "DTE out of range"
            else:
                key = reason
            failure_counts[key] = failure_counts.get(key, 0) + 1
    
    print(f"\nFailure Breakdown:")
    for reason, count in sorted(failure_counts.items(), key=lambda x: -x[1]):
        print(f"  {reason}: {count}")
    
    # Show top passing candidates
    if passing:
        print(f"\nTop 10 Candidates (by premium/day):")
        print("-" * 80)
        print(f"{'Symbol':<22} {'Strike':>8} {'DTE':>5} {'Bid':>8} {'Delta':>8} {'Daily%':>10} {'$/Day':>8}")
        print("-" * 80)
        
        for contract in passing[:10]:
            delta_str = f"{abs(contract.delta):.3f}" if contract.delta else "N/A"
            print(
                f"{contract.symbol:<22} "
                f"${contract.strike:>7.2f} "
                f"{contract.dte:>5} "
                f"${contract.bid:>7.2f} "
                f"{delta_str:>8} "
                f"{contract.daily_return_on_cost_basis:>9.4%} "
                f"${contract.premium_per_day:>7.3f}"
            )
    else:
        print(f"\nâš  No contracts pass all filter criteria")
        
        # Show closest misses
        print(f"\nClosest misses (sorted by daily return):")
        near_misses = sorted(all_results, key=lambda r: r.daily_return, reverse=True)[:5]
        for result in near_misses:
            c = result.contract
            delta_str = f"{result.delta_abs:.3f}" if result.delta_abs else "N/A"
            print(f"  {c.symbol}: Î”={delta_str}, Ret={result.daily_return:.4%}")
            print(f"    Reasons: {', '.join(result.failure_reasons)}")

except NameError as e:
    print(f"âš  Run Phase 1 first: {e}")
except Exception as e:
    print(f"âš  Error: {e}")
    import traceback
    traceback.print_exc()

Options Filter Test: AAPL @ $259.48
Total contracts fetched: 83
Contracts passing filter: 0
Contracts with Delta: 83/83

Failure Breakdown:
  Premium too low: 83
  Delta out of range: 76

âš  No contracts pass all filter criteria

Closest misses (sorted by daily return):
  AAPL260202P00252500: Î”=0.150, Ret=0.1811%
    Reasons: Daily return 0.1811% < 15.0000%, Delta 0.150 outside [0.15, 0.4]
  AAPL260204P00252500: Î”=0.216, Ret=0.1375%
    Reasons: Daily return 0.1375% < 15.0000%
  AAPL260206P00252500: Î”=0.253, Ret=0.1233%
    Reasons: Daily return 0.1233% < 15.0000%
  AAPL260202P00250000: Î”=0.094, Ret=0.1195%
    Reasons: Daily return 0.1195% < 15.0000%, Delta 0.094 outside [0.15, 0.4]
  AAPL260204P00250000: Î”=0.155, Ret=0.0925%
    Reasons: Daily return 0.0925% < 15.0000%


## 5. Combined Scanner

Combines equity filter + options filter into a single scan workflow.

In [29]:
@dataclass
class ScanResult:
    """
    Complete scan result for a symbol.
    """
    symbol: str
    stock_price: float
    equity_result: EquityFilterResult
    options_candidates: List['OptionContract']
    
    @property
    def has_candidates(self) -> bool:
        return len(self.options_candidates) > 0


class StrategyScanner:
    """
    Combined scanner that runs equity filter then options filter.
    """
    
    def __init__(
        self,
        config: 'StrategyConfig',
        equity_fetcher: 'EquityDataFetcher',
        options_fetcher: 'OptionsDataFetcher',
        greeks_calc: GreeksCalculator
    ):
        self.config = config
        self.equity_fetcher = equity_fetcher
        self.options_fetcher = options_fetcher
        self.equity_filter = EquityFilter(config)
        self.options_filter = OptionsFilter(config, greeks_calc)
    
    def scan_symbol(
        self, 
        symbol: str, 
        prices: pd.Series,
        skip_equity_filter: bool = False
    ) -> ScanResult:
        """
        Scan a single symbol through both filters.
        
        Args:
            symbol: Ticker symbol
            prices: Price history
            skip_equity_filter: If True, skip equity filter (for testing)
            
        Returns:
            ScanResult with equity filter result and option candidates
        """
        stock_price = prices.iloc[-1]
        
        # Run equity filter
        equity_result = self.equity_filter.evaluate(symbol, prices)
        
        # If equity fails and we're not skipping, return empty options
        if not equity_result.passes and not skip_equity_filter:
            return ScanResult(
                symbol=symbol,
                stock_price=stock_price,
                equity_result=equity_result,
                options_candidates=[]
            )
        
        # Get options chain
        puts = self.options_fetcher.get_puts_chain(symbol, stock_price, self.config)
        
        # Filter and rank options
        candidates = self.options_filter.get_best_candidates(puts, max_candidates=5)
        
        return ScanResult(
            symbol=symbol,
            stock_price=stock_price,
            equity_result=equity_result,
            options_candidates=candidates
        )
    
    def scan_universe(
        self,
        skip_equity_filter: bool = False
    ) -> List[ScanResult]:
        """
        Scan entire universe.
        
        Args:
            skip_equity_filter: If True, scan options for all symbols
            
        Returns:
            List of ScanResults for each symbol
        """
        # Get price history for all symbols
        price_history = self.equity_fetcher.get_close_history(
            self.config.ticker_universe,
            days=self.config.history_days
        )
        
        results = []
        for symbol in self.config.ticker_universe:
            if symbol not in price_history:
                continue
            
            result = self.scan_symbol(
                symbol, 
                price_history[symbol],
                skip_equity_filter=skip_equity_filter
            )
            results.append(result)
        
        return results
    
    def get_all_candidates(
        self,
        skip_equity_filter: bool = False,
        max_total: int = 20
    ) -> List['OptionContract']:
        """
        Get all option candidates across universe, ranked by premium/day.
        
        Args:
            skip_equity_filter: If True, include all symbols
            max_total: Maximum total candidates to return
            
        Returns:
            List of top option candidates across all symbols
        """
        scan_results = self.scan_universe(skip_equity_filter=skip_equity_filter)
        
        # Collect all candidates
        all_candidates = []
        for result in scan_results:
            all_candidates.extend(result.options_candidates)
        
        # Sort by premium per day
        all_candidates.sort(key=lambda c: c.premium_per_day, reverse=True)
        
        return all_candidates[:max_total]


print("Strategy Scanner module loaded")

Strategy Scanner module loaded


In [30]:
# Test Combined Scanner
try:
    scanner = StrategyScanner(
        config=config,
        equity_fetcher=equity_fetcher,
        options_fetcher=options_fetcher,
        greeks_calc=greeks_calc
    )
    
    print("Universe Scan (with equity filter)")
    print("=" * 80)
    
    # Run scan with equity filter
    scan_results = scanner.scan_universe(skip_equity_filter=False)
    
    passing_equity = sum(1 for r in scan_results if r.equity_result.passes)
    with_candidates = sum(1 for r in scan_results if r.has_candidates)
    
    print(f"Symbols scanned: {len(scan_results)}")
    print(f"Passing equity filter: {passing_equity}")
    print(f"With option candidates: {with_candidates}")
    
    # Show results by symbol
    print(f"\n{'Symbol':<8} {'Price':>10} {'Equity':>8} {'Options':>8}")
    print("-" * 40)
    for result in scan_results:
        equity_status = "âœ“" if result.equity_result.passes else "âœ—"
        options_count = len(result.options_candidates)
        print(f"{result.symbol:<8} ${result.stock_price:>9.2f} {equity_status:>8} {options_count:>8}")
    
    # Get top candidates across all symbols
    print(f"\n" + "=" * 80)
    print("Top Option Candidates Across Universe:")
    print("-" * 80)
    
    top_candidates = scanner.get_all_candidates(skip_equity_filter=False, max_total=10)
    
    if top_candidates:
        print(f"{'Symbol':<22} {'Strike':>8} {'DTE':>5} {'Bid':>8} {'Delta':>8} {'Daily%':>10}")
        print("-" * 80)
        for c in top_candidates:
            delta_str = f"{abs(c.delta):.3f}" if c.delta else "N/A"
            print(
                f"{c.symbol:<22} "
                f"${c.strike:>7.2f} "
                f"{c.dte:>5} "
                f"${c.bid:>7.2f} "
                f"{delta_str:>8} "
                f"{c.daily_return_on_cost_basis:>9.4%}"
            )
    else:
        print("No candidates found with equity filter enabled.")
        # Diagnostic: why equity-passing symbols had no options candidates
        equity_passing_no_options = [r for r in scan_results if r.equity_result.passes and not r.options_candidates]
        if equity_passing_no_options:
            print("\nWhy equity-passing symbols did not pass options filter:")
            print("-" * 60)
            for result in equity_passing_no_options:
                puts = scanner.options_fetcher.get_puts_chain(result.symbol, result.stock_price, config)
                passing, all_results = scanner.options_filter.filter_and_rank(puts)
                failure_counts = {}
                for r in all_results:
                    for reason in r.failure_reasons:
                        if "Daily return" in reason:
                            key = "Premium too low"
                        elif "Strike" in reason:
                            key = "Strike too high"
                        elif "Delta" in reason:
                            key = "Delta out of range" if "outside" in reason else "Delta unavailable"
                        elif "DTE" in reason:
                            key = "DTE out of range"
                        else:
                            key = reason
                        failure_counts[key] = failure_counts.get(key, 0) + 1
                reasons_str = ", ".join(f"{k}: {v}" for k, v in sorted(failure_counts.items(), key=lambda x: -x[1]))
                print(f"  {result.symbol}: {len(puts)} puts, 0 passed â€” {reasons_str}")
        else:
            print("  (No symbols passed the equity filter, so no options were evaluated.)")
        
        # Try without equity filter to show what options look like
        print("\nTrying without equity filter...")
        top_candidates_no_filter = scanner.get_all_candidates(skip_equity_filter=True, max_total=10)
        
        if top_candidates_no_filter:
            print(f"\nTop candidates (equity filter bypassed):")
            print(f"{'Symbol':<22} {'Strike':>8} {'DTE':>5} {'Bid':>8} {'Delta':>8} {'Daily%':>10}")
            print("-" * 80)
            for c in top_candidates_no_filter:
                delta_str = f"{abs(c.delta):.3f}" if c.delta else "N/A"
                print(
                    f"{c.symbol:<22} "
                    f"${c.strike:>7.2f} "
                    f"{c.dte:>5} "
                    f"${c.bid:>7.2f} "
                    f"{delta_str:>8} "
                    f"{c.daily_return_on_cost_basis:>9.4%}"
                )

except NameError as e:
    print(f"âš  Run Phase 1 first: {e}")
except Exception as e:
    print(f"âš  Error: {e}")
    import traceback
    traceback.print_exc()

Universe Scan (with equity filter)
Symbols scanned: 3
Passing equity filter: 0
With option candidates: 0

Symbol        Price   Equity  Options
----------------------------------------
AAPL     $   259.48        âœ—        0
MSFT     $   430.29        âœ—        0
GOOG     $   338.53        âœ—        0

Top Option Candidates Across Universe:
--------------------------------------------------------------------------------
No candidates found with equity filter enabled.

Trying without equity filter...


## 6. Phase 2 Summary & Validation

In [31]:
def run_phase2_diagnostics():
    """
    Run diagnostics on all Phase 2 components.
    """
    print("Phase 2 Diagnostics")
    print("=" * 60)
    
    results = {}
    
    # 1. Greeks Calculator
    print("\n1. Greeks Calculator...")
    try:
        test = greeks_calc.compute_greeks_from_price(0.15, 259.48, 225.0, 5, 'put')
        if test['iv'] and test['delta']:
            results['greeks'] = 'ok'
            print(f"   âœ“ OK - IV: {test['iv']:.4f}, Delta: {test['delta']:.4f}")
        else:
            results['greeks'] = 'partial'
            print(f"   âš  Partial - some calculations failed")
    except Exception as e:
        results['greeks'] = 'error'
        print(f"   âœ— Error: {e}")
    
    # 2. Technical Indicators
    print("\n2. Technical Indicators...")
    try:
        test_prices = pd.Series(range(100, 160))
        sma = indicators.sma(test_prices, 20).iloc[-1]
        rsi = indicators.rsi(test_prices, 14).iloc[-1]
        results['indicators'] = 'ok'
        print(f"   âœ“ OK - SMA(20): {sma:.2f}, RSI: {rsi:.2f}")
    except Exception as e:
        results['indicators'] = 'error'
        print(f"   âœ— Error: {e}")
    
    # 3. Equity Filter
    print("\n3. Equity Filter...")
    try:
        ef = EquityFilter(config)
        results['equity_filter'] = 'ok'
        print(f"   âœ“ OK - Filter initialized")
    except Exception as e:
        results['equity_filter'] = 'error'
        print(f"   âœ— Error: {e}")
    
    # 4. Options Filter
    print("\n4. Options Filter...")
    try:
        of = OptionsFilter(config, greeks_calc)
        results['options_filter'] = 'ok'
        print(f"   âœ“ OK - Filter initialized")
    except Exception as e:
        results['options_filter'] = 'error'
        print(f"   âœ— Error: {e}")
    
    # 5. Strategy Scanner
    print("\n5. Strategy Scanner...")
    try:
        scanner = StrategyScanner(config, equity_fetcher, options_fetcher, greeks_calc)
        results['scanner'] = 'ok'
        print(f"   âœ“ OK - Scanner initialized")
    except Exception as e:
        results['scanner'] = 'error'
        print(f"   âœ— Error: {e}")
    
    # Summary
    print("\n" + "=" * 60)
    all_ok = all(v == 'ok' for v in results.values())
    
    if all_ok:
        print("âœ“ All Phase 2 components working!")
        print("\nReady for Phase 3: Position Management")
    else:
        print("âš  Some components need attention")
    
    return results


# Run diagnostics
try:
    phase2_results = run_phase2_diagnostics()
except NameError as e:
    print(f"âš  Run Phase 1 first to initialize dependencies: {e}")

Phase 2 Diagnostics

1. Greeks Calculator...
   âœ“ OK - IV: 0.6136, Delta: -0.0212

2. Technical Indicators...
   âœ“ OK - SMA(20): 149.50, RSI: nan

3. Equity Filter...
   âœ“ OK - Filter initialized

4. Options Filter...
   âœ“ OK - Filter initialized

5. Strategy Scanner...
   âœ“ OK - Scanner initialized

âœ“ All Phase 2 components working!

Ready for Phase 3: Position Management


## Next Steps: Phase 3 Preview

With filters complete, Phase 3 will implement:

1. **Position Tracking** - `ActivePosition` management
2. **Stop-Loss Logic** - Delta, stock price, VIX checks
3. **Early Exit Logic** - Premium capture ahead of schedule
4. **Execution Engine** - Order placement via Alpaca
5. **Main Trading Loop** - Polling-based monitoring

In [32]:
print("Phase 2 Complete!")
print("\nModules available:")
print("  - greeks_calc (GreeksCalculator)")
print("  - indicators (TechnicalIndicators)")
print("  - EquityFilter")
print("  - OptionsFilter")
print("  - StrategyScanner")

Phase 2 Complete!

Modules available:
  - greeks_calc (GreeksCalculator)
  - indicators (TechnicalIndicators)
  - EquityFilter
  - OptionsFilter
  - StrategyScanner
