<a href="https://colab.research.google.com/github/younguk072023/Pytorch_study/blob/main/%ED%8F%90%EB%A0%B4%EC%A7%84%EB%8B%A8.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torchvision import models, datasets
from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np
import os
from google.colab import drive
from tqdm import tqdm


In [None]:
# GPU 사용 여부 확인
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# 데이터 전처리 및 변환 설정
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # ResNet 입력 크기로 조정
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])  # 흑백(X-ray) 데이터 정규화
])

# Google Drive 마운트 및 데이터셋 경로 설정
drive.mount('/content/drive')
data_dir = "/content/drive/MyDrive/딥러닝 논문_코드구현/chest_xray/chest_xray"
train_dir = os.path.join(data_dir, "train")

# 데이터셋 로드 및 학습/검증/테스트 데이터 분할
full_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
train_size = int(0.8 * len(full_dataset))
val_size = int(0.1 * len(full_dataset))
test_size = len(full_dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(full_dataset, [train_size, val_size, test_size])

# 데이터 로더 설정
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 클래스 확인
print("Class labels:", full_dataset.classes)  # ['NORMAL', 'PNEUMONIA']


In [None]:
# 모델 구축 (ResNet101 사용)
model = models.resnet101(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Sequential(
    nn.Linear(num_ftrs, 256),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(256, 1),  # 이진 분류 (정상, 폐렴)
    nn.Sigmoid()  # Sigmoid로 확률값 출력
)
model = model.to(device)


In [None]:
# 손실 함수 및 옵티마이저 설정
criterion = nn.BCELoss()  # 이진 교차 엔트로피 손실
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# 학습률 감소 스케줄러 설정
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=True)


In [None]:
# EarlyStopping 클래스 정의
class EarlyStopping:
    def __init__(self, patience=5, delta=0, path='best_model.pth'):
        """
        EarlyStopping 클래스는 모델이 더 이상 개선되지 않을 때 학습을 중단하도록 합니다.

        :param patience: 개선이 없을 경우 기다리는 epoch 수
        :param delta: 개선된 것으로 인정할 최소 변화량
        :param path: 최적 모델을 저장할 경로
        """
        self.patience = patience
        self.delta = delta
        self.path = path
        self.best_loss = np.inf
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.delta:
            self.best_loss = val_loss
            self.counter = 0
            torch.save(model.state_dict(), self.path)  # 최적 모델 저장
        else:
            self.counter += 1
            print(f"🔹 EarlyStopping counter: {self.counter} / {self.patience}")
            if self.counter >= self.patience:
                self.early_stop = True

# EarlyStopping 객체 생성
early_stopping = EarlyStopping(patience=5, delta=0.001, path="best_resnet101.pth")


In [None]:
# 모델 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, early_stopping, num_epochs=100):
    """
    모델을 학습하고, 검증 데이터로 평가한 후 정확도와 손실을 기록합니다.

    :param model: 학습할 모델
    :param train_loader: 학습 데이터 로더
    :param val_loader: 검증 데이터 로더
    :param criterion: 손실 함수
    :param optimizer: 옵티마이저
    :param scheduler: 학습률 스케줄러
    :param early_stopping: EarlyStopping 객체
    :param num_epochs: 총 학습 epoch 수
    :return: 학습 및 검증 정확도를 기록한 리스트
    """
    train_acc_list, val_acc_list = [], []

    # tqdm을 사용하여 학습 진행 상황을 시각적으로 표시
    for epoch in tqdm(range(num_epochs), desc="Training Epochs", ncols=100, leave=True):
        model.train()
        correct, total, running_loss = 0, 0, 0.0

        # 학습 데이터 배치 처리
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_acc = correct / total
        train_acc_list.append(train_acc)

        # 검증 데이터 평가
        model.eval()
        correct, total, val_loss = 0, 0, 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                predicted = (outputs > 0.5).float()
                correct += (predicted == labels).sum().item()
                total += labels.size(0)

        val_acc = correct / total
        val_acc_list.append(val_acc)
        val_loss /= len(val_loader)

        # 학습률 스케줄러 업데이트
        scheduler.step(val_loss)

        print(f"Epoch [{epoch+1}/{num_epochs}] - Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}, Val Loss: {val_loss:.4f}")

        # Early Stopping 체크
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early Stopping! 학습을 종료합니다.")
            break

    return train_acc_list, val_acc_list


In [None]:
# 모델 평가 함수
def evaluate_model(model, test_loader):
    """
    테스트 데이터셋을 사용하여 모델을 평가하고 정확도를 출력합니다.

    :param model: 평가할 모델
    :param test_loader: 테스트 데이터 로더
    """
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    test_acc = correct / total
    print(f"테스트 정확도: {test_acc * 100:.2f}%")


In [None]:
# 학습 결과 시각화 함수
def plot_accuracy(train_acc, val_acc):
    """
    학습과 검증 정확도를 그래프로 시각화합니다.

    :param train_acc: 학습 정확도 리스트
    :param val_acc: 검증 정확도 리스트
    """
    plt.plot(train_acc, label="Train Accuracy")
    plt.plot(val_acc, label="Validation Accuracy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()


In [None]:
# 혼동 행렬 시각화 함수
def plot_confusion_matrix(model, test_loader):
    """
    테스트 데이터셋에 대한 혼동 행렬을 시각화합니다.

    :param model: 평가할 모델
    :param test_loader: 테스트 데이터 로더
    """
    model.eval()
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
            outputs = model(inputs)
            predicted = (outputs > 0.5).float()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())

    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Pneumonia'], yticklabels=['Normal', 'Pneumonia'])
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()


In [None]:
# 랜덤 이미지 10개와 예측 라벨 출력 함수
def show_random_images_with_predictions(model, test_loader):
    """
    테스트 데이터에서 랜덤으로 10개의 이미지를 선택하고, 해당 예측 라벨을 함께 출력합니다.

    :param model: 예측할 모델
    :param test_loader: 테스트 데이터 로더
    """
    model.eval()
    data_iter = iter(test_loader)

    # 랜덤 샘플 10개
    for _ in range(10):
        inputs, labels = next(data_iter)
        inputs, labels = inputs.to(device), labels.to(device)

        # 예측
        outputs = model(inputs)
        predicted = (outputs > 0.5).float()

        # 이미지와 라벨 출력
        for i in range(len(inputs)):
            image = inputs[i].cpu().numpy().transpose((1, 2, 0))  # HWC 형식으로 변환
            image = (image * 0.5) + 0.5  # 정규화 역변환
            label = 'Pneumonia' if labels[i].item() == 1 else 'Normal'
            pred = 'Pneumonia' if predicted[i].item() == 1 else 'Normal'

            plt.imshow(image)
            plt.title(f"True: {label}, Pred: {pred}")
            plt.show()


In [None]:
# 모델 학습 및 평가 시작
train_acc, val_acc = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, early_stopping, num_epochs=30)
evaluate_model(model, test_loader)
plot_accuracy(train_acc, val_acc)
plot_confusion_matrix(model, test_loader)
show_random_images_with_predictions(model, test_loader)
