### 데이터 전처리

In [1]:
!pip install wfdb



In [2]:
import pandas as pd
import numpy as np
import wfdb
import ast
import os
from sklearn.preprocessing import MultiLabelBinarizer

def load_raw_data(df, sampling_rate, path):
    """df.index를 기준으로 데이터를 로드"""
    if sampling_rate == 100:
        data = [wfdb.rdsamp(os.path.join(path, f)) for f in df['filename_lr']]
    else:
        data = [wfdb.rdsamp(os.path.join(path, f)) for f in df['filename_hr']]
    # df.index에 있는 데이터만 로드
    data = np.array([signal for signal, meta in data])
    return data

# 데이터 경로 설정
path = "./"
sampling_rate = 100

# PTB-XL 데이터베이스 로드
df = pd.read_csv(os.path.join(path, 'ptbxl_database.csv'), index_col='ecg_id')
df.scp_codes = df.scp_codes.apply(lambda x: ast.literal_eval(x))

# 진단 정보 로드
agg_df = pd.read_csv(os.path.join(path, 'scp_statements.csv'), index_col=0)
agg_df = agg_df[agg_df.diagnostic == 1]

def aggregate_diagnostic(y_dic):
    """진단 클래스를 매핑하는 함수"""
    tmp = []
    for key in y_dic.keys():
        if key in agg_df.index:
            tmp.append(agg_df.loc[key].diagnostic_class)
    return list(set(tmp))

# 진단 클래스 매핑
df['diagnostic_superclass'] = df.scp_codes.apply(aggregate_diagnostic)

# 빈 클래스 제거
df = df[df['diagnostic_superclass'].apply(lambda x: len(x) > 0)]

# Raw data 로드
X = load_raw_data(df, sampling_rate, path)

# 크기 확인
assert len(X) == len(df), "X와 df의 크기가 일치하지 않습니다."

# 데이터셋 분리
test_fold = 10
val_fold = 9

train_filter = (df.strat_fold != test_fold) & (df.strat_fold != val_fold)
val_filter = df.strat_fold == val_fold
test_filter = df.strat_fold == test_fold

X_train = X[train_filter]
y_train = list(df[train_filter]['diagnostic_superclass'])

X_val = X[val_filter]
y_val = list(df[val_filter]['diagnostic_superclass'])

X_test = X[test_filter]
y_test = list(df[test_filter]['diagnostic_superclass'])

# 다중 라벨 이진화
mlb = MultiLabelBinarizer()
y_train_bin = mlb.fit_transform(y_train)
y_val_bin = mlb.transform(y_val)
y_test_bin = mlb.transform(y_test)

print(f"Train Data Shape: {X_train.shape}, Labels: {y_train_bin.shape}")
print(f"Validation Data Shape: {X_val.shape}, Labels: {y_val_bin.shape}")
print(f"Test Data Shape: {X_test.shape}, Labels: {y_test_bin.shape}")


Train Data Shape: (17084, 1000, 12), Labels: (17084, 5)
Validation Data Shape: (2146, 1000, 12), Labels: (2146, 5)
Test Data Shape: (2158, 1000, 12), Labels: (2158, 5)


### Data Transform

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from transformers import ViTForImageClassification, DeiTForImageClassification
from torchvision.models import convnext_tiny, efficientnet_v2_s
import numpy as np
import matplotlib.pyplot as plt
import os

# Custom Dataset for ECG Data
class ECGDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        if self.transform:
            # ECG 데이터를 3채널로 확장
            sample = self.transform(sample)
        return sample.float(), torch.tensor(label, dtype=torch.float32)

transform = transforms.Compose([
    transforms.ToTensor(),  # Numpy 배열 -> Tensor
    transforms.Resize((182, 256)),  # 이미지 크기 조정
    transforms.Lambda(lambda x: x.expand(3, -1, -1)),  # 1채널 데이터를 3채널로 확장
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # 정규화
])


# 데이터셋 정의
train_dataset = ECGDataset(X_train, y_train_bin, transform=transform)
val_dataset = ECGDataset(X_val, y_val_bin, transform=transform)
test_dataset = ECGDataset(X_test, y_test_bin, transform=transform)

# DataLoader 정의
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# 클래스 확인
class_names = mlb.classes_
print(f"Classes: {class_names}")




  from .autonotebook import tqdm as notebook_tqdm


Classes: ['CD' 'HYP' 'MI' 'NORM' 'STTC']


In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


### VIT 구현

In [5]:
class MLPBlock(nn.Module):
    def __init__(self, embed_dim, hidden_dim, dropout=0.1):
        super(MLPBlock, self).__init__()
        self.fc1 = nn.Linear(embed_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, embed_dim)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.fc2(x)
        x = self.dropout2(x)
        return x

class ViTBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, hidden_dim, dropout=0.1):
        super(ViTBlock, self).__init__()
        self.norm1 = nn.LayerNorm(embed_dim)
        self.attn = nn.MultiheadAttention(embed_dim, num_heads, dropout=dropout)
        self.norm2 = nn.LayerNorm(embed_dim)
        self.mlp = MLPBlock(embed_dim, hidden_dim, dropout=dropout)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x_res = x
        x = self.norm1(x)
        x, _ = self.attn(x, x, x)
        x = self.dropout(x) + x_res

        x_res = x
        x = self.norm2(x)
        x = self.mlp(x)
        x = self.dropout(x) + x_res
        return x

class VisionTransformer(nn.Module):
    def __init__(self, image_size, patch_size, in_channels, num_classes, embed_dim, depth, num_heads, hidden_dim, dropout=0.1):
        super(VisionTransformer, self).__init__()
        self.patch_size = patch_size
        self.num_patches = (image_size[0] // patch_size[0]) * (image_size[1] // patch_size[1])

        self.patch_embed = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)
        self.pos_embedding = nn.Parameter(torch.zeros(1, self.num_patches, embed_dim))

        self.transformer = nn.ModuleList([
            ViTBlock(embed_dim, num_heads, hidden_dim, dropout=dropout) for _ in range(depth)
        ])

        self.norm = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        x = self.patch_embed(x)
        x = x.flatten(2).transpose(1, 2)
        x = x + self.pos_embedding

        for block in self.transformer:
            x = block(x)

        x = self.norm(x)
        x = x.mean(dim=1)
        x = self.head(x)
        return x


In [6]:
import numpy as np
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
import torch

def train_model(
    model,
    train_loader,
    val_loader=None,
    num_epochs=40,
    patience=3,
    learning_rate=0.001,
    checkpoint_path='vit_checkpoint.pth'
):
    """Vision Transformer 모델 학습 함수.

    Args:
        model: 학습할 PyTorch 모델.
        train_loader: 학습 데이터용 DataLoader.
        val_loader: 검증 데이터용 DataLoader (선택).
        num_epochs: 학습할 epoch 수.
        patience: 조기 종료를 위한 patience.
        learning_rate: 옵티마이저 학습률.
        checkpoint_path: 체크포인트 저장 경로.

    Returns:
        dict: 학습 로그 (손실, 정확도 등).
    """
    criterion = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # 학습 상태 초기화
    best_loss = float('inf')
    epochs_no_improve = 0
    logs = {'train_loss': [], 'train_accuracy': [], 'train_f1': []}

    if val_loader:
        logs['val_loss'] = []
        logs['val_accuracy'] = []
        logs['val_f1'] = []

    for epoch in range(num_epochs):
        print(f"Starting epoch {epoch+1}/{num_epochs}...")
        model.train()
        running_loss = 0.0
        all_labels = []
        all_preds = []

        # Training loop
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.float().to(device)
            optimizer.zero_grad()

            outputs = model(inputs)  # 모델 forward
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)

            preds = (torch.sigmoid(outputs) > 0.5).int().cpu().numpy()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)

        # Train metrics
        epoch_loss = running_loss / len(train_loader.dataset)
        accuracy = accuracy_score(np.vstack(all_labels), np.vstack(all_preds))
        f1 = f1_score(np.vstack(all_labels), np.vstack(all_preds), average='macro')
        print(f'Epoch {epoch+1}, Train Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.4f}, F1 Score: {f1:.4f}')

        logs['train_loss'].append(epoch_loss)
        logs['train_accuracy'].append(accuracy)
        logs['train_f1'].append(f1)

        # Validation loop (if provided)
        if val_loader:
            val_loss, val_accuracy, val_f1 = evaluate_model(model, val_loader, criterion)
            logs['val_loss'].append(val_loss)
            logs['val_accuracy'].append(val_accuracy)
            logs['val_f1'].append(val_f1)

            print(f'Epoch {epoch+1}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}, Val F1: {val_f1:.4f}')

        # Checkpoint and early stopping
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            epochs_no_improve = 0
            torch.save(model.state_dict(), checkpoint_path)
            print(f"Checkpoint saved at epoch {epoch+1}.")
        else:
            epochs_no_improve += 1

        if epochs_no_improve >= patience:
            print(f"Early stopping triggered after {epoch+1} epochs.")
            break

    # Best model 로드
    model.load_state_dict(torch.load(checkpoint_path))
    print("Best model loaded.")
    return logs

