# Dự Án Nghiên Cứu: Phát Hiện Bất Thường Ảnh Y Khoa
### So sánh hiệu suất giữa U-Net (Baseline) và Reversed Autoencoder (Thử nghiệm)

## Giai đoạn 0: Chuẩn Bị Môi Trường & Dữ Liệu
Trong giai đoạn này, chúng ta sẽ thiết lập môi trường làm việc trên Google Colab, kết nối với Google Drive, và tải bộ dữ liệu NIH Chest X-ray từ Kaggle.

In [None]:
# Kết nối Google Drive (tùy chọn, để lưu model hoặc kaggle.json)
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Cài đặt các thư viện cần thiết
!pip install -q pandas scikit-learn matplotlib torchsummary

### Tải dữ liệu từ Kaggle
1.  Tạo một API token tại trang Kaggle (`Your Profile` -> `Account` -> `Create New API Token`). Thao tác này sẽ tải về file `kaggle.json`.
2.  Upload file `kaggle.json` đó lên Colab bằng đoạn code dưới đây.

In [None]:
from google.colab import files
import os

# Upload file kaggle.json
if not os.path.exists('/root/.kaggle/kaggle.json'):
    print("Vui lòng upload file kaggle.json của bạn.")
    files.upload()

# Tạo thư mục và cấp quyền cho file kaggle.json
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
# Tải bộ dữ liệu NIH Chest X-ray (chỉ tải file metadata và file ảnh zip nhỏ nhất để demo)
# LƯU Ý: Việc tải toàn bộ dữ liệu sẽ mất rất nhiều thời gian và không gian.
!kaggle datasets download -d nih-chest-xray/data -f Data_Entry_BBOX.csv
!kaggle datasets download -d nih-chest-xray/data -f images_001.zip
!kaggle datasets download -d nih-chest-xray/data -f images_002.zip

# Giải nén file
!unzip -q Data_Entry_BBOX.csv.zip -d .
!unzip -q images_001.zip -d .
!unzip -q images_002.zip -d .

## Giai đoạn 1: Khởi Tạo Notebook & Tiền Xử Lý Dữ Liệu
### Giới thiệu
Trong dự án này, chúng tôi thực hiện một nghiên cứu so sánh để tìm ra kiến trúc hiệu quả cho bài toán phát hiện bất thường không giám sát trên ảnh X-quang ngực. Chiến lược tiếp cận bao gồm hai bước:
1.  **Xây dựng mô hình Baseline:** Chúng tôi sẽ triển khai **U-Net**, một kiến trúc đã được chứng minh là cực kỳ hiệu quả cho các tác vụ trên ảnh y khoa, để làm thước đo hiệu suất chuẩn.
2.  **Xây dựng mô hình Thử nghiệm:** Dựa trên ý tưởng từ bài báo "Towards Universal Unsupervised Anomaly Detection...", chúng tôi sẽ triển khai một kiến trúc **Reversed Autoencoder (RA)**. Mục tiêu là so sánh trực tiếp hiệu suất của RA với U-Net để đánh giá phương pháp mới.

Toàn bộ quá trình huấn luyện và đánh giá sẽ được thực hiện trên cùng một tập dữ liệu để đảm bảo tính công bằng.

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, roc_curve
from PIL import Image
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import glob

# --- Cấu hình cho dự án ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
IMAGE_DIR = "/content/images"
METADATA_PATH = "/content/Data_Entry_BBOX.csv"

# Giảm số lượng dữ liệu để chạy demo nhanh hơn
NUM_NORMAL_SAMPLES = 2000  # Lấy 2000 ảnh bình thường
NUM_ABNORMAL_SAMPLES = 1000 # Lấy 1000 ảnh bất thường cho tập test

IMG_SIZE = 128
BATCH_SIZE = 32
EPOCHS = 10 # Tăng lên 20-30 cho kết quả tốt hơn
LR = 1e-4

