In [16]:
import vectorbt as vbt
import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from scipy.optimize import least_squares
from sklearn.metrics import r2_score
import warnings
from lppls import lppls
import time


warnings.filterwarnings('ignore')

# Downloading the list of tickers in the S&P 500, removing BRK.B and BF.B as these always cause issues. 
# stocks100 is simply the first 100 stocks (just used for testing)
url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
tables = pd.read_html(url)
table = tables[0]
stocks = table['Symbol'].tolist()
stocks.remove('BRK.B')
stocks.remove('BF.B')
stocks100 = stocks[:100]
stocks10 = stocks[:10]

# The start and end dates for the backtest. Data will be fetched from a certain time before that for modelling purposes.
start_date='2022-10-15'
end_date='2023-06-15'


# Loss function to calculate value of r^2
def loss(params, x, y):
    return func(x, *params) - y

# Exponential function for modelling
def func(x, a, b, c):
    return a * np.exp(b * x) + c

# This is the actual indicator. Currently, it is based on the modelling of the log of 5-day moving average of the price data
# model_window days before the current date to an exponential curve. This modelling happens every 30 days (but can be modified).
# This is slightly arbitrary, but is just a starting point I chose, where the threshold depends on the multiple of the
# model's exponent and its r^2 value, ie how closely it resembles a faster-than-exponential growth. If the model meets the
# requirements, we fit it to the LPPL model. I have tried doing the normal curve_fit in this case, but that takes an 
# incredibly long time. However, there is an lppl library in Python which is very efficient and, having visualised it, 
# seems to be quite good at fitting. There is a parameter, max_searches, which can be adjusted to prevent overfitting etc.
# Then, if the predicted bubble burst is more than 14 days into the future, we buy the stock, and sell it 14 days before tc.
# We sell everything at the end of the window.
def exp_growth_and_r2(close, model_window=120, r2_b_threshold=0.0005, days_before_tc=7, max_searches=20):
    try:
        output = np.zeros(len(close))
        for i in range(model_window, len(close), 30):
            historical_data = close.iloc[i-model_window:i]
            historical_data = historical_data.rolling(window = 5).mean().dropna()

            historical_data.dropna(inplace=True)

            xdata = (historical_data.index - historical_data.index[0]).days.values
            ydata = historical_data.values
            log_ydata = np.log(ydata + 1).ravel()

            res = least_squares(loss, x0=(1, 0.01, 1), args=(xdata, log_ydata))

            fitted_y = func(xdata, *res.x).ravel()

            a, b, c = res.x
            r2 = r2_score(log_ydata, fitted_y)
            
            if r2 * b > r2_b_threshold:
                time = [pd.Timestamp.toordinal(t) for t in historical_data.index]

                price = np.log(historical_data.values).ravel()

                observations = np.array([time, price])

                tcs = []

                for _ in range(5):
                    lppls_model = lppls.LPPLS(observations=observations)
                    tc, m, w, a, b, c1, c2, phi, O, D = lppls_model.fit(max_searches)
                    tcs.append(tc)

                tc = np.median(tcs)
                tc = tc - datetime.toordinal(historical_data.index[-1])
                
                if tc > 14:
                    output[i] = 1
                    if i+int(round(tc))-14 < len(output):
                        output[i+int(round(tc))-14]=-1
                
        output[-1] = -1
        return output
    
    except Exception as e:
        print(f"Couldn't process stock. Error: {str(e)}")

# Creating the indicator
BuyIndicator = vbt.IndicatorFactory(
    input_names=['close'],
    param_names=['model_window', 'r2_b_threshold', 'days_before_tc', 'max_searches'],
    output_names=['buy_indicator']
).from_apply_func(exp_growth_and_r2,
                model_window=90, 
                r2_b_threshold=0.001, 
                days_before_tc=14, 
                max_searches=25,
                keep_pd=True,
                per_column=True)

In [15]:
# We want to start the data fetch model_window days before the start of the backtest so we can run modelling on the first data
fetch_start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=90)
fetch_start_date = fetch_start_date_dt.strftime("%Y-%m-%d")

start_time = time.time()

# Downloading stock data
data = yf.download(stocks, start=fetch_start_date, end=end_date)['Close']
data = data.ffill().bfill()

# Not particularly important- our starting capital for assigned to each stock
initial_capital = 1000000

# Running the data through our indicator- days when we buy will have a value of 1, and when we sell -1. 0 otherwise
buy_indicator = BuyIndicator.run(data, model_window=120, r2_b_threshold=0.001, days_before_tc=7, max_searches=20)

entry_signal = buy_indicator.buy_indicator == 1
exit_signal = buy_indicator.buy_indicator == -1

