In [45]:
import pandas as pd
from sklearn.model_selection import train_test_split

from torch import nn, optim
import torch
from torch.utils.data import DataLoader, Subset
from torch.nn import functional as F

from dataset_model import CryptoDataset
from models.LSTM_base import BaseLSTM

from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

from tqdm import trange

In [84]:
# 1) 定义组合 transform
def future_return_to_bs_class(sample):
    sample['y'] = (sample['y'] >= 0.000081).float()
    return sample

def make_minmax_transform(min_f: torch.Tensor, max_f: torch.Tensor):
    denom = (max_f - min_f).clamp_min(1e-8)
    def _t(sample):
        sample['x'] = (sample['x'] - min_f) / denom
        return sample
    return _t

def compose(*funcs):
    def _t(sample):
        for f in funcs:
            sample = f(sample)
        return sample
    return _t

In [85]:
# 2) 先用“无 transform”的数据集计算训练集的全局 min/max（按特征列）
feature_cols = ['open', 'high', 'low', 'close', 'volume', 'ema_20', 'ema_60',
       'ema_100', 'ema_200', 'macd_20_60', 'macd_10_20', 'adx_10', 'adx_20',
       'adx_60', 'rsi_7', 'rsi_20', 'rsi_60', 'stoch_k_10', 'stoch_k_30',
       'stoch_k_100', 'roc_rsi_10', 'roc_rsi_20', 'roc_rsi_50', 'atr_20',
       'atr_60', 'atr_100', 'bb_width', 'bb_percent', 'obv', 'vwap', 'mom_5',
       'mom_20', 'mom_50', 'proc_5', 'proc_20', 'vol_ma_20', 'vol_ma_60']
past_period = 30
future_period = 5
base = CryptoDataset(
    root_path='data/BTC/raw/',
    feature_cols=feature_cols,
    target_col='close',
    past_period=past_period,
    future_period=future_period,
    transform=None,  # 先不要 transform
)

train_ratio = 0.8
train_size = int(len(base) * train_ratio)

# 假设 base.x 形状是 [T, F]，从训练时间段估计 min/max
min_feature = base.x[:train_size + past_period].min(axis=0).values
max_feature = base.x[:train_size + past_period].max(axis=0).values

In [86]:
# 3) 构建带 transform 的数据集（先归一化，再做二分类标签）
norm_t = make_minmax_transform(min_feature, max_feature)
full_ds = CryptoDataset(
    root_path='data/BTC/raw/',
    feature_cols=feature_cols,
    target_col='close',
    past_period=past_period,
    future_period=future_period,
    transform=compose(norm_t, future_return_to_bs_class),
)

In [87]:
full_ds, full_ds[0]

(<dataset_model.CryptoDataset at 0x17915de66d0>,
 {'x': tensor([[0.0266, 0.0263, 0.0278,  ..., 0.5012, 0.0613, 0.1429],
          [0.0266, 0.0262, 0.0279,  ..., 0.5072, 0.0600, 0.1426],
          [0.0268, 0.0265, 0.0281,  ..., 0.5062, 0.0578, 0.1424],
          ...,
          [0.0271, 0.0265, 0.0282,  ..., 0.5133, 0.0275, 0.0742],
          [0.0270, 0.0265, 0.0281,  ..., 0.5093, 0.0288, 0.0731],
          [0.0268, 0.0262, 0.0280,  ..., 0.5127, 0.0272, 0.0712]]),
  'y': tensor([0.])})

In [88]:
# 4) 划分子集并构建 DataLoader（注意 DataLoader 不接收 transform）
train_ds = Subset(full_ds, list(range(train_size)))
test_ds  = Subset(full_ds, list(range(train_size, len(full_ds))))

batch_size = 32
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_ds,  batch_size=len(test_ds), shuffle=False)

In [89]:
in_channels = full_ds.feature_dim
out_channels = 1
hidden_channels = 32
num_layers = 2
dropout = 0.6

lr = 1e-3
weight_decay = 1e-5
num_epochs = 30

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

Using device: cuda


