In [1]:
import os, joblib
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score, accuracy_score

from utils import set_seed

path = '/workspace/QuantGAN_stock'
try:
    os.chdir(path)
    print("Current working directory: {0}".format(os.getcwd()))
except FileNotFoundError:
    print("Directory {0} does not exist".format(path))
    
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
set_seed(40)
sns.set(style="darkgrid")

  from .autonotebook import tqdm as notebook_tqdm


Current working directory: /workspace/COMFI-GAN


In [2]:
class BiLSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(BiLSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.bilstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, 1)  # Bidirectional이므로 hidden_size * 2
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        out, _ = self.bilstm(x)
        out = self.fc(out[:, -1, :])  # 마지막 timestep의 hidden state를 사용
        out = self.sigmoid(out)
        return out
    
# 모델 초기화 함수
def weights_init(m):
    if isinstance(m, nn.Linear):
        nn.init.xavier_uniform_(m.weight)
        nn.init.constant_(m.bias, 0)
    elif isinstance(m, nn.LSTM):
        for name, param in m.named_parameters():
            if 'weight_ih' in name:
                nn.init.xavier_uniform_(param.data)
            elif 'weight_hh' in name:
                nn.init.orthogonal_(param.data)
            elif 'bias' in name:
                param.data.fill_(0)

In [3]:
# COSCI-GAN, TransGAN 데이터 로드
real = joblib.load('./real_list.pkl')
fake = joblib.load('./fake_list.pkl')

print(len(real), real[0].shape)
print(len(fake), fake[0].shape)

5 (4904, 127)
5 (4904, 127)


In [4]:
# 126개의 시계열 데이터를 사용하여 다음 스텝의 상승/하락 여부를 예측
seq_len = 126
input_size = 1  # feature 개수
hidden_size = 30
num_layers = 3
lr = 0.003

# 모델 학습
num_epochs = 200
batch_size = 64
patience = 30

