In [2]:
import os
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm

In [3]:
data_path = '../../data/combined_dataset-1.xlsx'
df = pd.read_excel(data_path)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 657 entries, 0 to 656
Columns: 2114 entries, Num. to 2100
dtypes: float64(1), int64(2108), object(5)
memory usage: 10.6+ MB


In [4]:
df.head(10)

Unnamed: 0,Num.,subject_ID,Sex(M/F),Age(year),Height(cm),Weight(kg),Systolic Blood Pressure(mmHg),Diastolic Blood Pressure(mmHg),Heart Rate(b/m),BMI(kg/m^2),...,2091,2092,2093,2094,2095,2096,2097,2098,2099,2100
0,1,2,Female,45,152,63,161,89,97,27.268006,...,1766,1766,1766,1833,1833,1827,1827,1827,1754,1754
1,1,2,Female,45,152,63,161,89,97,27.268006,...,1985,1985,2026,2026,2026,1977,1977,1997,1997,1997
2,1,2,Female,45,152,63,161,89,97,27.268006,...,1942,1900,1900,1938,1938,1938,1924,1924,1929,1929
3,2,3,Female,50,157,50,160,93,76,20.284799,...,2073,2072,2072,2072,2051,2051,2036,2036,2036,2045
4,2,3,Female,50,157,50,160,93,76,20.284799,...,2021,2010,2010,2010,2001,2001,2003,2003,2003,1989
5,2,3,Female,50,157,50,160,93,76,20.284799,...,2020,2020,2032,2032,2032,2011,2011,2005,2005,2005
6,3,6,Female,47,150,47,101,71,79,20.888889,...,2047,2047,2017,2017,2017,2053,2053,2038,2038,2038
7,3,6,Female,47,150,47,101,71,79,20.888889,...,2076,2076,2051,2051,2051,2060,2060,2067,2067,2067
8,3,6,Female,47,150,47,101,71,79,20.888889,...,2163,2159,2159,2159,2175,2175,2168,2168,2168,2175
9,4,8,Male,45,172,65,136,93,87,21.971336,...,1985,1985,1985,1984,1984,1995,1995,1995,1972,1972


In [5]:
df.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Num.,657.0,110.000000,63.267362,1.0,55.0,110.0,165.0,219.0
subject_ID,657.0,156.598174,101.449344,2.0,85.0,152.0,215.0,419.0
Age(year),657.0,57.168950,15.850110,21.0,48.0,58.0,68.0,86.0
Height(cm),657.0,161.228311,8.190357,145.0,155.0,160.0,167.0,196.0
Weight(kg),657.0,60.191781,11.868168,36.0,52.0,60.0,67.0,103.0
...,...,...,...,...,...,...,...,...
2096,657.0,2085.436834,305.845135,1519.0,1904.0,2014.0,2180.0,3811.0
2097,657.0,2083.791476,304.297297,1515.0,1904.0,2012.0,2176.0,3787.0
2098,657.0,2084.803653,306.657540,1515.0,1904.0,2011.0,2175.0,3774.0
2099,657.0,2085.196347,306.275406,1515.0,1906.0,2012.0,2177.0,3775.0


In [6]:
df.shape

(657, 2114)

In [7]:
print (set(df.isnull().sum()))
df = df.fillna(method='ffill')
print (set(df.isnull().sum()))
df = df.fillna(method='bfill')
print (set(df.isnull().sum()))
df = df.fillna(df.mean)

{0, 597, 582, 543}
{0, 75, 72, 174}
{0}


  df = df.fillna(method='ffill')
  df = df.fillna(method='bfill')


In [8]:
meta_end_idx = df.columns.get_loc('cerebrovascular disease') + 1
meta_cols = df.columns[:meta_end_idx]
signal_cols = df.columns[meta_end_idx:]

print(f"metadata col # : {list(meta_cols)}")
print(f"signal data col # : {len(signal_cols)}")

metadata col # : ['Num.', 'subject_ID', 'Sex(M/F)', 'Age(year)', 'Height(cm)', 'Weight(kg)', 'Systolic Blood Pressure(mmHg)', 'Diastolic Blood Pressure(mmHg)', 'Heart Rate(b/m)', 'BMI(kg/m^2)', 'Hypertension', 'Diabetes', 'cerebral infarction', 'cerebrovascular disease']
signal data col # : 2100


