In [1]:
import os
import pandas as pd
import numpy as np
import yfinance as yf
import pandas_market_calendars as mcal
import matplotlib.pyplot as plt
from datetime import timedelta
from pytz import timezone
import time
from scipy import linalg
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans

In [2]:
MARKET_TZ = timezone('America/New_York')

def is_market_open(date):
    nyse = mcal.get_calendar('NYSE')
    sched = nyse.schedule(start_date=date, end_date=date)
    return not sched.empty

def get_next_trading_day(date):
    d = date
    while True:
        d += timedelta(days=1)
        if is_market_open(d):
            return d

def get_previous_trading_day(date):
    d = date
    while True:
        d -= timedelta(days=1)
        if is_market_open(d):
            return d

In [None]:
def get_historical_data(ticker, start_date, end_date, interval, max_retries=3, backoff_factor=2):
    """
    Retrieve historical data with retry logic for rate limits.
    Returns timezone-aware DataFrame in MARKET_TZ.
    """
    os.makedirs('historical_data', exist_ok=True)
    file_path = f'historical_data/{ticker}.csv'
    
    start_date = pd.to_datetime(start_date)
    end_date = pd.to_datetime(end_date)
    
    for attempt in range(max_retries + 1):
        try:
            new_data = yf.download(ticker, 
                                 start=start_date,
                                 end=end_date,
                                 interval=interval,
                                 progress=False)
            
            if not new_data.empty:
                # Process columns and timezone
                if isinstance(new_data.columns, pd.MultiIndex):
                    new_data.columns = new_data.columns.get_level_values(0)
                    
                if new_data.index.tzinfo is None:
                    new_data.index = new_data.index.tz_localize(MARKET_TZ)
                else:
                    new_data.index = new_data.index.tz_convert(MARKET_TZ)
                
                return new_data
            
            print(f"Fetched data for {ticker} from {start_date} to {end_date}: {len(new_data)} rows")
            return pd.DataFrame()
            
        except Exception as e:
            if attempt < max_retries:
                wait_time = backoff_factor ** attempt
                print(f"Retry {attempt+1}/{max_retries} for {ticker} in {wait_time}s...")
                time.sleep(wait_time)
            else:
                print(f"Failed to fetch {ticker} after {max_retries} retries: {str(e)}")
                return pd.DataFrame()
    
    return pd.DataFrame()

In [9]:
def get_stock_metrics(ticker):
    """Fetch fundamental data for a ticker"""
    try:
        tk = yf.Ticker(ticker)
        info = tk.info
        
        # Basic metrics
        metrics = {
            'Symbol': ticker,
            'Sector': info.get('sector', 'Unknown'),
            'Industry': info.get('industry', 'Unknown'),
            'MarketCap': info.get('marketCap', None),
            'Beta': info.get('beta', None),
            'PE': info.get('trailingPE', None),
            'ForwardPE': info.get('forwardPE', None),
            'PEG': info.get('pegRatio', None),
            'ShortPercentFloat': info.get('shortPercentOfFloat', None),
            'AvgVolume': info.get('averageVolume', None),
        }
        
        # Calculate market cap category
        mcap = metrics['MarketCap']
        if mcap:
            if mcap >= 200e9:
                metrics['CapSize'] = 'Mega'
            elif mcap >= 10e9:
                metrics['CapSize'] = 'Large'
            elif mcap >= 2e9:
                metrics['CapSize'] = 'Mid'
            elif mcap >= 300e6:
                metrics['CapSize'] = 'Small'
            else:
                metrics['CapSize'] = 'Micro'
        else:
            metrics['CapSize'] = 'Unknown'
            
        return metrics
    except Exception as e:
        print(f"Error fetching metrics for {ticker}: {str(e)}")
        return {'Symbol': ticker, 'Sector': 'Unknown', 'Industry': 'Unknown', 
                'CapSize': 'Unknown', 'MarketCap': None}

In [10]:
# Factor Extraction via PCA & SVD
def extract_factors(returns_df, n_factors=3, denoise=True):
    """
    Extract principal factors from asset returns
    
    Parameters:
    - returns_df: DataFrame of asset returns (rows=time, columns=assets)
    - n_factors: Number of factors to retain
    - denoise: Whether to apply SVD denoising
    
    Returns:
    - factors_df: DataFrame of factor returns
    - loadings_df: DataFrame of factor loadings
    - eigenvalues: Array of eigenvalues (variance explained by each factor)
    """
    # Check for sufficient data
    if returns_df.shape[0] < returns_df.shape[1]:
        print(f"Warning: More assets ({returns_df.shape[1]}) than time periods ({returns_df.shape[0]}). Results may be unstable.")
    
    # Get asset returns as numpy array
    returns = returns_df.values
    
    # 1. PCA approach
    # Compute covariance matrix
    cov_matrix = np.cov(returns.T)
    
    # Eigendecomposition
    eigenvalues, eigenvectors = linalg.eigh(cov_matrix)
    
    # Sort in descending order (largest eigenvalues first)
    idx = eigenvalues.argsort()[::-1]
    eigenvalues = eigenvalues[idx]
    eigenvectors = eigenvectors[:, idx]
    
    # Keep only top n_factors
    eigenvalues = eigenvalues[:n_factors]
    eigenvectors = eigenvectors[:, :n_factors]
    
    # 2. SVD approach for denoising
    if denoise:
        U, S, Vt = linalg.svd(returns, full_matrices=False)
        
        # Truncate to keep only top n_factors singular values
        S_trunc = np.zeros_like(S)
        S_trunc[:n_factors] = S[:n_factors]
        
        # Reconstruct denoised returns
        returns_denoised = U @ np.diag(S_trunc) @ Vt
        
        # Update covariance and eigenvectors based on denoised returns
        cov_matrix_denoised = np.cov(returns_denoised.T)
        eigenvalues_denoised, eigenvectors_denoised = linalg.eigh(cov_matrix_denoised)
        
        # Sort in descending order 
        idx = eigenvalues_denoised.argsort()[::-1]
        eigenvalues_denoised = eigenvalues_denoised[idx]
        eigenvectors_denoised = eigenvectors_denoised[:, idx]
        
        # Update with denoised results
        eigenvalues = eigenvalues_denoised[:n_factors]
        eigenvectors = eigenvectors_denoised[:, :n_factors]
        
    # Calculate factor returns: F = R * V
    factor_returns = returns @ eigenvectors
    
    # Create DataFrames
    factors_df = pd.DataFrame(
        factor_returns, 
        index=returns_df.index,
        columns=[f'Factor_{i+1}' for i in range(n_factors)]
    )
    
    loadings_df = pd.DataFrame(
        eigenvectors,
        index=returns_df.columns,
        columns=[f'Factor_{i+1}' for i in range(n_factors)]
    )
    
    return factors_df, loadings_df, eigenvalues

