In [1]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import warnings
from scipy.linalg import svd
import torch.optim as optim
import math
warnings.filterwarnings('ignore')

from data_provider.data_loader_dinn import Dataset_SNP, Dataset_DOW
from models.fcn import MLP
from models.DLinear import DLinear
from utils.bootstrap import make_bootstrap_loader
from tqdm import tqdm

In [2]:
df_snp = pd.read_csv('./datas/snp500.csv')
df_dow = pd.read_csv('./datas/dow30.csv')


train_end_flag = '2017-12-31'
valid_end_flag = '2019-12-31'

df_snp_val = df_snp[(df_snp['Date']>train_end_flag) & (df_snp['Date']<=valid_end_flag)][:-4]   
df_dow_val = df_dow[(df_dow['Date']>train_end_flag)][:-4]
timestamps = df_dow_val['Date'].values

In [3]:
train_set = Dataset_SNP(root_path='./datas/', flag='train', timeenc=1, freq='B', train_end=train_end_flag, val_end=valid_end_flag)
valid_set = Dataset_SNP(root_path='./datas/', flag='val', timeenc=1, freq='B', train_end=train_end_flag, val_end=valid_end_flag)
test_set = Dataset_SNP(root_path='./datas/', flag='test', timeenc=1, freq='B', train_end=train_end_flag, val_end=train_end_flag)        # valid set에서도 학습할 게 필요함

In [4]:
# train_set = Dataset_DOW(root_path='./datas/', flag='train', timeenc=1, freq='B', train_end=date_flag, val_end=date_flag)
# valid_set = Dataset_DOW(root_path='./datas/', flag='val', timeenc=1, freq='B', train_end=date_flag, val_end=date_flag)
# test_set = Dataset_DOW(root_path='./datas/', flag='test', timeenc=1, freq='B', train_end=date_flag, val_end=date_flag)

In [5]:
batch_size = 16
bootstrap_loaders = make_bootstrap_loader(train_set, B=30, batch_size=batch_size)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=False, drop_last=True)

In [6]:
class DLinearConfig:
    def __init__(self, seq_len, pred_len, enc_in, individual=False):
        self.seq_len = seq_len
        self.pred_len = pred_len
        self.enc_in = enc_in
        self.individual = individual

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np


def train(model_cls, data_loader, valid_data_loader, EPOCHS=100, lr=1e-3, path='./weights/', patience=10, valid_mode = False): 
    
    models = []
    indices_ls = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    criterion = nn.MSELoss()
    
    for i, (loader_b, indices_b) in enumerate(data_loader):
        
        if model_cls == MLP:
            sample_x, sample_y, _, _ = next(iter(loader_b))
            input_dim = sample_x.shape[1] * sample_x.shape[2]
            hidden_dim = 2000
            output_dim = sample_y.shape[1] * sample_y.shape[2]
            model_b = model_cls(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim).to(device)
        
        elif model_cls == DLinear:
            sample_x, sample_y, _, _ = next(iter(loader_b))
            seq_len = sample_x.shape[1]
            pred_len = sample_y.shape[1]
            enc_in = sample_x.shape[2]
            dlinear_configs = DLinearConfig(seq_len=seq_len, pred_len=pred_len, enc_in=enc_in)
            model_b = model_cls(dlinear_configs).to(device)
            
        optimizer = optim.Adam(model_b.parameters(), lr=lr)
        
        best_val_loss = float('inf')
        patience_counter = 0
        
        for epoch in range(EPOCHS):
            model_b.train()
            total_train_loss = 0.0 
            for X_batch, y_batch, _, _ in loader_b:
                
                if model_cls == MLP:
                    X_batch = X_batch.float().to(device).view(X_batch.size(0), -1)
                    y_batch = y_batch.float().to(device).view(y_batch.size(0), -1)
                elif model_cls == DLinear:
                    X_batch = X_batch.float().to(device)
                    y_batch = y_batch.float().to(device)
                
                optimizer.zero_grad()
                preds = model_b(X_batch)
                loss = criterion(preds, y_batch)
                loss.backward()
                optimizer.step()
                total_train_loss += loss.item()
            
            avg_train_loss = total_train_loss / len(loader_b)
            print(f"Model Num {i}, Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}")
            
            
            if valid_mode:
                model_b.eval()
                total_val_loss = 0.0
                with torch.no_grad():
                    for X_val, y_val, _, _ in valid_data_loader:
                        if model_cls == MLP:
                            X_val = X_val.float().to(device).view(X_val.size(0), -1)
                            y_val = y_val.float().to(device).view(y_val.size(0), -1)
                        elif model_cls == DLinear:
                            X_val = X_val.float().to(device)
                            y_val = y_val.float().to(device)
                        
                        preds_val = model_b(X_val)
                        val_loss = criterion(preds_val, y_val)
                        total_val_loss += val_loss.item()
                
                avg_train_loss = total_train_loss / len(loader_b)
                avg_val_loss = total_val_loss / len(valid_data_loader)
                
                print(f"Model Num {i}, Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Valid Loss: {avg_val_loss:.4f}")
            # if (epoch+1) % 10 == 0:
            #     print(f"Model Num {i}, Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Valid Loss: {avg_val_loss:.4f}")

                if avg_val_loss < best_val_loss:
                    best_val_loss = avg_val_loss
                    patience_counter = 0
                    torch.save(model_b.state_dict(), f"{path}/model_b{i}.pt")
                else:
                    patience_counter += 1
                
                if patience_counter >= patience:
                    print(f"Early stopping for model {i} at epoch {epoch+1}!")
                    print()
                    break
        
        model_b.load_state_dict(torch.load(f"{path}/model_b{i}.pt"))
        models.append(model_b)
        indices_ls.append(indices_b)
        
    return models, indices_ls

