In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import roc_auc_score, roc_curve
import matplotlib.pyplot as plt



In [None]:
# 데이터 로딩 및 전처리를 수행하는 함수 정의
def load_and_preprocess_data():
    # scikit-learn의 OpenML에서 성인 인구조사 소득 데이터셋 불러오기
    from sklearn.datasets import fetch_openml
    data = fetch_openml(name='adult', version=1, as_frame=True)
    df = data.frame

    # 특성(features)과 목표(target) 변수 분리
    # 'class' 열을 제외한 모든 열을 특성으로, 'class' 열을 목표 변수로 설정
    X = df.drop('class', axis=1)
    y = df['class']

    # 열의 데이터 유형 식별
    # 범주형(문자열) 열과 수치형(정수, 실수) 열 구분
    categorical_columns = X.select_dtypes(include=['object']).columns
    numerical_columns = X.select_dtypes(include=['int64', 'float64']).columns

    # 데이터 전처리 단계 생성
    # ColumnTransformer를 사용하여 수치형과 범주형 열에 대해 다른 전처리 적용
    preprocessor = ColumnTransformer(
        transformers=[
            # 수치형 열 처리
            ('num', Pipeline([
                # 결측값을 중앙값으로 대체
                ('imputer', SimpleImputer(strategy='median')),
                # 표준 스케일링 (평균 0, 분산 1로 정규화)
                ('scaler', StandardScaler())
            ]), numerical_columns),

            # 범주형 열 처리
            ('cat', Pipeline([
                # 결측값을 'missing' 문자열로 대체
                ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
                # 원핫 인코딩 (범주형 변수를 이진 벡터로 변환)
                ('onehot', OneHotEncoder(handle_unknown='ignore'))
            ]), categorical_columns)
        ])

    # 특성 데이터 전처리 수행
    # 앞서 정의한 전처리기를 사용하여 데이터 변환
    X_processed = preprocessor.fit_transform(X)

    # 목표 변수 인코딩
    # LabelEncoder를 사용하여 문자열 레이블을 숫자로 변환
    le = LabelEncoder()
    y_processed = le.fit_transform(y)

    # 전처리된 특성, 목표 변수, 그리고 전처리기 반환
    return X_processed, y_processed, preprocessor

In [None]:
# Column-wise Interaction Module
# 컬럼 간 상호작용을 학습하는 신경망 모듈 클래스 정의
# 이 클래스는 입력 특성들 사이의 상호작용을 학습하고 새로운 특성을 생성함
class ColumnWiseInteraction(nn.Module):
    # 모듈 초기화 메서드
    def __init__(self, input_dim):                                                      # input_dim: 입력 데이터의 특성(컬럼) 차원 수
        super().__init__()                                                              # 부모 클래스(nn.Module)의 초기화 메서드 호출

        self.interaction_weights = nn.Parameter(torch.randn(input_dim, input_dim))      # 상호작용 가중치 행렬 생성
                                                                                        # 무작위로 초기화된 input_dim x input_dim 크기의 학습 가능한 파라미터 생성
                                                                                        # 이 가중치는 각 특성 간의 상호작용을 모델링하는 데 사용됨

    # 순전파(forward) 메서드 정의
    def forward(self, x):                                                               # 입력 텐서 x에 대해 컬럼 간 상호작용을 계산하고 새로운 특성을 추가
        # Compute column-wise interactions
        interactions = torch.matmul(x, self.interaction_weights)                        # 입력 텐서와 상호작용 가중치 행렬의 행렬 곱 계산
                                                                                        # 각 특성이 다른 특성들과 어떻게 상호작용하는지를 학습
                                                                                        # 예: x의 각 컬럼이 다른 모든 컬럼에 대해 가중치 기반 상호작용 계산

        return torch.cat([x, interactions], dim=1)                                      # 원본 입력 텐서와 상호작용 텐서를 특성(컬럼) 차원에서 결합
                                                                                        # 결과: 원본 특성 + 상호작용으로 인해 생성된 새로운 특성