def avg_corr(window):
    """
    Calculate average pairwise correlation in a window of returns
    
    Parameters:
    - window: DataFrame window from rolling operation, with assets as columns
    
    Returns:
    - avg_correlation: Average pairwise correlation
    """
    # Skip if there are too few observations or all NaNs
    if len(window) <= 1 or window.isna().all().all():
        return np.nan
        
    # Calculate correlation matrix
    corr_matrix = window.corr()
    
    # Extract upper triangle (excluding diagonal)
    mask = np.triu(np.ones_like(corr_matrix), k=1).astype(bool)
    upper_tri = corr_matrix.where(mask)
    
    # Calculate mean of correlations (ignoring NaN values)
    avg_correlation = upper_tri.stack().mean()
    
    return avg_correlation

# Regime Detection via Perron-Frobenius
def detect_market_regime(returns_df, n_regimes=3, lookback=60):
    """
    Detect market regimes using volatility clustering and Perron-Frobenius analysis
    
    Parameters:
    - returns_df: DataFrame of asset returns
    - n_regimes: Number of different regimes to identify
    - lookback: Number of days to use for rolling statistics
    
    Returns:
    - regime_df: DataFrame with regime indicators
    - transition_matrix: Transition probabilities between regimes
    - stationary_dist: Stationary distribution (from Perron-Frobenius)
    """
    # Compute market statistics used for regime identification
    market_stats = pd.DataFrame(index=returns_df.index)
    
    # 1. Rolling volatility (key regime indicator)
    market_stats['volatility'] = returns_df.std(axis=1).rolling(lookback).std()
    
    # 2. Cross-sectional correlation (manual calculation instead of rolling apply)
    # Calculate rolling correlation manually to avoid apply issues
    corr_values = []
    for i in range(len(returns_df)):
        if i < lookback:
            corr_values.append(np.nan)
            continue
            
        window = returns_df.iloc[i-lookback:i]
        if len(window) <= 1:
            corr_values.append(np.nan)
            continue
            
        # Calculate correlation matrix
        corr_matrix = window.corr()
        
        # Extract upper triangle (excluding diagonal)
        mask = np.triu(np.ones_like(corr_matrix), k=1).astype(bool)
        upper_tri = corr_matrix.where(mask)
        
        # Calculate mean of correlations (ignoring NaN values)
        avg_correlation = upper_tri.stack().mean()
        corr_values.append(avg_correlation)
    
    market_stats['correlation'] = corr_values
    
    # 3. Market trend (positive or negative)
    market_stats['trend'] = returns_df.mean(axis=1).rolling(lookback).mean()
    
    # Remove rows with NaN values
    market_stats = market_stats.dropna()
    if market_stats.empty:
        print("Not enough data for regime detection")
        return None, None, None
    
    # Normalize features
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(market_stats)
    
    # Cluster into regimes using KMeans
    kmeans = KMeans(n_clusters=n_regimes, random_state=69, n_init=10)
    regimes = kmeans.fit_predict(scaled_features)
    
    # Create regime DataFrame
    regime_df = pd.DataFrame({'Regime': regimes}, index=market_stats.index)
    
    # Calculate regime characteristics
    regime_characteristics = pd.DataFrame()
    for i in range(n_regimes):
        mask = regime_df['Regime'] == i
        if mask.sum() > 0:  # Check if we have data for this regime
            stats = market_stats[mask].mean()
            stats['count'] = mask.sum()
            stats['regime'] = i
            regime_characteristics = pd.concat([regime_characteristics, pd.DataFrame([stats])])
    
    # Label regimes based on volatility (highest vol = crisis, lowest = calm)
    vol_order = regime_characteristics.sort_values('volatility').index
    regime_map = {old: new for new, old in enumerate(vol_order)}
    
    # Ensure regime values are integers by first handling NaN values
    regime_df['Regime'] = regime_df['Regime'].fillna(-1).astype(int)  # Fill NaN with dummy value
    regime_df['Regime'] = regime_df['Regime'].map(lambda x: regime_map.get(x, 0))  # Map with default value
    
    # Build transition matrix (Markov model)
    transitions = np.zeros((n_regimes, n_regimes))
    
    # Ensure we're using integer indices for the matrix and skip NaN values
    for i in range(1, len(regime_df)):
        prev_regime = int(regime_df['Regime'].iloc[i-1])
        curr_regime = int(regime_df['Regime'].iloc[i])
        # Add bounds checking to prevent index errors
        if 0 <= prev_regime < n_regimes and 0 <= curr_regime < n_regimes:
            transitions[prev_regime, curr_regime] += 1
    
    # Convert to probabilities (row-wise)
    row_sums = transitions.sum(axis=1, keepdims=True)
    # Avoid division by zero
    row_sums[row_sums == 0] = 1
    transition_matrix = transitions / row_sums
    
    # Calculate stationary distribution (Perron-Frobenius)
    eigenvalues, eigenvectors = linalg.eig(transition_matrix.T)
    # Find index of eigenvalue closest to 1
    idx = np.argmin(np.abs(eigenvalues - 1.0))
    # Extract corresponding eigenvector and normalize
    stationary_dist = np.real(eigenvectors[:, idx] / np.sum(eigenvectors[:, idx]))
    
    return regime_df, transition_matrix, stationary_dist