In [8]:
def train(model_cls, data_loader, valid_data_loader, EPOCHS=100, lr=1e-3, path='./weights/', patience=10, valid_mode=False):
    os.makedirs(path, exist_ok=True)

    models = []
    indices_ls = []
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    criterion = nn.MSELoss()
    
    for i, (loader_b, indices_b) in enumerate(data_loader):
        
        sample_x, sample_y, _, _ = next(iter(loader_b))

        if model_cls == MLP:
            input_dim = sample_x.shape[1] * sample_x.shape[2]
            hidden_dim = 2000
            output_dim = sample_y.shape[1] * sample_y.shape[2]
            model_b = model_cls(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim).to(device)
        
        elif model_cls == DLinear:
            seq_len = sample_x.shape[1]
            pred_len = sample_y.shape[1]
            enc_in = sample_x.shape[2]
            dlinear_configs = DLinearConfig(seq_len=seq_len, pred_len=pred_len, enc_in=enc_in)
            model_b = model_cls(dlinear_configs).to(device)
        else:
            raise ValueError(f"Unsupported model class: {model_cls}")
            
        optimizer = optim.Adam(model_b.parameters(), lr=lr)
        
        best_val_loss = float('inf')
        patience_counter = 0
        
        # ================================================================== #
        # 아래 부분을 수정하여 파일 이름에 모델 클래스 이름을 포함시킵니다.
        # 예: ./weights/MLP_model_b0.pt 또는 ./weights/DLinear_model_b0.pt
        model_name = model_cls.__name__ 
        model_save_path = f"{path}/{model_name}_model_b{i}.pt"
        # ================================================================== #

        for epoch in range(EPOCHS):
            model_b.train()
            total_train_loss = 0.0 
            for X_batch, y_batch, _, _ in loader_b:
                
                if model_cls == MLP:
                    X_batch = X_batch.float().to(device).view(X_batch.size(0), -1)
                    y_batch = y_batch.float().to(device).view(y_batch.size(0), -1)
                elif model_cls == DLinear:
                    X_batch = X_batch.float().to(device)
                    y_batch = y_batch.float().to(device)
                
                optimizer.zero_grad()
                preds = model_b(X_batch)
                loss = criterion(preds, y_batch)
                loss.backward()
                optimizer.step()
                total_train_loss += loss.item()
            
            avg_train_loss = total_train_loss / len(loader_b)

            if valid_mode:
                model_b.eval()
                total_val_loss = 0.0
                with torch.no_grad():
                    for X_val, y_val, _, _ in valid_data_loader:
                        if model_cls == MLP:
                            X_val = X_val.float().to(device).view(X_val.size(0), -1)
                            y_val = y_val.float().to(device).view(y_val.size(0), -1)
                        elif model_cls == DLinear:
                            X_val = X_val.float().to(device)
                            y_val = y_val.float().to(device)
                        
                        preds_val = model_b(X_val)
                        val_loss = criterion(preds_val, y_val)
                        total_val_loss += val_loss.item()
                
                avg_val_loss = total_val_loss / len(valid_data_loader)
                
                print(f"Model Num {i} ({model_name}), Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}, Valid Loss: {avg_val_loss:.4f}")

                if avg_val_loss < best_val_loss:
                    best_val_loss = avg_val_loss
                    patience_counter = 0
                    torch.save(model_b.state_dict(), model_save_path)
                else:
                    patience_counter += 1
                
                if patience_counter >= patience:
                    print(f"Early stopping for model {i} ({model_name}) at epoch {epoch+1}!")
                    break
            else:
                print(f"Model Num {i} ({model_name}), Epoch {epoch+1}, Train Loss: {avg_train_loss:.4f}")
        
        # 학습이 끝난 후 모델을 불러오거나 저장
        if valid_mode:
            # 조기 종료된 경우, 파일이 존재하지 않을 수 있으므로 확인 후 로드
            if os.path.exists(model_save_path):
                print(f"Loading best model for {model_name}_{i} with validation loss: {best_val_loss:.4f}")
                model_b.load_state_dict(torch.load(model_save_path))
            else:
                print(f"Warning: No model was saved for {model_name}_{i} as validation loss never improved.")
        else:
            print(f"Saving final model for {model_name}_{i} after {EPOCHS} epochs.")
            torch.save(model_b.state_dict(), model_save_path)
        
        models.append(model_b)
        indices_ls.append(indices_b)
        print("-" * 60)
        
    return models, indices_ls

