In [3]:
!pip install torch torchvision --upgrade

Defaulting to user installation because normal site-packages is not writeable

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


### Dataset Loading

In [1]:
import torch
train_images = torch.load("final_train_images.pt", map_location=torch.device('cpu'))
train_labels = torch.load("final_train_labels.pt", map_location=torch.device('cpu'))
valid_images = torch.load("final_valid_images.pt", map_location=torch.device('cpu'))
valid_labels = torch.load("final_valid_labels.pt", map_location=torch.device('cpu'))

  train_images = torch.load("final_train_images.pt", map_location=torch.device('cpu'))
  train_labels = torch.load("final_train_labels.pt", map_location=torch.device('cpu'))
  valid_images = torch.load("final_valid_images.pt", map_location=torch.device('cpu'))
  valid_labels = torch.load("final_valid_labels.pt", map_location=torch.device('cpu'))


# With TDA(Persistent Landscape)

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2
import gudhi as gd
from gudhi import representations
import matplotlib.pyplot as plt
import wandb
torch.cuda.empty_cache()


def compute_persistent_landscape(image_tensor, num_landscapes=2, resolution=100):
    """
    이미지 텐서를 입력받아 Persistent Landscape를 계산.
    """
    image_np = image_tensor.permute(1, 2, 0).numpy()
    resized_image = cv2.resize(image_np, (128, 128), interpolation=cv2.INTER_CUBIC)

    try:

        cubical_complex = gd.CubicalComplex(top_dimensional_cells=resized_image)
        persistence_diagram = cubical_complex.persistence()
        h0 = cubical_complex.persistence_intervals_in_dimension(0)
        h1 = cubical_complex.persistence_intervals_in_dimension(1)


        def remove_infinite_intervals(intervals):
            return intervals[np.isfinite(intervals[:, 1])] if len(intervals) > 0 else intervals

        h0 = remove_infinite_intervals(h0)
        h1 = remove_infinite_intervals(h1)


        landscape_generator = representations.Landscape(num_landscapes=num_landscapes, resolution=resolution)
        filtered_h0 = np.array(h0) if len(h0) > 0 else np.empty((0, 2))
        filtered_h1 = np.array(h1) if len(h1) > 0 else np.empty((0, 2))


        landscape_h0 = landscape_generator.fit_transform([filtered_h0]).flatten() if len(filtered_h0) > 0 else np.zeros(num_landscapes * resolution)
        landscape_h1 = landscape_generator.fit_transform([filtered_h1]).flatten() if len(filtered_h1) > 0 else np.zeros(num_landscapes * resolution)


        combined_feature = np.concatenate([landscape_h0, landscape_h1])
        return torch.from_numpy(combined_feature).float()

    except Exception as e:
        print(f"TDA feature computation error: {e}")
        return torch.zeros(num_landscapes * resolution * 2)


class MRIDataset(Dataset):
    def __init__(self, images, labels, num_landscapes=5, resolution=100):
        self.images = images
        self.labels = labels
        self.num_landscapes = num_landscapes
        self.resolution = resolution

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        tda_features = compute_persistent_landscape(image, num_landscapes=self.num_landscapes, resolution=self.resolution)
        return image, label, tda_features


class CNNWithTDA(nn.Module):
    def __init__(self, num_classes=2, tda_feature_dim=200, reduced_dim=1024):
        super(CNNWithTDA, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten()
        )
        self.feature_reduce = nn.Linear(262144, reduced_dim)  # CNN 차원 축소
        self.fc1 = nn.Linear(reduced_dim + tda_feature_dim, 128)  # 결합된 피처 처리
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x, tda_features):
        cnn_features = self.cnn(x)
        cnn_features = self.feature_reduce(cnn_features)  # 차원 축소
        combined_features = torch.cat((cnn_features, tda_features), dim=1)  # 결합
        x = self.fc1(combined_features)
        x = self.fc2(x)
        return x