In [None]:
# Transformer 기반 분류 모델 클래스 정의
# 입력 데이터를 고급 변환과 분류를 수행하는 신경망 모델
class TransformerClassifier(nn.Module):
    # 모델 초기화 메서드
    # input_dim: 입력 특성 차원
    # hidden_dim: 은닉층 차원
    # num_heads: 멀티헤드 어텐션의 헤드 수
    # num_classes: 분류할 클래스 수
    def __init__(self, input_dim, hidden_dim, num_heads, num_classes):
        # 부모 클래스(nn.Module) 초기화
        super().__init__()

        # 컬럼 간 상호작용 모듈 생성
        # 입력 특성들 사이의 복잡한 상호작용 학습
        self.column_interaction = ColumnWiseInteraction(input_dim)

        # 위치 인코딩 생성
        # 모델에 입력 데이터의 순서/위치 정보 제공
        self.positional_encoding = nn.Parameter(torch.randn(1, input_dim * 2))

        # Transformer 인코더 레이어 생성
        # 특성 간 복잡한 관계와 의존성 모델링
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=input_dim * 2,  # 특성 차원 (상호작용 후 2배)
            nhead=num_heads,         # 멀티헤드 어텐션 헤드 수
            dim_feedforward=hidden_dim  # 피드포워드 레이어 차원
        )
        # 2개의 인코더 레이어로 구성된 Transformer 인코더 생성
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=2)

        # 분류기 헤드 생성
        # 고차원 특성을 클래스 확률로 변환
        self.classifier = nn.Sequential(
            nn.Linear(input_dim * 2, hidden_dim),  # 첫 번째 선형 레이어
            nn.ReLU(),                             # 비선형성 추가
            nn.Dropout(0.3),                       # 과적합 방지를 위한 드롭아웃
            nn.Linear(hidden_dim, num_classes)     # 최종 분류 레이어
        )

    # 순전파(forward) 메서드 정의
    def forward(self, x):
        # 컬럼 간 상호작용 적용
        # 입력 특성들 간의 상호작용 계산 및 새로운 특성 생성
        x = self.column_interaction(x)

        # 위치 인코딩 추가
        # 입력에 위치/순서 정보 주입
        x = x + self.positional_encoding

        # Transformer 인코딩을 위해 시퀀스 차원 추가
        # 모델의 입력 형태에 맞게 텐서 차원 조정
        x = x.unsqueeze(0)  # 형태: [1, 배치 크기, 특성 차원]

        # Transformer 인코더를 통한 특성 변환
        # 고급 특성 표현 학습
        x = self.transformer_encoder(x)
        x = x.squeeze(0)

        # 분류기를 통한 최종 클래스 예측
        return self.classifier(x)

In [None]:
# 모델 학습을 위한 훈련 함수 정의
def train_model(model, train_loader, criterion, optimizer, device):
    # 모델을 훈련 모드로 설정
    # 훈련 중에는 드롭아웃, 배치 정규화 등의 레이어가 훈련 모드로 동작
    model.train()

    # 총 손실값 초기화
    # 배치별 손실을 누적하여 평균 손실 계산을 준비
    total_loss = 0

    # 데이터 로더에서 배치 단위로 반복
    # batch_x: 입력 특성, batch_y: 정답 레이블
    for batch_x, batch_y in train_loader:
        # 입력 데이터와 레이블을 지정된 장치(CPU/GPU)로 이동
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)

        # 옵티마이저의 그래디언트 초기화
        # 이전 반복의 그래디언트 제거
        optimizer.zero_grad()

        # 모델에 입력 데이터 전달하여 예측값 생성
        outputs = model(batch_x)

        # 손실 함수를 사용해 예측값과 실제값 사이의 손실 계산
        loss = criterion(outputs, batch_y)

        # 역전파를 통해 손실에 대한 그래디언트 계산
        # 각 가중치의 손실에 대한 기여도 계산
        loss.backward()

        # 옵티마이저를 사용해 가중치 업데이트
        # 계산된 그래디언트를 바탕으로 모델 파라미터 조정
        optimizer.step()

        # 배치별 손실을 총 손실에 누적
        total_loss += loss.item()

    # 전체 배치의 평균 손실 반환
    # 한 에폭(epoch) 동안의 평균 손실 계산
    return total_loss / len(train_loader)


In [None]:
# 모델 평가를 위한 함수 정의
def evaluate_model(model, test_loader, device):
    # 모델을 평가 모드로 전환
    # 드롭아웃, 배치 정규화 등의 레이어가 추론 모드로 동작
    model.eval()

    # 정확히 예측한 샘플 수와 전체 샘플 수 초기화
    correct = 0
    total = 0

    # 그래디언트 계산 비활성화
    # 메모리 절약 및 계산 속도 향상
    with torch.no_grad():
        # 테스트 데이터 로더에서 배치 단위로 반복
        for batch_x, batch_y in test_loader:
            # 입력 데이터와 레이블을 지정된 장치(CPU/GPU)로 이동
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            # 모델에 입력 데이터 전달하여 예측값 생성
            outputs = model(batch_x)

            # 출력 텐서에서 가장 높은 확률을 가진 클래스 선택
            # outputs.data: 최대 확률 값, predicted: 해당 클래스 인덱스
            _, predicted = torch.max(outputs.data, 1)

            # 전체 샘플 수 누적
            total += batch_y.size(0)

            # 정확히 예측한 샘플 수 계산
            # predicted와 실제 레이블(batch_y)을 비교
            correct += (predicted == batch_y).sum().item()

    # 전체 정확도를 백분율로 반환
    # (정확히 예측한 샘플 수 / 전체 샘플 수) * 100
    return 100 * correct / total