In [90]:
criterion = nn.BCEWithLogitsLoss()

model = BaseLSTM(in_channels, hidden_channels, num_layers, out_channels, dropout).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

In [91]:
def measure_accuracy(out, gt):
    return (F.sigmoid(out).round() == gt).sum().item() / len(gt)

In [92]:
def train_iteration(model, optimizer, pbar, criterion, train_dataloader, epoch, writer, measure_acc=False,
                    device=None):
    """
    Train iteration
    :param model: Model to train
    :param optimizer: Optimizer to use (Adam, ...)
    :param pbar: tqdm progress bar
    :param criterion: Loss function to use (MSE, CrossEntropy, ...)
    :param train_dataloader: Train data loader
    :param epoch: Current epoch
    :param writer: Tensorboard writer
    :param measure_acc: Whether to measure accuracy or not (for classification tasks)
    """
    model.train()
    outs = torch.tensor([])
    gts = torch.tensor([])
    for idx, data in enumerate(train_dataloader):
        optimizer.zero_grad()
        out = model(data['x'].to(device))
        loss = criterion(out, data['y'].to(device))
        loss.backward()
        optimizer.step()
        pbar.set_postfix({"Batch": f"{(idx + 1) / len(train_dataloader) * 100:.1f}%"})
        writer.add_scalar("Loss/Train Loss", loss.item(), epoch * len(train_dataloader) + idx)

        outs = torch.cat((outs, out.detach().cpu()), dim=0)
        gts = torch.cat((gts, data['y'].detach().cpu()), dim=0)

   
    acc = measure_accuracy(outs, gts)
    writer.add_scalar("Accuracy/Train Accuracy", acc, epoch)

In [93]:
def test_iteration(model, criterion, test_dataloader, epoch, writer, measure_acc=False, device=None):
    """
    Test iteration
    :param model: Model to test
    :param criterion: Loss function to use (MSE, CrossEntropy, ...)
    :param test_dataloader: Test data loader
    :param epoch: Current epoch
    :param writer: Tensorboard writer
    :param measure_acc: Whether to measure accuracy or not (for classification tasks)
    """
    model.eval()
    outs = torch.tensor([])
    gts = torch.tensor([])
    for idx, data in enumerate(test_dataloader):
        out = model(data['x'].to(device))
        loss = criterion(out, data['y'].to(device))
        writer.add_scalar("Loss/Test Loss", loss.item(), epoch * len(test_dataloader) + idx)

        outs = torch.cat((outs, out.detach().cpu()), dim=0)
        gts = torch.cat((gts, data['y'].detach().cpu()), dim=0)

    acc = measure_accuracy(outs, gts)
    writer.add_scalar("Accuracy/Test Accuracy", acc, epoch)

In [94]:
def train(model, optimizer, criterion, train_dataloader, test_dataloader, num_epochs, task_title="", measure_acc=False):
    """
    Train function for a regression / classification model
    :param model: Model to train
    :param optimizer: Optimizer to use (Adam, ...)
    :param criterion: Loss function to use (MSE, CrossEntropy, ...)
    :param train_dataloader: Train data loader
    :param test_dataloader: Test data loader
    :param num_epochs: Number of epochs to train on the train dataset
    :param task_title: Title of the tensorboard run
    :param measure_acc: Whether to measure accuracy or not (for classification tasks)
    """
    writer = SummaryWriter(f'runs/{task_title}_{datetime.now().strftime("%d_%m_%Hh%M")}_{model.__class__.__name__}')
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    for epoch in (pbar := trange(num_epochs, desc="Epochs")):
        train_iteration(model, optimizer, pbar, criterion, train_dataloader, epoch, writer, measure_acc, device)
        test_iteration(model, criterion, test_dataloader, epoch, writer, measure_acc, device)

In [95]:
train(model, optimizer, criterion, train_loader, test_loader, num_epochs, task_title="BTC_Binary", measure_acc=True)

Epochs:  50%|█████     | 15/30 [01:49<01:49,  7.30s/it, Batch=24.7%] 


KeyboardInterrupt: 