## Logistic Regression and SVM models for Segmentation of Pancreas

#### Data Preparation and Loading

In [1]:
import torch
import os
import pydicom
import numpy as np
from monai.data import Dataset
from monai.transforms import LoadImage

class MedicalImageDataset(Dataset):
    def __init__(self, image_dir, label_dir, transforms=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transforms = transforms
        # the self.patients list is filled with the names of directories in image_dir when then directory name includes "PANCREAS"
        self.patients = [f for f in sorted(os.listdir(image_dir)) if "PANCREAS" in f]
        self.max_depth = self.calculate_max_depth()

    def calculate_max_depth(self):
        max_depth = 0
        for patient_folder in self.patients:
            patient_image_dir = os.path.join(self.image_dir, patient_folder)
            volume = self.load_dicom_volume(patient_image_dir) # finds the number of slices in the folder
            if volume.shape[0] > max_depth:
                max_depth = volume.shape[0]
        return max_depth

    def load_dicom_volume(self, patient_image_dir):
        subfolder = next(os.walk(patient_image_dir))[1][0]
        deepest_folder = next(os.walk(os.path.join(patient_image_dir, subfolder)))[1][0]
        final_path = os.path.join(patient_image_dir, subfolder, deepest_folder)
        files = sorted(os.listdir(final_path), key=lambda x: pydicom.dcmread(os.path.join(final_path, x)).InstanceNumber)
        # os.listdir(final_path): refers to the folder with the dicom files
        # sorted() usually sorts them in alphabetical, but using the key parameter makes sure a specific order is used
        # key=lambda x: pydicom.dcmread(os.path.join(final_path, x)).InstanceNumber:
            # lambda x: defines an anonymous function where x is the name of the variable that represents each element in the list
            # pydicom.dcmread(os.path.join(final_path, x)).InstanceNumber : the instance number of each dicom image is used to ensure that the images are sorted in consecutive, sequential order.

        # stacks all of the 2D slices for each patient into a 3D array
        volume = np.stack([pydicom.dcmread(os.path.join(final_path, f)).pixel_array for f in files])

        # converts the numpy array into a torch tensor
        return torch.from_numpy(volume).float()  # No need to add batch dimension manually

    def __len__(self):
        return len(self.patients)

    def __getitem__(self, idx):
        patient_folder = self.patients[idx]
        patient_image_dir = os.path.join(self.image_dir, patient_folder)
        patient_label_path = os.path.join(self.label_dir, f"label{patient_folder.split('_')[-1]}.nii.gz")
        
        volume = self.load_dicom_volume(patient_image_dir)

        # label image is loaded with MONAI's LoadImage where only the image is loaded with image_only = True
        label = LoadImage(image_only=True)(patient_label_path)
        # rearranges the axes of the label array to match the conventional layout expected by PyTorch models (num_channels x height x width)
        label = np.transpose(label, (2, 0, 1))
        # turns numpy array into torch tensor
        label = torch.from_numpy(label).float()

        # Padding to maximum depth for depth (slice axis)
        pad_size = self.max_depth - volume.shape[0]
        volume = torch.nn.functional.pad(volume, (0, 0, 0, 0, 0, pad_size))
        label = torch.nn.functional.pad(label, (0, 0, 0, 0, 0, pad_size))

        # Correcting shape to (512, 512, padded_num_slices, 1)
        volume = volume.permute(2, 1, 0).unsqueeze(-1)  # changes the shape of the volume tensor to fit the format expected by neural networks of (H, W, D, C)
        # permute function allows for changing dimensions of the tensor
        # unsqueeze function adds a new dimension at the specified index. Here the index is -1 which indicates that the new dimension added into the last position. 
        label = label.permute(2, 1, 0).unsqueeze(-1)    # changes the shape of the volume tensor to fit the format expected by neural networks of (H, W, D, C)

        if self.transforms:
            volume = self.transforms(volume)
            label = self.transforms(label)

        return volume.squeeze(0), label.squeeze(0)  # Ensure removing any singleton dimension at batch axis

2024-04-25 12:32:13.384786: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
import os
import numpy as np
import pydicom
import torch
from torch.utils.data import DataLoader, Dataset
from monai.transforms import Compose, ScaleIntensity, EnsureType
from monai.transforms import LoadImage

# Transforms and DataLoader
transforms = Compose([
    ScaleIntensity(), # normalizes or scales image intensities
    EnsureType(dtype=torch.float32) # ensures that the data type of the tensors after transformationis torch.float32 which is standard for PyTorch models
])

# Paths
image_root_dir = "/Users/lukeyun/SDS323/manifest-1599750808610/Pancreas-CT"
label_root_dir = "/Users/lukeyun/SDS323/TCIA_pancreas_labels-02-05-2017"

dataset = MedicalImageDataset(image_root_dir, label_root_dir, transforms=transforms)

# DataLoader is initialized which is standard for PyTorch usage
""" dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
# note that the dataset has volume, label at this point
# batch_size = 1 ensures that each item retried by the DataLoader will contain one sample from the dataset which in this case is one volume and one label

# Test DataLoader
for volume, label in dataloader:
    # Squeeze out the batching dimension which is the first dimension
    volume, label = volume.squeeze(0), label.squeeze(0) # Comment if permute block is commented above
    print("Volume shape:", volume.shape, "Label shape:", label.shape)
    break # Checking only the first batch """

Volume shape: torch.Size([512, 512, 466, 1]) Label shape: torch.Size([512, 512, 466, 1])


In [11]:
import torch
from torch.utils.data import DataLoader, random_split

def create_data_loaders(dataset, batch_size=1):

    # Determine sizes of each split
    total_size = len(dataset)
    train_size = int(0.6 * total_size)
    val_size = int(0.2 * total_size)
    test_size = total_size - train_size - val_size

    # Randomly split the dataset into train, cv, and test sets
    train_dataset, val_dataset, test_dataset = random_split(
        dataset, [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(832) # Set seed
    )

    # Create data loaders for each split
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader

dataset = MedicalImageDataset(image_root_dir, label_root_dir, transforms=transforms)
train_loader, val_loader, test_loader = create_data_loaders(dataset, batch_size=1)

### Logistic Regression Model

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split

# Define the Logistic Regression Model
class LogisticRegression(nn.Module):
    def __init__(self, num_features, num_outputs):
        super(LogisticRegression, self).__init__()
        self.linear = nn.Linear(num_features, num_outputs)

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten the image into a vector
        return torch.sigmoid(self.linear(x))

# Initialize the Logistic Regression Model
# num_outputs is the total number of voxels in a flattened volume
num_features = 512 * 512 * 466  # Adjust based on actual volume size
model = LogisticRegression(num_features=num_features, num_outputs=num_features)

# Loss and Optimizer
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training Function
def train_model(model, train_loader, criterion, optimizer, num_epochs=5):
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        for i, (volumes, labels) in enumerate(train_loader):
            volumes = volumes.view(volumes.size(0), -1)  # Flatten volumes
            labels = labels.view(labels.size(0), -1)  # Flatten labels

            # Forward pass
            outputs = model(volumes)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if (i + 1) % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

# Evaluation Function
def evaluate_model(model, val_loader):
    model.eval()  # Set the model to evaluation mode
    total = 0
    correct = 0
    with torch.no_grad():
        for volumes, labels in val_loader:
            volumes = volumes.view(volumes.size(0), -1)  # Flatten volumes
            labels = labels.view(labels.size(0), -1)  # Flatten labels

            outputs = model(volumes)
            predicted = outputs.round()  # Threshold the probabilities
            correct += (predicted == labels).sum().item()
            total += labels.numel()  # Total number of elements

        accuracy = 100 * correct / total
        print(f'Accuracy: {accuracy:.2f}%')

train_loader, val_loader, test_loader = create_data_loaders(dataset, batch_size=1)

# Train the model
train_model(model, train_loader, criterion, optimizer)

# Evaluate the model
evaluate_model(model, val_loader)


NameError: name 'train_loader' is not defined

### Support Vector Machine (SVM)

In [None]:
import torch
import numpy as np
from sklearn import svm
from sklearn.metrics import accuracy_score
from monai.data import Dataset, DataLoader
from torch.utils.data import DataLoader, random_split
from joblib import dump

class FlattenTransform:
    """Transform to flatten the image volumes and labels for SVM input."""
    def __call__(self, sample):
        volume, label = sample
        # Flatten volume and label to 2D arrays
        return volume.view(-1, volume.shape[-1]), label.view(-1)

def train_svm_model(train_loader, C=1.0):
    clf = svm.SVC(C=C, kernel='linear', probability=True)
    for i, (volumes, labels) in enumerate(train_loader):
        # Flatten the volumes and labels for SVM training
        X_train, y_train = FlattenTransform()((volumes, labels))
        X_train = X_train.numpy()  # Convert to numpy array for scikit-learn
        y_train = y_train.numpy()
        clf.fit(X_train, y_train)
        print(f"Batch {i+1} trained.")
    return clf

def validate_svm_model(clf, val_loader):
    total_acc = 0
    for i, (volumes, labels) in enumerate(val_loader):
        X_val, y_val = FlattenTransform()((volumes, labels))
        X_val = X_val.numpy()  # Convert to numpy array for scikit-learn
        y_val = y_val.numpy()
        y_pred = clf.predict(X_val)
        acc = accuracy_score(y_val, y_pred)
        total_acc += acc
        print(f"Validation accuracy for batch {i+1}: {acc:.2f}")
    average_acc = total_acc / len(val_loader)
    print(f"Average validation accuracy: {average_acc:.2f}")


clf = train_svm_model(train_loader)
validate_svm_model(clf, val_loader)
dump(clf, 'svm_pancreas_segmentation.joblib')