In [9]:
models, indices_bootstrap = train(
    model_cls=MLP,
    data_loader=bootstrap_loaders,
    valid_data_loader=valid_loader,
    EPOCHS = 20,
    valid_mode=True
)

Model Num 0 (MLP), Epoch 1, Train Loss: 0.9673, Valid Loss: 0.8124
Model Num 0 (MLP), Epoch 2, Train Loss: 1.0330, Valid Loss: 0.8149
Model Num 0 (MLP), Epoch 3, Train Loss: 0.9334, Valid Loss: 0.8272
Model Num 0 (MLP), Epoch 4, Train Loss: 1.1150, Valid Loss: 0.8147
Model Num 0 (MLP), Epoch 5, Train Loss: 1.5330, Valid Loss: 0.8113
Model Num 0 (MLP), Epoch 6, Train Loss: 0.9653, Valid Loss: 0.8127
Model Num 0 (MLP), Epoch 7, Train Loss: 0.8792, Valid Loss: 0.8135
Model Num 0 (MLP), Epoch 8, Train Loss: 0.8545, Valid Loss: 0.8114
Model Num 0 (MLP), Epoch 9, Train Loss: 0.8520, Valid Loss: 0.8099
Model Num 0 (MLP), Epoch 10, Train Loss: 0.8589, Valid Loss: 0.8127
Model Num 0 (MLP), Epoch 11, Train Loss: 0.8355, Valid Loss: 0.8126
Model Num 0 (MLP), Epoch 12, Train Loss: 0.8298, Valid Loss: 0.8122
Model Num 0 (MLP), Epoch 13, Train Loss: 0.8197, Valid Loss: 0.8126
Model Num 0 (MLP), Epoch 14, Train Loss: 0.8433, Valid Loss: 0.8148
Model Num 0 (MLP), Epoch 15, Train Loss: 0.8573, Valid Lo

In [10]:
dlinear_models, dlinear_indices_bootstrap = train(
    model_cls=DLinear,
    data_loader=bootstrap_loaders,
    valid_data_loader=valid_loader,
    EPOCHS = 20,
    valid_mode=True
)