print(f"Thiết bị đang sử dụng: {DEVICE}")

In [None]:
# Đọc và lọc dữ liệu
df = pd.read_csv(METADATA_PATH)
all_image_paths = glob.glob(os.path.join(IMAGE_DIR, '*.png'))
image_filenames = {os.path.basename(p): p for p in all_image_paths}

df['full_path'] = df['Image Index'].map(image_filenames)
df = df.dropna(subset=['full_path'])

normal_df = df[df['Finding Labels'] == 'No Finding']
abnormal_df = df[df['Finding Labels'] != 'No Finding']

# Lấy mẫu dữ liệu
normal_df = normal_df.sample(n=NUM_NORMAL_SAMPLES, random_state=42)
abnormal_df = abnormal_df.sample(n=NUM_ABNORMAL_SAMPLES, random_state=42)

print(f"Số ảnh bình thường (No Finding): {len(normal_df)}")
print(f"Số ảnh bất thường: {len(abnormal_df)}")

In [None]:
# Phân chia dữ liệu
# Tập train và val chỉ chứa ảnh bình thường
train_df, val_df = train_test_split(normal_df, test_size=0.2, random_state=42)

# Tập test chứa cả ảnh bình thường và bất thường
test_normal_df = val_df.sample(n=int(len(val_df)*0.5), random_state=42) # Lấy 1 nửa val làm test normal
test_df = pd.concat([test_normal_df, abnormal_df])

print(f"Kích thước tập Train (chỉ bình thường): {len(train_df)}")
print(f"Kích thước tập Validation (chỉ bình thường): {len(val_df)}")
print(f"Kích thước tập Test (bình thường + bất thường): {len(test_df)}")

In [None]:
# Viết lớp Dataset
class ChestXrayDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx]['full_path']
        label_str = self.dataframe.iloc[idx]['Finding Labels']
        
        image = Image.open(img_path).convert("L") # Chuyển sang ảnh xám
        
        # 0 cho bình thường, 1 cho bất thường
        label = 0 if label_str == 'No Finding' else 1
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

# Định nghĩa các phép biến đổi
transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5]) # Chuẩn hóa về [-1, 1]
])

