In [10]:
import os
import SimpleITK as sitk
import numpy as np
import matplotlib.pyplot as plt
from scipy.ndimage import zoom
import tensorflow as tf
from tensorflow.keras import layers, models


#### Data Preprocessing

In [None]:
# --- Helper Functions ---
def load_case(data_dir, case_id, margin=5):
    """
    Load a single case's imaging and segmentation data.
    """
    case_path = os.path.join(data_dir, case_id)
    
    # Load imaging data
    image_path = os.path.join(case_path, "imaging.nii.gz")
    image = sitk.ReadImage(image_path)
    image_array = sitk.GetArrayFromImage(image)
    
    # Load segmentation data
    mask_path = os.path.join(case_path, "segmentation.nii.gz")
    mask = sitk.ReadImage(mask_path)
    mask_array = sitk.GetArrayFromImage(mask)
    
    # Crop around the non-zero region
    non_zero = np.array(np.nonzero(mask_array))
    start = np.maximum(non_zero.min(axis=1) - margin, 0)
    end = np.minimum(non_zero.max(axis=1) + margin, mask_array.shape)
    slices = tuple(slice(s, e) for s, e in zip(start, end))
    
    return image_array[slices], mask_array[slices]

def normalize_image(image_array):
    """
    Normalize the image array to [0, 1].
    """
    return (image_array - np.min(image_array)) / (np.max(image_array) - np.min(image_array))

def resize_image_and_mask(image_array, mask_array, target_shape=(128, 128, 128)):
    """
    Resize the image and mask to the target shape.
    """
    factors = [t / i for t, i in zip(target_shape, image_array.shape)]
    resized_image = zoom(image_array, factors, order=1)  # Linear interpolation
    resized_mask = zoom(mask_array, factors, order=0)   # Nearest-neighbor for mask
    return resized_image, resized_mask

def encode_labels(mask_array):
    """
    Encode segmentation masks into binary class labels.
    """
    return (mask_array > 0).astype(int)

def augment_data(image_array, mask_array):
    """
    Apply random augmentation to the image and mask.
    """
    if np.random.rand() > 0.5:  # Random horizontal flip
        image_array = np.flip(image_array, axis=1)
        mask_array = np.flip(mask_array, axis=1)
    return image_array, mask_array

def preprocess_case(data_dir, case_id, target_shape=(128, 128, 128), augment=False):
    """
    Preprocess a single case: load, crop, normalize, resize, encode labels, and augment.
    """
    # Load and normalize data
    image, mask = load_case(data_dir, case_id)
    image_normalized = normalize_image(image)
    
    # Resize to target shape
    image_resized, mask_resized = resize_image_and_mask(image_normalized, mask, target_shape)
    
    # Encode labels
    mask_encoded = encode_labels(mask_resized)
    
    # Apply augmentation if enabled
    if augment:
        image_resized, mask_encoded = augment_data(image_resized, mask_encoded)
    
    return image_resized, mask_encoded

def load_dataset(data_dir, case_ids, target_shape=(128, 128, 128), augment=False):
    """
    Load and preprocess the entire dataset.
    """
    dataset = []
    for case_id in case_ids:
        print(f"Processing case: {case_id}")
        image, mask = preprocess_case(data_dir, case_id, target_shape, augment)
        dataset.append((image, mask))
    return dataset

def visualize_preprocessed(image_array, mask_array, slice_idx):
    """
    Visualize slices of the preprocessed CT scan and segmentation mask.
    """
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(image_array[slice_idx], cmap="gray")
    plt.title("Preprocessed CT Slice")
    plt.axis("off")
    
    plt.subplot(1, 2, 2)
    plt.imshow(mask_array[slice_idx], cmap="jet", alpha=0.5)
    plt.title("Preprocessed Mask Slice")
    plt.axis("off")
    plt.show()

# --- Main Execution ---
if __name__ == "__main__":
    # Path to the KiTS19 dataset
    data_dir = "D:\\Georgian\\ML Programming\\Project\\kits19\\data"
    
    # List all cases
    case_ids = [case for case in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, case))]
    
    # List for first 31 cases
    case_ids_ = [
    "case_00000", "case_00001", "case_00002", "case_00003", "case_00004",
    "case_00005", "case_00006", "case_00007", "case_00008", "case_00009",
    "case_00010", "case_00011", "case_00012", "case_00013", "case_00014",
    "case_00015", "case_00016", "case_00017", "case_00018", "case_00019",
    "case_00020", "case_00021", "case_00022", "case_00023", "case_00024",
    "case_00025", "case_00026", "case_00027", "case_00028", "case_00029",
    "case_00030"]

    # Preprocess the dataset
    target_shape = (128, 128, 128)
    dataset = load_dataset(data_dir, case_ids_, target_shape, augment=True)
    
    # Example: Visualize one case
    example_image, example_mask = dataset[0]
    slice_idx = example_image.shape[0] // 2
    visualize_preprocessed(example_image, example_mask, slice_idx)


Processing case: case_00000
Processing case: case_00001
Processing case: case_00002
Processing case: case_00003
Processing case: case_00004
Processing case: case_00005
Processing case: case_00006
Processing case: case_00007
Processing case: case_00008
Processing case: case_00009
Processing case: case_00010
Processing case: case_00011