# Portfolio Optimization with Diagonalization
def optimize_portfolio(returns_df, loadings_df, regime_info=None, min_weight=-0.5, max_weight=1.5):
    """
    Create optimal portfolio weights using factor loadings and regime information
    
    Parameters:
    - returns_df: DataFrame of asset returns
    - loadings_df: Factor loadings from PCA/SVD
    - regime_info: Current regime information
    - min_weight/max_weight: Weight constraints for individual assets
    
    Returns:
    - weights: Optimal portfolio weights
    """
    n_assets = returns_df.shape[1]
    
    # Get asset names
    assets = returns_df.columns
    
    # 1. Compute covariance matrix
    cov_matrix = np.cov(returns_df.values.T)
    
    # 2. Efficient diagonalization for inverse
    eigenvalues, eigenvectors = linalg.eigh(cov_matrix)
    
    # Replace any near-zero eigenvalues to avoid numerical instability
    eigenvalues[eigenvalues < 1e-10] = 1e-10
    
    # Compute inverse covariance via diagonalization: C⁻¹ = V Λ⁻¹ V^T
    inv_cov = eigenvectors @ np.diag(1/eigenvalues) @ eigenvectors.T
    
    # 3. Initialize with minimum variance portfolio
    ones = np.ones(n_assets)
    min_var_weights = inv_cov @ ones
    min_var_weights = min_var_weights / np.sum(min_var_weights)
    
    # 4. Adjust weights based on factor loadings
    # Get absolute loadings on first factor (market factor)
    market_loadings = np.abs(loadings_df.iloc[:, 0].values)
    
    # Use second factor for sector rotation/style tilts
    if loadings_df.shape[1] > 1:
        style_loadings = loadings_df.iloc[:, 1].values
    else:
        style_loadings = np.zeros(n_assets)
    
    # 5. Regime adjustments (if provided)
    regime_adjustment = np.ones(n_assets)
    if regime_info is not None:
        regime, transition_matrix, stationary_dist = regime_info
        
        # In high volatility regimes (2), reduce exposure to high-beta assets
        if regime == 2:  # Crisis regime
            # Reduce weights of high market factor exposure
            regime_adjustment = 1 - 0.5 * market_loadings / np.max(market_loadings) if np.max(market_loadings) > 0 else np.ones(n_assets)
        # In low volatility regimes (0), increase exposure to momentum
        elif regime == 0:  # Calm regime
            # Increase weights of positive style factor exposure
            std_style = np.std(style_loadings)
            if std_style > 0:
                regime_adjustment = 1 + 0.3 * (style_loadings - np.mean(style_loadings)) / std_style
    
    # 6. Final weights combining minimum variance with factor tilts
    final_weights = min_var_weights * regime_adjustment
    
    # Normalize to sum to 1
    if np.sum(np.abs(final_weights)) > 0:
        final_weights = final_weights / np.sum(np.abs(final_weights))
    
    # Apply constraints
    final_weights = np.clip(final_weights, min_weight, max_weight)
    
    # Normalize again after constraints
    if np.sum(np.abs(final_weights)) > 0:
        final_weights = final_weights / np.sum(np.abs(final_weights))
    
    # Return as Series
    return pd.Series(final_weights, index=assets)

# SVD Momentum Function
def calculate_svd_momentum(returns_df, lookback=60, n_vals=3):
    """
    Calculate momentum signals based on singular value trends
    
    Parameters:
    - returns_df: DataFrame of asset returns
    - lookback: Lookback period for SVD
    - n_vals: Number of singular values to track
    
    Returns:
    - momentum_df: DataFrame with SVD momentum scores for each asset
    """
    # Initialize results DataFrame
    momentum_df = pd.DataFrame(index=returns_df.columns, columns=['SVD_Momentum'])
    
    # Need sufficient history
    if returns_df.shape[0] < lookback:
        print(f"Warning: Not enough data for SVD momentum ({returns_df.shape[0]} < {lookback})")
        return momentum_df
    
    # Get recent returns
    recent_returns = returns_df.iloc[-lookback:].values
    
    # Calculate SVD
    U, S, Vt = linalg.svd(recent_returns, full_matrices=False)
    
    # Calculate momentum scores: project each asset onto top singular vectors
    # Higher score = more aligned with dominant market trends
    momentum_scores = np.zeros(returns_df.shape[1])
    
    # Use only top n_vals singular vectors
    for i in range(min(n_vals, len(S))):
        # Weight by singular value (strength of the factor)
        weight = S[i] / sum(S[:min(n_vals, len(S))])
        # Get right singular vector (asset loadings on this factor)
        # Note: Vt[i] gives loadings of all assets on the ith singular vector
        momentum_scores += weight * Vt[i]
    
    # Normalize scores (avoid division by zero)
    if np.std(momentum_scores) > 0:
        momentum_scores = (momentum_scores - np.mean(momentum_scores)) / np.std(momentum_scores)
    
    # Store results
    momentum_df['SVD_Momentum'] = momentum_scores
    
    return momentum_df

