In [8]:
import torch
import os
from pathlib import Path
from datetime import datetime
import wandb
from torch.utils.data import DataLoader, Dataset

# 본 과제 제출자는 현재 우분투 도커 환경에서 작업중이므로 다음과 같이 경로 설정
BASE_PATH="/home/Deep-Learning-study"
import sys
sys.path.append(BASE_PATH)

CURRENT_FILE_PATH = os.getcwd()
CHECKPOINT_FILE_PATH = os.path.join(CURRENT_FILE_PATH, "checkpoints")

if not os.path.isdir(CHECKPOINT_FILE_PATH):
  os.makedirs(os.path.join(CURRENT_FILE_PATH, "checkpoints"))

In [9]:
from _01_code._15_lstm_and_its_application.f_arg_parser import get_parser
#from _01_code._15_lstm_and_its_application.g_crypto_currency_regression_train_lstm import get_btc_krw_data
#from _01_code._15_lstm_and_its_application.i_crypto_currency_classification_train_lstm import get_model

In [10]:
class CryptoCurrencyDataset(Dataset):
  def __init__(self, X, y, is_regression=True):
    self.X = X
    self.y = y

    assert len(self.X) == len(self.y)

  def __len__(self):
    return len(self.X)

  def __getitem__(self, idx):
    X = self.X[idx]
    y = self.y[idx]
    return X, y

  def __str__(self):
    str = "Data Size: {0}, Input Shape: {1}, Target Shape: {2}".format(
      len(self.X), self.X.shape, self.y.shape
    )
    return str

In [11]:
import pandas as pd

def get_cryptocurrency_data(
    sequence_size=10, validation_size=100, test_size=10, 
    target_column='Close', y_normalizer=1.0e7, 
    is_regression=True, use_next_open=True
):
    btc_krw_path = os.path.join(BASE_PATH, "_00_data", "k_cryptocurrency", "BTC_KRW.csv")
    df = pd.read_csv(btc_krw_path)
    row_size = len(df)
    date_list = df['Date']

    # Next_Open 컬럼 추가
    if use_next_open:
        df['Next_Open'] = df['Open'].shift(-1)
        
    df = df.drop(columns=['Date'])
    
    # Next_Open이 NaN인 마지막 행 제거
    if use_next_open:
        df = df.dropna()
        row_size = len(df)

    data_size = row_size - sequence_size
    train_size = data_size - (validation_size + test_size)
    #################################################################################################

    row_cursor = 0

    X_train_list = []
    y_train_regression_list = []
    y_train_classification_list = []
    y_train_date = []
    for idx in range(0, train_size):
        sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 6)
        X_train_list.append(torch.from_numpy(sequence_data))
        y_train_regression_list.append(df.iloc[idx + sequence_size][target_column])
        y_train_classification_list.append(
            1 if df.iloc[idx + sequence_size][target_column] >= df.iloc[idx + sequence_size - 1][target_column] else 0
        )
        y_train_date.append(date_list[idx + sequence_size])
        row_cursor += 1

    X_train = torch.stack(X_train_list, dim=0).to(torch.float)
    y_train_regression = torch.tensor(y_train_regression_list, dtype=torch.float32) / y_normalizer
    y_train_classification = torch.tensor(y_train_classification_list, dtype=torch.int64)

    m = X_train.mean(dim=0, keepdim=True)
    s = X_train.std(dim=0, keepdim=True)
    X_train = (X_train - m) / s

    #################################################################################################

    X_validation_list = []
    y_validation_regression_list = []
    y_validation_classification_list = []
    y_validation_date = []
    for idx in range(row_cursor, row_cursor + validation_size):
        sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 6)
        X_validation_list.append(torch.from_numpy(sequence_data))
        y_validation_regression_list.append(df.iloc[idx + sequence_size][target_column])
        y_validation_classification_list.append(
            1 if df.iloc[idx + sequence_size][target_column] >= df.iloc[idx + sequence_size - 1][target_column] else 0
        )
        y_validation_date.append(date_list[idx + sequence_size])
        row_cursor += 1

    X_validation = torch.stack(X_validation_list, dim=0).to(torch.float)
    y_validation_regression = torch.tensor(y_validation_regression_list, dtype=torch.float32) / y_normalizer
    y_validation_classification = torch.tensor(y_validation_classification_list, dtype=torch.int64)

    X_validation = (X_validation - m) / s
    #################################################################################################

    X_test_list = []
    y_test_regression_list = []
    y_test_classification_list = []
    y_test_date = []
    for idx in range(row_cursor, row_cursor + test_size):
        sequence_data = df.iloc[idx: idx + sequence_size].values  # sequence_data.shape: (sequence_size, 6)
        X_test_list.append(torch.from_numpy(sequence_data))
        y_test_regression_list.append(df.iloc[idx + sequence_size][target_column])
        y_test_classification_list.append(
            1 if df.iloc[idx + sequence_size][target_column] > df.iloc[idx + sequence_size - 1][target_column] else 0
        )
        y_test_date.append(date_list[idx + sequence_size])
        row_cursor += 1

    X_test = torch.stack(X_test_list, dim=0).to(torch.float)
    y_test_regression = torch.tensor(y_test_regression_list, dtype=torch.float32) / y_normalizer
    y_test_classification = torch.tensor(y_test_classification_list, dtype=torch.int64)

    X_test = (X_test - m) / s

    if is_regression:
        return (
            X_train, X_validation, X_test,
            y_train_regression, y_validation_regression, y_test_regression,
            y_train_date, y_validation_date, y_test_date
        )
    else:
        return (
            X_train, X_validation, X_test,
            y_train_classification, y_validation_classification, y_test_classification,
            y_train_date, y_validation_date, y_test_date
        )

