In [52]:
import os

In [53]:
import pandas as pd
from sklearn.preprocessing import StandardScaler

In [71]:
import numpy as np
import joblib


In [55]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import accuracy_score, confusion_matrix


In [56]:
df = pd.read_csv("./AAPL_ta_dataset.csv")

In [57]:
features = ['Open', 'High', 'Low', 'Close', 'Volume',
            'rsi', 'stoch_k', 'macd', 'macd_signal',
            'sma_20', 'ema_20', 'bb_upper', 'bb_lower',
            'atr', 'adx']

X = df[features].copy()
y = df['target'].copy()

# Normalize features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)


In [58]:
def create_sequences(X, y, window_size=30):
    X_seq, y_seq = [], []
    for i in range(window_size, len(X)):
        X_seq.append(X[i-window_size:i])
        y_seq.append(y[i])
    return np.array(X_seq), np.array(y_seq)

X_seq, y_seq = create_sequences(X_scaled, y.values)


In [59]:
split = int(len(X_seq) * 0.8)
X_train, X_test = X_seq[:split], X_seq[split:]
y_train, y_test = y_seq[:split], y_seq[split:]


In [60]:
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 64),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(64, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        _, (h_n, _) = self.lstm(x)
        out = self.fc(h_n[-1])
        return out


In [61]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                         torch.tensor(y_train, dtype=torch.float32))
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)

model = LSTMModel(input_size=15).to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training loop
for epoch in range(10):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device).unsqueeze(1)
        preds = model(xb)
        loss = loss_fn(preds, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")


Epoch 1 Loss: 22.8467
Epoch 2 Loss: 22.8076
Epoch 3 Loss: 22.7921
Epoch 4 Loss: 22.7292
Epoch 5 Loss: 22.7579
Epoch 6 Loss: 22.6932
Epoch 7 Loss: 22.6604
Epoch 8 Loss: 22.6017
Epoch 9 Loss: 22.6023
Epoch 10 Loss: 22.6212


In [62]:
model.eval()
with torch.no_grad():
    X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_pred = model(X_test_t).cpu().numpy().flatten()
    y_pred_class = (y_pred > 0.5).astype(int)

acc_lstm = accuracy_score(y_test, y_pred_class)
print(f"Test Accuracy: {acc_lstm:.4f}")
cm_lstm = confusion_matrix(y_test, y_pred_class)
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_class))


Test Accuracy: 0.4411
Confusion Matrix:
[[116   0]
 [147   0]]


In [63]:
class TransformerModel(nn.Module):
    def __init__(self, input_dim=15, seq_len=30, d_model=64, nhead=4, num_layers=2, dropout=0.1):
        super().__init__()
        self.input_proj = nn.Linear(input_dim, d_model)
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=128,
            dropout=dropout,
            batch_first=True
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.cls_head = nn.Sequential(
            nn.Linear(seq_len * d_model, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # x: (batch_size, seq_len, input_dim)
        x = self.input_proj(x)  # (batch, seq_len, d_model)
        x = self.transformer(x)  # (batch, seq_len, d_model)
        x = x.flatten(start_dim=1)  # flatten across time
        return self.cls_head(x)


In [64]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                         torch.tensor(y_train, dtype=torch.float32))
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)

model = TransformerModel(input_dim=15, seq_len=30).to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training loop
for epoch in range(10):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device).unsqueeze(1)
        preds = model(xb)
        loss = loss_fn(preds, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")


Epoch 1 Loss: 24.1524
Epoch 2 Loss: 23.0653
Epoch 3 Loss: 22.8139
Epoch 4 Loss: 22.8646
Epoch 5 Loss: 22.6560
Epoch 6 Loss: 22.5653
Epoch 7 Loss: 22.4535
Epoch 8 Loss: 22.0161
Epoch 9 Loss: 21.9268
Epoch 10 Loss: 21.7895


In [65]:
model.eval()
with torch.no_grad():
    X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_pred = model(X_test_t).cpu().numpy().flatten()
    y_pred_class = (y_pred > 0.5).astype(int)

acc_tf = accuracy_score(y_test, y_pred_class)
print(f"Test Accuracy: {acc_tf:.4f}")
cm_tf = confusion_matrix(y_test, y_pred_class)
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_class))


