## Mean-reversion-based pairs trading strategy

### Data Collection

In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

In [2]:
# Fetches S&P 500 tickers from Wikipedia
def get_sp500_tickers():

    url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
    tables = pd.read_html(url)
    sp500_table = tables[0]
    tickers = sp500_table['Symbol'].tolist()
    
    # Clean tickers
    tickers = [ticker.replace('.', '-') for ticker in tickers]
    
    print(f"Found {len(tickers)} S&P 500 stocks")
    return tickers

In [3]:
# Downloads historical data
def download_stock_data(tickers, start_date='2022-01-01', end_date='2024-12-31'):

    print(f"Downloading data from {start_date} to {end_date}.")
    
    # Downloads data in batches
    all_data = {}
    failed_tickers = []
    
    for i, ticker in enumerate(tickers):
        try:
            stock = yf.Ticker(ticker)
            hist = stock.history(start=start_date, end=end_date)
            
            if len(hist) > 0:
                all_data[ticker] = hist['Close']  # Adjusted close
            else:
                failed_tickers.append(ticker)
                
        except Exception as e:
            print(f"Failed to download {ticker}: {e}")
            failed_tickers.append(ticker)
        
        # Progress indicator
        if (i + 1) % 50 == 0:
            print(f"Downloaded {i + 1}/{len(tickers)} stocks...")
    
    # Creates DataFrame
    price_data = pd.DataFrame(all_data)
    
    print(f"Successfully downloaded {len(price_data.columns)} stocks")
    print(f"Failed to download {len(failed_tickers)} stocks: {failed_tickers[:10]}...")
    
    return price_data


In [4]:
def clean_data(price_data, min_data_threshold=0.95, min_trading_days=0.8):

    print("  DATA CLEANING  ")
    print(f"Original data shape: {price_data.shape}")
    print(f"Date range: {price_data.index[0]} to {price_data.index[-1]}")
    print(f"Initial missing data points: {price_data.isnull().sum().sum()}")
    
    # Remove stocks with insufficient data
    print(f"\n1. Filtering stocks with <{min_data_threshold*100}% data availability...")
    data_availability = price_data.count() / len(price_data)
    good_stocks = data_availability[data_availability >= min_data_threshold].index
    filtered_data = price_data[good_stocks].copy()
    
    print(f"   Kept {len(good_stocks)} stocks")
    print(f"   Removed {len(price_data.columns) - len(good_stocks)} stocks")
    print(f"   Missing data points after stock filter: {filtered_data.isnull().sum().sum()}")
    
    # Handle missing data with forward fill
    print(f"\n2. Handling missing data...")
    print(f"   Missing data points before fill: {filtered_data.isnull().sum().sum()}")
    
    # Forward fill missing values 
    filtered_data = filtered_data.fillna(method='ffill')
    
    # If there are still missing values at the beginning, backward fill
    if filtered_data.isnull().sum().sum() > 0:
        filtered_data = filtered_data.fillna(method='bfill')
    
    print(f"   Missing data points after fill: {filtered_data.isnull().sum().sum()}")
    
    #  Remove days with too many missing stocks 
    print(f"\n3. Filtering trading days...")
    missing_per_day = filtered_data.isnull().sum(axis=1)
    max_missing_stocks = int(len(filtered_data.columns) * (1 - min_trading_days))
    
    print(f"   Maximum allowed missing stocks per day: {max_missing_stocks}")
    
    # Keep days where we have at least 80% of stocks with data
    valid_days = missing_per_day <= max_missing_stocks
    final_data = filtered_data[valid_days].copy()
    
    removed_days = len(filtered_data) - len(final_data)
    print(f"   Removed {removed_days} days with too many missing stocks")
    print(f"   Missing data points after day filter: {final_data.isnull().sum().sum()}")
    
    # Step 4: Final cleanup 
    completely_empty_rows = final_data.isnull().all(axis=1)
    if completely_empty_rows.any():
        final_data = final_data[~completely_empty_rows]
        print(f"   Removed {completely_empty_rows.sum()} completely empty days")
    
    # Final statistics
    print(f"\n=== FINAL RESULTS ===")
    print(f"Final data shape: {final_data.shape}")
    print(f"Final date range: {final_data.index[0]} to {final_data.index[-1]}")
    print(f"Total missing values: {final_data.isnull().sum().sum()}")
    print(f"Data completeness: {(1 - final_data.isnull().sum().sum() / (final_data.shape[0] * final_data.shape[1])) * 100:.2f}%")
    
    return final_data