In [9]:
target_cols = ['Systolic Blood Pressure(mmHg)', 'Diastolic Blood Pressure(mmHg)']

X_signals = df[signal_cols].values
y_bp = df[target_cols].values

print(f"signal data : {X_signals.shape}")
print(f"pressure data : {y_bp.shape}")

signal data : (657, 2100)
pressure data : (657, 2)


In [10]:
X_data = X_signals  # Shape: [num_samples, signal_length]
y_data = y_bp      # Shape: [num_samples, 2]

print(f"signal data : {X_data.shape}")
print(f"pressure data : {y_data.shape}")

# Split the data into train, validation, and test sets
X_train_val, X_test, y_train_val, y_test = train_test_split(X_data, y_data, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.2, random_state=42)

print(f"train={X_train.shape}, val={X_val.shape}, test={X_test.shape}")

signal data : (657, 2100)
pressure data : (657, 2)
train=(420, 2100), val=(105, 2100), test=(132, 2100)


In [11]:
scaler_X = StandardScaler()
X_train = scaler_X.fit_transform(X_train.reshape(X_train.shape[0], -1)).reshape(X_train.shape)
X_val = scaler_X.transform(X_val.reshape(X_val.shape[0], -1)).reshape(X_val.shape)
X_test = scaler_X.transform(X_test.reshape(X_test.shape[0], -1)).reshape(X_test.shape)

scaler_y = StandardScaler()
y_train = scaler_y.fit_transform(y_train)
y_val = scaler_y.transform(y_val)
y_test = scaler_y.transform(y_test)

In [12]:
import os
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.optim.lr_scheduler import ReduceLROnPlateau
from tqdm import tqdm

class RMSNorm(nn.Module):
    def __init__(self, d_model, eps=1e-5):
        super().__init__()
        self.eps = eps
        self.weight = nn.Parameter(torch.ones(d_model))

    def forward(self, x):
        norm = torch.norm(x, dim=-1, keepdim=True) * (x.shape[-1] ** -0.5)
        return x / (norm + self.eps) * self.weight

class DataEmbedding(nn.Module):
    def __init__(self, d_model, dropout=0.1):
        super().__init__()
        # Use Conv1d to handle the input shape [batch, 1, seq_len]
        self.proj = nn.Conv1d(in_channels=1, out_channels=d_model, kernel_size=1)
        self.norm = nn.BatchNorm1d(d_model)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):
        # x: [batch, 1, seq_len]
        x = self.proj(x)  # [batch, d_model, seq_len]
        x = self.norm(x)
        x = self.dropout(x)
        x = x.transpose(1, 2)  # [batch, seq_len, d_model]
        return x

class SeriesDecomp(nn.Module):
    """Time series decomposition module - separates trend and seasonality"""
    def __init__(self, kernel_size):
        super().__init__()
        self.kernel_size = kernel_size
        # Ensure kernel size is odd for symmetric padding
        if kernel_size % 2 == 0:
            self.kernel_size = kernel_size + 1

        self.avg = nn.AvgPool1d(
            kernel_size=self.kernel_size,
            stride=1,
            padding=self.kernel_size//2
        )

    def forward(self, x):
        # x: [Batch, Length, Channel]
        x_transpose = x.transpose(1, 2)  # [Batch, Channel, Length]

        # Extract trend using average pooling
        trend = self.avg(x_transpose).transpose(1, 2)  # [Batch, Length, Channel]

        # Original - trend = seasonal component
        seasonal = x - trend

        return seasonal, trend