Test Accuracy: 0.4791
Confusion Matrix:
[[107   9]
 [128  19]]


In [66]:
class xLSTMModel(nn.Module):
    def __init__(self, input_size=15, hidden_sizes=[32, 64, 128], dropout=0.2):
        super().__init__()
        self.lstm_blocks = nn.ModuleList([
            nn.LSTM(input_size, h, batch_first=True) for h in hidden_sizes
        ])
        self.fc = nn.Sequential(
            nn.Linear(sum(hidden_sizes), 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        outputs = []
        for lstm in self.lstm_blocks:
            _, (h_n, _) = lstm(x)
            outputs.append(h_n[-1])  # get final hidden state
        x = torch.cat(outputs, dim=1)
        return self.fc(x)

In [67]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_ds = TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                         torch.tensor(y_train, dtype=torch.float32))
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)

model = xLSTMModel(input_size=15).to(device)
loss_fn = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Training
for epoch in range(10):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device).unsqueeze(1)
        preds = model(xb)
        loss = loss_fn(preds, yb)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1} Loss: {total_loss:.4f}")


Epoch 1 Loss: 22.8584
Epoch 2 Loss: 22.7396
Epoch 3 Loss: 22.8193
Epoch 4 Loss: 22.6901
Epoch 5 Loss: 22.6772
Epoch 6 Loss: 22.6750
Epoch 7 Loss: 22.6184
Epoch 8 Loss: 22.5370
Epoch 9 Loss: 22.5406
Epoch 10 Loss: 22.6042


In [68]:
model.eval()
with torch.no_grad():
    X_test_t = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_pred = model(X_test_t).cpu().numpy().flatten()
    y_pred_class = (y_pred > 0.5).astype(int)

acc_xlstm = accuracy_score(y_test, y_pred_class)
print(f"Test Accuracy: {acc_xlstm:.4f}")
cm_xlstm = confusion_matrix(y_test, y_pred_class)
print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred_class))


Test Accuracy: 0.5133
Confusion Matrix:
[[85 31]
 [97 50]]


In [69]:
print(f"{'Model':<10} {'Accuracy Variable':<20} {'Accuracy Value':<15}")
print("-" * 50)
print(f"{'LSTM':<10} {'acc_lstm':<20} {acc_lstm:<15.4f}")
print(f"{'TF':<10} {'acc_tf':<20} {acc_tf:<15.4f}")
print(f"{'xLSTM':<10} {'acc_xlstm':<20} {acc_xlstm:<15.4f}")
print("\n")

# Function to print confusion matrix with label
def print_confusion_matrix(cm, model_name):
    print(f"Confusion Matrix for {model_name}:")
    print(cm)
    print()

# Print all confusion matrices
print_confusion_matrix(cm_lstm, "LSTM")
print_confusion_matrix(cm_tf, "TF")
print_confusion_matrix(cm_xlstm, "xLSTM")

Model      Accuracy Variable    Accuracy Value 
--------------------------------------------------
LSTM       acc_lstm             0.4411         
TF         acc_tf               0.4791         
xLSTM      acc_xlstm            0.5133         


Confusion Matrix for LSTM:
[[116   0]
 [147   0]]

Confusion Matrix for TF:
[[107   9]
 [128  19]]

Confusion Matrix for xLSTM:
[[85 31]
 [97 50]]



In [70]:
torch.save(model.state_dict(), 'xlstm_technical_model.pt')

In [72]:
joblib.dump(scaler, 'scaler.pkl')

['scaler.pkl']