In [1]:
import pandas as pd
import numpy as np
import torch
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split


In [2]:
df = pd.read_csv("./data/BTCUSDT_1h.csv")

In [3]:
df = df.sort_values('close_time')
features = ['open', 'high', 'low', 'close', 'volume']
data = df[features].values

In [4]:
# Normalize
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(data)

In [5]:
# Create sequences
def create_sequences(data, seq_len):
    sequences = []
    for i in range(len(data) - seq_len):
        sequences.append(data[i:i+seq_len])
    return np.array(sequences)

SEQ_LEN = 24  # e.g., 24 hourly steps
X = create_sequences(data_scaled, SEQ_LEN)

In [6]:
# Convert to torch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)

# Split into training and validation datasets
X_train, X_val = train_test_split(X_tensor, test_size=0.1, shuffle=False)

# DataLoader for batching
train_loader = DataLoader(TensorDataset(X_train), batch_size=32, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val), batch_size=32, shuffle=False)

In [7]:
import torch.nn as nn
import torch.optim as optim

class LSTM_Autoencoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, seq_len):
        super(LSTM_Autoencoder, self).__init__()
        self.seq_len = seq_len
        
        # Encoder
        self.encoder = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        
        # Decoder
        self.decoder = nn.LSTM(hidden_dim, input_dim, batch_first=True)
        
        self.fc = nn.Linear(input_dim, input_dim)
    
    def forward(self, x):
        batch_size = x.size(0)

        # Encoder
        _, (hidden, _) = self.encoder(x)

        # Repeat the hidden state SEQ_LEN times
        decoder_input = hidden.repeat(self.seq_len, 1, 1).permute(1, 0, 2)  # [batch_size, seq_len, hidden_dim]

        # Decoder
        decoded, _ = self.decoder(decoder_input)

        # Project back to original feature space
        decoded = self.fc(decoded)
        return decoded

In [8]:
# Initialize model
input_dim = X.shape[2]  # Number of features
hidden_dim = 64  # Size of the LSTM hidden layer



In [None]:
#optuna definition
import optuna

def objective(trial):
    # Sample hyperparameters
    hidden_dim = trial.suggest_int("hidden_dim", 16, 128)
    lr = trial.suggest_loguniform("lr", 1e-4, 1e-2)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

    # DataLoader for batching
    train_loader = DataLoader(TensorDataset(X_train), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val), batch_size=batch_size, shuffle=False)

    # Model, loss, optimizer
    model = LSTM_Autoencoder(input_dim, hidden_dim, SEQ_LEN)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Training loop (shortened to 10 epochs for Optuna speed)
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch in train_loader:
            X_batch = batch[0]
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, X_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)
   
        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in val_loader:
                X_batch = batch[0]
                output = model(X_batch)
                loss = criterion(output, X_batch)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        print(f"  Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.5f} | Val Loss: {avg_val_loss:.5f}")

        trial.report(avg_val_loss, epoch)

        # Early stopping
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return avg_val_loss

# Run the optimization
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=30)

# Best trial
print("Best trial:")
print(f"  Value: {study.best_trial.value}")
print("  Params:")
for key, value in study.best_trial.params.items():
    print(f"    {key}: {value}")


#optuna definition
import optuna

def objective(trial):
    # Sample hyperparameters
    hidden_dim = trial.suggest_int("hidden_dim", 16, 128)
    lr = trial.suggest_loguniform("lr", 1e-4, 1e-2)
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])

    # DataLoader for batching
    train_loader = DataLoader(TensorDataset(X_train), batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(TensorDataset(X_val), batch_size=batch_size, shuffle=False)

    # Model, loss, optimizer
    model = LSTM_Autoencoder(input_dim, hidden_dim, SEQ_LEN)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    # Training loop (shortened to 10 epochs for Optuna speed)
    num_epochs = 10
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch in train_loader:
            X_batch = batch[0]
            optimizer.zero_grad()
            output = model(X_batch)
            loss = criterion(output, X_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        train_loss /= len(train_loader)

        # Validation
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for batch in val_loader:
                X_batch = batch[0]
                output = model(X_batch)
                loss = criterion(output, X_batch)
                val_loss += loss.item()

        avg_val_loss = val_loss / len(val_loader)
        print(f"  Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.5f} | Val Loss: {avg_val_loss:.5f}")

        trial.report(avg_val_loss, epoch)

        # Early stopping
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()

    return avg_val_loss

# Run the optimization
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=30)

# Best trial
print("Best trial:")
print(f"  Value: {study.best_trial.value}")
print("  Params:")
for key, value in study.best_trial.params.items():
    print(f"    {key}: {value}")