In [12]:
def get_btc_krw_data(sequence_size=21, validation_size=150, test_size=30, is_regression=True, use_next_open=True):
    # use_next_open 파라미터 추가
    X_train, X_validation, X_test, y_train, y_validation, y_test, y_train_date, y_validation_date, y_test_date \
        = get_cryptocurrency_data(
            sequence_size=sequence_size,
            validation_size=validation_size,
            test_size=test_size,
            target_column='Close',
            y_normalizer=1.0e7,
            is_regression=is_regression,
            use_next_open=use_next_open  # Next_Open feature 사용 여부
        )

    # PyTorch Dataset 객체 생성
    train_crypto_currency_dataset = CryptoCurrencyDataset(X=X_train, y=y_train)
    validation_crypto_currency_dataset = CryptoCurrencyDataset(X=X_validation, y=y_validation)
    test_crypto_currency_dataset = CryptoCurrencyDataset(X=X_test, y=y_test)

    # DataLoader 생성
    train_data_loader = DataLoader(
        dataset=train_crypto_currency_dataset,
        batch_size=wandb.config.batch_size,
        shuffle=True
    )
    
    validation_data_loader = DataLoader(
        dataset=validation_crypto_currency_dataset,
        batch_size=wandb.config.batch_size,
        shuffle=True
    )
    
    test_data_loader = DataLoader(
        dataset=test_crypto_currency_dataset,
        batch_size=len(test_crypto_currency_dataset),
        shuffle=True
    )

    return train_data_loader, validation_data_loader, test_data_loader

In [13]:
import torch.nn as nn

def get_model():
    class MyModel(nn.Module):
        def __init__(self, n_input=6, n_output=2):  # n_input을 6으로 변경 (Next_Open 포함)
            super().__init__()
            
            #메인 LSTM 레이어
            self.lstm = nn.LSTM(
                input_size=n_input,
                hidden_size=1024,  # hidden size 증가
                num_layers=3,      # 3개의 layer
                dropout=0.1,       # dropout 추가
                batch_first=True,
                bidirectional=True # 양방향 LSTM
            )
            
            # 분류를 위한 FC 레이어
            self.fc_layers = nn.Sequential(
                nn.LayerNorm(2048),  # bidirectional이므로 hidden_size * 2
                nn.Linear(2048, 512),
                nn.GELU(),
                nn.Dropout(0.1),
                
                nn.LayerNorm(512),
                nn.Linear(512, 128),
                nn.GELU(),
                nn.Dropout(0.1),
                
                nn.LayerNorm(128),
                nn.Linear(128, n_output),  # n_output=2 for binary classification
            )
            
        def forward(self, x):
            self.lstm.flatten_parameters()  # CUDA 성능 최적화
            x, _ = self.lstm(x)
            x = x[:, -1, :]  # 마지막 시퀀스의 출력만 사용
            x = self.fc_layers(x)
            return x  # CrossEntropyLoss를 사용할 것이므로 softmax는 여기서 적용하지 않음

    my_model = MyModel(n_input=6, n_output=2)
    return my_model
