In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import joblib
import time

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score,confusion_matrix
device = "cpu"
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"

print("Using device:", device)

Using device: cuda


In [2]:
filename = "KRW-XRP-5m-full"

# 경로에서 npy 파일 로드
X = np.load(f"../../preprocessed/2025-01-16/{filename}-X.npy")
y = np.load(f"../../preprocessed/2025-01-16/{filename}-y.npy")

print("X.shape:", X.shape)  # 예상: (N, seq_len=60, input_features=?)
print("y.shape:", y.shape)

train_ratio = 0.8
train_size = int(len(X) * train_ratio)

X_train = X[:train_size]
y_train = y[:train_size]
X_test  = X[train_size:]
y_test  = y[train_size:]

print("Train set size:", X_train.shape, y_train.shape)
print("Test  set size:", X_test.shape, y_test.shape)

X.shape: (699939, 60, 6)
y.shape: (699939,)
Train set size: (559951, 60, 6) (559951,)
Test  set size: (139988, 60, 6) (139988,)


In [3]:
input_size = X.shape[2]   # 피처 개수 (ex: 6)
hidden_size = 100          # LSTM hidden 노드 수
num_layers = 2            # LSTM 레이어 수
learning_rate = 0.001
num_epochs = 100
batch_size = 32

print("=== Hyperparameters ===")
print("input_size:", input_size)
print("hidden_size:", hidden_size)
print("num_layers:", num_layers)
print("batch_size:", batch_size)
print("num_epochs:", num_epochs)
print("=======================")

=== Hyperparameters ===
input_size: 6
hidden_size: 100
num_layers: 2
batch_size: 32
num_epochs: 100


In [4]:
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)
        
    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = TimeSeriesDataset(X_train, y_train)
test_dataset = TimeSeriesDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

from sklearn.utils import class_weight

class_weights = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
pos_weight = torch.tensor(class_weights[1] / class_weights[0], dtype=torch.float).to(device)

In [5]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers=2):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)  # 이진 분류 -> 출력 1개 (logit)

    def forward(self, x):
        batch_size = x.size(0)
        # 초기 hidden, cell
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size, device=x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size, device=x.device)
        
        out, (hn, cn) = self.lstm(x, (h0, c0))
        # out.shape: (batch, seq_len, hidden_size)
        # 마지막 타임스텝
        out = out[:, -1, :]  # (batch, hidden_size)
        
        # 최종 -> logit
        logit = self.fc(out)  # (batch, 1)
        return logit

model = LSTMClassifier(input_size, hidden_size, num_layers).to(device)
print(model)
# 손실함수 & 옵티마
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

LSTMClassifier(
  (lstm): LSTM(6, 100, num_layers=2, batch_first=True)
  (fc): Linear(in_features=100, out_features=1, bias=True)
)


In [6]:
class EarlyStopping:
    def __init__(self, patience=5, verbose=False):
        self.patience = patience
        self.verbose = verbose
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

early_stopping = EarlyStopping(patience=10,verbose=True)

In [7]:
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    train_losses = []
    
    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        logits = model(X_batch)  # (batch,1)
        # BCEWithLogitsLoss -> logits은 시그모이드 통과 전, y_batch는 [batch,]
        loss = criterion(logits.squeeze(), y_batch)
        loss.backward()
        optimizer.step()
        
        train_losses.append(loss.item())
    
    train_loss = np.mean(train_losses)
    
    # validation (test) loss
    model.eval()
    test_losses = []
    with torch.no_grad():
        for X_batch, y_batch in test_loader:
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)
            logits = model(X_batch)
            val_loss = criterion(logits.squeeze(), y_batch)
            test_losses.append(val_loss.item())
    test_loss = np.mean(test_losses)

    elapsed = time.time() - start_time
    print(f"Epoch [{epoch+1}/{num_epochs}] | "
          f"Train Loss: {train_loss:.6f} | Test Loss: {test_loss:.6f} | "
          f"Time: {elapsed:.2f}s")

    early_stopping(test_loss)
    if early_stopping.early_stop:
        print("EarlyStopping triggered.")
        break

