### Backtesting
- 구현한 모델을 통한 수익률 예측
- 구현 날짜: 2024.03.11
- 딥러닝 적용
- 챗지피티 full code 작성
- bitcoin data 적용X 임시 데이터로 실행
- 고려사항: num_rows 값에 의해 없어진 sequence 같은거 계산 어떻게 해야하지..?

In [2]:
# 모델 훈련 함수 정의
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs=50):
    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs.squeeze(), y_batch)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())

        model.eval()
        test_losses = []
        all_predictions = []
        all_targets = []
        with torch.no_grad():
            for X_batch, y_batch in test_loader:
                outputs = model(X_batch)
                loss = criterion(outputs.squeeze(), y_batch)
                test_losses.append(loss.item())
                predictions = (outputs.squeeze() > 0.5).float()
                all_predictions.extend(predictions.numpy())
                all_targets.extend(y_batch.numpy())

        train_loss = np.mean(train_losses)
        test_loss = np.mean(test_losses)
        test_accuracy = accuracy_score(all_targets, all_predictions)

        if (epoch+1) % 10 == 0:
            print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')


In [23]:
# 모든 필요한 라이브러리 임포트
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from torch.optim import Adam
from sklearn.metrics import accuracy_score

# 데이터 전처리 및 시퀀스 생성
np.random.seed(0)
data_size = 1000
features = np.random.randn(data_size, 5)
returns = np.random.randn(data_size, 1) * 0.01
labels = (returns > 0).astype(int)

df = pd.DataFrame(np.hstack((features, returns, labels)), columns=['Open', 'High', 'Low', 'Close', 'Volume', 'Return', 'Label'])
df['Label'] = df['Label'].astype(int)

X = df[['Open', 'High', 'Low', 'Close', 'Volume']].values
y = df['Label'].values

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X)
#X_test_scaled = scaler.transform(X_test)

split = 800
X_train = X_train_scaled[:split]
X_test = X_train_scaled[split:]
y_train = y[:split]
y_test = y[split:]

def create_sequences(input_data, input_labels, sequence_length):
    xs = []
    ys = []
    for i in range(len(input_data)-sequence_length):
        x = input_data[i:(i+sequence_length)]
        y = input_labels[i+sequence_length]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

sequence_length = 20

X_train_seq, y_train_seq = create_sequences(X_train, y_train, sequence_length)
X_test_seq, y_test_seq = create_sequences(X_test, y_test, sequence_length)

X_train_seq_tensor = torch.tensor(X_train_seq, dtype=torch.float32)
y_train_seq_tensor = torch.tensor(y_train_seq, dtype=torch.float32)
X_test_seq_tensor = torch.tensor(X_test_seq, dtype=torch.float32)
y_test_seq_tensor = torch.tensor(y_test_seq, dtype=torch.float32)

train_seq_dataset = TensorDataset(X_train_seq_tensor, y_train_seq_tensor)
test_seq_dataset = TensorDataset(X_test_seq_tensor, y_test_seq_tensor)

train_seq_loader = DataLoader(train_seq_dataset, batch_size=64, shuffle=True)
test_seq_loader = DataLoader(test_seq_dataset, batch_size=64, shuffle=False)

# LSTM 모델 정의
class BitcoinLSTM(nn.Module):
    def __init__(self, input_size, hidden_layer_size=100, output_size=1):
        super(BitcoinLSTM, self).__init__()
        self.hidden_layer_size = hidden_layer_size
        self.lstm = nn.LSTM(input_size, hidden_layer_size, batch_first=True)
        self.linear = nn.Linear(hidden_layer_size, output_size)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        lstm_out, _ = self.lstm(x)
        predictions = self.linear(lstm_out[:, -1, :])
        predictions = self.sigmoid(predictions)
        return predictions

model = BitcoinLSTM(input_size=5, hidden_layer_size=50, output_size=1)

# 손실 함수와 옵티마이저 설정
criterion = nn.BCELoss()
optimizer = Adam(model.parameters(), lr=0.001)

# 모델 훈련 함수 정의 및 실행
train_model(model, train_seq_loader, test_seq_loader, criterion, optimizer, num_epochs=50)


Epoch 10, Train Loss: 0.6899, Test Loss: 0.6948, Test Accuracy: 0.5000
Epoch 20, Train Loss: 0.6882, Test Loss: 0.6968, Test Accuracy: 0.4833
Epoch 30, Train Loss: 0.6658, Test Loss: 0.7002, Test Accuracy: 0.4778
Epoch 40, Train Loss: 0.6251, Test Loss: 0.7638, Test Accuracy: 0.5000
Epoch 50, Train Loss: 0.5621, Test Loss: 0.7861, Test Accuracy: 0.4778


In [24]:
def backtest(model, test_loader, actual_returns, fee=0.00005):
    """
    백테스팅 함수

    Parameters:
    model (torch.nn.Module): 학습된 모델
    test_loader (DataLoader): 테스트 데이터셋의 DataLoader
    actual_returns (np.array): 각 거래에 대한 실제 수익률 데이터
    fee (float): 거래 수수료 비율

    Returns:
    float: 총 예상 수익률
    """
    model.eval()
    predictions = []
    with torch.no_grad():
        for X_batch, _ in test_loader:
            outputs = model(X_batch)
            predicted_labels = (outputs.squeeze() > 0.5).float()
            predictions.extend(predicted_labels.numpy())

    total_return = 0.0
    for i, pred in enumerate(predictions):
        if pred == 1:  # 모델이 매수를 예측한 경우
            # 실제 수익률에서 거래 수수료를 공제
            trade_return = actual_returns[i] - fee
            total_return += trade_return

    return total_return

# 백테스팅 실행 예시
# 이 코드를 실행하기 전에 `model`, `test_seq_loader`, `actual_returns` 변수가 정의되어 있어야 합니다.
# `actual_returns`는 테스트 데이터셋에 대한 실제 수익률을 나타내는 numpy 배열입니다.
actual_returns = np.array(df['Return'][800:])
total_return = backtest(model, test_seq_loader, actual_returns)
print(f"Total Return: {total_return:.4f}")

Total Return: 0.0545
