In [None]:
import numpy as np
import pandas as pd
import vectorbtpro as vbt
from keras.callbacks import EarlyStopping
from keras.layers import LSTM, Dense, Dropout
from keras.models import Sequential
from keras.optimizers import Adam
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

from vctr.data.data_loader import get_data
from vctr.data.labeling import label_data_extrema_multi
from vctr.data.preprocessing import clean_data
from vctr.data.timeseries import preprocess_data_for_lstm
from vctr.features.feature_engineering import add_features

vbt.settings.set_theme('dark')

In [None]:
data = get_data('ETH', '5m')

In [None]:
# Make the data stationary.
data['log_diff'] = np.log(data['close']).diff()

data.dropna(inplace=True)
data = add_features(data)
data = label_data_extrema_multi(data, 0.04, 0.01)

num_features = len(data.columns) - 1
num_timesteps = 12
batch_size = 32

# Clean the data.
print('Cleaning data...')
data = clean_data(data)

# Prepare data for LSTM.
print('Preparing data for LSTM...')
X_train, y_train, X_test, y_test = preprocess_data_for_lstm(
    data, [('label', [0, 1, 2])], num_timesteps, batch_size
)

In [None]:
def get_dist(y):
    print(pd.Series(np.argmax(y, axis=1)).value_counts(normalize=True))

get_dist(y_train)
get_dist(y_test)

In [None]:
input_shape = (num_timesteps, num_features)

# LSTM model
lstm_model = Sequential()
lstm_model.add(LSTM(units=64, input_shape=input_shape, return_sequences=True))
lstm_model.add(Dropout(0.2))
lstm_model.add(LSTM(units=64, return_sequences=False))
lstm_model.add(Dropout(0.2))
lstm_model.add(Dense(units=32, activation='relu'))
lstm_model.add(Dense(units=3, activation='softmax'))

lstm_model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy'],
)

early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

lstm_model.fit(
    X_train,
    y_train,
    epochs=20,
    batch_size=batch_size,
    validation_split=0.3,
    callbacks=[early_stopping],
)

# Extract temporal features from LSTM
temporal_features_train = lstm_model.predict(X_train)
temporal_features_test = lstm_model.predict(X_test)

# Assuming 'predictions' contains the output of the model's predict() function
temporal_features_train_labels = np.argmax(temporal_features_train, axis=1)
temporal_features_test_labels = np.argmax(temporal_features_test, axis=1)

In [None]:
# def predict(X, actual):
#     predictions = lstm_model.predict(X)
#     predictions = np.argmax(predictions, axis=1)
#     # print(classification_report(actual, predictions))
#     # print(confusion_matrix(actual, predictions))

In [None]:
def preprocess_no_split(
    data: pd.DataFrame, targets: List[Tuple[str, List[str]]], lookback: int, batch_size: int
):
    # One-hot encode target(s)
    target_cols = [t[0] for t in targets]
    target_values = [t[1] for t in targets]
    encoder = OneHotEncoder(categories=target_values)
    y = encoder.fit_transform(data[target_cols]).toarray()

    # Prepare X
    num_features = len(data.columns) - len(target_cols)
    num_samples = len(data) - lookback
    X = np.zeros((num_samples, lookback, num_features))
    for i in range(num_samples):
        X[i] = data.iloc[i : i + lookback, : -len(target_cols)].values

    # Slice y to match X's number of samples
    y = y[:X.shape[0]]

    # Reshape 3D data to 2D
    X_2d = X.reshape(-1, num_features)

    # Apply MinMaxScaler to 2D data
    scaler = MinMaxScaler(feature_range=(0, 1))
    X_2d = scaler.fit_transform(X_2d)

    # Reshape 2D data back to 3D
    X = X_2d.reshape(X.shape)

    return X, y


In [None]:
def get_data_for_predict(data):
    # Make the data stationary.
    data['log_diff'] = np.log(data['close']).diff()

    data.dropna(inplace=True)
    data = add_features(data)
    data = label_data_extrema_multi(data, 0.04, 0.01)

    num_features = len(data.columns) - 1
    num_timesteps = 12
    batch_size = 32

    # Clean the data.
    print('Cleaning data...')
    data = clean_data(data)

    # Prepare data for LSTM.
    print('Preparing data for LSTM...')
    return preprocess_no_split(
        data, [('label', [0, 1, 2])], num_timesteps, batch_size
    )

In [None]:
data2 = get_data('BTC', '5m')
X, actual = get_data_for_predict(data2)

In [None]:
predictions = lstm_model.predict(X)
predictions = np.argmax(predictions, axis=1)

In [None]:
# _actual = np.argmax(actual, axis=1)
# pd.Series(_actual).value_counts(normalize=True)
# # pd.Series(predictions).value_counts(normalize=True)
get_classification_stats(actual, predictions)

In [None]:
from sklearn.metrics import classification_report, accuracy_score

def get_classification_stats(y_true, y_pred):
    # Convert one-hot encoded labels to their class index
    y_true = np.argmax(y_true, axis=1)

    # Calculate the accuracy score
    accuracy = accuracy_score(y_true, y_pred)

    # Generate the classification report
    report = classification_report(y_true, y_pred, output_dict=True)

    # Print the results
    print(f"Accuracy: {accuracy:.2f}\n")
    print("Classification report:")
    for label, stats in report.items():
        print(f"{label}:")
        for stat, value in stats.items():
            print(f"  {stat}: {value:.2f}")

    return accuracy, report


In [None]:
print(pd.Series(temporal_features_test_labels).value_counts(normalize=True))
get_dist(y_test)