In [1]:
#importing packages
import numpy as np
import pandas as pd
import yfinance as yf
import yahoo_fin.stock_info as si
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint
import scipy.optimize as spop
import matplotlib.pyplot as plt
import seaborn
%matplotlib inline
# https://www.youtube.com/watch?v=jvZ0vuC9oJk&list=PLE4a3phdCOauXaK3f1QMI_fEIqEYTsH9I&index=3
# https://www.machinelearningplus.com/time-series/augmented-dickey-fuller-test/
pd.set_option('display.max_columns', 10)
pd.set_option('display.max_rows', 10)
from statsmodels.tsa.stattools import adfuller
from pykalman import KalmanFilter
from scipy import poly1d
import warnings
warnings.filterwarnings('ignore') 

In [2]:
def main():
    
    df['fft price'] = 0.0
    df['net_return'] = 0.0
    df['long signal'] = 0
    df['kf price'] = 0.0
    
    signal = 0
    global delta
    global theta
    global amplitude
    global freq
    global sample1
    global sample2

    # do FFT stock price perdiction on every day
    for i in range(window+2,len(df)):  ## remember to change it back to full len(df) ## need +2 as it's min for fft transform

        sample1 = df[i-window:i].copy()

        # do FFT stock price perdiction on every day

        delta = np.array([])
        theta = np.array([])
        amplitude = np.array([])
        freq = np.array([])
        delta = sample1['price'].diff()
        delta = delta[1:]
        sp = np.fft.fft(delta.values) # fft must be done without and NaN
        theta = np.arctan(sp.imag/sp.real) # phase in polar form
        amplitude = np.sqrt(sp.real**2 + sp.imag**2)/(window/2)# we only care about positive absolute amplitude, therefore 1/2
        freq = np.fft.fftfreq(sp.size) # cycles per sample spacing
        std_values = []
        rmse_values = []
        # loop thru and try diff std filter thresholds
        for j in np.linspace(0,4,20): # 100 steps, infer step size
            std_values.append(j)
            rmse_values.append(fft_std_filter(j))
        #print(rmse_values)
        idx = np.array(rmse_values).argmin()
        minSTD = std_values[idx]
        #print('minSTD is '+str(minSTD))
        minRMSE = rmse_values[idx]
        #print('minRMSE is '+str(minRMSE))
        fft_std_filter(minSTD)
        df['fft price'][i] = regression[-1]



        # Use the observed values of the price to get a rolling mean
        state_means, _ = kf.filter(df['fft price'][:i]) # ignore cov so repalce with _
        df['kf price'][i] = state_means[-1]
        
        sample2 = df[i-window:i].copy()

        X = sm.add_constant(sample2['kf price'])
        results = sm.OLS(sample2['price'],sample2['kf price']).fit()
        beta = results.params['kf price']
        #print(f'beta is {beta}')
        sample2['cointegration'] = sample2['price'] - beta * sample2['kf price'] 
        # this is the noise should be mean reverting, spread is scaled by beta rather than simple difference
        result = adfuller(sample2['cointegration'], autolag='AIC')
        result2 = coint(beta*sample2['kf price'],sample2['price'])
        
        if i == window+2: # starting point
            old_signal = 0 # the long short signal
        if result[0] > result[4]['10%'] or result2[0] > result2[2][2]: # nothing happens
            signal = 0
            df['net_return'].iloc[i,:] = 0
        elif result[0] < result[4]['10%'] and result2[0] < result2[2][2] \
        and (sample2['cointegration'][-1]-sample2['cointegration'].mean())/sample2['cointegration'].std() > -999:
            # trigger trading signal when test statistic is more negative for Augmanted Dickey Fuller unit roo test
            # based on time series diff beta stationarity. More negative means more likey to reject unit root exists which make nonstatioanry
            # from dickey and fuller unit root test, more negative is better
            # trigger trading signal when test statistic is more negative for Engle-Granger test (based on residual stationarity)
            # trigger signal when cointegration spread zscore above 1 std
        
            signal = np.sign(df['price'][i] - beta * df['kf price'][i]) # if - then long the stock, as undervalued
            df['long signal'].iloc[i,:] = -signal
            gross_return = -signal*df['return'][i]
            fees = fee*abs(signal - old_signal)
            net_return = gross_return - fees
            df['net_return'].iloc[i,:] = net_return
        old_signal = signal

    df['open long'] = df['long signal'].diff()
    annual_return_positive = str(round(np.power(df["net_return"].cumsum()[-1]*100,360/len(df[window+2:])),2))+'%'
    annual_sharpe_positive = round(np.power(df["net_return"][window:].mean()/df["net_return"][window:].std(),360/len(df[window+2:])),2)
    annual_return_negative = str(round(df["net_return"].cumsum()[-1]*100,2))+'%'
    annual_sharpe_negative = round((df["net_return"][window:].mean()/df["net_return"][window:].std()),2)
    max_drawdown = str(round(df["net_return"].cumsum().min()*100,2))+'%'
    number_of_trades = int(np.power(np.count_nonzero(df["open long"]!=0),360/len(df[window+2:])))

    if df["net_return"].cumsum()[-1] > 0:
        return stock,window,annual_return_positive,annual_sharpe_positive,max_drawdown,number_of_trades
    elif df["net_return"].cumsum()[-1] == 0:
        return stock,window,annual_return_negative,0,max_drawdown,number_of_trades
    elif df["net_return"].cumsum()[-1] < 0:
        return stock,window,annual_return_negative,annual_sharpe_negative,max_drawdown,number_of_trades