# Eigen portfolio strategy
def eigen_portfolio_strategy(ticker, date, lookback=120, holding_period=10):
    """
    Multi-Factor Eigen Portfolio strategy 
    
    Parameters:
    - ticker: Target ticker for trading
    - date: Current date
    - lookback: Historical period for analysis
    - holding_period: Position holding period
    
    Returns:
    - Trade details dictionary or None if no trade
    """
    try:
        # 1. Get universe data - we need price history for multiple assets
        # Use a market ETF (SPY) plus sector ETFs for broader context
        universe = [
            'SPY',   # S&P 500
            'XLK',   # Technology
            'XLF',   # Financials 
            'XLE',   # Energy
            'XLV',   # Healthcare
            'XLP',   # Consumer Staples
            'XLY',   # Consumer Discretionary
            'XLI',   # Industrials
            'XLB',   # Materials
            'XLRE',  # Real Estate
            'XLU',   # Utilities
            'QQQ',   # NASDAQ 100
            'IWM',   # Russell 2000
            ticker   # Our target stock
        ]
        
        # Calculate start date for data
        start_date = date - timedelta(days=lookback * 2)
        
        # Get price data for universe
        universe_data = {}
        for symbol in universe:
            data = get_historical_data(
                symbol,
                start_date=start_date,
                end_date=date + timedelta(days=holding_period * 2),
                interval='1d'
            )
            if not data.empty:
                universe_data[symbol] = data
        
        # Need at least the target ticker plus some context
        min_tickers = 5
        if len(universe_data) < min_tickers or ticker not in universe_data:
            print(f"Not enough tickers with data (need {min_tickers}, got {len(universe_data)})")
            return None
        
        # 2. Calculate returns for analysis
        returns_dict = {}
        for symbol, data in universe_data.items():
            returns_dict[symbol] = data['Close'].pct_change().fillna(0)
        
        # Create returns DataFrame
        returns_df = pd.DataFrame(returns_dict)
        
        # Filter to lookback period
        if len(returns_df) < lookback:
            print(f"Not enough return data (need {lookback}, got {len(returns_df)})")
            return None
        
        analysis_returns = returns_df.iloc[-lookback:]
        
        # 3. Extract factors using PCA & SVD
        n_factors = min(3, len(universe_data) - 1)  # Use up to 3 factors
        factors_df, loadings_df, eigenvalues = extract_factors(
            analysis_returns, 
            n_factors=n_factors,
            denoise=True
        )
        
        # 4. Detect market regime
        try:
            regime_info = detect_market_regime(
                analysis_returns,
                n_regimes=3,
                lookback=min(60, lookback // 2)
            )
            
            if regime_info is not None:
                regime_df, transition_matrix, stationary_dist = regime_info
                # Get current regime (most recent)
                current_regime = int(regime_df['Regime'].iloc[-1])
            else:
                current_regime = 1  # Default to middle regime
                regime_info = None
        except Exception as e:
            print(f"Regime detection error: {str(e)}")
            current_regime = 1  # Default to middle regime
            regime_info = None
        
        # 5. Calculate SVD momentum
        momentum_df = calculate_svd_momentum(
            analysis_returns,
            lookback=min(60, lookback // 2),
            n_vals=2
        )
        
        # 6. Generate portfolio weights
        optimal_weights = optimize_portfolio(
            analysis_returns,
            loadings_df,
            regime_info=(current_regime, None, None) if regime_info else None
        )
        
        # 7. Get signal for target ticker
        ticker_weight = optimal_weights.get(ticker, 0)
        ticker_momentum = momentum_df.loc[ticker, 'SVD_Momentum']
        ticker_loading = loadings_df.loc[ticker, 'Factor_1']  # Loading on market factor
        
        # Combine signals
        # Weight: portfolio optimization result
        # Momentum: SVD trend strength
        # Loading: exposure to market factor
        combined_signal = (
            0.5 * np.sign(ticker_weight) * min(abs(ticker_weight * 3), 1) +
            0.4 * np.sign(ticker_momentum) * min(abs(ticker_momentum), 1) +
            0.1 * np.sign(-ticker_loading) * min(abs(ticker_loading * 2), 1)
        )
        
        # Determine direction
        print(f"Ticker: {ticker}, Date: {date}, Signal: {combined_signal:.2f}")

        if combined_signal > 0.4:
            direction = 'Long'
        elif combined_signal < -0.4:
            direction = 'Short'
        else:
            return None
        
        # 8. Calculate entry and exit dates
        entry_date = date
        exit_date = entry_date
        for _ in range(holding_period):
            exit_date = get_next_trading_day(exit_date)
        
        # 9. Get prices for the trade
        ticker_prices = universe_data[ticker]
        entry_prices = ticker_prices[ticker_prices.index.date == entry_date.date()]
        exit_prices = ticker_prices[ticker_prices.index.date == exit_date.date()]
        
        if entry_prices.empty or exit_prices.empty:
            return None
        
        entry_price = entry_prices['Close'].iloc[0]
        exit_price = exit_prices['Close'].iloc[0]
        
        # 10. Calculate return
        if direction == 'Long':
            ret = (exit_price - entry_price) / entry_price
        else:  # Short
            ret = (entry_price - exit_price) / entry_price
        
        # 11. Create trade details
        trade = {
            'Symbol': ticker,
            'Strategy': 'Eigen-Portfolio',
            'Direction': direction,
            'Signal': float(combined_signal),
            'Regime': int(current_regime),
            'SVD_Momentum': float(ticker_momentum),
            'Market_Loading': float(ticker_loading),
            'Optimal_Weight': float(ticker_weight),
            'Entry Date': entry_date.date(),
            'Entry Price': float(entry_price),
            'Exit Date': exit_date.date(),
            'Exit Price': float(exit_price),
            'Return': float(ret),
            'Return %': float(ret * 100)
        }
        
        return trade
    
    except Exception as e:
        print(f"Error in strategy for {ticker}: {str(e)}")
        return None

# Function to calculate profit factor
def calculate_profit_factor(returns):
    """Calculate profit factor: sum of profits / sum of losses"""
    profits = returns[returns > 0].sum()
    losses = -returns[returns < 0].sum()
    return profits / losses if losses > 0 else float('inf')

# Run the backtest
def run_backtest(tickers, start_date, end_date, lookback=120, holding_period=10, check_interval=5):
    start_date = pd.to_datetime(start_date)
    end_dt = pd.to_datetime(end_date)
    
    all_trades = []
    
    # Step through dates
    current_date = start_date
    
    while current_date <= end_dt:
        # Only process if market is open
        if is_market_open(current_date):
            print(f"Processing date: {current_date.date()}")
            
            sampled_tickers = tickers
            
            for ticker in sampled_tickers:
                trade = eigen_portfolio_strategy(
                    ticker, 
                    current_date,
                    lookback=lookback,
                    holding_period=holding_period
                )
                
                if trade:
                    # Only include trades with entry in our backtest window
                    entry_date = trade["Entry Date"]
                    if start_date.date() <= entry_date <= end_dt.date():
                        all_trades.append(trade)
            
            # Advance check_interval trading days
            for _ in range(check_interval):
                current_date = get_next_trading_day(current_date)
        else:
            # Go to next day if market closed
            current_date += timedelta(days=1)
    
    # Build a DataFrame & inspect
    results = None
    if all_trades:
        results = pd.DataFrame(all_trades)
        results.sort_values("Entry Date", inplace=True)
        print("\n--- Sample Trades ---")
        print(results.head(10).to_string(index=False))
    
        # Simple performance summary
        total_ret = results["Return"].sum()
        avg_ret = results["Return"].mean()
        win_rate = (results["Return"] > 0).mean()
    
        print(f"\nTotal P&L:    {total_ret:.4f} ({total_ret*100:.2f}%)")
        print(f"Avg per trade:{avg_ret:.4f} ({avg_ret*100:.2f}%)")
        print(f"Win Rate:     {win_rate:.2%}")
    else:
        print("No trades generated in this period.")
    
    return results

In [11]:
tickers = [
    # Mega‑caps (>$1 trillion)
    "MSFT",  # Microsoft :contentReference[oaicite:0]{index=0}
    "NVDA",  # Nvidia :contentReference[oaicite:1]{index=1}
    "AAPL",  # Apple :contentReference[oaicite:2]{index=2}
    "AMZN",  # Amazon :contentReference[oaicite:3]{index=3}
    "GOOGL", # Alphabet Class A :contentReference[oaicite:4]{index=4}
    "GOOG",  # Alphabet Class C :contentReference[oaicite:5]{index=5}
    "META",  # Meta Platforms :contentReference[oaicite:6]{index=6}
    "TSLA",  # Tesla :contentReference[oaicite:7]{index=7}

    # Next‑tier big‑caps ($100 B+)
    "AVGO",  # Broadcom :contentReference[oaicite:8]{index=8}
    "CMCSA", # Comcast :contentReference[oaicite:9]{index=9}
    "PEP",   # PepsiCo (Nasdaq‑listed) :contentReference[oaicite:10]{index=10}
    "COST",  # Costco Wholesale :contentReference[oaicite:11]{index=11}
    "NFLX",  # Netflix :contentReference[oaicite:12]{index=12}
    "INTC",  # Intel :contentReference[oaicite:13]{index=13}
    "ADBE",  # Adobe :contentReference[oaicite:14]{index=14}
    "TXN",   # Texas Instruments :contentReference[oaicite:15]{index=15}
    "QCOM",  # Qualcomm :contentReference[oaicite:16]{index=16}
    "ASML",  # ASML Holding :contentReference[oaicite:17]{index=17}
    "AMGN",  # Amgen :contentReference[oaicite:18]{index=18}

    # Other large‑caps ($50 B+)
    "CSCO",  # Cisco Systems :contentReference[oaicite:19]{index=19}
    "SBUX",  # Starbucks :contentReference[oaicite:20]{index=20}
    "MRNA",  # Moderna :contentReference[oaicite:21]{index=21}
    "BIIB",  # Biogen :contentReference[oaicite:22]{index=22}
    "LRCX",  # Lam Research :contentReference[oaicite:23]{index=23}
    "INTU",  # Intuit :contentReference[oaicite:24]{index=24}
    "ILMN",  # Illumina :contentReference[oaicite:25]{index=25}
    "GILD",  # Gilead Sciences :contentReference[oaicite:26]{index=26}
    "BKNG",  # Booking Holdings :contentReference[oaicite:27]{index=27}
]

start_date = "2025-03-15"
end_date   = "2025-05-15"

results = run_backtest(tickers, start_date, end_date)

Processing date: 2025-03-17
Error in strategy for MSFT: 'Index' object has no attribute 'tzinfo'
Error in strategy for NVDA: 'Index' object has no attribute 'tzinfo'
Error in strategy for AAPL: 'Index' object has no attribute 'tzinfo'
Error in strategy for AMZN: 'Index' object has no attribute 'tzinfo'
Error in strategy for GOOGL: 'Index' object has no attribute 'tzinfo'
Error in strategy for GOOG: 'Index' object has no attribute 'tzinfo'
Error in strategy for META: 'Index' object has no attribute 'tzinfo'
Error in strategy for TSLA: 'Index' object has no attribute 'tzinfo'
Error in strategy for AVGO: 'Index' object has no attribute 'tzinfo'
Error in strategy for CMCSA: 'Index' object has no attribute 'tzinfo'
Error in strategy for PEP: 'Index' object has no attribute 'tzinfo'
Error in strategy for COST: 'Index' object has no attribute 'tzinfo'
Error in strategy for NFLX: 'Index' object has no attribute 'tzinfo'
Error in strategy for INTC: 'Index' object has no attribute 'tzinfo'
Error

KeyboardInterrupt: 

In [7]:
if not results.empty:
    # Collect metrics for all tickers
    all_metrics = []
    for ticker in set(results['Symbol']):
        metrics = get_stock_metrics(ticker)
        all_metrics.append(metrics)

    # Create metrics dataframe
    metrics_df = pd.DataFrame(all_metrics)

    # Merge with aggregated results
    symbol_perf = pd.merge(
        results, 
        metrics_df,
        on='Symbol'
    )

    cap_size_perf = symbol_perf.groupby('CapSize').agg({
        'Return': ['count', 'mean', 'sum', 'std'],
    })
    cap_size_perf.columns = ['Count', 'Avg Return', 'Total Return', 'Std Dev']
    cap_size_perf['Win Rate'] = symbol_perf.groupby('CapSize')['Return'].apply(
        lambda x: (x > 0).mean()
    )
    cap_size_perf['Profit Factor'] = symbol_perf.groupby('CapSize')['Return'].apply(
        lambda x: calculate_profit_factor(x.values)
    )   
    print("\n--- Performance by Market Cap ---")
    print(cap_size_perf)

    sector_perf = symbol_perf.groupby('Sector').agg({
        'Return': ['count', 'mean', 'sum', 'std'],
    })
    sector_perf.columns = ['Count', 'Avg Return', 'Total Return', 'Std Dev']
    sector_perf['Win Rate'] = symbol_perf.groupby('Sector')['Return'].apply(
        lambda x: (x > 0).mean()
    )
    print("\n--- Performance by Sector ---")
    print(sector_perf)

    strategy_cap = symbol_perf.groupby(['Strategy', 'CapSize']).agg({
        'Return': ['count', 'mean', 'sum']
    })
    strategy_cap.columns = ['Count', 'Avg Return', 'Total Return']
    print("\n--- Strategy by Market Cap ---")
    print(strategy_cap)

    plt.figure(figsize=(10, 6))
    plt.scatter(
        symbol_perf['Beta'], 
        symbol_perf['Return'],
        alpha=0.5, 
        c=symbol_perf['Strategy'].map({'Pre-Earnings': 'blue', 'Post-Earnings': 'red'})
    )
    plt.axhline(y=0, color='black', linestyle='--')
    plt.xlabel('Beta')
    plt.ylabel('Return')
    plt.title('Returns vs Beta by Strategy')
    plt.legend(['', '', 'Pre-Earnings', 'Post-Earnings'])
    plt.grid(True, alpha=0.3)
    plt.show()

    # -------------------------------
    # Performance Validity Checks
    # -------------------------------
    print("\n" + "="*40 + "\nPerformance Validity Analysis\n" + "="*40)

    # Convert to datetime for sorting (if not already)
    if 'Entry Date' in results.columns and not pd.api.types.is_datetime64_dtype(results['Entry Date']):
        results['Entry Date'] = pd.to_datetime(results['Entry Date'])
    
    # Sort results by entry date
    results.sort_values('Entry Date', inplace=True)

    # Create trade sequence index
    results['Trade Number'] = range(1, len(results)+1)

    # 1. Gap-Free Cumulative Returns Timeline
    plt.figure(figsize=(12, 6))
    cumulative_log_returns = np.log1p(results['Return']).cumsum()

    # Plot by trade sequence instead of calendar time
    plt.plot(results['Trade Number'], cumulative_log_returns, 
            color='red', linewidth=1.5)
    plt.title('Cumulative Log Returns (Trade Sequence)')
    plt.xlabel('Trade Number')
    plt.ylabel('Cumulative Log Returns')
    plt.grid(True)
    plt.axhline(0, color='black', linestyle='--')

    rolling_window = min(30, len(results) // 2) if len(results) > 4 else 2
    rolling_mean = cumulative_log_returns.rolling(rolling_window).mean()
    plt.plot(results['Trade Number'], rolling_mean,
            color='blue', linestyle='--', linewidth=1,
            label=f'{rolling_window}-Trade Rolling Mean')

    rolling_std = cumulative_log_returns.rolling(rolling_window).std()
    plt.fill_between(results['Trade Number'],
                    rolling_mean - 2*rolling_std,
                    rolling_mean + 2*rolling_std,
                    color='yellow', alpha=0.5,
                    label='2σ Volatility Bands')

    plt.legend()
    plt.tight_layout()
    plt.show()

    # 3. Enhanced Drawdown Analysis
    plt.figure(figsize=(12, 6))
    cumulative_returns = np.exp(cumulative_log_returns) - 1
    peak = cumulative_returns.expanding(min_periods=1).max()
    drawdown = (cumulative_returns - peak)/peak

    # Find maximum drawdown details
    max_dd = drawdown.min()
    max_dd_end_idx = drawdown.idxmin()
    max_dd_start_idx = drawdown[:max_dd_end_idx].idxmax()

    # Get actual datetime values from results
    max_dd_start_date = results.loc[max_dd_start_idx, 'Entry Date']
    max_dd_end_date = results.loc[max_dd_end_idx, 'Entry Date']

    # Calculate duration in days
    if isinstance(max_dd_start_date, pd.Timestamp) and isinstance(max_dd_end_date, pd.Timestamp):
        max_dd_duration = (max_dd_end_date - max_dd_start_date).days
    else:
        max_dd_duration = (pd.to_datetime(max_dd_end_date) - pd.to_datetime(max_dd_start_date)).days

    # Plot by trade sequence
    plt.plot(results['Trade Number'], drawdown, 
            color='darkred', linewidth=1.5)
    plt.title(f'Strategy Drawdown (Max: {max_dd:.2%}, Duration: {max_dd_duration:.1f} days)')
    plt.xlabel('Trade Number')
    plt.ylabel('Drawdown')
    plt.axhline(0, color='black', linestyle='--')

    max_dd_trade_num = results.loc[max_dd_end_idx, 'Trade Number']
    prev_trade_num = results.loc[max_dd_start_idx, 'Trade Number']

    plt.annotate(f'Max Drawdown: {max_dd:.2%}\nDuration: {max_dd_duration:.1f} days',
            xy=(max_dd_trade_num, max_dd),
            xytext=(max_dd_trade_num - (max_dd_trade_num - prev_trade_num)/2, 
                    max_dd*1.5),
            arrowprops=dict(facecolor='black', arrowstyle='->'),
            bbox=dict(boxstyle='round,pad=0.3', fc='white', ec='black'))

    plt.grid(True)
    plt.tight_layout()
    plt.show()

def perform_profit_factor_permutation_test(returns, n_permutations=10000):
    """
    Perform permutation test to evaluate statistical significance of profit factor
    
    Parameters:
    - returns: Array of trade returns
    - n_permutations: Number of permutations for the test
    
    Returns:
    - p-value of the observed profit factor
    """
    # Calculate the observed profit factor
    observed_pf = calculate_profit_factor(returns)
    
    # Initialize array to store permutation results
    permutation_pfs = np.zeros(n_permutations)
    
    # Perform permutation test by randomly flipping signs of returns
    np.random.seed(69)
    for i in range(n_permutations):
        # Generate random signs (-1 or 1) for each return
        random_signs = np.random.choice([-1, 1], size=len(returns))
        # Apply random signs to returns
        permuted_returns = returns * random_signs
        # Calculate profit factor for permuted returns
        permutation_pfs[i] = calculate_profit_factor(permuted_returns)
    
    # Calculate p-value: proportion of permuted PFs >= observed PF
    p_value = np.mean(permutation_pfs >= observed_pf)
    
    # Plot histogram of permutation results
    plt.figure(figsize=(10, 6))
    plt.hist(permutation_pfs, bins=50, alpha=0.7, color='skyblue', edgecolor='black')
    plt.axvline(observed_pf, color='red', linestyle='dashed', linewidth=2, label=f'Observed PF = {observed_pf:.2f}')
    plt.title(f'Profit Factor Permutation Test (p-value = {p_value:.4f})')
    plt.xlabel('Profit Factor')
    plt.ylabel('Frequency')
    plt.legend()
    plt.grid(True)
    plt.show()
    
    # Print summary of permutation test
    print("\n=== Profit Factor Permutation Test ===")
    print(f"Observed Profit Factor: {observed_pf:.4f}")
    print(f"Mean Permutation Profit Factor: {np.mean(permutation_pfs):.4f}")
    print(f"P-value: {p_value:.4f}")
    
    if p_value < 0.05:
        print("Result: The observed profit factor is statistically significant (p < 0.05)")
    else:
        print("Result: The observed profit factor is NOT statistically significant (p >= 0.05)")
    
    return p_value

# Only run if results exist
if not results.empty:
    # Extract returns array
    returns_array = results['Return'].values

    # Run permutation test (e.g., 10,000 permutations)
    p_value = perform_profit_factor_permutation_test(returns_array, n_permutations=10000)

    # Display p-value
    print(f"\nProfit Factor Permutation Test p-value: {p_value:.4f}")
    if p_value < 0.05:
        print("Result: Statistically significant profit factor (p < 0.05)")
    else:
        print("Result: Profit factor not statistically significant (p >= 0.05)")

    # -------------------------------
    # Return Distribution
    # -------------------------------
    print("\n" + "="*40 + "\nReturn Distribution\n" + "="*40)
    
    # Return Statistics
    print("\n--- Return Statistics ---")
    stats_df = results['Return'].describe(percentiles=[0.1, 0.25, 0.5, 0.75, 0.9, 0.99])
    print(stats_df)
    
    # Worst Outcomes Table
    print("\n--- Worst Outcomes ---")
    worst_percentiles = [0.05, 0.01, 0.001]
    worst_returns = []
    worst_labels = []
    
    for p in worst_percentiles:
        if len(results) >= int(1/p):  # Only calculate if we have enough data
            worst_returns.append(results['Return'].quantile(p))
            worst_labels.append(f'Worst {p*100:.2f}%')
    
    if worst_returns:
        worst_df = pd.DataFrame([worst_returns], columns=worst_labels)
        print(worst_df)
    
    # Histograms for Return Distributions
    plt.figure(figsize=(12, 8))
    plt.subplot(2, 1, 1)
    plt.hist(results['Return'], bins=min(100, len(results)//2), color='skyblue', edgecolor='black')
    plt.title('Unzoomed Return Distribution')
    plt.xlabel('Return per Trade')
    plt.ylabel('Frequency')
    plt.grid(True)
    
    plt.subplot(2, 1, 2)
    if len(results) >= 100:  # Only create zoomed view if enough data
        zoomed_data = results[results['Return'] > results['Return'].quantile(0.01)]
        plt.hist(zoomed_data['Return'], bins=min(50, len(zoomed_data)//2), color='lightgreen', edgecolor='black')
        plt.title('Zoomed Return Distribution (Worst 1% Removed)')
    else:
        plt.hist(results['Return'], bins=min(50, len(results)//2), color='lightgreen', edgecolor='black')
        plt.title('Return Distribution')
    plt.xlabel('Return per Trade')
    plt.ylabel('Frequency')
    plt.grid(True)
    plt.tight_layout()
    plt.show()

    # Strategy Analysis
    print("\n" + "="*40 + "\nStrategy Analysis\n" + "="*40)
    
    # Performance by strategy type
    if 'Strategy' in results.columns:
        strategy_perf = results.groupby('Strategy').agg({
            'Return': ['count', 'mean', 'sum', 'std'],
        })
        
        strategy_perf.columns = ['Count', 'Avg Return', 'Total Return', 'Std Dev']
        strategy_perf['Win Rate'] = results.groupby('Strategy')['Return'].apply(
            lambda x: (x > 0).mean()
        )
        strategy_perf['Profit Factor'] = results.groupby('Strategy')['Return'].apply(
            lambda x: calculate_profit_factor(x.values)
        )
        
        print("\n--- Performance by Strategy ---")
        print(strategy_perf)
        
        plt.figure(figsize=(12, 6))
        ax = plt.gca()
        
        # Bar chart for count
        strategy_perf['Count'].plot(kind='bar', color='skyblue', ax=ax, position=1, width=0.3)
        ax.set_ylabel('Number of Trades', color='skyblue')
        ax.tick_params(axis='y', labelcolor='skyblue')
        
        # Secondary axis for return
        ax2 = ax.twinx()
        strategy_perf['Avg Return'].plot(kind='bar', color='green', ax=ax2, position=0, width=0.3)
        ax2.set_ylabel('Average Return', color='green')
        ax2.tick_params(axis='y', labelcolor='green')
        
        plt.title('Strategy Performance Comparison')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
    
    # Performance by direction (Long/Short)
    if 'Direction' in results.columns:
        dir_perf = results.groupby('Direction').agg({
            'Return': ['count', 'mean', 'sum', 'std'],
        })
        
        dir_perf.columns = ['Count', 'Avg Return', 'Total Return', 'Std Dev']
        dir_perf['Win Rate'] = results.groupby('Direction')['Return'].apply(
            lambda x: (x > 0).mean()
        )
        dir_perf['Profit Factor'] = results.groupby('Direction')['Return'].apply(
            lambda x: calculate_profit_factor(x.values)
        )
        
        print("\n--- Performance by Direction ---")
        print(dir_perf)
        
        # Bar chart
        plt.figure(figsize=(10, 6))
        ax = plt.gca()
        
        # Bar chart for count
        dir_perf['Count'].plot(kind='bar', color='skyblue', ax=ax, position=1, width=0.3)
        ax.set_ylabel('Number of Trades', color='skyblue')
        ax.tick_params(axis='y', labelcolor='skyblue')
        
        # Secondary axis for return
        ax2 = ax.twinx()
        dir_perf['Avg Return'].plot(kind='bar', color='green', ax=ax2, position=0, width=0.3)
        ax2.set_ylabel('Average Return', color='green')
        ax2.tick_params(axis='y', labelcolor='green')
        
        plt.title('Long vs Short Performance')
        plt.xticks(rotation=0)
        plt.tight_layout()
        plt.show()
    
    # -------------------------------
    # Kelly Criterion Analysis
    # -------------------------------
    print("\n" + "="*40 + "\nKelly Criterion\n" + "="*40)
    
    def monte_carlo_kelly(returns, kelly_fraction=1.0, n_sims=10000):
        np.random.seed(69)
        all_terminal = []
        for _ in range(n_sims):
            sampled_returns = np.random.choice(returns, size=len(returns), replace=True)
            capital = 1000
            for ret in sampled_returns:
                capital *= (1 + kelly_fraction * ret)
            all_terminal.append(capital)
        return pd.Series(all_terminal)
    
    def plot_multiple_pnl_paths(returns, kelly_fraction=1.0, num_paths=100, label=''):
        plt.figure(figsize=(10, 4))
        for _ in range(num_paths):
            path = np.cumprod(1 + kelly_fraction * np.random.choice(returns, size=len(returns), replace=True))
            plt.plot(path, lw=1, alpha=0.3)  # thin lines with transparency
        plt.title(f'Multiple PnL Paths ({label}, {int(kelly_fraction*100)}% Kelly)')
        plt.xlabel('Trade Number')
        plt.ylabel('Normalized Capital')
        plt.grid(True)
        plt.show()
    
    # Run Kelly simulations with all data (no time-based segmentation since we have limited data)
    returns = results['Return'].values
    
    if len(returns) >= 10:  # Only run if we have enough trades
        print("\n--- Full Dataset Kelly Simulations ---")
        
        for fraction in [1.0, 0.5, 0.3]:
            terminal_pnl = monte_carlo_kelly(returns, kelly_fraction=fraction, n_sims=10000)
            
            # Stats Table for Terminal PnL
            stats_table = terminal_pnl.describe(percentiles=[0.01, 0.1, 0.25, 0.5, 0.75, 0.9, 0.99])
            print(f"\nKelly Fraction: {int(fraction*100)}%")
            print(stats_table)
            
            # Histogram of Terminal Capital distribution
            plt.figure(figsize=(10, 6))
            plt.hist(terminal_pnl, bins=50, alpha=0.7, color='magenta', edgecolor='black')
            plt.title(f'Terminal PnL Distribution (Full Dataset, {int(fraction*100)}% Kelly)')
            plt.xlabel('Terminal Capital')
            plt.ylabel('Frequency')
            plt.grid(True)
            plt.show()
        
            # Plot Multiple Simulation Paths (PnL Curves)
            plot_multiple_pnl_paths(returns, kelly_fraction=fraction, num_paths=50, label='Full Dataset')
    
    # -------------------------------
    # Symbol Analysis
    # -------------------------------
    if 'Symbol' in results.columns:
        print("\n" + "="*40 + "\nSymbol Analysis\n" + "="*40)
        
        symbol_perf = results.groupby('Symbol').agg({
            'Return': ['count', 'mean', 'sum', 'std'],
        })
        
        symbol_perf.columns = ['Count', 'Avg Return', 'Total Return', 'Std Dev']
        symbol_perf['Win Rate'] = results.groupby('Symbol')['Return'].apply(
            lambda x: (x > 0).mean()
        )
        symbol_perf['Profit Factor'] = results.groupby('Symbol')['Return'].apply(
            lambda x: calculate_profit_factor(x.values) if (x < 0).any() else float('inf')
        )
        
        print("\n--- Performance by Symbol ---")
        print(symbol_perf)
        
        plt.figure(figsize=(12, 6))
        ax = plt.gca()
        
        # Bar chart for count
        symbol_perf['Count'].plot(kind='bar', color='skyblue', ax=ax, position=1, width=0.3)
        ax.set_ylabel('Number of Trades', color='skyblue')
        ax.tick_params(axis='y', labelcolor='skyblue')
        
        # Secondary axis for return
        ax2 = ax.twinx()
        symbol_perf['Avg Return'].plot(kind='bar', color='green', ax=ax2, position=0, width=0.3)
        ax2.set_ylabel('Average Return', color='green')
        ax2.tick_params(axis='y', labelcolor='green')
        
        plt.title('Symbol Performance Comparison')
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
else:
    print("No trades found with the given strategy.")

AttributeError: 'NoneType' object has no attribute 'empty'