In [None]:
samples = []

with open('Data/oznaczenia.txt') as fp:
    for line in fp.readlines():
        [filename, end, start] = line[:-1].split("\t")
        samples.append((filename, int(start), int(end)))

samples

In [None]:
import torch
import nibabel as nib
import random

slices = []
targets = []

for filename, start, end in samples:
    img = nib.load(f"Data/{filename}").get_fdata()
    # Positive slices
    for depth in range(start, end + 1):
        s = img[:, :, depth]
        slices.append(torch.tensor(s, dtype=torch.float).unsqueeze(0))
        targets.append(1)
    negative_indices = list(range(0, start)) + list(range(end, img.shape[2]))
    print(filename)
    chosen_negative_indices = random.sample(negative_indices, end - start + 1)
    for depth in chosen_negative_indices:
        s = img[:, :, depth]
        slices.append(torch.tensor(s, dtype=torch.float).unsqueeze(0))
        targets.append(0)
    

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(slices, targets, test_size=0.2)
print(len(X_train))
print(f"{sum(y_train)}/{len(y_train)}")
print(f"{sum(y_test)}/{len(y_test)}")
print(len(X_test))

In [None]:
resize_shape = 32

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import numpy as np
from typing import List
from sklearn.model_selection import train_test_split
import torch.nn.functional as F

class SlicesDataset:
    def __init__(self, slices: torch.tensor, targets: List[bool], transform = None):
        self.slices = slices
        self.targets = torch.tensor(targets, dtype=torch.int64)
        self.transform = transform

    def __getitem__(self, idx):
        s = self.transform(self.slices[idx]) if self.transform is not None else self.slices[idx]
        target = F.one_hot(self.targets[idx], 2).float()
        return s, target
        
    def __len__(self):
        return len(self.targets)


transform = transforms.Compose([
    transforms.Resize((resize_shape, resize_shape)),
    transforms.Normalize(mean=[0.485], std=[0.229]),
    
])

train_dataset = SlicesDataset(X_train, y_train, transform)
test_dataset = SlicesDataset(X_test, y_test, transform)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
import matplotlib.pyplot as plt

s, target = next(iter(train_loader))

print(target[0])
plt.gray()
plt.imshow(s[0][0])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from torchvision.models import densenet121
from torch import nn


model = densenet121(num_classes=2)
model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model = model.to(device)
model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class SimpleCNN(nn.Module):
    def __init__(self, num_classes=10, size=32):
        """
        Initialize a Convolutional Neural Network with Batch Normalization.
        
        Args:
            num_classes (int): Number of output classes (default: 10 for CIFAR-10)
        """
        super(SimpleCNN, self).__init__()
        
        # Convolutional layers with batch normalization
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True)
        )
        
        # self.conv2 = nn.Sequential(
        #     nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1, bias=False),
        #     nn.BatchNorm2d(64),
        #     nn.ReLU(inplace=True)
        # )
        
        # self.conv3 = nn.Sequential(
        #     nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1, bias=False),
        #     nn.BatchNorm2d(128),
        #     nn.ReLU(inplace=True)
        # )
        
        # Pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # Fully connected layers
        self.fc1 = nn.Sequential(
            # nn.Linear(8192, 256),
            nn.Linear(32 * (size // 2) * (size // 2), 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.20)
        )
        
        self.fc2 = nn.Linear(256, num_classes)

    
    def forward(self, x):
        # First convolutional block
        x = self.pool(self.conv1(x))
        
        # Second convolutional block
        # x = self.pool(self.conv2(x))
        
        # # Third convolutional block
        # x = self.pool(self.conv3(x))
        
        # Flatten the tensor
        x = x.view(x.size(0), -1)

        # Fully connected layers with batch norm and dropout
        x = self.fc1(x)
        x = self.fc2(x)
        
        return x

model = SimpleCNN(2, resize_shape).to(device)
model

In [None]:
from tqdm import tqdm

def train(model, train_loader, optimizer, criterion, device):
    model.train()  # Set the model to training mode
    running_loss = 0.0
    correct = 0
    total = 0
    for images, labels in tqdm(train_loader, desc="Training Epoch"):
        images, labels = images.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass
        loss.backward()
        optimizer.step()

        # Track the loss and accuracy
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        _, true_predictions = torch.max(labels, 1)
        correct += (predicted == true_predictions).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / len(train_loader)
    epoch_accuracy = correct / total * 100
    return epoch_loss, epoch_accuracy

# Testing loop
def test(model, test_loader, criterion, device):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():  # No gradient computation during testing
        for images, labels in tqdm(test_loader, desc="Testing"):
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Track the loss and accuracy
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            _, true_predictions = torch.max(labels, 1)
            correct += (predicted == true_predictions).sum().item()
            total += labels.size(0)

    epoch_loss = running_loss / len(test_loader)
    epoch_accuracy = correct / total * 100
    return epoch_loss, epoch_accuracy


def train_loop(model, train_loader, test_loader, criterion, optimizer, device, num_epochs):
    train_losses = []
    test_losses = []
    train_accuracies = []
    test_accuracies = []
    for epoch in range(num_epochs):
        # Training
        train_loss, train_accuracy = train(model, train_loader, optimizer, criterion, device)
        print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.2f}%")
    
        # Testing
        test_loss, test_accuracy = test(model, test_loader, criterion, device)
        print(f"Epoch {epoch+1}/{num_epochs} - Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.2f}%")
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        train_accuracies.append(train_accuracy)
        test_accuracies.append(test_accuracy)
        # save the model
        if (epoch + 1) % 100 == 0:
            torch.save(model.state_dict(), f"./Model/model_{epoch+1}.pt")

    return {
        "train_loss": train_losses,
        "test_loss": test_losses,
        "train_acc": train_accuracies,
        "test_acc": test_accuracies
    }

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), weight_decay=1e-5)

