In [None]:
import pandas as pd
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, LSTM, Input, Activation, concatenate, Bidirectional 
from keras import Model
from keras import optimizers
from tensorflow import keras

In [None]:
data_full = pd.read_csv('Intraday Volatility Dataset.csv')

In [None]:
X = data_full[["Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility"]]
Y = data_full["target"]
data_set = data_full[["Date", "Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility", "target"]]

In [None]:
splitlimit = int(len(data_set)*0.8)
training_features, test_data = data_set[:splitlimit], data_set[splitlimit:]

In [None]:
#Code to remove all outliers from the dataset

training_features["volatility_rolling_median"] = training_features["Daily Volatility"].rolling(window=41, center=True, min_periods=1).median()
training_features["return_squared_rolling_median"] = training_features["Return_Squared"].rolling(window=41, center=True, min_periods=1).median()
training_features["return_rolling_median"] = training_features["Return"].rolling(window=41, center=True, min_periods=1).median()
training_features["EMAF_rolling_median"] = training_features["EMAF"].rolling(window=41, center=True, min_periods=1).median()
training_features["volume_rolling_median"] = training_features["Volume"].rolling(window=41, center=True, min_periods=1).median()

training_features["volatility minus median"] = (training_features["Daily Volatility"] - training_features["volatility_rolling_median"]).abs()
training_features["return_squared minus median"] = (training_features["Return_Squared"] - training_features["return_squared_rolling_median"]).abs()
training_features["return minus median"] = (training_features["Return"] - training_features["return_rolling_median"]).abs()
training_features["EMAF minus median"] = (training_features["EMAF"] - training_features["EMAF_rolling_median"]).abs()
training_features["volume minus median"] = (training_features["Volume"] - training_features["volume_rolling_median"]).abs()

volatility_outliers_removed = training_features[~(training_features['volatility minus median'] > 5 * training_features['volatility minus median'].median())]
all_outliers_removed = volatility_outliers_removed[~(volatility_outliers_removed['return_squared minus median'] > 5 * volatility_outliers_removed['return_squared minus median'].median())]
all_outliers_removed = all_outliers_removed[~(all_outliers_removed['return minus median'] > 5 * volatility_outliers_removed['return minus median'].median())]
all_outliers_removed = all_outliers_removed[~(all_outliers_removed['EMAF minus median'] > 5 * volatility_outliers_removed['EMAF minus median'].median())]
all_outliers_removed = all_outliers_removed[~(all_outliers_removed['volume minus median'] > 5 * volatility_outliers_removed['volume minus median'].median())]

In [None]:
X_cleaned = all_outliers_removed[["Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility"]]
Y_cleaned = all_outliers_removed["target"]
data_set_cleaned = all_outliers_removed[["Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility", "target"]]

In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
training_data_features_scaled = scaler.fit_transform(X_cleaned)
data_set_scaled = scaler.fit_transform(data_set_cleaned)

In [None]:
Z = []

backcandles = 10

for j in range(5):
    Z.append([])
    for i in range(backcandles, training_data_features_scaled.shape[0]):
        Z[j].append(training_data_features_scaled[i-backcandles:i, j])

In [None]:
Z = np.moveaxis(Z, [0], [2])
Z, yi = np.array(Z), np.array(data_set_scaled[backcandles-1:, -1])
y_final = np.reshape(yi,(len(yi),1))
y_final = y_final[1:]

In [None]:
#Random Search and Walk-Forward Cross-Validation


from keras.models import load_model
from sklearn.model_selection import ParameterSampler
import numpy as np
from sklearn.metrics import accuracy_score
from sklearn.model_selection import TimeSeriesSplit
from keras.wrappers.scikit_learn import KerasClassifier
from keras.models import Model
from keras.layers import Input, LSTM, Dense, Activation
from sklearn.model_selection import cross_val_score, RandomizedSearchCV
import numpy as np
from scipy.stats import randint, uniform


