## 0) 필요한 라이브러리 설치 및 Import

In [None]:
!pip install -q torchvision sklearn

# import
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.metrics import classification_report, roc_auc_score, roc_curve, confusion_matrix

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models

## 1) 캐글에서 데이터 다운로드

In [None]:
from google.colab import files
files.upload()  # kaggle.json

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d paultimothymooney/chest-xray-pneumonia
!unzip -q chest-xray-pneumonia.zip -d /content

# 중복 제거 (있는 경우만)
!mv /content/chest_xray/chest_xray/* /content/chest_xray/ 2>/dev/null
!rm -r /content/chest_xray/chest_xray 2>/dev/null

## 1) eda

In [None]:
csv_files = {
    'train': '/content/DATA/csv/train_list.csv',
    'val': '/content/DATA/csv/val_list.csv',
    'test': '/content/DATA/csv/test_list.csv',
}

#각 split 별, label 별 historgram 살펴보기

fig, axes = plt.subplots(nrow=3, ncols=1, figsize=(8, 12))
fig.title('Histogram of CT image datasets', fontsize=16)
colors = ['skyblue', 'orange']

for idx, (split, path) in enumerate(csv_paths.items()):
    df = pd.read_csv(path)
    
    label_counts = df['label'].value_counts().sort_index()
    
    # 막대 차트 그리기
    axes[idx].bar(['Normal (0)', 'Pneumonia (1)'], label_counts, color=colors)
    axes[idx].set_title(f'{split} Set')
    axes[idx].set_ylabel('Count')
    axes[idx].set_ylim(0, max(label_counts) * 1.2)

    # 개수 텍스트로 표시
    for i, count in enumerate(label_counts):
        axes[idx].text(i, count + 5, str(count), ha='center', va='bottom')

plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()

- train 데이터의 불균형이 심하다. 따라서 augmentation을 통해 데이터 개수의 균형을 맞추는 것이 중요


In [None]:
#미리 보기

#train 각 label 별로 / val/test.. 2*3 형태로


# Subplot: 3행 2열
fig, axes = plt.subplots(3, 2, figsize=(10, 10))
axes = axes.flatten()  # 1차원 배열로 변환

for split, path in csv_paths.items():
    for lb in ["NORMAL", "PNEUMONIA"]:
        df = pd.read_csv(path)
        sample_df = df.loc[df.label == lb, ['path', 'label']]
        
        img_path = sample_df.loc[0, 'path']
        label = sample_df.loc[0, 'label']
        
        # 이미지 불러오기
        image = Image.open(img_path)
        
        # 시각화
        axes[i].imshow(image, cmap='gray')
        axes[i].set_title(f'Label: {label}')
        axes[i].axis('off')

plt.tight_layout()
plt.show()

In [None]:
# 이미지 크기 통계

from tqdm import tqdm

sizes = []
for path in tqdm(train_df['path'].values[:300]):  # 전체 말고 일부만 예시
    try:
        with Image.open(path) as img:
            sizes.append(img.size)
    except:
        continue

sizes = np.array(sizes)
print(f"Average size: {np.mean(sizes, axis=0)}")

# 분포 시각화
plt.hist([w for w, h in sizes], bins=30, alpha=0.5, label='Width')
plt.hist([h for w, h in sizes], bins=30, alpha=0.5, label='Height')
plt.legend()
plt.title("Image Size Distribution")
plt.show()


## 2) 데이터 전처리 & 데이터 로더 생성

In [None]:
✅ 해결 전략: 소수 클래스만 증강하기
CSV에서 class 0만 filtering
그 데이터만 augment해서 전체 train에 추가
→ 결과적으로 class balance 맞춤

import pandas as pd
from sklearn.model_selection import train_test_split

# 원본 train CSV 불러오기
df = pd.read_csv(csv_files['train'])

# 소수 클래스(예: label == 0)만 추출
minority_df = df[df['label'] == 0]

# 증강을 위해 minority 클래스 복사 (예: 2배로 증강)
augmented_df = pd.concat([minority_df] * 2, ignore_index=True)

# 합쳐서 class balance 맞추기
balanced_train_df = pd.concat([df, augmented_df], ignore_index=True).sample(frac=1).reset_index(drop=True)

# 4. CSV로 저장 → 모델 학습 시 사용
balanced_train_df.to_csv('/content/DATA/csv/train_list.csv', index=False)

In [None]:
class CSVImageDataset(Dataset):
    def __init__(self, csv_path, transform=None, aug_transform_train_nml=None):
        self.df = pd.read_csv(csv_path)
        self.transform = transform
        self.aug_transform_train_nml = aug_transform_train_nml
        # 문자열 라벨을 숫자로 바꾸는 매핑
        self.label_map = {'NORMAL': 0, 'PNEUMONIA': 1}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image_path = self.df.iloc[idx]['path']
        label_str = self.df.iloc[idx]['label']
        label = self.label_map[label_str]

        # 흑백 이미지 불러오기
        image = Image.open(image_path).convert('L')

        if label == 0 and self.aug_transform_train_nml:
            image = self.aug_transform_train_nml(image)
        elif self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# 이미지 변환 정의
transform_train = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

transform_val = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

aug_transform_train_nml = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])

])

In [None]:
from torch.utils.data import DataLoader

train_dataset = CSVImageDataset('/content/DATA/csv/train_list.csv', transform=transform_train, aug_transform_train_nml=aug_transform_train_nml)
val_dataset = CSVImageDataset('/content/DATA/csv/val_list.csv', transform=transform_val)
test_dataset = CSVImageDataset('/content/DATA/csv/test_list.csv', transform=transform_val)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
test_loader = DataLoader(test_dataset, batch_size=32)

## 모델 구조 설계

In [None]:
class CNN_for_CT(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Linear(128 * 28 * 28, 256), nn.ReLU(),
            nn.Linear(256, 2)
        )

    def forward(self, x):
        x = self.conv(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

## 모델 컴파일

In [None]:

from torch import optim
from tqdm import tqdm  # 진행률 표시
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNN_for_CT().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

train_loss_history = []

epochs = 30
patience = 5
train_losses, val_losses = [], []

for epoch in range(epochs):
    model.train()
    total_loss = 0
    progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")
    
    for imgs, labels in progress_bar:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        progress_bar.set_postfix(loss=loss.item())
    
    train_loss = running_loss / len(train_loader)
    train_acc = correct / total
    train_losses.append(train_loss)

    
    model.eval()
    val_running_loss = 0.0
    val_correct, val_total = 0, 0

    with torch.no_grad():
    for images, labels in val_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)

        loss = criterion(outputs, labels)
        val_running_loss += loss.item()

        _, predicted = torch.max(outputs, 1)
        val_correct += (predicted == labels).sum().item()
        val_total += labels.size(0)

    val_loss = val_running_loss / len(val_loader)
    val_acc = val_correct / val_total
    val_losses.append(val_loss)

    print(f"[{epoch+1}/{num_epochs}] "
    f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} | "
    f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")
        

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        early_stop_counter = 0
        torch.save(model.state_dict(), "/content/MODELS/cnn_best_model.pth")
    else:
        early_stop_counter += 1
        if early_stop_counter >= patience:
            print("Early stopping triggered.")
            break

## 학습 과정 시각화


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Train vs Validation Loss")
plt.legend()
plt.grid(True)
plt.show()

## 평가

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

model = model.load_state_dict(torch.load("/content/MODELS/cnn_best_model.pth")).to(device)
model.eval()
y_true, y_pred, y_prob = [], [], []

with torch.no_grad():
    for imgs, labels in test_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        outputs = model(imgs)
        probs = torch.softmax(outputs, dim=1)[:, 1]
        preds = torch.argmax(outputs, dim=1)

        y_true.extend(labels.cpu().numpy())
        y_pred.extend(preds.cpu().numpy())
        y_prob.extend(probs.cpu().numpy())

print("Accuracy:", accuracy_score(y_true, y_pred))
print("Precision:", precision_score(y_true, y_pred))
print("Recall:", recall_score(y_true, y_pred))
print("F1 Score:", f1_score(y_true, y_pred))
print("ROC AUC:", roc_auc_score(y_true, y_prob))

# roc curve
fpr, tpr, _ = roc_curve(y_true, y_prob)
plt.figure()
plt.plot(fpr, tpr, label="ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve")
plt.legend()
plt.show()


# Confusion Matrix 출력
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["NORMAL", "PNEUMONIA"])
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()

## 결과 확인

In [None]:
# 샘플 추출 및 예측 비교 시각화
def visualize_predictions(model, dataset, num=5):
    model.eval()
    indices = np.random.choice(len(dataset), num, replace=False)
    for idx in indices:
        img, label = dataset[idx]
        with torch.no_grad():
            pred = model(img.unsqueeze(0).to(device))
            pred_label = torch.argmax(pred).item()

        plt.imshow(img.permute(1, 2, 0).squeeze(), cmap='gray')
        plt.title(f"True: {'PNEUMONIA' if label==1 else 'NORMAL'} | Pred: {'PNEUMONIA' if pred_label==1 else 'NORMAL'}")
        plt.axis('off')
        plt.show()

visualize_predictions(model, test_dataset, num=5)