def train(model, train_loader, device, optimizer, criterion, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for images, labels, tda_features in train_loader:
            images, labels, tda_features = images.to(device), labels.to(device), tda_features.to(device)
            optimizer.zero_grad()
            outputs = model(images, tda_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_accuracy = correct / total
        wandb.log({"Train Loss": train_loss, "Train Accuracy": train_accuracy})
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}")


def evaluate_and_visualize(model, valid_loader, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for batch_idx, (images, labels, tda_features) in enumerate(valid_loader):
            images, labels, tda_features = images.to(device), labels.to(device), tda_features.to(device)
            outputs = model(images, tda_features)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)


            for img_idx in range(images.size(0)):
                fig, ax = plt.subplots()
                ax.imshow(images[img_idx].cpu().squeeze(), cmap="gray")
                title = f"GT: {labels[img_idx].item()}, Pred: {predicted[img_idx].item()}"
                ax.set_title(title)
                ax.axis("off")
                wandb.log({f"Sample Prediction (Batch {batch_idx}, Image {img_idx})": wandb.Image(fig)})
                plt.close(fig)

    valid_loss = running_loss / len(valid_loader)
    valid_accuracy = correct / total
    wandb.log({"Validation Loss": valid_loss, "Validation Accuracy": valid_accuracy})
    print(f"Validation Loss: {valid_loss:.4f}, Accuracy: {valid_accuracy:.4f}")



wandb.init(project="COSE474", name="PL_CNN+TDA(Epoch 5)")
wandb.config = {
    "learning_rate": 0.001,
    "num_epochs": 5,
    "num_classes": 2,
    "tda_feature_dim": 400
}



num_landscapes = 2
resolution = 100
tda_feature_dim = num_landscapes * resolution * 2

train_dataset = MRIDataset(train_images, train_labels, num_landscapes=num_landscapes, resolution=resolution)
valid_dataset = MRIDataset(valid_images, valid_labels, num_landscapes=num_landscapes, resolution=resolution)
torch.manual_seed(42)
np.random.seed(42)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNWithTDA(num_classes=wandb.config["num_classes"], tda_feature_dim=tda_feature_dim).to(device)
optimizer = optim.Adam(model.parameters(), lr=wandb.config["learning_rate"])
criterion = nn.CrossEntropyLoss()


train(model, train_loader, device, optimizer, criterion, num_epochs=wandb.config["num_epochs"])


evaluate_and_visualize(model, valid_loader, device)


wandb.finish()


Epoch [1/5], Loss: 6.9545, Accuracy: 0.5350
Epoch [2/5], Loss: 0.5615, Accuracy: 0.6783
Epoch [3/5], Loss: 0.4619, Accuracy: 0.7633
Epoch [4/5], Loss: 0.3696, Accuracy: 0.8250
Epoch [5/5], Loss: 0.3006, Accuracy: 0.8533
Validation Loss: 0.7560, Accuracy: 0.6333


0,1
Train Accuracy,▁▄▆▇█
Train Loss,█▁▁▁▁
Validation Accuracy,▁
Validation Loss,▁

0,1
Train Accuracy,0.85333
Train Loss,0.30063
Validation Accuracy,0.63333
Validation Loss,0.75596


In [25]:
wandb.finish()

# Only CNN 


In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2
import matplotlib.pyplot as plt
import wandb
from torchvision import transforms 


class MRIDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

class CNNOnly(nn.Module):
    def __init__(self, num_classes=2, reduced_dim=1024):
        super(CNNOnly, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.AdaptiveAvgPool2d((32, 32)),  # 출력 크기를 고정
            nn.Flatten()
        )
        self.feature_reduce = nn.Linear(64 * 32 * 32, reduced_dim)  # 실제 CNN 출력 크기로 변경
        self.fc1 = nn.Linear(reduced_dim, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        cnn_features = self.cnn(x)
        cnn_features = self.feature_reduce(cnn_features)
        x = self.fc1(cnn_features)
        x = self.fc2(x)
        return x

def train(model, train_loader, device, optimizer, criterion, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_accuracy = correct / total
        wandb.log({"Train Loss": train_loss, "Train Accuracy": train_accuracy})
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}")


def evaluate_and_visualize(model, valid_loader, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for batch_idx, (images, labels) in enumerate(valid_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)


            for img_idx in range(images.size(0)):
                fig, ax = plt.subplots()
                ax.imshow(images[img_idx].cpu().squeeze(), cmap="gray")
                title = f"GT: {labels[img_idx].item()}, Pred: {predicted[img_idx].item()}"
                ax.set_title(title)
                ax.axis("off")
                wandb.log({f"Sample Prediction (Batch {batch_idx}, Image {img_idx})": wandb.Image(fig)})
                plt.close(fig)

    valid_loss = running_loss / len(valid_loader)
    valid_accuracy = correct / total
    wandb.log({"Validation Loss": valid_loss, "Validation Accuracy": valid_accuracy})
    print(f"Validation Loss: {valid_loss:.4f}, Accuracy: {valid_accuracy:.4f}")


wandb.init(project="COSE474", name="Reduced_CNN_Only(Epoch 5)")
wandb.config = {
    "learning_rate": 0.001,
    "num_epochs": 5,
    "num_classes": 2
}



train_dataset = MRIDataset(train_images, train_labels)
valid_dataset = MRIDataset(valid_images, valid_labels)
torch.manual_seed(42)
np.random.seed(42)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = CNNOnly(num_classes=wandb.config["num_classes"]).to(device)
optimizer = optim.Adam(model.parameters(), lr=wandb.config["learning_rate"])
criterion = nn.CrossEntropyLoss()


train(model, train_loader, device, optimizer, criterion, num_epochs=wandb.config["num_epochs"])


evaluate_and_visualize(model, valid_loader, device)

wandb.finish()


Epoch [1/5], Loss: 1.6147, Accuracy: 0.6167
Epoch [2/5], Loss: 0.5515, Accuracy: 0.6917
Epoch [3/5], Loss: 0.4775, Accuracy: 0.7383
Epoch [4/5], Loss: 0.4157, Accuracy: 0.7900
Epoch [5/5], Loss: 0.3501, Accuracy: 0.8417
Validation Loss: 0.6822, Accuracy: 0.6200


0,1
Train Accuracy,▁▃▅▆█
Train Loss,█▂▂▁▁
Validation Accuracy,▁
Validation Loss,▁

0,1
Train Accuracy,0.84167
Train Loss,0.35014
Validation Accuracy,0.62
Validation Loss,0.68222


# TDA Only

In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2
import gudhi as gd
from gudhi import representations
import matplotlib.pyplot as plt
import wandb

torch.cuda.empty_cache()


def compute_persistent_landscape(image_tensor, num_landscapes=2, resolution=100):
    """
    이미지 텐서를 입력받아 Persistent Landscape를 계산.
    """
    image_np = image_tensor.permute(1, 2, 0).numpy()
    resized_image = cv2.resize(image_np, (128, 128), interpolation=cv2.INTER_CUBIC)

    try:

        cubical_complex = gd.CubicalComplex(top_dimensional_cells=resized_image)
        persistence_diagram = cubical_complex.persistence()
        h0 = cubical_complex.persistence_intervals_in_dimension(0)
        h1 = cubical_complex.persistence_intervals_in_dimension(1)


        def remove_infinite_intervals(intervals):
            return intervals[np.isfinite(intervals[:, 1])] if len(intervals) > 0 else intervals

        h0 = remove_infinite_intervals(h0)
        h1 = remove_infinite_intervals(h1)


        landscape_generator = representations.Landscape(num_landscapes=num_landscapes, resolution=resolution)
        filtered_h0 = np.array(h0) if len(h0) > 0 else np.empty((0, 2))
        filtered_h1 = np.array(h1) if len(h1) > 0 else np.empty((0, 2))


        landscape_h0 = landscape_generator.fit_transform([filtered_h0]).flatten() if len(filtered_h0) > 0 else np.zeros(num_landscapes * resolution)
        landscape_h1 = landscape_generator.fit_transform([filtered_h1]).flatten() if len(filtered_h1) > 0 else np.zeros(num_landscapes * resolution)


        combined_feature = np.concatenate([landscape_h0, landscape_h1])
        return torch.from_numpy(combined_feature).float()

    except Exception as e:
        print(f"TDA feature computation error: {e}")
        return torch.zeros(num_landscapes * resolution * 2)


class MRIDataset(Dataset):
    def __init__(self, images, labels, num_landscapes=2, resolution=100):
        self.images = images
        self.labels = labels
        self.num_landscapes = num_landscapes
        self.resolution = resolution

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        tda_features = compute_persistent_landscape(image, num_landscapes=self.num_landscapes, resolution=self.resolution)
        return image, label, tda_features


class TDAOnlyModel(nn.Module):
    def __init__(self, tda_feature_dim=200, num_classes=2):
        super(TDAOnlyModel, self).__init__()
        self.fc1 = nn.Linear(tda_feature_dim, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, tda_features):
        x = self.fc1(tda_features)
        x = self.fc2(x)
        return x


def train(model, train_loader, device, optimizer, criterion, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0
        for images, labels, tda_features in train_loader:
            images, labels, tda_features = images.to(device), labels.to(device), tda_features.to(device)
            optimizer.zero_grad()

            outputs = model(tda_features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / len(train_loader)
        train_accuracy = correct / total
        wandb.log({"Train Loss": train_loss, "Train Accuracy": train_accuracy})
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}")


def evaluate_and_visualize(model, valid_loader, device):
    model.eval()
    running_loss, correct, total = 0.0, 0, 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for batch_idx, (images, labels, tda_features) in enumerate(valid_loader):
            images, labels, tda_features = images.to(device), labels.to(device), tda_features.to(device)
            outputs = model(tda_features)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            for img_idx in range(images.size(0)):
                fig, ax = plt.subplots()
                ax.imshow(images[img_idx].cpu().squeeze(), cmap="gray")
                title = f"GT: {labels[img_idx].item()}, Pred: {predicted[img_idx].item()}"
                ax.set_title(title)
                ax.axis("off")
                wandb.log({f"Sample Prediction (Batch {batch_idx}, Image {img_idx})": wandb.Image(fig)})
                plt.close(fig)

    valid_loss = running_loss / len(valid_loader)
    valid_accuracy = correct / total
    wandb.log({"Validation Loss": valid_loss, "Validation Accuracy": valid_accuracy})
    print(f"Validation Loss: {valid_loss:.4f}, Accuracy: {valid_accuracy:.4f}")


wandb.init(project="COSE474", name="TDA_Only(Epoch 5)")
wandb.config = {
    "learning_rate": 0.001,
    "num_epochs": 5,
    "num_classes": 2,
    "num_landscapes": 2,
    "resolution": 100
}

num_landscapes = wandb.config["num_landscapes"]
resolution = wandb.config["resolution"]
tda_feature_dim = num_landscapes * resolution * 2

train_dataset = MRIDataset(train_images, train_labels, num_landscapes=num_landscapes, resolution=resolution)
valid_dataset = MRIDataset(valid_images, valid_labels, num_landscapes=num_landscapes, resolution=resolution)
torch.manual_seed(42)
np.random.seed(42)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=16, shuffle=False)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = TDAOnlyModel(tda_feature_dim=tda_feature_dim, num_classes=wandb.config["num_classes"]).to(device)
optimizer = optim.Adam(model.parameters(), lr=wandb.config["learning_rate"])
criterion = nn.CrossEntropyLoss()


train(model, train_loader, device, optimizer, criterion, num_epochs=wandb.config["num_epochs"])


evaluate_and_visualize(model, valid_loader, device)


wandb.finish()


Epoch [1/5], Loss: 0.7595, Accuracy: 0.5300
Epoch [2/5], Loss: 0.6716, Accuracy: 0.5883
Epoch [3/5], Loss: 0.6454, Accuracy: 0.6550
Epoch [4/5], Loss: 0.6453, Accuracy: 0.6250
Epoch [5/5], Loss: 0.6372, Accuracy: 0.6350
Validation Loss: 0.7457, Accuracy: 0.5133


0,1
Train Accuracy,▁▄█▆▇
Train Loss,█▃▁▁▁
Validation Accuracy,▁
Validation Loss,▁

0,1
Train Accuracy,0.635
Train Loss,0.63722
Validation Accuracy,0.51333
Validation Loss,0.7457


In [3]:
wandb.finish()

0,1
Train Accuracy,▁█
Train Loss,█▁

0,1
Train Accuracy,0.58833
Train Loss,0.67157