# Main execution
if __name__ == "__main__":
    # Get S&P 500 tickers
    sp500_tickers = get_sp500_tickers()
    
    # Download data
    raw_data = download_stock_data(sp500_tickers)
    
    # Clean data
    clean_price_data = clean_data(raw_data)
    
    # Save for later use
    clean_price_data.to_csv('sp500_prices_cleaned.csv')
    print("Data saved to 'sp500_prices_cleaned.csv'")
    
    # Basic statistics
    print(f"\nFinal dataset summary:")
    print(f"Number of stocks: {clean_price_data.shape[1]}")
    print(f"Number of trading days: {clean_price_data.shape[0]}")

Found 503 S&P 500 stocks
Downloading data from 2022-01-01 to 2024-12-31.
Downloaded 50/503 stocks...
Downloaded 100/503 stocks...
Downloaded 150/503 stocks...
Downloaded 200/503 stocks...
Downloaded 250/503 stocks...
Downloaded 300/503 stocks...
Downloaded 350/503 stocks...
Downloaded 400/503 stocks...
Downloaded 450/503 stocks...
Downloaded 500/503 stocks...
Successfully downloaded 503 stocks
Failed to download 0 stocks: []...
  DATA CLEANING  
Original data shape: (752, 503)
Date range: 2022-01-03 00:00:00-05:00 to 2024-12-30 00:00:00-05:00
Initial missing data points: 2779

1. Filtering stocks with <95.0% data availability...
   Kept 497 stocks
   Removed 6 stocks
   Missing data points after stock filter: 11

2. Handling missing data...
   Missing data points before fill: 11
   Missing data points after fill: 0

3. Filtering trading days...
   Maximum allowed missing stocks per day: 99
   Removed 0 days with too many missing stocks
   Missing data points after day filter: 0

=== FI

  filtered_data = filtered_data.fillna(method='ffill')
  filtered_data = filtered_data.fillna(method='bfill')


Data saved to 'sp500_prices_cleaned.csv'

Final dataset summary:
Number of stocks: 497
Number of trading days: 752


### Data Cleaning

In [5]:
price_data = pd.read_csv('./sp500_prices_cleaned.csv', index_col=0, parse_dates=True)
price_data = price_data.select_dtypes(include=[np.number])  # Keep only numeric columns

In [6]:
def trading_day_allignment(price_data):

    print("Original data shape:", price_data.shape)
    
    # Drop rows where ANY stock has NaN
    aligned_data = price_data.dropna()
    
    print("After inner join alignment:", aligned_data.shape)
    print(f"Removed {len(price_data) - len(aligned_data)} trading days")
    
    return aligned_data

In [7]:
align = trading_day_allignment(price_data)

Original data shape: (752, 497)
After inner join alignment: (752, 497)
Removed 0 trading days


In [8]:
def align_by_trading_days_common(price_data, min_stocks_threshold=0.8):

    print("Original data shape:", price_data.shape)
    
    # Count how many stocks have data each day
    stocks_per_day = price_data.count(axis=1)
    total_stocks = len(price_data.columns)
    
    # Keep days where at least 80% of stocks have data
    min_stocks = int(total_stocks * min_stocks_threshold)
    valid_days = stocks_per_day >= min_stocks
    
    aligned_data = price_data[valid_days].copy()
    
    print(f"Kept days with ≥{min_stocks} stocks ({min_stocks_threshold*100}%)")
    print("After common trading days alignment:", aligned_data.shape)
    
    return aligned_data

In [9]:
align2 = align_by_trading_days_common(price_data)

Original data shape: (752, 497)
Kept days with ≥397 stocks (80.0%)
After common trading days alignment: (752, 497)


In [10]:

def compare_price_transformations(price_data):
    
    # 1. Raw prices 
    raw_prices = price_data.copy()
    
    # 2. Log transformation
    log_prices = np.log(price_data)
    
    # 3. Normalized prices (alternative approach)
    normalized_prices = price_data / price_data.iloc[0]  # Normalize to first observation
    
    print("=== PRICE TRANSFORMATION COMPARISON ===")
    print(f"Raw prices shape: {raw_prices.shape}")
    print(f"Log prices shape: {log_prices.shape}")
    print(f"Normalized prices shape: {normalized_prices.shape}")
    
    # Check for any issues with log transformation
    negative_prices = (price_data <= 0).sum().sum()
    print(f"Negative/zero prices (problematic for log): {negative_prices}")
    
    return raw_prices, log_prices, normalized_prices

In [11]:
transformation = compare_price_transformations(price_data)

=== PRICE TRANSFORMATION COMPARISON ===
Raw prices shape: (752, 497)
Log prices shape: (752, 497)
Normalized prices shape: (752, 497)
Negative/zero prices (problematic for log): 0


In [12]:
stock1= price_data['AAPL']
stock2 = price_data['GOOG']

In [13]:

def test_cointegration_with_transformations(stock1, stock2, method="raw"):

    from statsmodels.tsa.stattools import adfuller
    from statsmodels.api import OLS
    import statsmodels.api as sm
    
    if method == "log":
        s1, s2 = np.log(stock1), np.log(stock2)
        print("Testing LOG PRICES for cointegration")
    elif method == "normalized":
        s1, s2 = stock1/stock1.iloc[0], stock2/stock2.iloc[0]
        print("Testing NORMALIZED PRICES for cointegration")
    else:
        s1, s2 = stock1, stock2
        print("Testing RAW PRICES for cointegration")
    
    # Linear regression
    s1_clean = s1.dropna()
    s2_clean = s2.dropna()
    
    # Align the series
    common_idx = s1_clean.index.intersection(s2_clean.index)
    s1_aligned = s1_clean[common_idx]
    s2_aligned = s2_clean[common_idx]
    
    # Regression: s2 = alpha + beta * s1 + error
    X = sm.add_constant(s1_aligned)
    model = OLS(s2_aligned, X).fit()
    hedge_ratio = model.params[1]
    
    # Create spread and test for stationarity
    spread = s2_aligned - hedge_ratio * s1_aligned
    
    # ADF test on spread (as per slide 8-9)
    adf_result = adfuller(spread.dropna())
    
    print(f"Hedge ratio (beta): {hedge_ratio:.4f}")
    print(f"ADF test statistic: {adf_result[0]:.4f}")
    print(f"ADF p-value: {adf_result[1]:.4f}")
    print(f"Cointegrated (p < 0.05): {adf_result[1] < 0.05}")
    
    return hedge_ratio, adf_result[1], spread

In [14]:
test_log = test_cointegration_with_transformations(stock1, stock2, method="log")

Testing LOG PRICES for cointegration
Hedge ratio (beta): 1.1212
ADF test statistic: -2.4012
ADF p-value: 0.1414
Cointegrated (p < 0.05): False


  hedge_ratio = model.params[1]


In [15]:
test_normalized = test_cointegration_with_transformations(stock1, stock2, method="normalized")

Testing NORMALIZED PRICES for cointegration
Hedge ratio (beta): 1.0131
ADF test statistic: -2.4368
ADF p-value: 0.1316
Cointegrated (p < 0.05): False


  hedge_ratio = model.params[1]


In [16]:
test = test_cointegration_with_transformations(stock1,stock2)

Testing RAW PRICES for cointegration
Hedge ratio (beta): 0.8178
ADF test statistic: -2.4368
ADF p-value: 0.1316
Cointegrated (p < 0.05): False


  hedge_ratio = model.params[1]


In [17]:
import yfinance as yf
from collections import defaultdict
import time