Model Num 0 (DLinear), Epoch 1, Train Loss: 0.9875, Valid Loss: 0.8160
Model Num 0 (DLinear), Epoch 2, Train Loss: 0.9649, Valid Loss: 0.8204
Model Num 0 (DLinear), Epoch 3, Train Loss: 0.9636, Valid Loss: 0.8239
Model Num 0 (DLinear), Epoch 4, Train Loss: 0.9627, Valid Loss: 0.8234
Model Num 0 (DLinear), Epoch 5, Train Loss: 0.9660, Valid Loss: 0.8253
Model Num 0 (DLinear), Epoch 6, Train Loss: 0.9626, Valid Loss: 0.8251
Model Num 0 (DLinear), Epoch 7, Train Loss: 0.9632, Valid Loss: 0.8259
Model Num 0 (DLinear), Epoch 8, Train Loss: 0.9640, Valid Loss: 0.8270
Model Num 0 (DLinear), Epoch 9, Train Loss: 0.9634, Valid Loss: 0.8254
Model Num 0 (DLinear), Epoch 10, Train Loss: 0.9630, Valid Loss: 0.8266
Model Num 0 (DLinear), Epoch 11, Train Loss: 0.9638, Valid Loss: 0.8258
Early stopping for model 0 (DLinear) at epoch 11!
Loading best model for DLinear_0 with validation loss: 0.8160
------------------------------------------------------------
Model Num 1 (DLinear), Epoch 1, Train Loss: 

In [11]:
# import torch
# from tqdm import tqdm

# def compute_train_residuals_variable_wise(model_type, train_dataset, bootstrap_models, bootstrap_index_sets, device='cuda'):
#     """
#     훈련 데이터셋에 대한 out-of-bag 잔차(residuals)를 계산합니다.

#     Args:
#         model_type (str): 모델의 종류 ('MLP' 또는 'DLinear').
#         train_dataset (Dataset): 훈련 데이터셋. inverse_transform 메소드를 포함해야 합니다.
#         bootstrap_models (list): 부트스트랩으로 훈련된 모델들의 리스트.
#         bootstrap_index_sets (list): 각 부트스트랩 모델이 학습에 사용한 데이터 인덱스 set의 리스트.
#         device (str): 연산에 사용할 디바이스 ('cuda' 또는 'cpu').

#     Returns:
#         tuple: (전체 예측값 텐서, 전체 실제값 텐서, 잔차 텐서)
#                (torch.Tensor, torch.Tensor, torch.Tensor)
#     """
#     T = len(train_dataset)
#     if T == 0:
#         # 데이터셋이 비어있는 경우 처리
#         return torch.empty(0), torch.empty(0), torch.empty(0)

#     # shape 정보를 명확하게 얻기 위해 샘플 하나를 사용
#     sample_x, sample_y = train_dataset[0][:2]
#     pred_len, num_features = torch.tensor(sample_y).shape

#     # 1. 오타 수정: train_set -> train_dataset
#     # 2. shape 정보를 변수로 만들어 가독성 향상
#     residuals = torch.zeros(T, pred_len, num_features).to(device)
#     sample_indices = np.random.choice(T+1, num_samples, replace=False)
#     whole_pred_inv = []
#     whole_true_inv = []
    
#     for i in tqdm(range(T), desc='Computing Residuals'):
        
#         if i in sample_indices:
#             seq_x, seq_y, _, _ = train_dataset[i]
            
#             y_true = torch.tensor(seq_y).float().to(device)  # [pred_len, num_features]

#             # --- 3. 중복 코드 개선: 모델 타입에 따라 입력 형태만 다르게 처리 ---
#             if model_type == 'MLP':
#                 x_input = torch.tensor(seq_x).float().flatten().unsqueeze(0).to(device)  # [1, D]
#             elif model_type == 'DLinear':
#                 x_input = torch.tensor(seq_x).float().unsqueeze(0).to(device)  # [1, seq_len, num_features]
#             else:
#                 raise ValueError(f"지원하지 않는 모델 타입입니다: {model_type}")
#             # -----------------------------------------------------------------