class AutoCorrelation(nn.Module):
    """Auto-correlation mechanism with improved stability"""
    def __init__(self, dim, heads=8, dropout=0.1):
        super().__init__()
        self.dim = dim
        self.heads = heads
        self.head_dim = dim // heads
        self.scale = self.head_dim ** -0.5

        self.q_proj = nn.Linear(dim, dim)
        self.k_proj = nn.Linear(dim, dim)
        self.v_proj = nn.Linear(dim, dim)
        self.out_proj = nn.Linear(dim, dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, queries, keys, values, mask=None):
        # q, k, v: [Batch, Length, Dim]
        batch_size, q_len, _ = queries.shape
        _, k_len, _ = keys.shape

        # Project and reshape for multi-head
        q = self.q_proj(queries).view(batch_size, q_len, self.heads, self.head_dim).transpose(1, 2)
        k = self.k_proj(keys).view(batch_size, k_len, self.heads, self.head_dim).transpose(1, 2)
        v = self.v_proj(values).view(batch_size, k_len, self.heads, self.head_dim).transpose(1, 2)

        # Use a simpler and more stable attention mechanism instead of FFT
        # This avoids potential NaN issues with FFT operations
        attn = torch.matmul(q, k.transpose(-1, -2)) * self.scale

        # Apply masking if provided
        if mask is not None:
            attn = attn.masked_fill(mask == 0, -1e9)

        # Apply softmax to get attention weights
        attn = F.softmax(attn, dim=-1)
        attn = self.dropout(attn)

        # Apply attention to values
        output = torch.matmul(attn, v)

        # Reshape back to original dimensions
        output = output.transpose(1, 2).contiguous().view(batch_size, q_len, self.dim)

        # Project output
        output = self.out_proj(output)

        return output

class AutoformerEncoderLayer(nn.Module):
    """Autoformer Encoder Layer with auto-correlation and decomposition"""
    def __init__(self, d_model, n_heads, d_ff=None, dropout=0.1, activation="relu", decomp_kernel=5):
        super().__init__()
        d_ff = d_ff or 4 * d_model

        # Auto-correlation mechanism
        self.autocorr = AutoCorrelation(d_model, heads=n_heads, dropout=dropout)

        # Series decomposition
        self.decomp1 = SeriesDecomp(decomp_kernel)
        self.decomp2 = SeriesDecomp(decomp_kernel)

        # Feed-forward network
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU() if activation == "relu" else nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model),
            nn.Dropout(dropout)
        )

    def forward(self, x, mask=None):
        # Auto-correlation Block
        autocorr_output = self.autocorr(x, x, x, mask=mask)

        # First residual connection
        x1 = autocorr_output + x

        # Decomposition - split seasonal & trend
        x_seasonal, x_trend = self.decomp1(x1)

        # Feed-forward Block
        ff_output = self.ff(x_seasonal)

        # Second residual connection
        x2 = ff_output + x_seasonal

        # Decomposition
        out_seasonal, out_trend = self.decomp2(x2)

        # Final output = seasonal + trend
        output = out_seasonal + x_trend

        return output

class AutoformerEncoder(nn.Module):
    """Autoformer encoder with stacked encoder layers"""
    def __init__(self, d_model, n_layers, n_heads, d_ff=None, dropout=0.1, activation="relu", decomp_kernel=5):
        super().__init__()

        self.layers = nn.ModuleList([
            AutoformerEncoderLayer(
                d_model=d_model,
                n_heads=n_heads,
                d_ff=d_ff,
                dropout=dropout,
                activation=activation,
                decomp_kernel=decomp_kernel
            ) for _ in range(n_layers)
        ])

        self.norm = nn.LayerNorm(d_model)

    def forward(self, x, mask=None):
        # x: [Batch, Length, Dim]

        # Apply encoder layers sequentially
        for layer in self.layers:
            x = layer(x, mask=mask)

        # Apply final normalization
        x = self.norm(x)

        return x