history = train_loop(model, train_loader, test_loader, criterion, optimizer, device, 5)

In [None]:
history["test_acc"][-10:]

In [None]:
import matplotlib.pyplot as plt

epochs = list(range(len(history["test_acc"])))
plt.plot(epochs, history["train_loss"], label="Train loss")
plt.plot(epochs, history["test_loss"], label="Test loss")

In [None]:
import random

ct = nib.load(f"Data/0004_1_.nii.gz").get_fdata()

slices = []
labels = []
for depth in range(ct.shape[2]):
    slices.append(torch.tensor(ct[:, :, depth], dtype=torch.float).unsqueeze(0))
    label = 1 if depth >= start and depth <= end else 0
    labels.append(label)

transform = transforms.Compose([
    transforms.Resize((resize_shape, resize_shape)),
    transforms.Normalize(mean=[0.485], std=[0.229]),
])

validation_dataset = SlicesDataset(slices, labels, transform=transform)
validation_loader = DataLoader(validation_dataset, batch_size=32, shuffle=False)

test(model, validation_loader, criterion, device)

In [None]:
def make_classification(model, ct, device, transform=transform):
    result = []
    for depth in range(ct.shape[2]):
        s = transform(torch.tensor(ct[:, :, depth], dtype=torch.float, device=device).unsqueeze(0))
        c = torch.argmax(model(s.unsqueeze(0))).item()
        result.append(c)
    # print(set(result))
    return result

classification = make_classification(model, ct, device)

In [None]:
classification

In [None]:
def find_voi(classification, n_neigh=3):
    result = []
    for i in range(len(classification)):
        left = classification[max(0, i - n_neigh):i + 1]
        right = classification[i:min(len(classification), i + n_neigh)]
        decision = int((all(left) and len(left) != 1) or (all(right) and len(right) != 1))
        result.append(decision)
    start = result.index(1)
    end = start + sum(result)
    return start, end

print(start, end)
find_voi(classification, n_neigh=10)

In [None]:
import pandas as pd

n_neigh = 10

data = {
    "sample": list(),
    "start": list(),
    "end": list(),
    "predicted_start": list(),
    "predicted_end": list(),
    "metric": list()
}

# # load model
# model = SimpleCNN(2, 32).to(device)
# model.load_state_dict(torch.load("./Model/old_model_300.pt"))

for sample, start, end in samples:
    print(sample)
    ct = nib.load(f"Data/{sample}").get_fdata()
    classification = make_classification(model, ct, device)
    predicted_start, predicted_end = find_voi(classification, n_neigh)
    metric = abs(predicted_start - start) + abs(predicted_end - end)
    data["sample"].append(sample)
    data["start"].append(start)
    data["end"].append(end)
    data["predicted_start"].append(predicted_start)
    data["predicted_end"].append(predicted_end)
    data["metric"].append(metric)

df = pd.DataFrame(data)

In [None]:
df

In [None]:
df["metric"].mean()

# Test

In [None]:
test_transform = transforms.Compose([
    transforms.Resize((resize_shape, resize_shape)),
    transforms.Normalize(mean=[0.485], std=[0.229]),
])

In [None]:
from glob import glob

model_files = glob("./Model/*.pt")
model_files.sort()



data = {
    "sample": list(),
    "start": list(),
    "end": list(),
    "predicted_start": list(),
    "predicted_end": list(),
    "metric": list()
}

samples = []

with open('TestData/ranges.txt') as fp:
    for line in fp.readlines():
        [filename, start, end] = line[:-1].split(" ")

        samples.append((filename, int(start), int(end)))


        
for sample, start, end in samples:
    print(sample)
    ct = nib.load(f"TestData/{sample}").get_fdata()
    classification = make_classification(model, ct, device)
    try:
        predicted_start, predicted_end = find_voi(classification, n_neigh)
    except ValueError:
        predicted_start, predicted_end = 0, 0
    metric = abs(predicted_start - start) + abs(predicted_end - end)
    data["sample"].append(sample)
    data["start"].append(start)
    data["end"].append(end)
    data["predicted_start"].append(predicted_start)
    data["predicted_end"].append(predicted_end)
    data["metric"].append(metric)

df = pd.DataFrame(data)

In [None]:
df["metric"].mean()