In [67]:
import yfinance as yf
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
import matplotlib.pyplot as plt
from arch import arch_model
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.stats.diagnostic import acorr_ljungbox
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout

symbol = 'AAPL'
start_date = '2020-01-01'
end_date = '2023-01-01'

stock_data = yf.download(symbol, start=start_date, end=end_date)

[*********************100%%**********************]  1 of 1 completed


STANDARD ROLLING VS EWMA VOLATILITY

In [68]:
# Calculate volatility using rolling windows and EWMA
window = 21 # for rolling calculation
span = 21 # for EWMA

# Calculate daily return for each consecutive day
stock_data['Daily Return'] = stock_data['Adj Close'].pct_change()

# Calculate annualized rolling volatility
stock_data['Rolling Volatility'] = stock_data['Daily Return'].rolling(window=window).std() * (252**0.5)

# Calculate Exonentially Weighted Moving Average (EWMA) Volatility; weights not normalized
stock_data['EWMA Volatility'] = stock_data['Daily Return'].ewm(span=span, adjust=False).std() * (252**0.5)

GARCH/EGARCH

In [69]:
# Fit a GARCH(1,1) and EGARCH(1,1) model
returns = stock_data['Daily Return'].dropna()

#GARCH(1,1)
garch_model = arch_model(returns, vol='Garch', p=1, q=1, rescale='false')
garch_results = garch_model.fit(disp="off")

# EGARCH(1,1)
egarch_model = arch_model(returns, vol='EGarch', p=1, q=1, rescale='false')
egarch_results = egarch_model.fit(disp="off")

GARCH/EGARCH: AIC/BIC Values

In [70]:
# Extract AIC and BIC for GARCH
garch_aic = garch_results.aic
garch_bic = garch_results.bic

# Extract AIC and BIC for EGARCH
egarch_aic = egarch_results.aic
egarch_bic = egarch_results.bic

print(f"GARCH(1,1) AIC: {garch_aic}, BIC: {garch_bic}")
print(f"EGARCH(1,1) AIC: {egarch_aic}, BIC: {egarch_bic}")

GARCH(1,1) AIC: -3679.9250864093046, BIC: -3661.4182154123087
EGARCH(1,1) AIC: -3680.1858665208547, BIC: -3661.6789955238587


GARCH/EGARCH: Out of Sample Forecasting Accuracy with MSE

In [71]:
# Split the data into training and test sets
split_point = int(len(stock_data) * 0.8)
train_data = stock_data[:split_point].copy()
test_data = stock_data[split_point:].copy()

# Preprocessing
train_returns = train_data['Daily Return'].dropna().replace([np.inf, -np.inf], np.nan).dropna()
test_returns = test_data['Daily Return'].dropna().replace([np.inf, -np.inf], np.nan).dropna()

# Fit the GARCH and EGARCH models on the training data
garch_model = arch_model(train_returns, vol='Garch', p=1, q=1, rescale='false')
garch_results = garch_model.fit(disp="off")

egarch_model = arch_model(train_returns, vol='EGarch', p=1, q=1, rescale='false')
egarch_results = egarch_model.fit(disp="off")

# Forecast GARCH volatility (analytic method: use model's equation)
garch_forecast = garch_results.forecast(horizon=len(test_returns))
garch_forecast_volatility = np.sqrt(garch_forecast.variance.values[-1, :])

# Forecast EGARCH volatility (simulation method: use average of Monte Carlo simulations)
egarch_forecast = egarch_results.forecast(horizon=len(test_returns), method='simulation')
egarch_forecast_volatility = np.sqrt(egarch_forecast.variance.values[-1, :])

# Observed volatility (use rolling stdev)
actual_volatility = test_returns.rolling(window=21).std() * np.sqrt(252)
actual_volatility = actual_volatility.dropna()

# Align lengths
garch_forecast_volatility = garch_forecast_volatility[-len(actual_volatility):]
egarch_forecast_volatility = egarch_forecast_volatility[-len(actual_volatility):]