In [None]:
# --- Helper Functions ---
def prepare_data(dataset):
    """
    Convert dataset into features (X) and labels (y).
    """
    X = np.array([item[0] for item in dataset])  # Images
    y = np.array([item[1] for item in dataset])  # Masks
    return X, y

def split_data(X, y, train_ratio=0.8, val_ratio=0.1):
    """
    Split data into training, validation, and test sets.
    """
    total_samples = len(X)
    train_end = int(total_samples * train_ratio)
    val_end = train_end + int(total_samples * val_ratio)
    
    X_train, y_train = X[:train_end], y[:train_end]
    X_val, y_val = X[train_end:val_end], y[train_end:val_end]
    X_test, y_test = X[val_end:], y[val_end:]
    
    return (X_train, y_train), (X_val, y_val), (X_test, y_test)

# --- Model Architecture ---
def create_3d_cnn(input_shape):
    model = models.Sequential()

    # Downsampling
    model.add(layers.Conv3D(32, kernel_size=3, activation='relu', padding='same', input_shape=input_shape))
    model.add(layers.MaxPooling3D(pool_size=2))  # Reduces dimensions by half
    model.add(layers.Conv3D(64, kernel_size=3, activation='relu', padding='same'))
    model.add(layers.MaxPooling3D(pool_size=2))  # Reduces dimensions by half again
    model.add(layers.Conv3D(128, kernel_size=3, activation='relu', padding='same'))
    model.add(layers.MaxPooling3D(pool_size=2))  # Reduces dimensions by half again

    # Upsampling to restore dimensions
    model.add(layers.Conv3DTranspose(128, kernel_size=3, activation='relu', padding='same'))
    model.add(layers.UpSampling3D(size=2))  # Upsamples dimensions by 2
    model.add(layers.Conv3DTranspose(64, kernel_size=3, activation='relu', padding='same'))
    model.add(layers.UpSampling3D(size=2))  # Upsamples dimensions by 2
    model.add(layers.Conv3DTranspose(32, kernel_size=3, activation='relu', padding='same'))
    model.add(layers.UpSampling3D(size=2))  # Upsamples dimensions by 2

    # Output layer
    model.add(layers.Conv3D(1, kernel_size=1, activation='sigmoid', padding='same'))  # Binary segmentation

    return model


# --- Main Execution ---
def new_func(create_3d_cnn, input_shape):
    model = create_3d_cnn(input_shape)

if __name__ == "__main__":
    # Prepare data
    X, y = prepare_data(dataset)  # Preprocessed dataset
    (X_train, y_train), (X_val, y_val), (X_test, y_test) = split_data(X, y)

    # Normalize and reshape inputs
    X_train, X_val, X_test = X_train / 1.0, X_val / 1.0, X_test / 1.0  # Normalize
    X_train = X_train[..., np.newaxis]  # Add channel axis
    X_val = X_val[..., np.newaxis]
    X_test = X_test[..., np.newaxis]
    y_train = y_train[..., np.newaxis]  # Shape: (batch_size, depth, height, width, 1)
    y_val = y_val[..., np.newaxis]
    y_test = y_test[..., np.newaxis]

    # Define model
    input_shape = X_train.shape[1:]  # e.g., (128, 128, 128, 1)
    new_func(create_3d_cnn, input_shape)

# Compile, train, and evaluate as before


    # Normalize inputs
    X_train, X_val, X_test = X_train / 1.0, X_val / 1.0, X_test / 1.0  # Already normalized in preprocessing

    # Define model
    input_shape = X_train.shape[1:]  # e.g., (128, 128, 128, 1)
    model = create_3d_cnn(input_shape)

    # Compile model
    model.compile(optimizer='adam',
                  loss='binary_crossentropy',  # Binary segmentation
                  metrics=['accuracy'])
    
    print("Model output shape:", model.output_shape)  # Should match y_train shape
    print("y_train shape:", y_train.shape)  # Ensure y_train includes the channel dimension


    # Train model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=5,
        batch_size=4,
        callbacks=[tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)]
    )

    # Evaluate model
    test_loss, test_acc = model.evaluate(X_test, y_test)
    print(f"Test Accuracy: {test_acc:.2f}")

    # # Save model
    # model.save("kits19_cnn_model.h5")


Model output shape: (None, 128, 128, 128, 1)
y_train shape: (24, 128, 128, 128, 1)
Epoch 1/5
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m39s[0m 6s/step - accuracy: 0.6348 - loss: 0.6307 - val_accuracy: 0.8466 - val_loss: 0.5546
Epoch 2/5
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 6s/step - accuracy: 0.8530 - loss: 0.5088 - val_accuracy: 0.8466 - val_loss: 0.4585
Epoch 3/5
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 6s/step - accuracy: 0.8600 - loss: 0.4308 - val_accuracy: 0.8466 - val_loss: 0.4074
Epoch 4/5
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m37s[0m 6s/step - accuracy: 0.8597 - loss: 0.3982 - val_accuracy: 0.8466 - val_loss: 0.3565
Epoch 5/5
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 6s/step - accuracy: 0.8581 - loss: 0.3475 - val_accuracy: 0.8466 - val_loss: 0.3283
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step - accuracy: 0.8468 - loss: 0.3240
Test Accuracy: 0.85