class EnhancedAutoformer(nn.Module):
    """Enhanced Autoformer model with CNN layers for improved performance"""
    def __init__(self, d_model=64, n_layers=4, n_heads=8, d_ff=None, dropout=0.1,
                 decomp_kernel=5, activation="relu", cnn_channels=[128, 64]):
        super().__init__()

        # Data embedding
        self.embedding = DataEmbedding(d_model, dropout)

        # Autoformer encoder
        self.encoder = AutoformerEncoder(
            d_model=d_model,
            n_layers=n_layers,
            n_heads=n_heads,
            d_ff=d_ff,
            dropout=dropout,
            activation=activation,
            decomp_kernel=decomp_kernel
        )

        # CNN feature extraction after Autoformer
        self.post_cnn_layers = nn.ModuleList()

        # Current channels is d_model
        in_channels = d_model

        # Add CNN layers
        for out_channels in cnn_channels:
            self.post_cnn_layers.append(
                nn.Sequential(
                    nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1),
                    nn.BatchNorm1d(out_channels),
                    nn.ReLU(),
                    nn.MaxPool1d(kernel_size=2, stride=2)
                )
            )
            in_channels = out_channels

        # Calculate CNN output size (depends on input sequence length)
        self.calc_cnn_output_size = lambda seq_len: seq_len // (2 ** len(cnn_channels))

        # Global pooling for variable length sequences
        self.global_pool = nn.AdaptiveAvgPool1d(1)

        # Output head for blood pressure prediction with additional features
        self.projection = nn.Sequential(
            nn.Linear(cnn_channels[-1] + d_model * 2, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 2)
        )

    def forward(self, x):
        # x shape: [batch, 1, seq_len]
        batch_size, _, seq_len = x.shape

        # Apply embedding
        embedded = self.embedding(x)  # [batch, seq_len, d_model]

        # Apply Autoformer encoder
        encoded = self.encoder(embedded)  # [batch, seq_len, d_model]

        # Autoformer features: mean and max pooling
        mean_pooled = torch.mean(encoded, dim=1)  # [batch, d_model]
        max_pooled, _ = torch.max(encoded, dim=1)  # [batch, d_model]

        # Apply CNN layers for additional feature extraction
        # Need to change dimensions for CNN
        cnn_input = encoded.transpose(1, 2)  # [batch, d_model, seq_len]

        for cnn_layer in self.post_cnn_layers:
            cnn_input = cnn_layer(cnn_input)

        # Global pooling to handle variable sequence lengths
        cnn_features = self.global_pool(cnn_input).squeeze(-1)  # [batch, cnn_channels[-1]]

        # Concatenate all features
        combined_features = torch.cat([mean_pooled, max_pooled, cnn_features], dim=-1)

        # Project to blood pressure values
        bp_pred = self.projection(combined_features)  # [batch, 2]

        return bp_pred

# Dataset class
class BPDataset(Dataset):
    def __init__(self, signals, bp_values):
        self.signals = signals
        self.bp_values = bp_values

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        bp = self.bp_values[idx]

        # Convert to [1, sequence_length] shape
        x = torch.FloatTensor(signal).unsqueeze(0)
        y = torch.FloatTensor(bp)

        return x, y

# Custom loss function combining MSE with correlation loss
class BPLoss(nn.Module):
    def __init__(self, mse_weight=1.0, corr_weight=0.5):
        super().__init__()
        self.mse_weight = mse_weight
        self.corr_weight = corr_weight
        self.mse_loss = nn.MSELoss()

    def forward(self, pred, target):
        # MSE Loss
        mse = self.mse_loss(pred, target)

        # Correlation Loss (encourages predictions to follow the same trend as targets)
        pred_centered = pred - pred.mean(dim=0, keepdim=True)
        target_centered = target - target.mean(dim=0, keepdim=True)

        pred_norm = torch.norm(pred_centered, dim=0)
        target_norm = torch.norm(target_centered, dim=0)

        # Avoid division by zero
        pred_norm = torch.clamp(pred_norm, min=1e-8)
        target_norm = torch.clamp(target_norm, min=1e-8)

        # Calculate correlation for both SBP and DBP
        corr = (pred_centered * target_centered).sum(dim=0) / (pred_norm * target_norm)

        # Convert correlation to loss (1 - mean correlation)
        corr_loss = 1 - corr.mean()

        # Combined loss
        combined_loss = self.mse_weight * mse + self.corr_weight * corr_loss

        return combined_loss

# Training function with gradient clipping
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=50, device='cuda'):
    train_losses = []
    val_losses = []
    best_val_loss = float('inf')

    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0.0

        for x, y in tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Train]'):
            x = x.to(device)
            y = y.to(device)

            optimizer.zero_grad()

            # Forward pass
            y_pred = model(x)
            loss = criterion(y_pred, y)

            # Backward pass
            loss.backward()

            # Gradient clipping to prevent explosion
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

            optimizer.step()

            train_loss += loss.item() * x.size(0)

        train_loss /= len(train_loader.dataset)
        train_losses.append(train_loss)

        # Validation
        model.eval()
        val_loss = 0.0

        with torch.no_grad():
            for x, y in tqdm(val_loader, desc=f'Epoch {epoch+1}/{num_epochs} [Valid]'):
                x = x.to(device)
                y = y.to(device)

                y_pred = model(x)
                loss = criterion(y_pred, y)
                val_loss += loss.item() * x.size(0)

        val_loss /= len(val_loader.dataset)
        val_losses.append(val_loss)

        # Update learning rate
        scheduler.step(val_loss)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), 'best_enhanced_autoformer.pth')
            print(f'Epoch {epoch+1}: model saved (val_loss: {val_loss:.4f})')

    return train_losses, val_losses

