In [None]:
from matplotlib import pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import mutual_info_classif
from pm_utils import get_data
from pm_utils import create_sliding_windows_with_labels
from pm_utils import NDStandardScaler
from keras.src.callbacks import EarlyStopping
import keras
from keras import Input, Model
from sklearn.pipeline import Pipeline
from sklearn.model_selection import TimeSeriesSplit, RandomizedSearchCV
from scikeras.wrappers import KerasClassifier
from keras.layers import LSTM, Dense, Dropout, SimpleRNN, GRU, Conv1D, GlobalMaxPool1D, Flatten
from keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from sklearn.metrics import make_scorer, accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
import numpy as np

random_seed = 812
keras.utils.set_random_seed(random_seed)
np.random.seed(random_seed)


In [None]:
train_percent = 0.9
validation_percent = 0.1

T = 6
T_shift = 4
path = 'C:/Users/mkedzia/Desktop/praca-magisterska/binance-data/aaa/data/BNB/BNB-merged-data.csv'

columns, input_data, targets = get_data(path=path, base_m=True, trade_metrics=True, trade_metrics_m=False,
                                        google=True, t=T, t_shift=T_shift)
targets = (targets >= 0.0)

k_features = 18

mi = mutual_info_classif(input_data, targets, random_state=0)

sorted_indices = np.argsort(mi)[::-1]
sorted_features = np.array(columns)[sorted_indices]
sorted_mi = mi[sorted_indices]

selector = SelectKBest(mutual_info_classif, k=k_features)
selected_features = selector.fit_transform(input_data, targets)
selected_mask = selector.get_support()
# input_data = selected_features

selected_features_labels = [col for col, is_selected in zip(columns, selected_mask) if is_selected]
rejected_features_labels = [col for col, is_selected in zip(columns, selected_mask) if not is_selected]

print("Wybrane cechy:", selected_features_labels)
print("Odrzucone cechy:", rejected_features_labels)

D = input_data.shape[1]
print(f"Input shape: {D}")

plt.figure(figsize=(12, 8))
plt.bar(sorted_features, sorted_mi, color='skyblue')
plt.xlabel('Cecha')
plt.ylabel('Informacja wzajemna')
plt.title('Wartości informacji wzajemnej dla każdej cechy')
plt.xticks(rotation=90)
plt.show()

In [None]:
train_ = int(len(input_data) * train_percent)
validation_ = train_ + int(len(input_data) * validation_percent)

X_train, Y_train = create_sliding_windows_with_labels(input_data[:train_], targets[:train_], T)
X_test, Y_test = create_sliding_windows_with_labels(input_data[train_:], targets[train_:], T)

In [None]:
def create_lstm_model(units=50, num_layers=1, dropout_rate=0.0, learning_rate=0.001, kernel_regularizer=0.0,
                      rnn_activation='', dense_activation='', recurrent_dropout=0.0, dropout=0.0):
    inn = Input(shape=(T, D))
    x = inn
    for i in range(num_layers):
        return_sequences = True if i < num_layers - 1 else False
        kernel_regularizer = l2(kernel_regularizer) if i == 0 else None
        x = LSTM(units, return_sequences=return_sequences, activation=rnn_activation,
                 kernel_regularizer=kernel_regularizer, dropout=dropout_rate, recurrent_dropout=recurrent_dropout)(x)
    x = Dense(units, activation=dense_activation)(x)
    x = Dropout(dropout)(x)
    x = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inn, outputs=x)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model


def create_rnn_model(units=50, num_layers=1, dropout_rate=0.0, learning_rate=0.001, kernel_regularizer=0.0,
                     rnn_activation='', dense_activation='', recurrent_dropout=0.0, dropout=0.0):
    inn = Input(shape=(T, D))
    x = inn
    for i in range(num_layers):
        return_sequences = True if i < num_layers - 1 else False
        kernel_regularizer = l2(kernel_regularizer) if i == 0 else None
        x = SimpleRNN(units, return_sequences=return_sequences, activation=rnn_activation,
                      kernel_regularizer=kernel_regularizer, dropout=dropout_rate, recurrent_dropout=recurrent_dropout)(
            x)
    x = Dense(units, activation=dense_activation)(x)
    x = Dropout(dropout)(x)
    x = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inn, outputs=x)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model


def create_gru_model(units=50, num_layers=1, dropout_rate=0.0, learning_rate=0.001, kernel_regularizer=0.0,
                     rnn_activation='', dense_activation='', recurrent_dropout=0.0, dropout=0.0):
    inn = Input(shape=(T, D))
    x = inn
    for i in range(num_layers):
        return_sequences = True if i < num_layers - 1 else False
        kernel_regularizer = l2(kernel_regularizer) if i == 0 else None
        x = GRU(units, return_sequences=return_sequences, activation=rnn_activation,
                kernel_regularizer=kernel_regularizer, dropout=dropout_rate, recurrent_dropout=recurrent_dropout)(x)
    x = Dense(units, activation=dense_activation)(x)
    x = Dropout(dropout)(x)
    x = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inn, outputs=x)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model