def get_sectors_automatically(stock_tickers, batch_size=50):
  
    print(f"Classifying {len(stock_tickers)} stocks by sector...")
    
    sectors = defaultdict(list)
    failed_tickers = []
    
    for i, ticker in enumerate(stock_tickers):
        try:
            stock = yf.Ticker(ticker)
            info = stock.info
            
            sector = info.get('sector', 'Unknown')
            industry = info.get('industry', 'Unknown')
            
            # Clean up sector names
            if sector and sector != 'Unknown':
                sectors[sector].append(ticker)
            else:
                sectors['Unknown'].append(ticker)
            
            # Progress indicator
            if (i + 1) % batch_size == 0:
                print(f"Processed {i + 1}/{len(stock_tickers)} stocks...")
                
            # Rate limiting to avoid API limits
            time.sleep(0.1)
            
        except Exception as e:
            print(f"Failed to get sector for {ticker}: {e}")
            failed_tickers.append(ticker)
            sectors['Unknown'].append(ticker)
    
    # Convert to regular dict and show results
    sectors_dict = dict(sectors)
    
    print(f"\n=== SECTOR CLASSIFICATION RESULTS ===")
    for sector, stocks in sectors_dict.items():
        print(f"{sector}: {len(stocks)} stocks")
        if len(stocks) <= 10:  # Show stocks for smaller sectors
            print(f"  Stocks: {stocks}")
    
    print(f"\nFailed to classify: {len(failed_tickers)} stocks")
    
    return sectors_dict, failed_tickers

In [18]:
# Extract stock tickers from your price_data DataFrame
stock_tickers = price_data.columns.tolist()

In [19]:
# Then run the sector classification
sectors_dict, failed_tickers = get_sectors_automatically(stock_tickers)

Classifying 497 stocks by sector...
Processed 50/497 stocks...
Processed 100/497 stocks...
Processed 150/497 stocks...
Processed 200/497 stocks...
Processed 250/497 stocks...
Processed 300/497 stocks...
Processed 350/497 stocks...
Processed 400/497 stocks...
Processed 450/497 stocks...

=== SECTOR CLASSIFICATION RESULTS ===
Industrials: 69 stocks
Healthcare: 59 stocks
Technology: 82 stocks
Utilities: 31 stocks
Financial Services: 68 stocks
Basic Materials: 20 stocks
Consumer Cyclical: 55 stocks
Real Estate: 31 stocks
Communication Services: 23 stocks
Consumer Defensive: 36 stocks
Energy: 23 stocks

Failed to classify: 0 stocks


In [23]:
def test_cointegration_silent(stock1, stock2, method="log"):
    """
    Modified version of your function for batch processing
    Returns: (p_value, hedge_ratio, adf_statistic, spread)
    """
    from statsmodels.tsa.stattools import adfuller
    from statsmodels.api import OLS
    import statsmodels.api as sm
    
    try:
        if method == "log":
            s1, s2 = np.log(stock1), np.log(stock2)
        elif method == "normalized":
            s1, s2 = stock1/stock1.iloc[0], stock2/stock2.iloc[0]
        else:
            s1, s2 = stock1, stock2
        
        # Linear regression
        s1_clean = s1.dropna()
        s2_clean = s2.dropna()
        
        # Align the series
        common_idx = s1_clean.index.intersection(s2_clean.index)
        s1_aligned = s1_clean[common_idx]
        s2_aligned = s2_clean[common_idx]
        
        # Need sufficient data points
        if len(common_idx) < 60:
            return None, None, None, None
        
        # Regression: s2 = alpha + beta * s1 + error
        X = sm.add_constant(s1_aligned)
        model = OLS(s2_aligned, X).fit()
        hedge_ratio = model.params.iloc[1]  # Using iloc to avoid the warning
        
        # Create spread and test for stationarity
        spread = s2_aligned - hedge_ratio * s1_aligned
        
        # ADF test on spread
        adf_result = adfuller(spread.dropna())
        
        return adf_result[1], hedge_ratio, adf_result[0], spread
        
    except Exception as e:
        return None, None, None, None