def create_model(units=80):
    lstm_input = Input(shape=(backcandles, 5), name='lstm_input')
    inputs = LSTM(units, name='first_layer')(lstm_input)
    inputs = Dense(1, name='dense_layer')(inputs)
    output = Activation('sigmoid', name='output')(inputs)
    model = Model(inputs=lstm_input, outputs=output)
    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
    return model


param_dist = {
    'units': randint(50, 150),  
    'batch_size': [16, 32, 64],  
    'epochs': randint(10,30), 
}


best_score = -np.inf  
best_params = None  
best_model_path = "best_model.h5" 
n_iter = 1  
tscv = TimeSeriesSplit(n_splits=5)
best_score = -np.inf 
best_params = None  

for params in ParameterSampler(param_dist, n_iter=n_iter):
    fold_scores = []  
    
    for train_index, test_index in tscv.split(Z):
       
    
        X_train_fold, X_val_fold = Z[train_index], Z[test_index]
        
        y_train_fold, y_val_fold = y_final[train_index], y_final[test_index]
        
        model = create_model(units=params['units'])
        
        model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])
       
        model.fit(X_train_fold, y_train_fold, epochs=params['epochs'], batch_size=params['batch_size'], verbose=1)
        
        _, score = model.evaluate(X_val_fold, y_val_fold, verbose=0)
        
        fold_scores.append(score)
    

    avg_score = np.mean(fold_scores)

    if avg_score > best_score:
        best_score = avg_score
        best_params = params
        model.save(best_model_path)

In [None]:
best_model = load_model(best_model_path)

In [None]:
X_test = test_data[["Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility"]]
Y_test = test_data["target"]
test_dataset = test_data[["Volume", "Return", "Return_Squared", "EMAF", "Daily Volatility", "target"]]

In [None]:
#Scaling test data
test_scaled = scaler.fit_transform(test_dataset)
X_test_scaled = scaler.fit_transform(X_test)

In [None]:
#reconstructing test data 


T = []

backcandles = 10

for j in range(5):
    T.append([])
    for i in range(backcandles, X_test_scaled.shape[0]):
        T[j].append(X_test_scaled[i-backcandles:i, j])
        
        
T = np.moveaxis(T, [0], [2])
T, yi_test = np.array(T), np.array(test_scaled[backcandles-1:, -1])
y_final_test = np.reshape(yi_test,(len(yi_test),1))
y_final_test = y_final_test[1:]

In [None]:
#lstm in sample 
from sklearn.metrics import confusion_matrix

validation_predictions = best_model.predict(Z)
validation_predicted_classes = (validation_predictions > 0.5).astype(int)
dataframe_val = pd.DataFrame(y_final, columns = ["target"])
dataframe_val["predicted"] = validation_predicted_classes
cm = confusion_matrix(dataframe_val['predicted'], dataframe_val['target'])
print(cm)

In [None]:
#lstm out of sample
test_predictions = best_model.predict(T)
test_predicted_classes = (test_predictions > 0.5).astype(int)
dataframe = pd.DataFrame(y_final_test, columns = ["target"])
dataframe["predicted"] = test_predicted_classes
cm = confusion_matrix(dataframe['predicted'], dataframe['target'])
print(cm)

In [None]:
#ROC Curve 
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from matplotlib import cm
import numpy as np


false_pos_rate, true_pos_rate, thresholds = roc_curve(dataframe['target'], test_predictions)
roc_auc = auc(false_pos_rate, true_pos_rate)


# Making plot
plt.figure(figsize=(10, 8))
cmap = cm.get_cmap('viridis')  
sc = plt.scatter(fpr, tpr, c=thresholds, cmap=cmap, edgecolor='none', s =70)


# Plot ROC
plt.plot(fpr, tpr, color='black', lw=1)
plt.plot([0, 1], [0, 1], color='black', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.gca().tick_params(axis='x', labelsize=15)
plt.gca().tick_params(axis='y', labelsize=15)
plt.xlabel('1-Specificity', fontsize=20)
plt.ylabel('Sensitivity', fontsize=20)


cbar = plt.colorbar(sc)
cbar.set_label('Threshold', size=18)
cbar.ax.tick_params(labelsize=15)


plt.show()