#             preds = []
#             for b, model in enumerate(bootstrap_models):
#                 if i not in bootstrap_index_sets[b]:
#                     model.eval()
#                     with torch.no_grad():
#                         y_pred = model(x_input).squeeze(0)
#                         if model_type == 'MLP' and len(y_pred.shape) == 1:
#                             y_pred = y_pred.view_as(y_true)
#                         preds.append(y_pred)

#             if len(preds) == 0:

#                 residuals[i] = torch.full((pred_len, num_features), float('nan')).to(device)
#             else:
#                 pred_mean = torch.stack(preds).mean(dim=0)  # [pred_len, num_features]

#                 y_true_np = y_true.cpu().numpy()
#                 pred_np = pred_mean.cpu().numpy()

#                 y_true_inv = torch.tensor(train_dataset.inverse_transform(y_true_np)).to(device)
#                 pred_inv = torch.tensor(train_dataset.inverse_transform(pred_np)).to(device)

#                 # 절대 오차(잔차) 계산
#                 residuals[i] = torch.abs(y_true_inv - pred_inv)  # [pred_len, num_features]
                
#                 whole_pred_inv.append(pred_inv)
#                 whole_true_inv.append(y_true_inv)

#             stacked_preds = torch.stack(whole_pred_inv)
#             stacked_trues = torch.stack(whole_true_inv)
            
#             # squared_error = (stacked_preds - stacked_trues)**2
#             # mse = squared_error.mean() 
#             # print(f'{model_type} Test Loss: {mse:.4f}')
            
#         if not whole_pred_inv:
#             empty_tensor = torch.empty(0, pred_len, num_features).to(device)
#             return empty_tensor, empty_tensor, residuals
        
#         else:
#             continue
        
#     # return torch.stack(whole_pred_inv), torch.stack(whole_true_inv), residuals
#     return stacked_preds, stacked_trues, residuals

In [12]:
import torch
import numpy as np
from tqdm import tqdm