In [13]:
train_dataset = BPDataset(X_train, y_train)
val_dataset = BPDataset(X_val, y_val)
test_dataset = BPDataset(X_test, y_test)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

print(f"train={len(train_dataset)}, val={len(val_dataset)}, test={len(test_dataset)}") # <--- data point
print(f"train={len(train_loader)}, val={len(val_loader)}, test={len(test_loader)}") # <-- batch

train=420, val=105, test=132
train=14, val=4, test=5


In [None]:
device = "cpu"
num_epochs = 150

model = EnhancedAutoformer(
    d_model=64,         # 모델 차원
    n_layers=4,         # Autoformer 레이어 수
    n_heads=8,          # 어텐션 헤드 수
    d_ff=256,           # 피드포워드 네트워크 차원
    dropout=0.1,        # 드롭아웃 비율
    decomp_kernel=5,    # 시계열 분해 커널 크기
    activation="relu",  # 활성화 함수
    cnn_channels=[128, 64]  # CNN 레이어의 채널 수
).to(device)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-6)

train_losses, val_losses = train_model(
    model, train_loader, val_loader, criterion, optimizer, scheduler,
    num_epochs=num_epochs, device=device
)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 4))
plt.plot(range(1, num_epochs+1), train_losses, 'b-', label='Training Loss')
plt.plot(range(1, num_epochs+1), val_losses, 'r-', label='Validation Loss')
plt.axvline(x=best_epoch+1, color='g', linestyle='--', label=f'Best Epoch ({best_epoch+1})')
plt.grid(True)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Curves')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error

def evaluate_model(model, test_loader, device, scaler_y):
    model.eval()
    y_true_list = []
    y_pred_list = []

    with torch.no_grad():
        for x, y in test_loader:
            x = x.to(device)
            y = y.to(device)

            y_pred = model(x)

            y_true_list.append(y.cpu().numpy())
            y_pred_list.append(y_pred.cpu().numpy())

    y_true = scaler_y.inverse_transform(np.concatenate(y_true_list))
    y_pred = scaler_y.inverse_transform(np.concatenate(y_pred_list))

    mse = mean_squared_error(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mse)

    print(f"MSE (Mean Squared Error): {mse:.4f}")
    print(f"MAE (Mean Absolute Error): {mae:.4f}")
    print(f"RMSE (Root Mean Squared Error): {rmse:.4f}")

    return y_true, y_pred

def visualize_predictions(y_true, y_pred, num_samples=10):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 2, 1)
    plt.scatter(y_true[:num_samples, 0], y_pred[:num_samples, 0], color='blue', alpha=0.7)
    plt.plot([y_true[:num_samples, 0].min(), y_true[:num_samples, 0].max()],
             [y_true[:num_samples, 0].min(), y_true[:num_samples, 0].max()],
             'r--', lw=2)
    plt.title('Systolic Blood Pressure Prediction')
    plt.xlabel('True Systolic BP')
    plt.ylabel('Predicted Systolic BP')

    plt.subplot(1, 2, 2)
    plt.scatter(y_true[:num_samples, 1], y_pred[:num_samples, 1], color='green', alpha=0.7)
    plt.plot([y_true[:num_samples, 1].min(), y_true[:num_samples, 1].max()],
             [y_true[:num_samples, 1].min(), y_true[:num_samples, 1].max()],
             'r--', lw=2)
    plt.title('Diastolic Blood Pressure Prediction')
    plt.xlabel('True Diastolic BP')
    plt.ylabel('Predicted Diastolic BP')

    plt.tight_layout()
    plt.show()

