# CNN 모델: 패치 단위 학습용

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import requests
from io import BytesIO
import pandas as pd
import torchvision
import matplotlib.pyplot as plt
from PIL import Image
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
import random
from collections import defaultdict, Counter

In [None]:
def make_data_index(repo_id, label_csv_path):
    df = pd.read_csv(label_csv_path)
    filename_to_label = dict(zip(df['pub_subspec_id'], df['label']))
    data_index = []

    for fname, label in filename_to_label.items():
        fname_with_ext = fname if fname.endswith(".npz") else f"{fname}.npz"
        url = f"https://huggingface.co/datasets/{repo_id}/resolve/main/{fname_with_ext}"
        try:
            response = requests.get(url)
            npz = np.load(BytesIO(response.content))
            for key in npz.files:
                patch_array = npz[key]
                if patch_array.ndim == 4:
                    for i in range(patch_array.shape[0]):
                        data_index.append((url, key, i, label))
                else:
                    data_index.append((url, key, None, label))
        except Exception as e:
            print(f"❌ Failed to load {fname_with_ext}: {e}")
    return data_index


def stratified_split(data_index, train_ratio=0.7, val_ratio=0.15, seed=42):
    label_to_items = defaultdict(list)
    for item in data_index:
        label = item[2]
        label_to_items[label].append(item)

    train, val, test = [], [], []
    random.seed(seed)

    for label, items in label_to_items.items():
        random.shuffle(items)
        n_total = len(items)
        n_train = int(n_total * train_ratio)
        n_val = int(n_total * 0.15)
        train.extend(items[:n_train])
        val.extend(items[n_train:n_train + n_val])
        test.extend(items[n_train + n_val:])
    return train, val, test


In [None]:
class PatchDataset(Dataset):
    def __init__(self, data_index, transform=None):
        self.data_index = data_index
        self.transform = transform

    def __len__(self):
        return len(self.data_index)

    def __getitem__(self, idx):
        url, key, patch_idx, label = self.data_index[idx]
        response = requests.get(url)
        npz = np.load(BytesIO(response.content))
        patch_array = npz[key]

        if patch_idx is not None:
            patch = patch_array[patch_idx]
        else:
            patch = patch_array  # 이미 3D이면 그대로

        if patch.ndim == 2:
            patch = Image.fromarray(patch.astype(np.uint8), mode='L')
        elif patch.shape[-1] == 3:
            patch = Image.fromarray(patch.astype(np.uint8), mode='RGB')
        else:
            patch = Image.fromarray(patch.astype(np.uint8))

        if self.transform:
            patch = self.transform(patch)

        return patch, int(label)


In [None]:
repo_id = "nayoungku1/npz-histopathology-dataset"
label_csv_path = "./metadata/label.csv"

transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

all_index = make_data_index(repo_id, label_csv_path)
train_idx, val_idx, test_idx = stratified_split(all_index)

train_dataset = PatchDataset(train_idx, transform=transform)
val_dataset = PatchDataset(val_idx, transform=transform)
test_dataset = PatchDataset(test_idx, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(16, 32, kernel_size=3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1), nn.ReLU(), nn.MaxPool2d(2)
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 32 * 32, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        return self.fc(self.conv(x))

num_classes = len(pd.read_csv(label_csv_path)['label'].unique())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleCNN(num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
for epoch in range(3):
    model.train()
    total, correct, running_loss = 0, 0, 0.0
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        _, preds = outputs.max(1)
        total += labels.size(0)
        correct += (preds == labels).sum().item()
    print(f"Epoch {epoch+1} | Train Acc: {correct/total:.2%}, Loss: {running_loss:.4f}")

In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import numpy as np
import random
from sklearn.model_selection import train_test_split


In [None]:

class NPZPatchDataset(Dataset):
    def __init__(self, npz_file_paths, labels, transform=None):
        self.npz_file_paths = npz_file_paths
        self.labels = labels
        self.transform = transform
        self.patch_info = []

        for i, path in enumerate(self.npz_file_paths):
            try:
                data = np.load(path)
                patches = data['patches']
                for j in range(min(2000, patches.shape[0])):  # Limit per npz file
                    self.patch_info.append((path, j, self.labels[i]))
            except Exception as e:
                print(f"Failed to load {path}: {e}")

    def __len__(self):
        return len(self.patch_info)

    def __getitem__(self, idx):
        path, patch_idx, label = self.patch_info[idx]
        data = np.load(path)
        patch = data['patches'][patch_idx]
        patch = Image.fromarray(patch.astype(np.uint8))

        if self.transform:
            patch = self.transform(patch)

        return patch, label


In [None]:

# Directory containing .npz files
npz_dir = "./npz_data"  # Change to your directory path
all_npz_files = [f for f in os.listdir(npz_dir) if f.endswith('.npz')]
selected_files = random.sample(all_npz_files, 46)

file_paths = [os.path.join(npz_dir, fname) for fname in selected_files]
labels = [1 if 'tumor' in fname else 0 for fname in selected_files]  # Modify if different labeling

train_paths, val_paths, train_labels, val_labels = train_test_split(file_paths, labels, test_size=0.2, random_state=42)


In [None]:

transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

train_dataset = NPZPatchDataset(train_paths, train_labels, transform)
val_dataset = NPZPatchDataset(val_paths, val_labels, transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)


In [None]:

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 32 * 32, 128)
        self.fc2 = nn.Linear(128, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = x.view(-1, 64 * 32 * 32)
        x = torch.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x


In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNModel().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

epochs = 5
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for images, labels in train_loader:
        images = images.to(device)
        labels = labels.float().unsqueeze(1).to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss:.4f}")