# Tạo các đối tượng Dataset và DataLoader
train_dataset = ChestXrayDataset(train_df, transform=transform)
val_dataset = ChestXrayDataset(val_df, transform=transform)
test_dataset = ChestXrayDataset(test_df, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Giai đoạn 2: Xây Dựng Các Mô Hình

### 2A. Mô Hình Baseline: U-Net
U-Net là một kiến trúc autoencoder đối xứng với các "kết nối tắt" (skip connections). Các kết nối này cho phép thông tin chi tiết từ bộ mã hóa (encoder) được truyền thẳng đến bộ giải mã (decoder), giúp tái tạo hình ảnh sắc nét hơn, đặc biệt quan trọng cho việc định vị các chi tiết nhỏ trong ảnh y khoa.

In [None]:
class UNet(nn.Module):
    def __init__(self):
        super(UNet, self).__init__()

        # Encoder
        self.enc1 = self.conv_block(1, 64)
        self.enc2 = self.conv_block(64, 128)
        self.enc3 = self.conv_block(128, 256)
        self.pool = nn.MaxPool2d(2, 2)

        # Bottleneck
        self.bottleneck = self.conv_block(256, 512)

        # Decoder
        self.upconv3 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        self.dec3 = self.conv_block(512, 256) # 256 from upconv + 256 from enc3
        self.upconv2 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        self.dec2 = self.conv_block(256, 128) # 128 from upconv + 128 from enc2
        self.upconv1 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        self.dec1 = self.conv_block(128, 64)   # 64 from upconv + 64 from enc1

        # Output layer
        self.out_conv = nn.Conv2d(64, 1, kernel_size=1)
        self.tanh = nn.Tanh()

    def conv_block(self, in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        # Encoder
        e1 = self.enc1(x)       # 128x128
        p1 = self.pool(e1)      # 64x64
        e2 = self.enc2(p1)      # 64x64
        p2 = self.pool(e2)      # 32x32
        e3 = self.enc3(p2)      # 32x32
        p3 = self.pool(e3)      # 16x16

        # Bottleneck
        b = self.bottleneck(p3) # 16x16

        # Decoder
        d3 = self.upconv3(b)    # 32x32
        d3 = torch.cat((e3, d3), dim=1) # Concatenate with skip connection
        d3 = self.dec3(d3)

        d2 = self.upconv2(d3)   # 64x64
        d2 = torch.cat((e2, d2), dim=1)
        d2 = self.dec2(d2)

        d1 = self.upconv1(d2)   # 128x128
        d1 = torch.cat((e1, d1), dim=1)
        d1 = self.dec1(d1)

        output = self.out_conv(d1)
        output = self.tanh(output) # Tanh to match normalized input range [-1, 1]
        return output

### 2B. Mô Hình Thử Nghiệm: Reversed Autoencoder (RA)
Dựa trên ý tưởng của bài báo, RA không chỉ tái tạo lại ảnh gốc, mà cố gắng tạo ra một phiên bản "giả lành tính" (pseudo-healthy) của nó. Để mô phỏng sự khác biệt về kiến trúc, chúng tôi triển khai một phiên bản Autoencoder không đối xứng và không có skip-connections đầy đủ như U-Net. Điều này buộc mô hình phải dựa nhiều hơn vào thông tin trong không gian ẩn (bottleneck) để tái tạo, có khả năng sẽ "bỏ qua" các chi tiết bất thường mà nó chưa từng học.

In [None]:
class ReversedAutoencoder(nn.Module):
    def __init__(self):
        super(ReversedAutoencoder, self).__init__()
        # Encoder: Nén thông tin mạnh mẽ
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 16, 3, stride=2, padding=1),  # 128 -> 64
            nn.ReLU(True),
            nn.Conv2d(16, 32, 3, stride=2, padding=1), # 64 -> 32
            nn.ReLU(True),
            nn.Conv2d(32, 64, 3, stride=2, padding=1), # 32 -> 16
            nn.ReLU(True),
            nn.Conv2d(64, 128, 7) # 16 -> 10
        )
        
        # Decoder: Kiến trúc khác biệt, không đối xứng hoàn toàn
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(128, 64, 7), # 10 -> 16
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1), # 16 -> 32
            nn.ReLU(True),
            nn.ConvTranspose2d(32, 16, 3, stride=2, padding=1, output_padding=1), # 32 -> 64
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 1, 3, stride=2, padding=1, output_padding=1),  # 64 -> 128
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
from torchsummary import summary

print("--- Kiến trúc U-Net ---")
unet_model = UNet().to(DEVICE)
summary(unet_model, (1, IMG_SIZE, IMG_SIZE))

print("
--- Kiến trúc Reversed Autoencoder ---")
ra_model = ReversedAutoencoder().to(DEVICE)
summary(ra_model, (1, IMG_SIZE, IMG_SIZE))

## Giai đoạn 3: Huấn Luyện So Sánh

In [None]:
def train_model(model, train_loader, val_loader, epochs, lr, model_name):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    best_val_loss = float('inf')
    history = {'train_loss': [], 'val_loss': []}
    
    for epoch in range(epochs):
        model.train()
        running_train_loss = 0.0
        for images, _ in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]" ):
            images = images.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, images)
            loss.backward()
            optimizer.step()
            
            running_train_loss += loss.item() * images.size(0)
        
        epoch_train_loss = running_train_loss / len(train_loader.dataset)
        history['train_loss'].append(epoch_train_loss)
        
        model.eval()
        running_val_loss = 0.0
        with torch.no_grad():
            for images, _ in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} [Val]" ):
                images = images.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, images)
                running_val_loss += loss.item() * images.size(0)
        
        epoch_val_loss = running_val_loss / len(val_loader.dataset)
        history['val_loss'].append(epoch_val_loss)
        
        print(f"Epoch {epoch+1}/{epochs} - Train Loss: {epoch_train_loss:.6f}, Val Loss: {epoch_val_loss:.6f}")
        
        if epoch_val_loss < best_val_loss:
            best_val_loss = epoch_val_loss
            torch.save(model.state_dict(), f'{model_name}_best.pth')
            print(f"Lưu model tốt nhất: {model_name}_best.pth")
            
    return model, history