def evaluate_model(model, dataloader, criterion):
    """모델 검증 및 평가."""
    model.eval()
    running_loss = 0.0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.float().to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item() * inputs.size(0)

            preds = (torch.sigmoid(outputs) > 0.5).int().cpu().numpy()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)

    epoch_loss = running_loss / len(dataloader.dataset)
    accuracy = accuracy_score(np.vstack(all_labels), np.vstack(all_preds))
    f1 = f1_score(np.vstack(all_labels), np.vstack(all_preds), average='macro')

    return epoch_loss, accuracy, f1


In [7]:
# VisionTransformer 모델 초기화
vit_model = VisionTransformer(
    image_size=(182, 256),  # 데이터 크기
    patch_size=(8, 8),      # 최적 패치 크기
    in_channels=3,          # 입력 채널 수 (3채널 데이터 사용)
    num_classes=y_train_bin.shape[1],  # 클래스 수
    embed_dim=256,          # 최적 임베딩 차원
    depth=6,                # 최적 Transformer 블록 깊이
    num_heads=256 // 64,    # Multi-head Attention 헤드 수 (임베딩 차원 나누기 64)
    hidden_dim=256 * 4,     # MLP hidden layer 차원
    dropout=0.1             # 최적 Dropout 비율
)
vit_model = vit_model.to(device)

# 모델 출력 확인 (가상 데이터)
dummy_input = torch.randn(16, 3, 182, 256).to(device)  # 가상 입력 데이터 생성
output = vit_model(dummy_input)  # 모델 출력 확인
print(f"Output shape: {output.shape}")  # Expected: (16, num_classes)

# 학습 루프 실행
print('Training ViT...')
logs = train_model(
    model=vit_model,
    train_loader=train_loader,  # 제공된 학습 데이터 로더
    val_loader=val_loader,      # 제공된 검증 데이터 로더
    num_epochs=40,              # 총 학습 epoch 수
    patience=5,                 # 조기 종료를 위한 patience
    learning_rate=0.0001,       # 최적 학습률
    checkpoint_path='best_vit_model.pth'  # 체크포인트 저장 경로
)
print('ViT training completed.')


Output shape: torch.Size([16, 5])
Training ViT...
Starting epoch 1/40...
Epoch 1, Train Loss: 0.5289, Accuracy: 0.1725, F1 Score: 0.1448
Epoch 1, Val Loss: 0.5000, Val Accuracy: 0.2665, Val F1: 0.2122
Checkpoint saved at epoch 1.
Starting epoch 2/40...
Epoch 2, Train Loss: 0.4654, Accuracy: 0.3307, F1 Score: 0.3095
Epoch 2, Val Loss: 0.4296, Val Accuracy: 0.3798, Val F1: 0.4090
Checkpoint saved at epoch 2.
Starting epoch 3/40...
Epoch 3, Train Loss: 0.4271, Accuracy: 0.3773, F1 Score: 0.4135
Epoch 3, Val Loss: 0.4145, Val Accuracy: 0.3910, Val F1: 0.4538
Checkpoint saved at epoch 3.
Starting epoch 4/40...
Epoch 4, Train Loss: 0.4082, Accuracy: 0.4040, F1 Score: 0.4668
Epoch 4, Val Loss: 0.3984, Val Accuracy: 0.4450, Val F1: 0.4675
Checkpoint saved at epoch 4.
Starting epoch 5/40...
Epoch 5, Train Loss: 0.3994, Accuracy: 0.4251, F1 Score: 0.5033
Epoch 5, Val Loss: 0.3971, Val Accuracy: 0.4613, Val F1: 0.5160
Checkpoint saved at epoch 5.
Starting epoch 6/40...
Epoch 6, Train Loss: 0.3890

  model.load_state_dict(torch.load(checkpoint_path))


In [9]:
# 검증 데이터 평가
validation_loader = DataLoader(
    ECGDataset(X_val, y_val_bin, transform=transform),
    batch_size=16,
    shuffle=False
)
print("Evaluating on validation set...")
val_loss, val_accuracy, val_f1 = evaluate_model(vit_model, validation_loader, nn.BCEWithLogitsLoss())
print(f"Validation Loss: {val_loss:.4f}")
print(f"Validation Accuracy: {val_accuracy:.4f}")
print(f"Validation F1 Score: {val_f1:.4f}")

# 테스트 데이터 평가
test_loader = DataLoader(
    ECGDataset(X_test, y_test_bin, transform=transform),
    batch_size=16,
    shuffle=False
)
print("Evaluating on test set...")
test_loss, test_accuracy, test_f1 = evaluate_model(vit_model, test_loader, nn.BCEWithLogitsLoss())
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test F1 Score: {test_f1:.4f}")

Evaluating on validation set...
Validation Loss: 0.3489
Validation Accuracy: 0.5405
Validation F1 Score: 0.6166
Evaluating on test set...
Test Loss: 0.3563
Test Accuracy: 0.5431
Test F1 Score: 0.6141