def compute_train_residuals_variable_wise(model_type, train_dataset, bootstrap_models, bootstrap_index_sets, num_samples=252, device='cuda'):
    """
    훈련 데이터셋에 대한 out-of-bag 잔차(residuals)를 올바르게 계산합니다.

    Args:
        model_type (str): 모델의 종류 ('MLP' 또는 'DLinear').
        train_dataset (Dataset): 훈련 데이터셋. inverse_transform 메소드를 포함해야 합니다.
        bootstrap_models (list): 부트스트랩으로 훈련된 모델들의 리스트.
        bootstrap_index_sets (list): 각 부트스트랩 모델이 학습에 사용한 데이터 인덱스 set의 리스트.
        num_samples (int): 잔차를 계산할 훈련 데이터 샘플의 수.
        device (str): 연산에 사용할 디바이스 ('cuda' 또는 'cpu').

    Returns:
        tuple: (전체 예측값 텐서, 전체 실제값 텐서, 잔차 텐서)
               (torch.Tensor, torch.Tensor, torch.Tensor)
    """
    T = len(train_dataset)
    if T == 0:
        return torch.empty(0), torch.empty(0), torch.empty(0)

    # shape 정보를 명확하게 얻기 위해 샘플 하나를 사용
    sample_x, sample_y = train_dataset[0][:2]
    # torch.tensor()로 감싸서 shape을 안전하게 추출
    pred_len, num_features = torch.tensor(sample_y).shape

    # [수정 1] 샘플링을 루프 밖에서 한 번만 수행합니다.
    # T보다 num_samples가 클 경우 T로 제한하여 모든 샘플을 사용하도록 합니다.
    actual_num_samples = min(num_samples, T)
    sample_indices = np.random.choice(T, actual_num_samples, replace=False)

    # 처리된 샘플만 저장할 리스트
    whole_pred_inv = []
    whole_true_inv = []
    
    # [수정 2] 잔차 텐서의 크기를 실제 샘플 수에 맞게 조정하고, 모든 값을 NaN으로 초기화
    # 이렇게 하면 OOB 모델이 없는 경우를 자연스럽게 처리할 수 있습니다.
    residuals_list = []

    # [수정 3] tqdm의 total을 실제 샘플 수로 설정하여 진행률을 정확하게 표시합니다.
    for i in tqdm(sample_indices, desc='Computing OOB Residuals'):
        seq_x, seq_y, _, _ = train_dataset[i]
        
        y_true = torch.tensor(seq_y, dtype=torch.float32).to(device)  # [pred_len, num_features]

        # 모델 타입에 따라 입력 형태 처리
        if model_type == 'MLP':
            x_input = torch.tensor(seq_x, dtype=torch.float32).flatten().unsqueeze(0).to(device)
        elif model_type == 'DLinear':
            x_input = torch.tensor(seq_x, dtype=torch.float32).unsqueeze(0).to(device)
        else:
            raise ValueError(f"지원하지 않는 모델 타입입니다: {model_type}")

        oob_preds = []
        for b, model in enumerate(bootstrap_models):
            # 이 샘플(i)을 훈련에 사용하지 않은 모델(OOB model)을 찾습니다.
            if i not in bootstrap_index_sets[b]:
                model.eval()
                with torch.no_grad():
                    y_pred = model(x_input).squeeze(0)
                    # MLP 출력이 1D 벡터일 경우 올바른 shape으로 변환
                    if model_type == 'MLP' and len(y_pred.shape) == 1:
                        y_pred = y_pred.view_as(y_true)
                    oob_preds.append(y_pred)

        # OOB 예측을 수행한 모델이 하나라도 있는 경우에만 잔차를 계산합니다.
        if len(oob_preds) > 0:
            pred_mean = torch.stack(oob_preds).mean(dim=0)  # [pred_len, num_features]

            # inverse_transform을 위해 numpy로 변환
            y_true_np = y_true.cpu().numpy()
            pred_np = pred_mean.cpu().numpy()

            # 스케일링 복원
            y_true_inv = torch.tensor(train_dataset.inverse_transform(y_true_np), dtype=torch.float32).to(device)
            pred_inv = torch.tensor(train_dataset.inverse_transform(pred_np), dtype=torch.float32).to(device)

            # 절대 오차(잔차) 계산 및 리스트에 추가
            residual_val = torch.abs(y_true_inv - pred_inv)
            residuals_list.append(residual_val)
            
            # 전체 예측값과 실제값 저장
            whole_pred_inv.append(pred_inv)
            whole_true_inv.append(y_true_inv)

    # [수정 4] 루프가 모두 끝난 후, 수집된 리스트를 텐서로 변환합니다.
    # 리스트가 비어있을 경우 (처리된 샘플이 하나도 없을 경우) 빈 텐서를 반환합니다.
    if not whole_pred_inv:
        empty_tensor = torch.empty(0, pred_len, num_features).to(device)
        return empty_tensor, empty_tensor, empty_tensor
        
    stacked_preds = torch.stack(whole_pred_inv)
    stacked_trues = torch.stack(whole_true_inv)
    stacked_residuals = torch.stack(residuals_list)
    
    # MSE 계산 (필요시 주석 해제)
    # mse = (stacked_preds - stacked_trues).pow(2).mean()
    # print(f'{model_type} OOB MSE: {mse:.4f}')

    return stacked_preds, stacked_trues, stacked_residuals

In [13]:
var_train_preds, var_train_trues, var_train_residuals = compute_train_residuals_variable_wise('MLP', train_set, models, indices_bootstrap)
dlinear_train_preds, dlinear_train_trues, dlinear_train_residuals = compute_train_residuals_variable_wise('DLinear', train_set, dlinear_models, dlinear_indices_bootstrap)

Computing OOB Residuals: 100%|██████████| 252/252 [00:01<00:00, 193.72it/s]
Computing OOB Residuals: 100%|██████████| 252/252 [00:01<00:00, 178.45it/s]


In [14]:
def mse(trues, preds):
    squared_error = (trues-preds)**2
    
    return squared_error.mean()

mlp_train_mse = mse(var_train_preds , var_train_trues)
dlinear_train_mse = mse(dlinear_train_preds, dlinear_train_trues)