Epoch [1/100] | Train Loss: 0.969130 | Test Loss: 0.966953 | Time: 73.39s
Epoch [2/100] | Train Loss: 0.965278 | Test Loss: 0.958228 | Time: 73.24s
Epoch [3/100] | Train Loss: 0.964414 | Test Loss: 0.954741 | Time: 71.24s
Epoch [4/100] | Train Loss: 0.963978 | Test Loss: 0.956543 | Time: 73.09s
EarlyStopping counter: 1 out of 10
Epoch [5/100] | Train Loss: 0.963808 | Test Loss: 0.960644 | Time: 73.12s
EarlyStopping counter: 2 out of 10
Epoch [6/100] | Train Loss: 0.963931 | Test Loss: 0.950283 | Time: 73.15s
Epoch [7/100] | Train Loss: 0.963398 | Test Loss: 0.944592 | Time: 73.11s
Epoch [8/100] | Train Loss: 0.963121 | Test Loss: 0.972930 | Time: 72.16s
EarlyStopping counter: 1 out of 10
Epoch [9/100] | Train Loss: 0.963349 | Test Loss: 0.944461 | Time: 32.70s
Epoch [10/100] | Train Loss: 0.966786 | Test Loss: 0.940437 | Time: 32.63s
Epoch [11/100] | Train Loss: 0.966709 | Test Loss: 0.964180 | Time: 32.72s
EarlyStopping counter: 1 out of 10
Epoch [12/100] | Train Loss: 0.966437 | Test

# 최종 평가(Accuracy, Precision, Recall, F1)

In [18]:

model.eval()
y_true_list = []
y_pred_list = []

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch.to(device)
        logits = model(X_batch)
        # 시그모이드 -> 확률
        probs = torch.sigmoid(logits.squeeze())  # shape: (batch,)
        preds = (probs >= 0.57).float()  # 0.5 threshold -> 0 or 1

        y_true_list.append(y_batch.cpu().numpy())
        y_pred_list.append(preds.cpu().numpy())

y_true = np.concatenate(y_true_list)
y_pred = np.concatenate(y_pred_list)

acc = accuracy_score(y_true, y_pred)
prec = precision_score(y_true, y_pred, zero_division=0)
rec = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
cm = confusion_matrix(y_test, y_pred)

print(f"Accuracy:  {acc:.4f}")
print(f"Precision: {prec:.4f}")
print(f"Recall:    {rec:.4f}")
print(f"F1-score:  {f1:.4f}")

print("[[TN FP]")
print(" [FN TP]]")
print(cm)

Accuracy:  0.6394
Precision: 0.3438
Recall:    0.4424
F1-score:  0.3869
[[TN FP]
 [FN TP]]
[[73588 30396]
 [20077 15927]]


```
[[TN FP]
 [FN TP]]

True = 정답
False = 틀림
Positive = 상승
Negative = 하락
```

#### 정확도 Accuracy

**전체 예측 중 맞춘 비율**

그러나 지금처럼 클래스 불균형이 존재할 경우 신뢰하기 힘듦

#### 정밀도 Precision

**예측한 싱승 시그널 중 실제 상승한 비율**

정밀도가 낮으면 False Positive(FP), 즉 잘못된 상승 예측이 많다는 의미

#### 재현율 Recall

**실제 상승 중 모델이 예측한 비율**

재현율이 높으면 실제 상승을 많이 예측했다는 뜻

#### F1-Score

**정밀도와 재현율의 평균**

두 지표간의 균형을 평가하며, 낮을 경우 정밀도와 재현율 간의 불균형을 나타냄


In [20]:
import matplotlib.pyplot as plt
from sklearn.metrics import precision_recall_curve, f1_score

# 예측 확률 계산
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_prob = torch.sigmoid(model(X_test_tensor)).detach().cpu().numpy()

# Precision-Recall Curve 계산
precision, recall, thresholds = precision_recall_curve(y_test, y_prob)

# F1-score 계산
f1_scores = 2 * (precision * recall) / (precision + recall)

# Precision, Recall, F1-score을 임계값에 따라 시각화
plt.figure(figsize=(10, 6))
plt.plot(thresholds, precision[:-1], label='Precision')
plt.plot(thresholds, recall[:-1], label='Recall')
plt.plot(thresholds, f1_scores[:-1], label='F1-score')
plt.xlabel('Threshold')
plt.ylabel('Score')
plt.title('Precision, Recall, and F1-score vs Threshold')
plt.legend()
plt.show()

OutOfMemoryError: CUDA out of memory. Tried to allocate 66.73 GiB. GPU 0 has a total capacity of 47.43 GiB of which 39.43 GiB is free. Process 691732 has 0 bytes memory in use. Including non-PyTorch memory, this process has 0 bytes memory in use. Of the allocated memory 4.73 GiB is allocated by PyTorch, and 2.37 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [19]:
# F1-score 계산
f1_scores = 2 * (precision[:-1] * recall[:-1]) / (precision[:-1] + recall[:-1] + 1e-10)

# 최적의 Threshold 찾기 (F1-score 최대값 기준)
best_index = np.argmax(f1_scores)
best_threshold = thresholds[best_index]
best_f1_score = f1_scores[best_index]

print(f"최적의 Threshold: {best_threshold:.4f}")
print(f"최적의 F1-score: {best_f1_score:.4f}")

NameError: name 'precision' is not defined

In [21]:
# 모델 저장

torch.save(model.state_dict(), f"../../model/2025-01-16/{filename}.pth")