# Dropping some extra columns to make it match data
entry_signal.columns = entry_signal.columns.droplevel(['custom_model_window', 'custom_r2_b_threshold', 'custom_days_before_tc', 'custom_max_searches'])

# Cropping the data for the backtest to make it faster
data = data.loc[start_date:end_date]
entry_signal = entry_signal.loc[start_date:end_date]
exit_signal = exit_signal.loc[start_date:end_date]

# # Assigning an equal proportion of the capital to each stock, investing it all when we get an entr signal
num_stocks_to_buy = entry_signal.sum(axis=1)
weights = entry_signal.div(num_stocks_to_buy, axis=0)
weights = weights.mul(initial_capital / data).fillna(0)

# Creating the portfolio. Can change fees as desired
portfolio = vbt.Portfolio.from_signals(
    data,
    entry_signal,
    exit_signal,
    init_cash=initial_capital,
    size=weights,
    size_type='Amount',
    fees=0.001
)

# Printing stats- can add extra info about the portfolio at this point
print(portfolio.stats(group_by = True))

total_portfolio_value = portfolio.value().sum(axis=1)
total_portfolio_value.vbt.plot().show()

end_time = time.time()
print(f'Time taken: {end_time - start_time} seconds')

[*********************100%***********************]  501 of 501 completed
Start                         2022-10-17 00:00:00
End                           2023-06-14 00:00:00
Period                                        166
Start Value                           501000000.0
End Value                        501080493.297449
Total Return [%]                         0.016067
Benchmark Return [%]                    13.917037
Max Gross Exposure [%]                   0.508742
Total Fees Paid                       6507.342288
Max Drawdown [%]                         0.021767
Max Drawdown Duration                        66.0
Total Trades                                  283
Total Closed Trades                           283
Total Open Trades                               0
Open Trade PnL                                0.0
Win Rate [%]                            51.236749
Best Trade [%]                         107.859344
Worst Trade [%]                        -32.857602
Avg Winning Trade [%]      

Time taken: 855.5060300827026 seconds


In [20]:
# We want to start the data fetch model_window days before the start of the backtest so we can run modelling on the first data
fetch_start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=90)
fetch_start_date = fetch_start_date_dt.strftime("%Y-%m-%d")

start_time = time.time()

# Downloading stock data
data = yf.download(stocks10, start=fetch_start_date, end=end_date)['Close']
data = data.ffill().bfill()

# Not particularly important- our starting capital for assigned to each stock
initial_capital = 1000000

# Running the data through our indicator- days when we buy will have a value of 1, and when we sell -1. 0 otherwise
buy_indicator = BuyIndicator.run(data, model_window=120, r2_b_threshold=0.001, days_before_tc=7, max_searches=20)

entry_signal = buy_indicator.buy_indicator == 1
exit_signal = buy_indicator.buy_indicator == -1

# Cropping the data for the backtest to make it faster
data = data.loc[start_date:end_date]
entry_signal = entry_signal.loc[start_date:end_date]
exit_signal = exit_signal.loc[start_date:end_date]

def custom_weights(data, entry_signal, exit_signal, initial_capital):
    
    # Dropping unnecessary levels from column index of entry_signal and exit_signal
    entry_signal.columns = entry_signal.columns.droplevel(['custom_model_window', 'custom_r2_b_threshold', 'custom_days_before_tc', 'custom_max_searches'])
    exit_signal.columns = exit_signal.columns.droplevel(['custom_model_window', 'custom_r2_b_threshold', 'custom_days_before_tc', 'custom_max_searches'])
    
    cash_available = pd.Series(data=[initial_capital]*len(data), index=data.index)

    cash_after_sell = cash_available + (exit_signal * data).sum(axis=1)

    cash_after_buy = cash_after_sell - (entry_signal * data).sum(axis=1)

    weights = entry_signal.div(cash_after_buy, axis=0)

    return weights

# Call the function
weights = custom_weights(data, entry_signal, exit_signal, initial_capital)

# Creating the portfolio. Can change fees as desired
portfolio = vbt.Portfolio.from_signals(
    data,
    entry_signal,
    exit_signal,
    init_cash=initial_capital,
    size=weights,
    size_type='TargetPercent',
    fees=0.001
)

# Printing stats- can add extra info about the portfolio at this point
print(portfolio.stats(group_by = True))

total_portfolio_value = portfolio.value().sum(axis=1)
total_portfolio_value.vbt.plot().show()

end_time = time.time()
print(f'Time taken: {end_time - start_time} seconds')

[*********************100%***********************]  10 of 10 completed


ValueError: Only SizeType.Amount, SizeType.Value, and SizeType.Percent are supported

