In [2]:
!nvidia-smi

Wed Jul  2 21:37:47 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 560.94                 Driver Version: 560.94         CUDA Version: 12.6     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3080      WDDM  |   00000000:2D:00.0  On |                  N/A |
|  0%   43C    P8             19W /  320W |     959MiB /  10240MiB |      9%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [15]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import StepLR
from sklearn.preprocessing import StandardScaler

# 파라미터 세팅
SEQ_LEN = 63
BATCH_SIZE = 32
EPOCHS = 30
LR = 1e-3
STEP_SIZE = 3
GAMMA = 0.5
HIDDEN_DIM = 128
EARLY_STOPPING_PATIENCE = 5

# 날짜 기준 split 설정
TRAIN_END = '2019-12-31'
VALID_END = '2021-12-31'

# 데이터셋 로드 및 전처리
df = pd.read_csv('./dataset/dow30.csv')
df['Date'] = pd.to_datetime(df['Date'])
returns = df.iloc[:, 1:].values.astype(np.float32)

# 윈도우 데이터 생성 (날짜 포함)
dates = df['Date'].values
def create_sequences(data, dates, seq_len):
    xs, ys, ds = [], [], []
    for i in range(len(data) - seq_len):
        xs.append(data[i:i+seq_len])
        ys.append(data[i+seq_len])
        ds.append(dates[i+seq_len])
    return np.array(xs), np.array(ys), np.array(ds)

X, y, d = create_sequences(returns, dates, SEQ_LEN)

# 날짜 기준 split
train_mask = d <= np.datetime64(TRAIN_END)
valid_mask = (d > np.datetime64(TRAIN_END)) & (d <= np.datetime64(VALID_END))
test_mask = d > np.datetime64(VALID_END)

X_train, y_train = X[train_mask], y[train_mask]
X_valid, y_valid = X[valid_mask], y[valid_mask]
X_test, y_test = X[test_mask], y[test_mask]

# === 시계열 데이터 유출 방지 표준화 ===
scaler = StandardScaler()
X_train_2d = X_train.reshape(-1, X_train.shape[-1])
scaler.fit(X_train_2d)

def scale_X(X, scaler):
    X_shape = X.shape
    X_2d = X.reshape(-1, X_shape[-1])
    X_scaled = scaler.transform(X_2d)
    return X_scaled.reshape(X_shape)

def scale_y(y, scaler):
    return scaler.transform(y)

X_train = scale_X(X_train, scaler)
X_valid = scale_X(X_valid, scaler)
X_test = scale_X(X_test, scaler)
y_train = scaler.transform(y_train)
y_valid = scaler.transform(y_valid)
y_test = scaler.transform(y_test)

class ReturnDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X.reshape(X.shape[0], -1))
        self.y = torch.tensor(y)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_ds = ReturnDataset(X_train, y_train)
valid_ds = ReturnDataset(X_valid, y_valid)
test_ds = ReturnDataset(X_test, y_test)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE)

# MLP 모델 정의
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=18, dropout=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, output_dim)
        )
    def forward(self, x):
        return self.net(x)
    
# Update input_dim for MLP to seq_len * num_features
input_dim = X_train.shape[1] * X_train.shape[2]
output_dim = y_train.shape[1]  # 18
model = MLP(input_dim, HIDDEN_DIM, output_dim=output_dim)
optimizer = torch.optim.Adam(model.parameters(), lr=LR)
scheduler = StepLR(optimizer, step_size=STEP_SIZE, gamma=GAMMA)
criterion = nn.MSELoss()

# 학습 함수
def train_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0
    for X_batch, y_batch in loader:
        optimizer.zero_grad()
        preds = model(X_batch)
        loss = criterion(preds, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * len(X_batch)
    return total_loss / len(loader.dataset)

def eval_epoch(model, loader, criterion):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in loader:
            preds = model(X_batch)
            loss = criterion(preds, y_batch)
            total_loss += loss.item() * len(X_batch)
    return total_loss / len(loader.dataset)

# 얼리스탑핑 및 베스트 모델 저장
best_valid_loss = float('inf')
patience = 0
best_model_state = None

for epoch in range(EPOCHS):
    train_loss = train_epoch(model, train_loader, optimizer, criterion)
    valid_loss = eval_epoch(model, valid_loader, criterion)
    scheduler.step()
    print(f'Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.6f} | Valid Loss: {valid_loss:.6f}')
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        best_model_state = model.state_dict()
        patience = 0
    else:
        patience += 1
        if patience >= EARLY_STOPPING_PATIENCE:
            print(f'Early stopping at epoch {epoch+1}')
            break

# 베스트 모델로 복원 후 테스트
model.load_state_dict(best_model_state)
test_loss = eval_epoch(model, test_loader, criterion)
print(f'Test Loss: {test_loss:.6f}')

# === UP/DOWN 방향성 예측 정확도 계산 ===
model.eval()
all_preds = []
all_targets = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        preds = model(X_batch)
        all_preds.append(preds.cpu().numpy())
        all_targets.append(y_batch.cpu().numpy())
all_preds = np.concatenate(all_preds, axis=0)
all_targets = np.concatenate(all_targets, axis=0)

# up/down 방향성: 0보다 크면 up(1), 작으면 down(0)
pred_updown = (all_preds > 0).astype(int)
target_updown = (all_targets > 0).astype(int)
updown_acc = (pred_updown == target_updown).mean()
print(f"Test Up/Down Accuracy: {updown_acc * 100:.2f}%")

Epoch 1/30 | Train Loss: 1.098849 | Valid Loss: 3.253091
Epoch 2/30 | Train Loss: 0.995918 | Valid Loss: 3.179214
Epoch 3/30 | Train Loss: 0.925659 | Valid Loss: 3.241795
Epoch 4/30 | Train Loss: 0.827651 | Valid Loss: 3.246050
Epoch 5/30 | Train Loss: 0.755874 | Valid Loss: 3.270723
Epoch 6/30 | Train Loss: 0.727057 | Valid Loss: 3.432972
Epoch 7/30 | Train Loss: 0.688916 | Valid Loss: 3.292610
Early stopping at epoch 7
Test Loss: 1.596331
Test Up/Down Accuracy: 49.76%