### 3A. Huấn luyện U-Net

In [None]:
unet_model = UNet().to(DEVICE)
unet_model, unet_history = train_model(unet_model, train_loader, val_loader, EPOCHS, LR, "unet")

### 3B. Huấn luyện Reversed Autoencoder

In [None]:
ra_model = ReversedAutoencoder().to(DEVICE)
ra_model, ra_history = train_model(ra_model, train_loader, val_loader, EPOCHS, LR, "ra")

In [None]:
# Vẽ đồ thị loss
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(unet_history['train_loss'], label='U-Net Train Loss')
plt.plot(unet_history['val_loss'], label='U-Net Val Loss')
plt.title('U-Net Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(ra_history['train_loss'], label='RA Train Loss')
plt.plot(ra_history['val_loss'], label='RA Val Loss')
plt.title('Reversed Autoencoder Loss')
plt.legend()

plt.show()

## Giai đoạn 4: Phân Tích So Sánh Chuyên Sâu

In [None]:
# Tải lại các model tốt nhất để đánh giá
unet_model_eval = UNet().to(DEVICE)
unet_model_eval.load_state_dict(torch.load("unet_best.pth"))
unet_model_eval.eval()

ra_model_eval = ReversedAutoencoder().to(DEVICE)
ra_model_eval.load_state_dict(torch.load("ra_best.pth"))
ra_model_eval.eval()

In [None]:
def evaluate_model(model, data_loader):
    reconstruction_errors = []
    true_labels = []
    criterion = nn.MSELoss(reduction='none')
    
    with torch.no_grad():
        for images, labels in tqdm(data_loader, desc="Evaluating"):
            images = images.to(DEVICE)
            outputs = model(images)
            
            # Tính loss trên từng pixel, sau đó lấy trung bình trên toàn ảnh
            error = criterion(outputs, images).mean(dim=[1, 2, 3])
            
            reconstruction_errors.extend(error.cpu().numpy())
            true_labels.extend(labels.numpy())
            
    return np.array(reconstruction_errors), np.array(true_labels)

### 4A & 4B. Tính toán hiệu suất cho cả hai mô hình

In [None]:
unet_errors, unet_labels = evaluate_model(unet_model_eval, test_loader)
ra_errors, ra_labels = evaluate_model(ra_model_eval, test_loader)

unet_auc = roc_auc_score(unet_labels, unet_errors)
ra_auc = roc_auc_score(ra_labels, ra_errors)

print(f"AUC của U-Net: {unet_auc:.4f}")
print(f"AUC của Reversed Autoencoder: {ra_auc:.4f}")

### 4C. Phân Tích So Sánh Trực Tiếp

In [None]:
unet_fpr, unet_tpr, _ = roc_curve(unet_labels, unet_errors)
ra_fpr, ra_tpr, _ = roc_curve(ra_labels, ra_errors)

plt.figure(figsize=(8, 6))
plt.plot(unet_fpr, unet_tpr, label=f'U-Net (AUC = {unet_auc:.4f})')
plt.plot(ra_fpr, ra_tpr, label=f'Reversed Autoencoder (AUC = {ra_auc:.4f})')
plt.plot([0, 1], [0, 1], 'k--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Đường cong ROC So sánh')
plt.legend()
plt.show()

In [None]:
# Trực quan hóa kết quả so sánh
def visualize_comparison(model1, model2, model1_name, model2_name, data_loader, num_images=5):
    model1.eval()
    model2.eval()
    criterion = nn.MSELoss(reduction='none')
    
    # Lấy một batch ảnh từ test loader
    images, labels = next(iter(data_loader))
    images = images.to(DEVICE)
    
    with torch.no_grad():
        outputs1 = model1(images)
        outputs2 = model2(images)
        
        error_map1 = torch.mean(criterion(outputs1, images), dim=1).cpu()
        error_map2 = torch.mean(criterion(outputs2, images), dim=1).cpu()

    images = images.cpu()
    outputs1 = outputs1.cpu()
    outputs2 = outputs2.cpu()
    
    plt.figure(figsize=(20, num_images * 4))
    for i in range(num_images):
        label_text = "Abnormal" if labels[i] == 1 else "Normal"
        
        # Denormalize for display
        img_display = images[i] * 0.5 + 0.5
        recon1_display = outputs1[i] * 0.5 + 0.5
        recon2_display = outputs2[i] * 0.5 + 0.5
        
        # Ảnh gốc
        plt.subplot(num_images, 5, i * 5 + 1)
        plt.imshow(img_display.squeeze(), cmap='gray')
        plt.title(f"Original ({label_text})")
        plt.axis('off')
        
        # Tái tạo của Model 1
        plt.subplot(num_images, 5, i * 5 + 2)
        plt.imshow(recon1_display.squeeze(), cmap='gray')
        plt.title(f"{model1_name} Recon")
        plt.axis('off')
        
        # Bản đồ lỗi của Model 1
        plt.subplot(num_images, 5, i * 5 + 3)
        plt.imshow(error_map1[i], cmap='jet')
        plt.title(f"{model1_name} Error Map")
        plt.axis('off')
        
        # Tái tạo của Model 2
        plt.subplot(num_images, 5, i * 5 + 4)
        plt.imshow(recon2_display.squeeze(), cmap='gray')
        plt.title(f"{model2_name} Recon")
        plt.axis('off')
        
        # Bản đồ lỗi của Model 2
        plt.subplot(num_images, 5, i * 5 + 5)
        plt.imshow(error_map2[i], cmap='jet')
        plt.title(f"{model2_name} Error Map")
        plt.axis('off')
        
    plt.tight_layout()
    plt.show()

visualize_comparison(unet_model_eval, ra_model_eval, "U-Net", "RA", test_loader)

## Giai đoạn 5: Tổng Kết Dựa Trên So Sánh
Trong phần này, bạn sẽ viết kết luận cuối cùng của mình.

1.  **Tóm tắt quá trình:** Tóm tắt lại việc đã xây dựng và so sánh hai mô hình U-Net và Reversed Autoencoder cho bài toán phát hiện bất thường.
2.  **Phân tích kết quả:** Dựa vào điểm AUC và các hình ảnh trực quan hóa, hãy đưa ra kết luận mô hình nào hoạt động hiệu quả hơn. Ví dụ: "Mặc dù cả hai mô hình đều cho thấy khả năng phát hiện bất thường, U-Net với AUC cao hơn và bản đồ lỗi chi tiết hơn đã chứng tỏ sự vượt trội. Các kết nối tắt (skip connections) của U-Net dường như đóng vai trò quan trọng trong việc tái tạo các chi tiết nền của ảnh, giúp làm nổi bật các vùng lỗi thực sự do bất thường gây ra."
3.  **Hạn chế:** Nêu ra các hạn chế của nghiên cứu (ví dụ: chỉ dùng một phần nhỏ dữ liệu, thời gian huấn luyện ngắn).
4.  **Hướng phát triển:** Đề xuất các bước tiếp theo (ví dụ: huấn luyện trên toàn bộ dữ liệu, thử nghiệm với các hàm loss khác, áp dụng lên bộ dữ liệu MRI đã tải về).