![QuantConnect Logo](https://cdn.quantconnect.com/web/i/icon.png)
<hr>

In [6]:
"""
#region imports
from AlgorithmImports import *
from statsmodels.tsa.stattools import adfuller
#endregion

class StationarySelectionModel(ETFConstituentsUniverseSelectionModel):
    def __init__(self, algorithm, etf, lookback = 10, universe_settings = None):
        self.algorithm = algorithm
        self.lookback = lookback
        self.symbolData = {}

        symbol = Symbol.Create(etf, SecurityType.Equity, Market.USA)
        super().__init__(symbol, universe_settings, self.ETFConstituentsFilter)

    def ETFConstituentsFilter(self, constituents):
        stationarity = {}
        self.algorithm.Debug(f"{self.algorithm.Time}::{len(list(constituents))} tickers in SPY")

        for c in constituents:
            symbol = c.Symbol
            if symbol not in self.symbolData:
                self.symbolData[symbol] = SymbolData(self.algorithm, symbol, self.lookback)
            data = self.symbolData[symbol]

            # Update with the last price
            self.algorithm.Debug(f"{self.algorithm.Time}::{symbol}::{c.MarketValue}::{c.SharesHeld}")
            if c.MarketValue and c.SharesHeld:
                price = c.MarketValue / c.SharesHeld
                data.Update(price)
            # Cache the stationarity test statistics in the dict
            if data.TestStatistics:
                stationarity[symbol] = data.TestStatistics

        # Return the top 10 lowest test statistics stocks (more negative stat means higher prob to have no unit root)
        selected = sorted(stationarity.items(), key=lambda x: x[1])
        return [x[0] for x in selected[:10]]

class SymbolData:
    def __init__(self, algorithm, symbol, lookback):
        # RollingWindow to hold log price series for stationary testing
        self.window = RollingWindow[float](lookback)
        self.model = None

        # Warm up RollingWindow
        history = algorithm.History[TradeBar](symbol, lookback, Resolution.Daily)
        for bar in list(history)[:-1]:
            self.window.Add(np.log(bar.Close))

    def Update(self, value):
        if value == 0: return

        # Update RollingWindow with log price
        self.window.Add(np.log(value))
        if self.window.IsReady:
            # Test stationarity for log price series by augmented dickey-fuller test
            price = np.array(list(self.window))[::-1]
            self.model = adfuller(price, regression='n', autolag='BIC')

    @property
    def TestStatistics(self):
        return self.model[0] if self.model else None

class MeanReversionAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2019, 4, 8)  # Set Start Date
        self.SetEndDate(2023, 1, 1)
        self.SetCash(1000000)  # Set Strategy Cash
                
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)

        self.UniverseSettings.Resolution = Resolution.Minute
        self.SetUniverseSelection(StationarySelectionModel(self, "SPY", universe_settings=self.UniverseSettings))

        self.AddAlpha(ConstantAlphaModel(InsightType.Price, InsightDirection.Up, timedelta(1)))
        self.SetPortfolioConstruction(MeanReversionPortfolioConstructionModel())

        self.SetWarmUp(timedelta(90))

algorithm = MeanReversionAlgorithm()

# Set up ETF and lookback
etf = "SPY"
lookback = 10

# Create an instance of the StationarySelectionModel
stationary_model = StationarySelectionModel(algorithm, etf, lookback)

# Get ETF constituents and apply the selection model
constituents = algorithm.Securities.Values
selected_symbols = stationary_model.ETFConstituentsFilter(constituents)

constituents

# Create a DataFrame to store test statistics
data = {'Symbol': [], 'Test Statistics': []}

# Populate the DataFrame with test statistics
for symbol in selected_symbols:
…test_statistics_table
"""

In [1]:
# Import necessary libraries
import yfinance as yf
import pandas as pd
import numpy as np
from statsmodels.tsa.stattools import adfuller

# Function to conduct ADF test for stationarity
def test_stationarity(price_series):
    result = adfuller(price_series, regression='n', autolag='BIC')
    return result[0] if result else None

# Function to fetch historical stock prices using yfinance
def get_historical_prices(symbol, start_date, end_date):
    stock_data = yf.download(symbol, start=start_date, end=end_date)
    return stock_data['Close']

# Get SPY constituents
spy_tickers = pd.read_html('https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]['Symbol'].tolist()

# Choose a subset of tickers (e.g., first 10) for demonstration purposes
selected_tickers = spy_tickers + ["SPY"]

# Define the date range for historical prices
start_date = '2023-01-01'
end_date = '2024-01-01'

# Create a DataFrame to store test statistics
data = {'Symbol': [], 'Test Statistics': []}

# Fetch historical prices and conduct ADF test for each stock
for symbol in selected_tickers:
    try:
        prices = get_historical_prices(symbol, start_date, end_date)
        test_statistic = test_stationarity(np.log(prices))

        data['Symbol'].append(symbol)
        data['Test Statistics'].append(test_statistic)
    except Exception as e:
        print(f"Error fetching data for {symbol}: {e}")

# Create a table from the DataFrame
test_statistics_table = pd.DataFrame(data)

# Display the table
test_statistics_table = test_statistics_table.sort_values(by='Test Statistics', ascending=True)

# Display the sorted table
print(test_statistics_table.head(20))

In [6]:
test_statistics_table = test_statistics_table.sort_values(by='Test Statistics', ascending=True)

# Display the sorted table
print(test_statistics_table.head(20))