In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import RobustScaler
import matplotlib.pyplot as plt
import os

In [None]:
# Load the Dataset
file_path = ".../data/training_data/BTCUSDT/BTCUSDT_2022-01-01_2024-11-01_5.csv"
df = pd.read_csv(file_path)
df = pd.DataFrame(df)
df

In [None]:
# Feature-Auswahl
features = ['SMA_50', 'SMA_200', 'EMA_50', 'EMA_200', 'MACD_Signal']
target = 'Close'

# Daten vorbereiten
prices = df['Close'].values.reshape(-1, 1)

X = df[features].values.copy()
y = df[target].values.copy()

scaler_X = RobustScaler()
X_scaled = scaler_X.fit_transform(X)

scaler_y = RobustScaler()
y_scaled = scaler_y.fit_transform(y.reshape(-1, 1))

train_size = int(len(X_scaled) * 0.7)
X_train, X_test = X_scaled[:train_size], X_scaled[train_size:]
y_train, y_test = y_scaled[:train_size], y_scaled[train_size:]

# Funktion zur Sequenz-Erstellung
def create_sequences(X, y, sequence_length):
    X_seq, y_seq = [], []
    for i in range(len(X) - sequence_length):
        X_seq.append(X[i:i + sequence_length])
        y_seq.append(y[i + sequence_length])
    return np.array(X_seq), np.array(y_seq)

sequence_length = 60
X_train_seq, y_train_seq = create_sequences(X_train, y_train, sequence_length)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, sequence_length)

# Dataset und DataLoader für PyTorch
class MultiInputTimeSeriesDataset(Dataset):
    def __init__(self, X, y, sequence_length):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        self.sequence_length = sequence_length

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        X_seq = self.X[idx]
        y_target = self.y[idx]
        Y = X_seq[:, 0].unsqueeze(-1)
        X_p = X_seq[:, 1:3]
        X_n = X_seq[:, 3:5]
        return Y, y_target, X_p, X_n

batch_size = 64

train_dataset = MultiInputTimeSeriesDataset(X_train_seq, y_train_seq, sequence_length)
test_dataset = MultiInputTimeSeriesDataset(X_test_seq, y_test_seq, sequence_length)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

class ResidualBlock(nn.Module):
  def __init__(self, input_dim, hidden_dim, activation=nn.ReLU, normalization=nn.BatchNorm1d):
      """
      Residual block with skip connections.
      Args:
          input_dim (int): Dimension of the input features.
          hidden_dim (int): Dimension of the hidden layer.
          activation (callable): Activation function class (default: ReLU).
          normalization (callable): Normalization layer class (default: BatchNorm1d).
      """
      super(ResidualBlock, self).__init__()
      self.input_dim = input_dim
      self.hidden_dim = hidden_dim

      # First layer: Linear -> Normalization -> Activation
      self.linear1 = nn.Linear(input_dim, hidden_dim)
      self.norm1 = normalization(hidden_dim)
      self.activation1 = activation()

      # Second layer: Linear -> Normalization -> Activation
      self.linear2 = nn.Linear(hidden_dim, input_dim)
      self.norm2 = normalization(input_dim)
      self.activation2 = activation()

      # Shortcut connection: Optional downsampling if dimensions don't match
      self.downsample = (
          nn.Linear(input_dim, input_dim) if input_dim != hidden_dim else nn.Identity()
      )

  def forward(self, x):
      """
      Forward pass through the residual block.
      Args:
          x (Tensor): Input tensor of shape (batch_size, input_dim).
      Returns:
          Tensor: Output tensor of the same shape as input.
      """
      # Store the original input for the skip connection
      residual = self.downsample(x)

      # Pass through the first layer
      out = self.linear1(x)
      out = self.norm1(out)
      out = self.activation1(out)

      # Pass through the second layer
      out = self.linear2(out)
      out = self.norm2(out)

      # Add the residual (skip connection) and apply the activation
      out += residual
      out = self.activation2(out)

      return out

# MultiInputLSTM mit Residual Layer
class LSTMWithResidual(nn.Module):
    def __init__(self, input_size_pn, hidden_size, residual_dim):
        super(LSTMWithResidual, self).__init__()
        self.lstm_y = nn.LSTM(input_size=1, hidden_size=hidden_size, batch_first=True)
        self.lstm_p = nn.LSTM(input_size=input_size_pn, hidden_size=hidden_size, batch_first=True)
        self.lstm_n = nn.LSTM(input_size=input_size_pn, hidden_size=hidden_size, batch_first=True)

        # Separate Residual Blocks for each LSTM output
        self.residual_y = ResidualBlock(hidden_size, residual_dim)
        self.residual_p = ResidualBlock(hidden_size, residual_dim)
        self.residual_n = ResidualBlock(hidden_size, residual_dim)

        # Fully connected layer for final output
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, Y, X_p, X_n):
        # Process inputs through LSTM layers
        Y_out, _ = self.lstm_y(Y)
        X_p_out, _ = self.lstm_p(X_p)
        X_n_out, _ = self.lstm_n(X_n)

        # Extract the last output step and pass through respective Residual Blocks
        Y_out = self.residual_y(Y_out[:, -1, :])  # Last time step of Y_out
        X_p_out = self.residual_p(X_p_out[:, -1, :])  # Last time step of X_p_out
        X_n_out = self.residual_n(X_n_out[:, -1, :])  # Last time step of X_n_out

        # Combine outputs
        combined = Y_out + X_p_out + X_n_out

        # Final output
        output = self.fc(combined)
        return output