In [3]:
def fft_std_filter(std_value):
    # for the variables can be used outside function
    global regressionDelta
    global regression
    regressionDelta = 0
    regression = 0
    #Getting dominant values based on std_value
    meanAmp = amplitude.mean()
    stdAmp = amplitude.std()
    dominantAmp = list(filter(lambda x : x > (std_value*stdAmp + meanAmp),amplitude))
    dominantTheta = theta
    #Calculating Regression Delta
    for n in range(len(dominantAmp)):
        shift = dominantTheta[n]
        regressionDelta += dominantAmp[n] * np.cos(n * np.array(range(window)) + shift) # adding up till len(sample1)
    #Converting Delta Time to Time at start value of real data
    startValue = sample1['price'][0]
    regression = startValue + np.cumsum(regressionDelta)
    #Calculating RMSE
    rmse = np.sqrt(np.mean((sample1['price'].values - regression)**2))
    #if np.isnan(rmse) or len(regression)==1:
    if np.isnan(rmse):
        rmse = 10000000000000
    return rmse

In [4]:
# Construct a Kalman filter
kf = KalmanFilter(observation_matrices = [1], # this tells us the next measurement we should expect given the predicted next state
                  # or this can be random walk if modeling a fairly stable system so next move would be the same 1*
                  # observation matrix will dot with transition matrix
                  observation_covariance=1, # assume price has variance 1 under R.W model (hard to know)
                  # also this represents the error of the guesses
                  initial_state_mean = 0, # guess 0 for 1d
                  initial_state_covariance = 1, # assume price move as our model says, 1 for identity matrix. error for guess
                  transition_matrices = [1], # how state evolves (identity matrix if R.W)
                  transition_covariance=.01)# this is the kalman gain or err term for transition matrix: 
                  # (Err_state)/(Err_state+Err_measure)
                  # important to keep track this as new measurements come in, also hard to know
                  # bigger this value, more overfitting
                  # close to 1 means stock price is accurate - weigh more on recent event
                  # close to 0 means stock price is not accurate - weigh more on historical event

In [None]:
%%time

#specifying parameters
start = '2019-01-01'
end = '2021-01-01'
fee = 0.001
dow_list = si.tickers_dow()
result = pd.DataFrame([],columns=['stock','window','annual return','annual sharpe','max drawdown','number of trades'])


#for i in ['AAPL','TSLA','JPM']:
for i in dow_list:
    for j in np.linspace(start=30,stop=200,num=3):
        window = int(j)
        stock = i
        #retrieving data
        df = pd.DataFrame()
        returns = pd.DataFrame()
        prices = yf.download(i, start, end)
        df['price'] = prices['Adj Close']
        #df['return'] = prices['Adj Close'].pct_change().shift(-1)
        df['return'] = np.log(prices['Adj Close']).diff().shift(-1)
        # important! need to match today's price (or signal) with tomorrow's return in the same df row for backtesting
        df = df[:-1]
        result = result.append(pd.Series(data=main(),index=result.columns),ignore_index=True)

result

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%********

In [None]:
pd.set_option('display.max_rows',100)
result