In [1]:
""" validation/test set을 real 데이터로 사용 """
import os, joblib
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

from utils import set_seed

path = '/workspace/COSCI-GAN_Journal'
try:
    os.chdir(path)
    print("Current working directory: {0}".format(os.getcwd()))
except FileNotFoundError:
    print("Directory {0} does not exist".format(path))
    
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
set_seed(42)
sns.set(style="darkgrid")

  from .autonotebook import tqdm as notebook_tqdm


Current working directory: /workspace/COSCI-GAN_Journal


In [2]:
# 126개의 시계열 데이터를 사용하여 다음 스텝의 상승/하락 여부를 예측
input_size = 5  # feature 개수
output_size = 5
hidden_size = 20
num_layers = 2
lr = 0.001
dropout = 0.0
num_repeats = 10  # 반복 횟수


# 모델 학습
num_epochs = 100
batch_size = 128

class Regressor(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(Regressor, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()
    
    def forward(self, x):
        out, _ = self.gru(x)
        out = self.fc(out)  # 모든 timestep에 대해 예측        
        out = self.sigmoid(out)        
        return out

In [3]:
# COSCI-GAN, TransGAN 데이터 로드
real = joblib.load('./Output/real_list.pkl')
fake = joblib.load('./Output/fake_list.pkl')
    
print(len(real), real[0].shape)
print(len(fake), fake[0].shape)

real_arr = np.transpose(np.array(real), (1, 2, 0))
fake_arr = np.transpose(np.array(fake), (1, 2, 0))

# 데이터 스케일링
scaler = MinMaxScaler()
real_arr = scaler.fit_transform(real_arr.reshape(-1, real_arr.shape[-1])).reshape(real_arr.shape)
fake_arr = scaler.transform(fake_arr.reshape(-1, fake_arr.shape[-1])).reshape(fake_arr.shape)

print(real_arr.shape, fake_arr.shape)

5 (4904, 127)
5 (5000, 127)
(4904, 127, 5) (5000, 127, 5)


In [4]:
# Data preparation for training and testing
def prepare_data(data):
    X = data[:, :-1, :]  # 마지막 timestep 제외
    y = data[:, 1:, :]  # 첫번째 timestep 제외 (전체 시퀀스 예측)
    return X, y

# Weekly data preparation
def prepare_weekly_data(data, interval=5):
    num_samples, num_timesteps, num_features = data.shape
    num_weeks = num_timesteps // interval
    weekly_data = np.zeros((num_samples, num_weeks, num_features))
    
    for i in range(num_weeks):
        weekly_data[:, i, :] = data[:, i*interval:(i+1)*interval, :].mean(axis=1)
    
    return weekly_data

# Training function
def train_model(model, train_loader, criterion, optimizer, num_epochs, device):
    model.train()
    for epoch in range(num_epochs):
        epoch_train_loss = 0
        for x, y in train_loader:
            x, y = x.to(device), y.to(device)
            optimizer.zero_grad()
            outputs = model(x)
            loss = criterion(outputs, y)
            loss.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)  # Gradient Clipping
            optimizer.step()
            epoch_train_loss += loss.item()
        epoch_train_loss /= len(train_loader)
        #print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_train_loss:.4f}")

# Evaluation function
def evaluate_model(model, test_loader, device):
    model.eval()
    all_predictions = []
    all_targets = []
    with torch.no_grad():
        for x_test, y_test in test_loader:
            x_test, y_test = x_test.to(device), y_test.to(device)
            outputs = model(x_test)
            all_predictions.append(outputs.cpu())
            all_targets.append(y_test.cpu())

    all_predictions = torch.cat(all_predictions).view(-1, all_predictions[0].shape[-1])
    all_targets = torch.cat(all_targets).view(-1, all_targets[0].shape[-1])

    mse = mean_squared_error(all_targets, all_predictions)
    mae = mean_absolute_error(all_targets, all_predictions)
    r2 = r2_score(all_targets, all_predictions)

    print(f"Mean Squared Error (MSE): {mse:.4f}")
    print(f"Mean Absolute Error (MAE): {mae:.4f}")
    print(f"R^2 Score: {r2:.4f}")
    
    return mae

In [5]:

# Prepare data for synthetic dataset
X_fake, y_fake = prepare_data(fake_arr)
X_fake = torch.tensor(X_fake, dtype=torch.float32)
y_fake = torch.tensor(y_fake, dtype=torch.float32)

train_dataset_fake = TensorDataset(X_fake, y_fake)
train_loader_fake = DataLoader(train_dataset_fake, batch_size=batch_size, shuffle=True)

# Prepare data for original dataset
X_ori, y_ori = prepare_data(real_arr)
X_ori = torch.tensor(X_ori, dtype=torch.float32)
y_ori = torch.tensor(y_ori, dtype=torch.float32)

test_dataset_ori = TensorDataset(X_ori, y_ori)
test_loader_ori = DataLoader(test_dataset_ori, batch_size=batch_size, shuffle=False)

# Main loop for multiple repetitions
mae_scores = []

for i in range(num_repeats):
    print(f"Repeat {i+1}/{num_repeats}")
    
    # Initialize and train model
    model = Regressor(input_size, hidden_size, num_layers, output_size).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()
    
    train_model(model, train_loader_fake, criterion, optimizer, num_epochs, device)
    
    # Evaluate model
    mae = evaluate_model(model, test_loader_ori, device)
    mae_scores.append(mae)

# Calculate mean and standard deviation of MAE
print()
mae_mean = np.mean(mae_scores)
mae_std = np.std(mae_scores)
print(f"MAE Mean: {mae_mean:.4f}, MAE Std: {mae_std:.4f}")

Repeat 1/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0340
R^2 Score: -0.0589
Repeat 2/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0340
R^2 Score: -0.0617
Repeat 3/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0338
R^2 Score: -0.0579
Repeat 4/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0340
R^2 Score: -0.0638
Repeat 5/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0342
R^2 Score: -0.0746
Repeat 6/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0341
R^2 Score: -0.0617
Repeat 7/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0340
R^2 Score: -0.0655
Repeat 8/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0339
R^2 Score: -0.0557
Repeat 9/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0339
R^2 Score: -0.0582
Repeat 10/10
Mean Squared Error (MSE): 0.0027
Mean Absolute Error (MAE): 0.0339
R^2 Score: -0.0638
MAE Mean: 0.0340, M

In [6]:
real_weekly = prepare_weekly_data(real_arr[:, 2:, :])
fake_weekly = prepare_weekly_data(fake_arr[:, 2:, :])

# Prepare data for synthetic dataset
X_fake, y_fake = prepare_data(fake_weekly)
X_fake = torch.tensor(X_fake, dtype=torch.float32)
y_fake = torch.tensor(y_fake, dtype=torch.float32)

train_dataset_fake = TensorDataset(X_fake, y_fake)
train_loader_fake = DataLoader(train_dataset_fake, batch_size=batch_size, shuffle=True)

# Prepare data for original dataset
X_ori, y_ori = prepare_data(real_weekly)
X_ori = torch.tensor(X_ori, dtype=torch.float32)
y_ori = torch.tensor(y_ori, dtype=torch.float32)

test_dataset_ori = TensorDataset(X_ori, y_ori)
test_loader_ori = DataLoader(test_dataset_ori, batch_size=batch_size, shuffle=False)

# Main loop for multiple repetitions
mae_scores = []

for i in range(num_repeats):
    print(f"Repeat {i+1}/{num_repeats}")
    
    # Initialize and train model
    model = Regressor(input_size, hidden_size, num_layers, output_size).to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = nn.L1Loss()
    
    train_model(model, train_loader_fake, criterion, optimizer, num_epochs, device)
    
    # Evaluate model
    mae = evaluate_model(model, test_loader_ori, device)
    mae_scores.append(mae)

# Calculate mean and standard deviation of MAE
print()
mae_mean = np.mean(mae_scores)
mae_std = np.std(mae_scores)
print(f"MAE Mean: {mae_mean:.4f}, MAE Std: {mae_std:.4f}")


Repeat 1/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0147
R^2 Score: -0.0214
Repeat 2/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0341
Repeat 3/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0208
Repeat 4/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0147
R^2 Score: -0.0223
Repeat 5/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0147
R^2 Score: -0.0246
Repeat 6/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0207
Repeat 7/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0242
Repeat 8/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0147
R^2 Score: -0.0156
Repeat 9/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0179
Repeat 10/10
Mean Squared Error (MSE): 0.0005
Mean Absolute Error (MAE): 0.0148
R^2 Score: -0.0271
MAE Mean: 0.0148, M

In [7]:
# # 예측값과 실제 값을 플롯
# plt.figure(figsize=(15, 5))
# plt.plot(targets[:100, 0].cpu(), label='True Values', marker='o', linestyle='dashed')
# plt.plot(predictions[:100, 0].cpu(), label='Predicted Values', marker='x')
# plt.xlabel('Sample Index')
# plt.ylabel('Value')
# plt.title('True vs Predicted Values')
# plt.legend()
# plt.show()