def create_cnn_model(units=50, num_layers=1, learning_rate=0.001, kernel_regularizer=0.0,
                     rnn_activation='', dense_activation='', dropout=0.0):
    inn = Input(shape=(T, D))
    x = inn
    for i in range(num_layers):
        kernel_regularizer = l2(kernel_regularizer) if i == 0 else None
        x = Conv1D(units, kernel_size=3, activation=rnn_activation, padding='same',
                   kernel_regularizer=kernel_regularizer)(x)

    x = GlobalMaxPool1D()(x)
    x = Flatten()(x)
    x = Dense(units, activation=dense_activation)(x)
    x = Dropout(dropout)(x)
    x = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=inn, outputs=x)
    model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
    return model


early_stopping = EarlyStopping(monitor='loss', mode='min', verbose=1, patience=5)
clf = KerasClassifier(model=create_gru_model, units=None, num_layers=None, dropout_rate=None, learning_rate=None,
                      kernel_regularizer=None, rnn_activation=None, dense_activation=None, recurrent_dropout=None,
                      dropout=None)

param_grid = {
    'clf__units': [64, 128, 192, 256],
    'clf__num_layers': [1, 2],
    'clf__dropout': [0.1, 0.2, 0.3],
    'clf__batch_size': [32, 64, 128],
    'clf__epochs': [16, 32, 64, 100],
    'clf__dense_activation': ['relu', 'sigmoid', 'softmax'],
    'clf__kernel_regularizer': [1e-3],
    'clf__learning_rate': [0.001],
    'clf__recurrent_dropout': [0.0],
    'clf__dropout_rate': [0.0],
    'clf__rnn_activation': ['tanh']
}

# clf = KerasClassifier(model=create_gru_model, units=None, num_layers=None, dropout_rate=None, learning_rate=None,
#                       kernel_regularizer=None, rnn_activation=None, dense_activation=None, recurrent_dropout=None,
#                       dropout=None)
# param_grid = {
#     'clf__units': [64, 128, 192, 256],
#     'clf__num_layers': [1, 2],
#     'clf__dropout': [0.1, 0.2, 0.3],
#     'clf__batch_size': [32, 64, 128],
#     'clf__epochs': [16, 32, 64, 100],
#     'clf__dense_activation': ['relu', 'sigmoid', 'softmax'],
#     'clf__kernel_regularizer': [1e-3],
#     'clf__learning_rate': [0.001],
#     'clf__recurrent_dropout': [0.0],
#     'clf__dropout_rate': [0.0],
#     'clf__rnn_activation': ['tanh']
# }


scoring = {
    'accuracy': make_scorer(accuracy_score),
    'precision': make_scorer(precision_score, zero_division=np.nan, average='binary'),
    'recall': make_scorer(recall_score, zero_division=np.nan, average='binary'),
    'f1_score': make_scorer(f1_score, zero_division=np.nan, average='binary'),
    'roc_auc': make_scorer(roc_auc_score)
}

pipeline = Pipeline([
    ('scaler', (NDStandardScaler(feature_range=(-1, 1)))),
    ('clf', clf)
])

random_search = RandomizedSearchCV(pipeline, param_distributions=param_grid, verbose=0, n_iter=1,
                                   cv=TimeSeriesSplit(n_splits=6),
                                   scoring=scoring, refit='accuracy', return_train_score=False,
                                   random_state=random_seed)
random_result = random_search.fit(X_train, Y_train)

print("Najlepsze parametry: %s" % random_result.best_params_)
print("Najlepsze wyniki dla accuracy: %s" % random_result.best_score_)

results = random_result.cv_results_
best_model = random_result.best_estimator_
Y_pred = best_model.predict(X_test)

best_index = random_result.best_index_
print(f"Mean test accuracy: {results['mean_test_accuracy'][best_index]:.3f}")
print(f"Mean test precision: {results['mean_test_precision'][best_index]:.3f}")
print(f"Mean test recall: {results['mean_test_recall'][best_index]:.3f}")
print(f"Mean test f1_score: {results['mean_test_f1_score'][best_index]:.3f}")
print(f"Mean test roc_auc: {results['mean_test_roc_auc'][best_index]:.3f}")

accuracy = accuracy_score(Y_test, Y_pred)
precision = precision_score(Y_test, Y_pred, average='binary', zero_division=np.nan)
recall = recall_score(Y_test, Y_pred, average='binary', zero_division=np.nan)
f1 = f1_score(Y_test, Y_pred, average='binary', zero_division=np.nan)
roc_auc = roc_auc_score(Y_test, Y_pred)

print(f"Accuracy: {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall: {recall:.3f}")
print(f"F1 Score: {f1:.3f}")
print(f"ROC/AUC: {roc_auc:.3f}")

In [None]:
for mean, std, params in zip(results['mean_test_accuracy'], results['std_test_accuracy'], results['params']):
    print("%0.3f (+/-%0.03f) dla %r" % (mean, std * 2, params))

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

disp = ConfusionMatrixDisplay(confusion_matrix=confusion_matrix(Y_test, Y_pred, labels=[True, False]),
                              display_labels=['Long', 'Short'])

disp.plot()
plt.xlabel('Przewidywane klasy')
plt.ylabel('Prawdziwe klasy')
plt.title('Macierz pomyłek')
plt.show()

best_model = random_search.best_estimator_
Y_proba = best_model.predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(Y_test, Y_proba)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label=f'ROC curve (area = {roc_auc:.2f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('Wskaźnik fałszywie pozytywnych')
plt.ylabel('Wskaźnik prawdziwie pozytywnych')
plt.title('Krzywa ROC dla najlepszego modelu')
plt.legend(loc="lower right")
plt.show()