print(mlp_train_mse)
print(dlinear_train_mse)

tensor(0.0004, device='cuda:0')
tensor(0.0004, device='cuda:0')


In [15]:
import torch
from tqdm import tqdm

def compute_interval(
    model_type,
    test_dataset,
    train_dataset,  # inverse_transform을 위해 필요
    train_residuals,
    bootstrap_models,
    alpha=0.1,
    device='cuda'
):
    """
    테스트 데이터셋에 대한 예측 구간을 온라인 업데이트 방식으로 계산합니다.

    Args:
        model_type (str): 모델의 종류 ('MLP' 또는 'DLinear').
        test_dataset (Dataset): 테스트 데이터셋.
        train_dataset (Dataset): 스케일러의 inverse_transform을 위해 필요한 훈련 데이터셋.
        train_residuals (torch.Tensor): 사전에 계산된 훈련 데이터의 잔차 풀.
        bootstrap_models (list): 앙상블을 위한 부트스트랩 모델 리스트.
        alpha (float): 유의 수준 (e.g., 0.1 for 90% prediction interval).
        device (str): 연산에 사용할 디바이스.

    Returns:
        tuple: (상단 구간, 하단 구간, 예측값, 실제값) 텐서들.
    """
    upper_intervals = []
    lower_intervals = []
    test_preds = []
    test_trues = []
    T = len(test_dataset)

    # train_preds 인수는 사용되지 않아 제거했습니다.
    # f_t 변수 또한 사용되지 않아 관련 코드를 제거했습니다.

    for i in tqdm(range(T), desc='Computing Intervals'):
        seq_x, seq_y, _, _ = test_dataset[i]
        y_true = torch.tensor(seq_y).float().to(device)  # [pred_len, num_features]

        # 1. 모델 타입에 따라 입력 데이터 형태를 다르게 설정
        if model_type == 'MLP':
            x_input = torch.tensor(seq_x).float().flatten().unsqueeze(0).to(device)  # [1, D]
        elif model_type == 'DLinear':
            x_input = torch.tensor(seq_x).float().unsqueeze(0).to(device)  # [1, seq_len, num_features]
        else:
            raise ValueError(f"지원하지 않는 모델 타입입니다: {model_type}")

        # 모든 부트스트랩 모델을 사용해 예측 (앙상블)
        preds = []
        for model in bootstrap_models:
            model.eval()
            with torch.no_grad():
                y_pred = model(x_input).squeeze(0)
                # 2. MLP 모델의 출력이 1D일 경우, y_true와 같은 2D 형태로 변환
                if model_type == 'MLP' and len(y_pred.shape) == 1:
                    y_pred = y_pred.view_as(y_true)
                preds.append(y_pred)
                
        pred_mean = torch.stack(preds).mean(dim=0)  # [pred_len, num_features]


        y_true_np = y_true.cpu().numpy()
        pred_np = pred_mean.cpu().numpy()
        y_true_inv = torch.tensor(train_dataset.inverse_transform(y_true_np)).to(device)
        pred_inv = torch.tensor(train_dataset.inverse_transform(pred_np)).to(device)

        w_t = torch.quantile(train_residuals, q=(1 - alpha), dim=0)
        
        upper_interval_t = pred_inv + w_t
        lower_interval_t = pred_inv - w_t
        

        current_test_residual = torch.abs(y_true_inv - pred_inv)


        train_residuals = torch.roll(train_residuals, shifts=-1, dims=0)
        train_residuals[-1] = current_test_residual

        lower_intervals.append(lower_interval_t)
        upper_intervals.append(upper_interval_t)
        test_preds.append(pred_inv)
        test_trues.append(y_true_inv)

    return (
        torch.stack(upper_intervals),
        torch.stack(lower_intervals),
        torch.stack(test_preds),
        torch.stack(test_trues)
    )

In [16]:
mlp_upper_intervals, mlp_lower_intervals, mlp_test_preds, mlp_test_true = compute_interval(
    model_type='MLP',
    test_dataset=test_set,
    train_dataset=train_set,
    train_residuals=var_train_residuals, 
    bootstrap_models=models,              
    alpha=0.05,                          
    device='cuda'
)