# Args 클래스도 classification task에 맞게 수정
class Args:
    def __init__(self):
        self.wandb = True
        self.batch_size = 32       # classification은 regression보다 큰 배치 사이즈가 효과적일 수 있음
        self.epochs = 300
        self.learning_rate = 1e-3  # classification은 보통 더 큰 학습률 사용
        self.weight_decay = 1e-4
        self.validation_intervals = 1
        self.early_stop_patience = 30
        self.early_stop_delta = 1e-4

In [14]:
def test(test_model):
    # 테스트용 데이터로더만 가져옴 (분류 태스크용)
    _, _, test_data_loader = get_btc_krw_data(is_regression=False)

    # 모델을 평가 모드로 설정 (dropout, batch norm 등이 평가 모드로 변경됨)
    test_model.eval()

    # 정확도 계산을 위한 변수 초기화
    num_corrects_test = 0      # 정확히 예측한 샘플 수
    num_tested_samples = 0     # 전체 테스트 샘플 수

    print("[TEST DATA]")
    # gradient 계산 비활성화 (메모리 사용량 감소, 연산 속도 향상)
    with torch.no_grad():
        # 테스트 데이터의 배치를 하나씩 처리
        for test_batch in test_data_loader:
            # 입력 데이터와 정답 레이블을 배치에서 추출
            input_test, target_test = test_batch

            # 모델을 통해 예측값 계산
            output_test = test_model(input_test)

            # 가장 높은 확률을 가진 클래스를 예측값으로 선택
            predicted_test = torch.argmax(output_test, dim=1)
            # 정답과 예측이 일치하는 개수 누적
            num_corrects_test += torch.sum(torch.eq(predicted_test, target_test))

            # 처리된 샘플 수 누적
            num_tested_samples += len(input_test)

        # 전체 정확도 계산 (백분율)
        test_accuracy = 100.0 * num_corrects_test / num_tested_samples

        # 전체 테스트 정확도 출력
        print(f"TEST RESULTS: {test_accuracy:6.3f}%")

        # 각 샘플별 예측 결과 출력
        for idx, (output, target) in enumerate(zip(output_test, target_test)):
            # 결과 출력 포맷:
            # 인덱스: 예측 클래스 <--> 실제 클래스
            print("{0:2}: {1:6,.2f} <--> {2:6,.2f}".format(
                idx,                            # 데이터 포인트 인덱스
                torch.argmax(output).item(),    # 예측한 클래스 (0: 하락, 1: 상승)
                target.item()                   # 실제 클래스
            ))