# Modell initialisieren
input_size_pn = 2
hidden_size = 64
residual_dim = 32

model = LSTMWithResidual(input_size_pn=input_size_pn, hidden_size=hidden_size, residual_dim=residual_dim)

def model_summary(model):
    print("Model Architecture:")
    print(model)
    print("\nNumber of trainable parameters:")
    total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"{total_params:,}")

# Gebe die Modellzusammenfassung aus
model_summary(model)


# Training und Optimierung
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

num_epochs = 20
train_losses, val_losses = [], []
best_val_loss = float('inf')

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for Y, y_target, X_p, X_n in train_loader:
        Y, y_target, X_p, X_n = Y.to(device), y_target.to(device), X_p.to(device), X_n.to(device)
        optimizer.zero_grad()
        predictions = model(Y, X_p, X_n)
        loss = criterion(predictions.squeeze(), y_target.squeeze())
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    avg_train_loss = train_loss / len(train_loader)
    train_losses.append(avg_train_loss)

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for Y, y_target, X_p, X_n in test_loader:
            Y, y_target, X_p, X_n = Y.to(device), y_target.to(device), X_p.to(device), X_n.to(device)
            predictions = model(Y, X_p, X_n)
            loss = criterion(predictions.squeeze(), y_target.squeeze())
            val_loss += loss.item()
    avg_val_loss = val_loss / len(test_loader)
    val_losses.append(avg_val_loss)

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), "MultiInputLSTM_ResidualBlock.pth")

    print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {avg_train_loss:.6f}, Val Loss: {avg_val_loss:.6f}")

# Beste Modell laden
model.load_state_dict(torch.load("MultiInputLSTM_ResidualBlock.pth"))

# Vorhersagen
model.eval()
train_predictions, test_predictions = [], []
with torch.no_grad():
    for Y, y_target, X_p, X_n in train_loader:
        Y, X_p, X_n = Y.to(device), X_p.to(device), X_n.to(device)
        outputs = model(Y, X_p, X_n)
        train_predictions.extend(outputs.cpu().numpy())
    for Y, y_target, X_p, X_n in test_loader:
        Y, X_p, X_n = Y.to(device), X_p.to(device), X_n.to(device)
        outputs = model(Y, X_p, X_n)
        test_predictions.extend(outputs.cpu().numpy())

train_predictions = scaler_y.inverse_transform(np.array(train_predictions).reshape(-1, 1))
test_predictions = scaler_y.inverse_transform(np.array(test_predictions).reshape(-1, 1))
y_train_unscaled = scaler_y.inverse_transform(y_train_seq.reshape(-1, 1))
y_test_unscaled = scaler_y.inverse_transform(y_test_seq.reshape(-1, 1))

# Output directory
output_dir = "../Evaluation_PNGs/"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)
    print(f"Folder '{output_dir}' created for saving the plots.")

# Visualisierung der Ergebnisse
plt.figure(figsize=(12, 6))
plt.plot(y_test_unscaled, label='Tatsächliche Preise')
plt.plot(test_predictions, label='Vorhergesagte Preise')
plt.xlabel('Zeit')
plt.ylabel('Preis')
plt.title('Tatsächliche vs. Vorhergesagte Preise (PyTorch Modell)')
plt.legend()

# Save plot
price_plot_path = os.path.join(output_dir, "MultiInputLSTM_ResidualBlock_price_predictions.png")
plt.savefig(price_plot_path)
print(f"Saved price predictions plot to {price_plot_path}")

# Show the plot
plt.show()

# Plot der Trainings- und Validierungsverluste
plt.figure(figsize=(12, 6))
plt.plot(train_losses, label='Trainings-Loss')
plt.plot(val_losses, label='Validierungs-Loss')
plt.xlabel('Epoche')
plt.ylabel('Loss')
plt.title('Trainings- und Validierungs-Loss Verlauf')
plt.legend()

# Save plot
loss_plot_path = os.path.join(output_dir, "MultiInputLSTM_ResidualBlock_loss_curves.png")
plt.savefig(loss_plot_path)
print(f"Saved loss curves plot to {loss_plot_path}")

# Show the plot
plt.show()