def visualize_error_distribution(y_true, y_pred):
    errors_systolic = y_true[:, 0] - y_pred[:, 0]
    errors_diastolic = y_true[:, 1] - y_pred[:, 1]

    plt.figure(figsize=(15, 5))

    plt.subplot(1, 2, 1)
    plt.hist(errors_systolic, bins=30, color='blue', alpha=0.7)
    plt.title('Error Distribution - Systolic BP')
    plt.xlabel('Prediction Error')
    plt.ylabel('Frequency')

    plt.subplot(1, 2, 2)
    plt.hist(errors_diastolic, bins=30, color='green', alpha=0.7)
    plt.title('Error Distribution - Diastolic BP')
    plt.xlabel('Prediction Error')
    plt.ylabel('Frequency')

    plt.tight_layout()
    plt.show()

y_true, y_pred = evaluate_model(model, test_loader, device, scaler_y)
visualize_predictions(y_true, y_pred)
visualize_error_distribution(y_true, y_pred)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error

def visualize_bp_predictions(y_true, y_pred, num_samples=100, title="PPG-BP Prediction Results"):
    """
    혈압 예측 결과를 시각화하는 함수

    Parameters:
    -----------
    y_true : numpy.ndarray
        실제 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    y_pred : numpy.ndarray
        예측된 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    num_samples : int, optional (default=100)
        시각화할 샘플의 수
    title : str, optional
        그래프 제목
    """
    # 지정된 샘플 수만큼 데이터 선택
    y_true_subset = y_true[:num_samples]
    y_pred_subset = y_pred[:num_samples]

    # RMSE와 MAE 계산
    mse = mean_squared_error(y_true_subset, y_pred_subset)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true_subset, y_pred_subset)

    # 그래프 크기 설정
    plt.figure(figsize=(12, 6))

    # 샘플 인덱스 (x축)
    x = np.arange(len(y_true_subset))

    # 수축기 혈압(SBP)과 이완기 혈압(DBP) 플롯
    plt.plot(x, y_true_subset[:, 0], 'b-', label='True SBP')
    plt.plot(x, y_pred_subset[:, 0], 'orange', label='Predicted SBP')
    plt.plot(x, y_true_subset[:, 1], 'g-', label='True DBP')
    plt.plot(x, y_pred_subset[:, 1], 'r-', label='Predicted DBP')

    # 그래프 설정
    plt.xlabel('Sample Index')
    plt.ylabel('Blood Pressure (mmHg)')
    plt.title(f'{title} (First {num_samples} Samples)')
    plt.legend(loc='upper right')
    plt.grid(False)

    # RMSE와 MAE 표시
    plt.figtext(0.02, 0.98, f'✅ Evaluation Result | RMSE: {rmse:.2f} | MAE: {mae:.2f}',
                fontsize=12, color='green', va='top')

    plt.tight_layout()
    plt.show()

# 함수 사용 예시:
"""
# 평가 함수로부터 y_true와 y_pred를 받은 후:
y_true, y_pred = evaluate_model(model, test_loader, device, scaler_y)

# 시각화 함수 호출
visualize_bp_predictions(y_true, y_pred, num_samples=100, title="PPG-Deformer Prediction")
"""

def visualize_bp_scatter(y_true, y_pred, title="Blood Pressure Prediction Scatter Plot"):
    """
    혈압 예측 결과의 산점도를 그리는 함수

    Parameters:
    -----------
    y_true : numpy.ndarray
        실제 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    y_pred : numpy.ndarray
        예측된 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    title : str, optional
        그래프 제목
    """
    # RMSE와 MAE 계산
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)

    # 그래프 설정
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    # 수축기 혈압(SBP) 산점도
    ax1.scatter(y_true[:, 0], y_pred[:, 0], alpha=0.5, color='blue')

    # 이상적인 예측선 (y=x)
    min_sbp = min(np.min(y_true[:, 0]), np.min(y_pred[:, 0]))
    max_sbp = max(np.max(y_true[:, 0]), np.max(y_pred[:, 0]))
    ax1.plot([min_sbp, max_sbp], [min_sbp, max_sbp], 'r--')

    ax1.set_title('Systolic BP Prediction')
    ax1.set_xlabel('True SBP (mmHg)')
    ax1.set_ylabel('Predicted SBP (mmHg)')
    ax1.set_aspect('equal')
    ax1.grid(True, alpha=0.3)

    # 이완기 혈압(DBP) 산점도
    ax2.scatter(y_true[:, 1], y_pred[:, 1], alpha=0.5, color='green')

    # 이상적인 예측선 (y=x)
    min_dbp = min(np.min(y_true[:, 1]), np.min(y_pred[:, 1]))
    max_dbp = max(np.max(y_true[:, 1]), np.max(y_pred[:, 1]))
    ax2.plot([min_dbp, max_dbp], [min_dbp, max_dbp], 'r--')

    ax2.set_title('Diastolic BP Prediction')
    ax2.set_xlabel('True DBP (mmHg)')
    ax2.set_ylabel('Predicted DBP (mmHg)')
    ax2.set_aspect('equal')
    ax2.grid(True, alpha=0.3)

    # RMSE와 MAE 표시
    plt.figtext(0.02, 0.98, f'✅ Evaluation Results | RMSE: {rmse:.2f} | MAE: {mae:.2f}',
                fontsize=12, color='green', va='top')

    plt.suptitle(title)
    plt.tight_layout()
    plt.subplots_adjust(top=0.88)
    plt.show()