# DLinear 모델에 대한 예측 구간 계산
dlinear_upper_intervals, dlinear_lower_intervals, dlinear_test_preds, dlinear_test_true = compute_interval(
    model_type='DLinear',
    test_dataset=test_set,
    train_dataset=train_set,
    train_residuals=var_train_residuals,  
    bootstrap_models=dlinear_models,      
    alpha=0.05,                           
    device='cuda'
)

Computing Intervals: 100%|██████████| 1756/1756 [00:17<00:00, 99.11it/s] 
Computing Intervals: 100%|██████████| 1756/1756 [00:26<00:00, 66.86it/s]


In [17]:
mlp_test_mse = mse(mlp_test_preds, mlp_test_true)
dlinear_test_mse = mse(dlinear_test_preds, dlinear_test_true)

print(mlp_test_mse)
print(dlinear_test_mse)

tensor(0.0003, device='cuda:0')
tensor(0.0003, device='cuda:0')


In [18]:
def compute_coverage_widths(lower_interval, upper_interval, trues):
    
    widths = torch.abs(upper_interval.mean(dim=1) - lower_interval.mean(dim=1))
    
    covered_lower = lower_interval <= trues
    covered_upper = trues <= upper_interval
    is_covered = covered_lower & covered_upper
    num_covered = is_covered.sum().item()
    total_samples = trues.numel()
    coverage = num_covered / total_samples
    
    return coverage, widths

mlp_coverage, mlp_widths = compute_coverage_widths(mlp_lower_intervals, mlp_upper_intervals, mlp_test_true)
dlinear_coverage, dlinear_widths = compute_coverage_widths(dlinear_lower_intervals, dlinear_upper_intervals, dlinear_test_true)


print(f'MLP Coverage: {mlp_coverage:.4f}')
print(f'DLinear Coverage: {dlinear_coverage:.4f}')

MLP Coverage: 0.9438
DLinear Coverage: 0.9438


In [19]:
# def make_files(assets, model_type, seq_len, pred_len, preds, trues, widths, timestamp, save_paths = './result_enbpi'):
#     file_name = f"{assets}_{model_type}_{seq_len}_{pred_len}.pt"
#     save_path = os.path.join(save_paths, file_name)
    
#     torch.save({
#     'preds': preds,
#     'trues': trues,
#     'widths': widths,
#     'Date':timestamp
# }, save_path)

In [20]:
import os
import pickle

def make_files(assets, model_type, seq_len, pred_len, preds, trues, widths, upper, lower, timestamp, save_paths='./result_enbpi'):
    file_name = f"{assets}_{model_type}_{seq_len}_{pred_len}.pkl"
    save_path = os.path.join(save_paths, file_name)

    os.makedirs(save_paths, exist_ok=True)  # 경로 없으면 생성

    with open(save_path, 'wb') as f:
        pickle.dump({
            'preds': preds,       # torch.Tensor
            'trues': trues,       # torch.Tensor
            'widths': widths,     # torch.Tensor
            'Date': timestamp,
            'Upper': upper,
            'Lower':lower
        }, f)

    print(f"Saved to {save_path}")


In [21]:
make_files(assets='SNP', model_type='MLP', seq_len=train_set[0][0].shape[0], 
           pred_len=train_set[0][1].shape[0], preds=mlp_test_preds, trues=mlp_test_true, widths=mlp_widths, 
           upper=mlp_upper_intervals,lower=mlp_lower_intervals, timestamp=timestamps)

Saved to ./result_enbpi\SNP_MLP_60_5.pkl


In [22]:
make_files(assets='SNP', model_type='DLinear', seq_len=train_set[0][0].shape[0], pred_len=train_set[0][1].shape[0], preds=dlinear_test_preds, 
           trues=dlinear_test_true, widths=dlinear_widths,  upper=dlinear_upper_intervals,lower=dlinear_lower_intervals,
           timestamp=timestamps)

Saved to ./result_enbpi\SNP_DLinear_60_5.pkl