In [24]:
def test_all_sector_pairs_with_your_function(sector_data, sector_name, method="log"):

    print(f"\nTesting all pairs in {sector_name} sector using {method} prices...")
    
    results = []
    stock_list = sector_data.columns.tolist()
    
    if len(stock_list) < 2:
        print(f"Insufficient stocks in {sector_name} sector")
        return results
    
    total_pairs = len(stock_list) * (len(stock_list) - 1) // 2
    print(f"Testing {total_pairs} pairs from {len(stock_list)} stocks...")
    
    pair_count = 0
    
    for i, stock_a in enumerate(stock_list):
        for j, stock_b in enumerate(stock_list[i+1:], i+1):
            pair_count += 1
            
            # Test both directions using your function
            p_val_1, hedge_1, adf_1, spread_1 = test_cointegration_silent(
                sector_data[stock_a], sector_data[stock_b], method=method
            )
            
            p_val_2, hedge_2, adf_2, spread_2 = test_cointegration_silent(
                sector_data[stock_b], sector_data[stock_a], method=method
            )
            
            # Keep the better result (more negative ADF statistic)
            if p_val_1 is not None and p_val_2 is not None:
                if adf_1 < adf_2:  # More negative is better
                    best_p_val = p_val_1
                    best_hedge = hedge_1
                    best_adf = adf_1
                    direction = f"{stock_b} ~ {stock_a}"
                    spread = spread_1
                    dependent_var = stock_b
                    independent_var = stock_a
                else:
                    best_p_val = p_val_2
                    best_hedge = hedge_2
                    best_adf = adf_2
                    direction = f"{stock_a} ~ {stock_b}"
                    spread = spread_2
                    dependent_var = stock_a
                    independent_var = stock_b
                
                # Only keep statistically significant pairs
                if best_p_val < 0.05:
                    results.append({
                        'stock_a': independent_var,
                        'stock_b': dependent_var,
                        'p_value': best_p_val,
                        'adf_statistic': best_adf,
                        'hedge_ratio': best_hedge,
                        'direction': direction,
                        'sector': sector_name,
                        'method': method,
                        'spread': spread,
                        'economic_rationale': f"Both operate in {sector_name} sector"
                    })
            
            # Progress indicator
            if pair_count % 50 == 0:
                print(f"  Tested {pair_count}/{total_pairs} pairs...")
    
    cointegrated_count = len(results)
    print(f"Found {cointegrated_count} cointegrated pairs in {sector_name}")
    
    return results

In [None]:
# Example for Technology sector:
tech_stocks = sectors_dict['Technology']  # List from Step 1
available_tech_stocks = [s for s in tech_stocks if s in price_data.columns]
sector_data = price_data[available_tech_stocks]  # DataFrame with only tech stocks

# Input 2: sector_name (String)
sector_name = "Technology"

# Input 3: method (String, optional)
method = "log"  # or "raw" or "normalized"

# Usage:
tech_results = test_all_sector_pairs_with_your_function(sector_data, sector_name, method)


Testing all pairs in Technology sector using log prices...
Testing 3321 pairs from 82 stocks...
  Tested 50/3321 pairs...
  Tested 100/3321 pairs...
  Tested 150/3321 pairs...
  Tested 200/3321 pairs...
  Tested 250/3321 pairs...
  Tested 300/3321 pairs...
  Tested 350/3321 pairs...
  Tested 400/3321 pairs...
  Tested 450/3321 pairs...
  Tested 500/3321 pairs...
  Tested 550/3321 pairs...
  Tested 600/3321 pairs...
  Tested 650/3321 pairs...
  Tested 700/3321 pairs...
  Tested 750/3321 pairs...
  Tested 800/3321 pairs...
  Tested 850/3321 pairs...
  Tested 900/3321 pairs...
  Tested 950/3321 pairs...
  Tested 1000/3321 pairs...
  Tested 1050/3321 pairs...
  Tested 1100/3321 pairs...
  Tested 1150/3321 pairs...
  Tested 1200/3321 pairs...
  Tested 1250/3321 pairs...
  Tested 1300/3321 pairs...
  Tested 1350/3321 pairs...
  Tested 1400/3321 pairs...
  Tested 1450/3321 pairs...
  Tested 1500/3321 pairs...
  Tested 1550/3321 pairs...
  Tested 1600/3321 pairs...
  Tested 1650/3321 pairs...