In [17]:
def main(args):
    # 현재 시간을 문자열로 변환 (실행 식별자로 사용)
    run_time_str = datetime.now().astimezone().strftime('%Y-%m-%d_%H-%M-%S')

    # wandb 설정을 위한 하이퍼파라미터 딕셔너리 생성
    config = {
        'epochs': args.epochs,                    # 총 학습 에폭 수
        'batch_size': args.batch_size,           # 미니배치 크기
        'validation_intervals': args.validation_intervals,  # 검증 수행 주기
        'learning_rate': args.learning_rate,      # 학습률
        'early_stop_patience': args.early_stop_patience,  # 조기 종료 인내 횟수
        'early_stop_delta': args.early_stop_delta,  # 조기 종료 임계값
    }

    # wandb 프로젝트 초기화 (테스트 모드)
    project_name = "lstm_classification_btc_krw_next_open"
    wandb.init(
        mode="disabled",                          # 테스트 시에는 wandb 비활성화
        project=project_name,                     # 프로젝트 이름
        notes="btc_krw experiment with lstm",     # 실험 설명
        tags=["lstm", "regression", "btc_krw"],   # 관련 태그
        name=run_time_str,                       # 실행 이름 (시간 기반)
        config=config                            # 설정값들
    )

    # 분류 모델 인스턴스 생성
    test_model = get_model()

    # 저장된 최신 모델 파일 경로 설정
    latest_file_path = os.path.join(
        CHECKPOINT_FILE_PATH, f"{project_name}_checkpoint_2024-12-12_17-13-45.pt"    # 최신 체크포인트 파일명
    )
    print("MODEL FILE: {0}".format(latest_file_path))
    
    # 저장된 모델의 가중치를 CPU에 로드
    test_model.load_state_dict(
        torch.load(latest_file_path, map_location=torch.device('cpu'))
    )

    # 테스트 수행
    test(test_model)

In [18]:
if __name__ == "__main__":
    import sys
    if 'ipykernel' in sys.modules:  # Jupyter Notebook에서 실행 중인지 확인
        # Jupyter에서 실행할 때는 기본값 사용
        args = Args()
    else:
        # 일반 Python 스크립트로 실행할 때는 argparse 사용
        parser = get_parser()
        args = parser.parse_args()
    
    main(args)

MODEL FILE: /home/Deep-Learning-study/_02_homeworks/hw4/checkpoints/lstm_classification_btc_krw_next_open_checkpoint_2024-12-12_17-13-45.pt


  torch.load(latest_file_path, map_location=torch.device('cpu'))


[TEST DATA]
TEST RESULTS: 50.000%
 0:   1.00 <-->   1.00
 1:   1.00 <-->   1.00
 2:   1.00 <-->   0.00
 3:   1.00 <-->   0.00
 4:   1.00 <-->   1.00
 5:   1.00 <-->   1.00
 6:   1.00 <-->   0.00
 7:   1.00 <-->   0.00
 8:   1.00 <-->   0.00
 9:   1.00 <-->   1.00
10:   1.00 <-->   0.00
11:   1.00 <-->   0.00
12:   1.00 <-->   1.00
13:   1.00 <-->   1.00
14:   1.00 <-->   0.00
15:   1.00 <-->   0.00
16:   1.00 <-->   0.00
17:   1.00 <-->   1.00
18:   1.00 <-->   1.00
19:   1.00 <-->   0.00
20:   1.00 <-->   1.00
21:   1.00 <-->   0.00
22:   1.00 <-->   1.00
23:   1.00 <-->   0.00
24:   1.00 <-->   1.00
25:   1.00 <-->   1.00
26:   1.00 <-->   1.00
27:   1.00 <-->   0.00
28:   1.00 <-->   1.00
29:   1.00 <-->   0.00


숙제 후기에서도 계속 이어서 이야기 하겠지만 lstm 네트워크를 어느정도 많이 쌓는데는 한계가 있는것 같다. 사실 학습을 하면서 여러 시도를 하면서 차라리 overfitting이 날때 까지 돌려보려고도 하긴 하였는데 overfitting도 나지 않는것을 확인할 수 있었다. 기존에 대비 feature를 추가해도 그렇게 유의미한 변화를 파악할 수 없었다.

숙제 후기

lstm에 대해서 굉장히 많은 모델 아키텍쳐 변경을 통한 학습을 시도해보았지만 그 한계를 맛볼 수 있었던 과제였습니다. iteration을 굉장히 많이 해보기도 하고, laerning rate를 굉장히 작게 만들어 local minima에 빠지는것을 막고자 하였지만 결국 그 마지막은 비슷한 성능을 냄을 알 수 있었습니다.
결국 그 구간이 local minima가 아니라 global minima라고 생각이 들었던것 같아, 이것은 lstm 자체의 모델의 해결하지 문제점이 아닌가라는 생각이 많이 드는 과제였습니다.