In [None]:
# AUROC 계산 및 시각화 함수 추가
def calculate_and_plot_auroc(model, test_loader, device):
    model.eval()
    all_probs = []
    all_labels = []

    with torch.no_grad():
        for batch_x, batch_y in test_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)

            # 모델 출력을 확률로 변환 (softmax)
            outputs = torch.softmax(model(batch_x), dim=1)

            # 양성 클래스(일반적으로 인덱스 1)의 확률 추출
            probs = outputs[:, 1].cpu().numpy()
            labels = batch_y.cpu().numpy()

            all_probs.extend(probs)
            all_labels.extend(labels)

    # AUROC 계산
    auroc = roc_auc_score(all_labels, all_probs)

    # ROC 곡선 계산
    fpr, tpr, thresholds = roc_curve(all_labels, all_probs)

    # ROC 곡선 시각화
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', label=f'ROC curve (AUROC = {auroc:.2f})')
    plt.plot([0, 1], [0, 1], color='red', linestyle='--', label='Random Classifier')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    plt.tight_layout()
    plt.savefig('roc_curve.png')
    plt.close()

    return auroc



In [None]:
# 메인 실행 함수 수정
def main():
    # 이전 코드와 동일한 설정
    torch.manual_seed(42)
    np.random.seed(42)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # 데이터 로드 및 전처리
    X, y, preprocessor = load_and_preprocess_data()

    # 데이터 분할
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # PyTorch 텐서로 변환
    X_train = torch.FloatTensor(X_train)
    X_test = torch.FloatTensor(X_test)
    y_train = torch.LongTensor(y_train)
    y_test = torch.LongTensor(y_test)

    # DataLoaders 생성
    train_dataset = TensorDataset(X_train, y_train)
    test_dataset = TensorDataset(X_test, y_test)
    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=64)

    # 모델 하이퍼파라미터
    input_dim = X_train.shape[1]
    hidden_dim = 128
    num_heads = 4
    num_classes = len(np.unique(y))

    # 모델 초기화
    model = TransformerClassifier(input_dim, hidden_dim, num_heads, num_classes).to(device)

    # 손실 함수 및 최적화기
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # 학습 루프
    num_epochs = 10
    for epoch in tqdm(range(num_epochs)):
        train_loss = train_model(model, train_loader, criterion, optimizer, device)
        test_acc = evaluate_model(model, test_loader, device)

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}, Test Accuracy: {test_acc:.2f}%')

# def auroc():
#     # AUROC 계산 및 시각화
    auroc = calculate_and_plot_auroc(model, test_loader, device)
    print(f'AUROC: {auroc:.4f}')

In [None]:
# 추가 유틸리티 함수: AUROC 결과 상세 분석
def interpret_auroc(auroc):
    """AUROC 값에 대한 해석"""
    if auroc >= 0.9:
        return "Excellent discrimination"
    elif auroc >= 0.8:
        return "Good discrimination"
    elif auroc >= 0.7:
        return "Fair discrimination"
    elif auroc >= 0.6:
        return "Poor discrimination"
    else:
        return "Failed discrimination"

In [None]:
main()

 10%|█         | 1/10 [00:08<01:17,  8.60s/it]

Epoch [1/10], Loss: 0.5137, Test Accuracy: 77.19%


 20%|██        | 2/10 [00:13<00:50,  6.28s/it]

Epoch [2/10], Loss: 0.4964, Test Accuracy: 77.72%


 30%|███       | 3/10 [00:19<00:42,  6.06s/it]

Epoch [3/10], Loss: 0.4945, Test Accuracy: 77.53%


 40%|████      | 4/10 [00:23<00:33,  5.55s/it]

Epoch [4/10], Loss: 0.4936, Test Accuracy: 77.60%


 50%|█████     | 5/10 [00:28<00:26,  5.32s/it]

Epoch [5/10], Loss: 0.4926, Test Accuracy: 77.76%


 60%|██████    | 6/10 [00:34<00:21,  5.39s/it]

Epoch [6/10], Loss: 0.4924, Test Accuracy: 77.73%


 70%|███████   | 7/10 [00:38<00:15,  5.17s/it]

Epoch [7/10], Loss: 0.4910, Test Accuracy: 77.74%


 80%|████████  | 8/10 [00:44<00:10,  5.34s/it]

Epoch [8/10], Loss: 0.4908, Test Accuracy: 77.72%


 90%|█████████ | 9/10 [00:49<00:05,  5.17s/it]

Epoch [9/10], Loss: 0.4902, Test Accuracy: 77.76%


100%|██████████| 10/10 [00:54<00:00,  5.41s/it]

Epoch [10/10], Loss: 0.4901, Test Accuracy: 77.80%





AUROC: 0.7208