In [7]:

import vectorbt as vbt
import yfinance as yf
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from scipy.optimize import least_squares
from sklearn.metrics import r2_score
import warnings
from lppls import lppls


warnings.filterwarnings('ignore')

url = 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies'
tables = pd.read_html(url)
table = tables[0]
stocks = table['Symbol'].tolist()
stocks.remove('BRK.B')
stocks.remove('BF.B')
stocks10 = stocks[:10]

start_date='2022-10-15'
end_date='2023-06-15'


def loss(params, x, y):
    return func(x, *params) - y


def func(x, a, b, c):
    return a * np.exp(b * x) + c


def exp_growth_and_r2(close, model_window=90, r2_b_threshold=0.001, days_before_tc=14, max_searches=25):
    try:
        output = np.zeros(len(close))
        for i in range(model_window, len(close), 30):
            historical_data = close.iloc[i-model_window:i]
            historical_data = historical_data.rolling(window = 5).mean().dropna()

            historical_data.dropna(inplace=True)

            xdata = (historical_data.index - historical_data.index[0]).days.values
            ydata = historical_data.values
            log_ydata = np.log(ydata + 1).ravel()

            res = least_squares(loss, x0=(1, 0.01, 1), args=(xdata, log_ydata))

            fitted_y = func(xdata, *res.x).ravel()

            a, b, c = res.x
            r2 = r2_score(log_ydata, fitted_y)
            
            if r2 * b > r2_b_threshold:
                time = [pd.Timestamp.toordinal(t) for t in historical_data.index]

                price = np.log(historical_data.values).ravel()

                observations = np.array([time, price])

                tcs = []

                for _ in range(5):
                    lppls_model = lppls.LPPLS(observations=observations)
                    tc, m, w, a, b, c1, c2, phi, O, D = lppls_model.fit(max_searches)
                    tcs.append(tc)

                tc = np.median(tcs)
                tc = tc - datetime.toordinal(historical_data.index[-1])
                
                if tc > 14:
                    output[i] = 1
                    if i+int(round(tc))-14 < len(output):
                        output[i+int(round(tc))-14]=-1
                
        output[-1] = -1
        return output
    
    except Exception as e:
        print(f"Couldn't process stock. Error: {str(e)}")


BuyIndicator = vbt.IndicatorFactory(
    input_names=['close'],
    param_names=['model_window', 'r2_b_threshold', 'days_before_tc', 'max_searches'],
    output_names=['buy_indicator']
).from_apply_func(exp_growth_and_r2,
                model_window=90, 
                r2_b_threshold=0.001, 
                days_before_tc=14, 
                max_searches=25,
                keep_pd=True,
                per_column=True)

fetch_start_date_dt = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=90)
fetch_start_date = fetch_start_date_dt.strftime("%Y-%m-%d")

data = yf.download(['ANSS', 'GOOGL', 'AXON', 'ETN', 'HWM', 'MOS', 'PGR', 'PWR', 'REGN', 'TT'], start=fetch_start_date, end=end_date)['Close']
data = data.ffill().bfill()

initial_capital = 1000000

import itertools

# Define your parameter space
param_space = {
    'model_window': [60, 90, 120], 
    'r2_b_threshold': [0.0005, 0.001, 0.0015],
    'days_before_tc': [7, 14, 21],
    'max_searches': [20, 25, 30]
}

# Generate all combinations of parameters
param_combinations = list(itertools.product(*param_space.values()))

# Generate portfolios for all combinations
portfolio_list = []
for params in param_combinations:
    params_dict = dict(zip(param_space.keys(), params))
    
    buy_indicator = BuyIndicator.run(data, **params_dict)
    entry_signal = buy_indicator.buy_indicator == 1
    exit_signal = buy_indicator.buy_indicator == -1


    portfolio = vbt.Portfolio.from_signals(
        data,
        entry_signal,
        exit_signal,
        init_cash=initial_capital,
        fees=0.001
    )

    portfolio_list.append((params_dict, portfolio))

# Find the portfolio with the highest total return
best_portfolio_params, best_portfolio = max(portfolio_list, key=lambda x: x[1].total_return())

print("Best parameters:")
print(best_portfolio_params)


[*********************100%***********************]  10 of 10 completed


ValueError: Can only compare identically-labeled Series objects

In [8]:
best_portfolio_params, best_portfolio = max(portfolio_list, key=lambda x: x[1].final_value().iloc[-1])

print("Best parameters:")
print(best_portfolio_params)

Best parameters:
{'model_window': 120, 'r2_b_threshold': 0.0005, 'days_before_tc': 7, 'max_searches': 20}