In [5]:
for i in range(len(real)):       
    # real 데이터 준비
    X_real = real[i][:, :seq_len]
    targets_real = real[i][:, seq_len]
    y_real = (targets_real > 0).astype(int)
    
    # fake 데이터 준비
    X_fake = fake[i][:, :seq_len]
    targets_fake = fake[i][:, seq_len]
    y_fake = (targets_fake > 0).astype(int)
    
    # 데이터 스케일링
    scaler = StandardScaler()
    X_fake = scaler.fit_transform(X_fake)
    X_real = scaler.transform(X_real)

    # real 데이터의 길이를 계산하여 validation과 test로 나누기
    num_real_samples = X_real.shape[0]
    val_size = num_real_samples // 4

    X_val_real = X_real[:val_size]
    y_val_real = y_real[:val_size]

    X_test_real = X_real[val_size + seq_len:]
    y_test_real = y_real[val_size + seq_len:]
    
    # NumPy 배열을 PyTorch 텐서로 변환
    X_fake = torch.tensor(X_fake, dtype=torch.float32).unsqueeze(2).to(device)
    y_fake = torch.tensor(y_fake, dtype=torch.float32).to(device)
    X_val_real = torch.tensor(X_val_real, dtype=torch.float32).unsqueeze(2).to(device)
    y_val_real = torch.tensor(y_val_real, dtype=torch.float32).to(device)
    X_test_real = torch.tensor(X_test_real, dtype=torch.float32).unsqueeze(2).to(device)
    y_test_real = torch.tensor(y_test_real, dtype=torch.float32).to(device)

    # TensorDataset과 DataLoader를 사용하여 데이터셋을 배치로 나누기
    train_dataset = TensorDataset(X_fake, y_fake)
    val_dataset = TensorDataset(X_val_real, y_val_real)
    test_dataset = TensorDataset(X_test_real, y_test_real)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    
    # 모델 초기화
    model = BiLSTMClassifier(input_size, hidden_size, num_layers).to(device)
    model.apply(weights_init)

    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.BCELoss()

    best_model = model.state_dict().copy()  # 초기 모델 상태 저장
    best_val_loss = float('inf')
    early_stop_counter = 0
    
    # 학습 중 loss와 val_loss 값을 저장할 리스트 초기화
    train_losses = []
    val_losses = []

    # 학습 시작
    for epoch in range(num_epochs):
        model.train()
        epoch_train_loss = 0

        
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            # Forward pass
            outputs = model(x)
            loss = criterion(outputs.squeeze(), y)
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient Clipping
            optimizer.step()
            epoch_train_loss += loss.item()
            
        # Epoch당 평균 학습 손실 저장
        epoch_train_loss /= len(train_loader)
        train_losses.append(epoch_train_loss)
        
        # Validation
        model.eval()
        val_loss = 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device)
                val_outputs = model(x_val)
                val_loss += criterion(val_outputs.squeeze(), y_val).item()        
        val_loss /= len(val_loader)            
        val_losses.append(val_loss)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Val Loss: {val_loss:.4f}')
        
        # Early stopping and model saving
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model.state_dict().copy()
            early_stop_counter = 0
        else:
            early_stop_counter += 1
        
        if early_stop_counter >= patience:
            print("Early stopping")
            break
    
    # 모델 평가
    if best_model is not None:
        model.load_state_dict(best_model)
    else:
        print("No improvement, using the last model.")
    model.eval()
    with torch.no_grad():
        all_predictions = []
        all_targets = []
        
        for x_test, y_test in test_loader:
            x_test, y_test = x_test.to(device), y_test.to(device)
            outputs = model(x_test).squeeze()            
            predictions = (outputs >= 0.5).float()
            all_predictions.append(predictions)
            all_targets.append(y_test)
        
        # 모든 배치의 결과를 하나로 합침
        all_predictions = torch.cat(all_predictions).cpu()
        all_targets = torch.cat(all_targets).cpu()        

    # 1과 0의 비율 계산
    num_ones = all_predictions.sum().item()
    num_zeros = len(all_predictions) - num_ones
    ratio_ones = num_ones / len(all_predictions)
    ratio_zeros = num_zeros / len(all_predictions)

    print(f"Number of ones: {num_ones}")
    print(f"Number of zeros: {num_zeros}")
    print(f"Ratio of ones: {ratio_ones:.4f}")
    print(f"Ratio of zeros: {ratio_zeros:.4f}")
        
    # 혼동 행렬
    conf_matrix = confusion_matrix(all_targets.numpy(), all_predictions.numpy())
    print("Confusion Matrix:")
    print(conf_matrix)

    # 정밀도, 재현율, F1 점수
    precision = precision_score(all_targets.numpy(), all_predictions.numpy())
    recall = recall_score(all_targets.numpy(), all_predictions.numpy())
    f1 = f1_score(all_targets.numpy(), all_predictions.numpy())
    accuracy = accuracy_score(all_targets.numpy(), all_predictions.numpy())

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print()
    
    # # Loss 그래프 그리기
    # plt.figure(figsize=(10, 5))
    # plt.plot(train_losses, label='Training Loss')
    # plt.plot(val_losses, label='Validation Loss')
    # plt.xlabel('Epoch')
    # plt.ylabel('Loss')
    # plt.legend()
    # plt.title('Training and Validation Loss')
    # plt.show()
    
    


Epoch [1/200], Loss: 0.6969, Val Loss: 0.6895
Epoch [2/200], Loss: 0.6799, Val Loss: 0.6890
Epoch [3/200], Loss: 0.6800, Val Loss: 0.6859
Epoch [4/200], Loss: 0.6953, Val Loss: 0.6863
Epoch [5/200], Loss: 0.6683, Val Loss: 0.6848
Epoch [6/200], Loss: 0.7090, Val Loss: 0.6869
Epoch [7/200], Loss: 0.6677, Val Loss: 0.6846
Epoch [8/200], Loss: 0.6828, Val Loss: 0.6864
Epoch [9/200], Loss: 0.6808, Val Loss: 0.6870
Epoch [10/200], Loss: 0.6911, Val Loss: 0.6835
Epoch [11/200], Loss: 0.6546, Val Loss: 0.6866
Epoch [12/200], Loss: 0.6851, Val Loss: 0.6861
Epoch [13/200], Loss: 0.6634, Val Loss: 0.6833
Epoch [14/200], Loss: 0.6690, Val Loss: 0.6876
Epoch [15/200], Loss: 0.7306, Val Loss: 0.6870
Epoch [16/200], Loss: 0.6996, Val Loss: 0.6867
Epoch [17/200], Loss: 0.6650, Val Loss: 0.6872
Epoch [18/200], Loss: 0.6965, Val Loss: 0.6864
Epoch [19/200], Loss: 0.7156, Val Loss: 0.6861
Epoch [20/200], Loss: 0.6595, Val Loss: 0.6890
Epoch [21/200], Loss: 0.6655, Val Loss: 0.6854
Epoch [22/200], Loss: 