# Calculate MSE for GARCH
garch_mse = mean_squared_error(actual_volatility, garch_forecast_volatility)

# Calculate MSE for EGARCH
egarch_mse = mean_squared_error(actual_volatility, egarch_forecast_volatility)

print(f"GARCH(1,1) MSE: {garch_mse}")
print(f"EGARCH(1,1) MSE: {egarch_mse}")


GARCH(1,1) MSE: 0.11414691547646583
EGARCH(1,1) MSE: 0.11380035779637658


Preprocessing for LSTM and SVR

In [72]:
# Used to scale to N(0,1)
scaler = StandardScaler()
# Reshape to vector
X_train = scaler.fit_transform(train_returns.values.reshape(-1, 1))
X_test = scaler.transform(test_returns.values.reshape(-1, 1))

# Calculate actual volatility (use rolling stdev)
actual_volatility = test_returns.rolling(window=21).std() * np.sqrt(252)
actual_volatility = actual_volatility.dropna()

LSTM

In [73]:
# Transform training and test data into sequences of 21 time steps (inputs) and corresponding next values (outputs)
def create_lstm_input(data, time_step=21):
    X, y = [], []
    for i in range(len(data) - time_step - 1):
        X.append(data[i:(i + time_step), 0])
        y.append(data[i + time_step, 0])
    return np.array(X), np.array(y)

time_step = 21
X_train_lstm, y_train_lstm = create_lstm_input(X_train, time_step)
X_test_lstm, y_test_lstm = create_lstm_input(X_test, time_step)

# Reshape for LSTM (# samples, # time steps, 1 feature)
X_train_lstm = X_train_lstm.reshape(X_train_lstm.shape[0], X_train_lstm.shape[1], 1)
X_test_lstm = X_test_lstm.reshape(X_test_lstm.shape[0], X_test_lstm.shape[1], 1)

# Implement LSTM: 2 LSTM layers, 2 dropout layers, 1 dense layer
lstm_model = Sequential()
lstm_model.add(LSTM(units=50, return_sequences=True, input_shape=(time_step, 1)))
lstm_model.add(Dropout(0.2))
lstm_model.add(LSTM(units=50, return_sequences=False))
lstm_model.add(Dropout(0.2))
lstm_model.add(Dense(units=1))

# Set Adam as optimizer, set MSE as loss function
lstm_model.compile(optimizer='adam', loss='mean_squared_error')

# Train LSTM model
lstm_model.fit(X_train_lstm, y_train_lstm, epochs=10, batch_size=32, verbose=0)

# Forecast with LSTM
lstm_predictions = lstm_model.predict(X_test_lstm)
lstm_predictions = scaler.inverse_transform(lstm_predictions)

  super().__init__(**kwargs)


[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 22ms/step


SVR

In [74]:
# Implement SVR
X_train_svr = X_train[time_step:-1]  # Removing the first `time_step` samples to align with y_train

# Train the SVR model
svr_model = SVR(kernel='rbf', C=100, gamma=0.1, epsilon=.01)
svr_model.fit(X_train_svr, train_returns[time_step+1:])

# Forecast with SVR
svr_predictions = svr_model.predict(X_test)
svr_predictions = scaler.inverse_transform(svr_predictions.reshape(-1, 1))

Compare LSTM and SVR

In [75]:
# Align lengths of predictions and actual volatility
min_length = min(len(lstm_predictions), len(svr_predictions), len(actual_volatility))
lstm_predictions = lstm_predictions[-min_length:]
svr_predictions = svr_predictions[-min_length:]
actual_volatility = actual_volatility[-min_length:]

# Calculate MSE for LSTM and SVR
lstm_mse = mean_squared_error(actual_volatility, lstm_predictions)
svr_mse = mean_squared_error(actual_volatility, svr_predictions)

print(f"LSTM MSE: {lstm_mse}")
print(f"SVR MSE: {svr_mse}")

LSTM MSE: 0.1298044289626367
SVR MSE: 0.12963725230596773
