In [24]:
%load_ext autoreload
%autoreload 2

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import polars as pl

from sklearn.metrics import accuracy_score, classification_report, ConfusionMatrixDisplay, precision_recall_curve, roc_curve
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from fart.common.constants import (
    CLOSE,
    EMA_FAST,
    MOVEMENT,
    STATIONARY,
)
from fart.features.calculate_technical_indicators import calculate_technical_indicators
from fart.features.parse_timestamp_to_datetime import parse_timestamp_to_datetime

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
df = pl.read_csv("../data/BTC-EUR.csv")
df = calculate_technical_indicators(df)

In [3]:
df = df.with_columns((pl.col(CLOSE) - pl.col(EMA_FAST)).alias(STATIONARY))
df = df.with_columns(
    pl.when(pl.col(STATIONARY) > pl.col(STATIONARY).shift(1))
    .then(pl.lit(1))
    .otherwise(pl.lit(0))
    .alias(MOVEMENT)
)

In [4]:
features = [STATIONARY]
time_span = 3 * 2  # 3 hours
lags = range(1, time_span + 1)
df = df.with_columns(
    [
        pl.col(feature).shift(lag).alias(f"{feature} #{lag}")
        for lag in lags
        for feature in features
    ]
)
df = df.drop_nans()

In [None]:
df = df.to_pandas()
X = df[[f"{feature} #{lag}" for lag in lags for feature in features]]
y = df[MOVEMENT]

In [25]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=X.columns)

In [None]:
window_size = 2 * 12 * 5 # 5 days

X_sequence = []
y_sequence = []

for i in range(window_size, len(X_scaled)):
    X_sequence.append(X_scaled.iloc[i - window_size : i].values)
    y_sequence.append(y.iloc[i])

X_sequence = np.array(X_sequence)
y_sequence = np.array(y_sequence)

In [28]:
X_train, X_val, y_train, y_val = train_test_split(X_sequence, y_sequence, test_size=0.2, shuffle=False)


In [29]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam

time_span = 2 * 12 * 5 # 5 days

model = Sequential([
    Input(shape=(time_span, X.shape[1])),
    LSTM(64, return_sequences=True),
    # Dropout(0.2),
    LSTM(32),
    # Dropout(0.2),
    Dense(1, activation='sigmoid')
])

model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
model.summary()

In [None]:
model.fit(X_train, y_train, epochs=10, batch_size=64, validation_data=(X_val, y_val))

Epoch 1/10