def visualize_error_distribution(y_true, y_pred, bins=30):
    """
    예측 오차의 분포를 히스토그램으로 시각화하는 함수

    Parameters:
    -----------
    y_true : numpy.ndarray
        실제 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    y_pred : numpy.ndarray
        예측된 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    bins : int, optional
        히스토그램의 bin 수
    """
    # 오차 계산
    errors_sbp = y_true[:, 0] - y_pred[:, 0]
    errors_dbp = y_true[:, 1] - y_pred[:, 1]

    # 그래프 설정
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    # 수축기 혈압(SBP) 오차 히스토그램
    ax1.hist(errors_sbp, bins=bins, alpha=0.7, color='blue')
    ax1.axvline(x=0, color='r', linestyle='--')
    ax1.set_title('SBP Error Distribution')
    ax1.set_xlabel('Error (mmHg)')
    ax1.set_ylabel('Frequency')

    # 통계값 계산 및 표시
    mean_error_sbp = np.mean(errors_sbp)
    std_error_sbp = np.std(errors_sbp)
    ax1.text(0.05, 0.95, f'Mean: {mean_error_sbp:.2f}\nStd: {std_error_sbp:.2f}',
             transform=ax1.transAxes, va='top')

    # 이완기 혈압(DBP) 오차 히스토그램
    ax2.hist(errors_dbp, bins=bins, alpha=0.7, color='green')
    ax2.axvline(x=0, color='r', linestyle='--')
    ax2.set_title('DBP Error Distribution')
    ax2.set_xlabel('Error (mmHg)')
    ax2.set_ylabel('Frequency')

    # 통계값 계산 및 표시
    mean_error_dbp = np.mean(errors_dbp)
    std_error_dbp = np.std(errors_dbp)
    ax2.text(0.05, 0.95, f'Mean: {mean_error_dbp:.2f}\nStd: {std_error_dbp:.2f}',
             transform=ax2.transAxes, va='top')

    plt.suptitle('Prediction Error Distribution')
    plt.tight_layout()
    plt.show()

# 복합 시각화 함수 (위 세 가지 시각화를 모두 수행)
def comprehensive_bp_visualization(y_true, y_pred, num_samples=100):
    """
    혈압 예측 결과에 대한 종합적인 시각화를 수행하는 함수

    Parameters:
    -----------
    y_true : numpy.ndarray
        실제 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    y_pred : numpy.ndarray
        예측된 혈압 값 (SBP, DBP) 배열, 형태 [n_samples, 2]
    num_samples : int, optional
        시계열 시각화에 사용할 샘플 수
    """
    # 시계열 시각화
    visualize_bp_predictions(y_true, y_pred, num_samples, title="PPG-BP Prediction")

    # 산점도 시각화
    visualize_bp_scatter(y_true, y_pred)

    # 오차 분포 시각화
    visualize_error_distribution(y_true, y_pred)

# 모델 평가 후 실제값(y_true)과 예측값(y_pred) 얻기
y_true, y_pred = evaluate_model(model, test_loader, device, scaler_y)

# 이미지에서 보이는 것과 같은 시각화 생성
visualize_bp_predictions(y_true, y_pred, num_samples=100, title="PPG-Deformer Prediction")

# 혹은 모든 시각화를 한번에 수행하려면:
# comprehensive_bp_visualization(y_true, y_pred)