Import libraries

In [2]:
import os
import numpy as np
from tensorflow.keras.utils import to_categorical
import nibabel
from sklearn.model_selection import train_test_split
import tensorflow as tf
import cv2
from skimage.filters import unsharp_mask
from sklearn.utils import shuffle
from sklearn.utils.class_weight import compute_class_weight
from skimage.transform import resize
from dipy.align.imwarp import SymmetricDiffeomorphicRegistration
from dipy.align.metrics import CCMetric
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Activation, Dense, Flatten
from tensorflow.keras.layers import Conv3D, AveragePooling3D, MaxPooling3D
from tensorflow.keras.layers import (Input, Activation, Dense, 
                                     Flatten, Conv3D, AveragePooling3D, 
                                     MaxPooling3D, add, multiply, BatchNormalization, 
                                     GlobalAveragePooling3D, GlobalMaxPooling3D, 
                                     Reshape, Concatenate, Lambda)
from tensorflow.keras.regularizers import l2
from tensorflow.keras.layers import add, multiply
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import SGD

Check for GPU

In [3]:
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    logical_gpus = tf.config.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")

Compatibility with Python 2 and 3

In [4]:
from __future__ import absolute_import, division, print_function, unicode_literals
import six
from math import ceil

ADD-Net 3D Model Builder

In [5]:
from keras.models import Model
from keras.layers import (
    Input,
    Conv3D,
    ReLU,
    AveragePooling3D,
    Dropout,
    Flatten,
    Dense,
    Softmax,
)
from keras.initializers import GlorotUniform


# class ADDNet3DBuilder:
#     """3D CNN Builder for a specified architecture."""

#     @staticmethod
#     def build(input_shape, num_outputs, dropout_rates=(0.01, 0.03)):
#         """
#         Build the 3D CNN model.

#         # Arguments:
#             input_shape: Tuple of input shape (depth, height, width, channels).
#             num_outputs: Number of output classes.
#             dropout_rates: Tuple of dropout rates (dropout1, dropout2).
#         # Returns:
#             model: A 3D CNN Keras model.
#         """
#         init = GlorotUniform()

#         input_layer = Input(shape=input_shape)

#         # First Conv3D block
#         x = Conv3D(16, kernel_size=(7, 7, 7), kernel_initializer=init)(input_layer)
#         x = ReLU()(x)
#         x = AveragePooling3D(pool_size=(3, 3, 3))(x)

#         # Second Conv3D block
#         x = Conv3D(32, kernel_size=(7, 7, 7), kernel_initializer=init)(x)
#         x = ReLU()(x)
#         x = AveragePooling3D(pool_size=(3, 3, 3))(x)

#         # Third Conv3D block
#         x = Conv3D(64, kernel_size=(7, 7, 7), kernel_initializer=init)(x)
#         x = ReLU()(x)
#         x = AveragePooling3D(pool_size=(3, 3, 3))(x)

#         # Fourth Conv3D block
#         x = Conv3D(128, kernel_size=(7, 7, 7), kernel_initializer=init)(x)
#         x = ReLU()(x)
#         x = AveragePooling3D(pool_size=(3, 3, 3))(x)

#         # Dropout
#         x = Dropout(dropout_rates[0])(x)

#         # Fully connected layers
#         x = Flatten()(x)
#         x = Dense(256, kernel_initializer=init)(x)
#         x = ReLU()(x)
#         x = Dropout(dropout_rates[1])(x)

#         # Output layer
#         output_layer = Dense(num_outputs, kernel_initializer=init)(x)
#         output_layer = Softmax()(output_layer)

#         # Build model
#         model = Model(inputs=input_layer, outputs=output_layer)
#         return model


# # Example usage
# model = ADDNet3DBuilder.build(
#     input_shape=(100, 100, 100, 1),  # Example 3D input (depth=176, height=208, width=3, channels=1)
#     num_outputs=2,                # Number of output classes
# )
# model.summary()

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Input, Conv2D, ReLU, AveragePooling2D, Dropout,
    Flatten, Dense, Softmax, GlobalAveragePooling3D
)
from tensorflow.keras.initializers import GlorotUniform
# import tensorflow_addons as tfa
from tensorflow.keras.optimizers import Adam


from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import (
    Input, Conv3D, AveragePooling3D, Flatten, Dense, ReLU, Dropout, Softmax
)
from tensorflow.keras.initializers import GlorotUniform

# Initialize the weights
init = GlorotUniform()

# Define the model
model = Sequential([
    # Input layer (e.g., 3D shape like (depth, height, width, channels))
    Input(shape=(100, 100, 100, 1)),

    # 3D convolutional layers
    Conv3D(16, kernel_size=7, kernel_initializer=init),
    ReLU(),
    AveragePooling3D(pool_size=(2, 2, 2), strides=(1, 1, 1)),

    Conv3D(32, kernel_size=7, kernel_initializer=init),
    ReLU(),
    AveragePooling3D(pool_size=(2, 2, 2), strides=(1, 1, 1)),

    Conv3D(64, kernel_size=7, kernel_initializer=init),
    ReLU(),
    AveragePooling3D(pool_size=(2, 2, 2)),
    

    Conv3D(128, kernel_size=7, kernel_initializer=init),
    ReLU(),
    # AveragePooling3D(pool_size=(2, 2, 2), strides=(1, 1, 1)),
    GlobalAveragePooling3D(),

    # Dropout layer
    Dropout(0.2),
    
    # Flatten the 3D volume to a 1D vector
    Flatten(),

    # Fully connected layers
    Dense(256, kernel_initializer=init),
    ReLU(),
    Dropout(0.5),

    # Output layer (e.g., 2 classes)
    Dense(3, kernel_initializer=init),
    Softmax()
])


# model.compile(
#     optimizer=Adam(0.001), 
#     loss='binary_crossentropy', 
#     metrics=['accuracy', 'Recall', 'AUC', 'Precision']
# )

model.summary()

Image Preprocessing

In [6]:
# Testing the model
def apply_mask(aseg_image, brain_image, labels = [17, 53, 2, 7, 41, 46]):
    brain_data = aseg_image.get_fdata()
    aseg_data = aseg_image.get_fdata()
    origin_data = brain_image.get_fdata()
    
    brain_mask = np.zeros_like(aseg_data)
    for label in labels:
        brain_mask += np.where((aseg_data == label), 1, 0)

#     segmented_brain_image = brain_data * brain_mask
#     segmented_brain_image = nibabel.Nifti1Image(segmented_brain_image, affine=None)
    new_image = origin_data * brain_mask
    
    return new_image

def enhance_slice(slice_data):
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    enhanced_slice = clahe.apply(slice_data.astype(np.uint8))

    return enhanced_slice

def enhance_image(img_data):
    enhanced_slices = []
    
    for slice_idx in range(img_data.shape[2]):
        slice_data = img_data[:, :, slice_idx]
        enhanced_slice = enhance_slice(slice_data)
        enhanced_slices.append(enhanced_slice)
    enhanced_volume = np.dstack(enhanced_slices)

    return enhanced_volume

def sharpen_image(image, strength=1.0):
    sharpened_image = unsharp_mask(image, radius=1, amount=strength)
    return sharpened_image

def apply_nonlinear_registration(moving_image, fixed_image):
    metric = CCMetric(3)

    sdr = SymmetricDiffeomorphicRegistration(metric, [10, 10, 10], step_length=0.25, ss_sigma_factor=1.5)

    mapping = sdr.optimize(fixed_image, moving_image)

    warped_moving_image = mapping.transform(moving_image)

    return warped_moving_image

In [7]:
def image_fixed(image_type, target_shape):
    aseg_image = nibabel.load('AD/I65597.nii')
    base_path = "AD/I65597.nii/mri/orig.mgz"
    origin_image =  nibabel.load(base_path)
    
    if (image_type=='nonroi') :
        aseg_image = aseg_image.get_fdata()
        origin_image = origin_image.get_fdata()
        
        mask = np.where(asg_img != 0, 1, 0)
        image = origin_image * mask
        image = enhance_image(image)
        image = resize(image, target_shape, anti_aliasing=True)
        image = sharpen_image(image)
        
        return image

In [8]:
from scipy.ndimage import rotate

def augment(image, rotation_range):
    rotation_angle = np.random.uniform(-rotation_range, rotation_range)
    rotated_image = rotate(image, rotation_angle, reshape=False)
    return rotated_image


def image_load_nonRoi(image_path, target_shape, type_dt=''):
    asg_img = nibabel.load(image_path).get_fdata()
    # print(image_path)
    # print(image_path.split("\\")[:-1])
    # "\\".join(image_path.split("\\")[:-1])
    
    origin_path = "\\".join(image_path.split("\\")[:-1]) + "/orig.mgz"
    # print(origin_path)
    origin_image = nibabel.load(origin_path).get_fdata()
    mask = np.where(asg_img != 0, 1, 0)
    
    image = origin_image * mask
    image = resize(image, target_shape, anti_aliasing=True)
    image = enhance_image(image)
    image = sharpen_image(image)

    
    if type_dt=='train':
        image = augment(image, 50)
        
    return image

def data_generator(paths, labels, batch_size, target_shape, image_type, type_dt=''):
    while True:
        for i in range(0, len(paths), batch_size):
            batch_paths = paths[i:i+batch_size]
            batch_labels = labels[i:i+batch_size]
            batch_images = []
            
    
            if image_type == 'nonroi':
                batch_images = [image_load_nonRoi(image, target_shape, type_dt) for image in batch_paths]

            batch_images = np.stack([batch_images] * 1, axis=-1)

            batch_labels = to_categorical(batch_labels, num_classes=3)
            yield np.array(batch_images), batch_labels
base_dir = ''
ad = os.path.join(base_dir, 'dataset/AD')
mci = os.path.join(base_dir, 'dataset/MCI')
cn = os.path.join(base_dir, 'dataset/CN')
# original shape (257, 257, 257)

ad_images= []
mci_images = []
cn_images = []

for subject_dir in os.listdir(cn):
    mri_path = os.path.join(cn, subject_dir, 'mri', 'aparc.DKTatlas+aseg.deep.mgz')
    if not (len(os.listdir(os.path.join(cn, subject_dir, 'mri'))) < 6):
        cn_images.append(mri_path)

for subject_dir in os.listdir(ad):
    mri_path = os.path.join(ad, subject_dir, 'mri', 'aparc.DKTatlas+aseg.deep.mgz')
    if not (len(os.listdir(os.path.join(ad, subject_dir, 'mri'))) < 6):
        ad_images.append(mri_path)
        
for subject_dir in os.listdir(mci):
    mri_path = os.path.join(mci, subject_dir, 'mri', 'aparc.DKTatlas+aseg.deep.mgz')
    
    if not (len(os.listdir(os.path.join(mci, subject_dir, 'mri'))) < 6):
        mci_images.append(mri_path)

for subject_dir in os.listdir(cn):
    mri_path = os.path.join(cn, subject_dir, 'mri', 'aparc.DKTatlas+aseg.deep.mgz')
    if not (len(os.listdir(os.path.join(cn, subject_dir, 'mri'))) < 6):
        cn_images.append(mri_path)
        

In [9]:
image_path = mci_images + cn_images + cn_images[:len(mci_images)-len(cn_images)]
labels = [0] * len(mci_images) + [1] * len(cn_images) + [1] * len(cn_images[:len(mci_images)-len(cn_images)])
train_paths, test_paths, train_labels, test_labels = train_test_split(image_path, labels, test_size = 0.2, random_state=42)

In [None]:
# Try this later:
# from sklearn.utils import class_weight

# class_weights = class_weight.compute_class_weight(
#     class_weight="balanced",
#     classes=np.unique(train_labels.argmax(axis=1)),
#     y=train_labels.argmax(axis=1)
# )
# class_weights = dict(enumerate(class_weights))

class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
train_paths = np.array(train_paths)
train_labels = np.array(train_labels)
test_paths = np.array(test_paths)
test_labels = np.array(test_labels)

train_paths, train_labels = shuffle(train_paths, train_labels, random_state=42)
test_paths, test_labels = shuffle(test_paths, test_labels, random_state=42)

In [11]:
target_shape = (100, 100, 100)
batch_size = 8
selection_type = 'nonroi'
train_dataset = data_generator(train_paths, train_labels, batch_size, target_shape, 
                               image_type=selection_type, 
                               type_dt='train'
                              )

test_dataset = data_generator(test_paths, test_labels, batch_size, target_shape, 
                              image_type=selection_type
                             )

len(test_paths), len(train_labels), class_weights

(304, 1214, array([1.57662338, 0.73220748]))

Compile the model

In [12]:
# model.compile(
#     optimizer=SGD(learning_rate=0.01), 
#     loss = tf.keras.losses.BinaryCrossentropy(name='loss'), 
#     metrics=[
#         tf.keras.metrics.CategoricalAccuracy(name='acc'), 
#         tf.keras.metrics.AUC(name='auc'),
#         # tfa.metrics.F1Score(num_classes=4),
#         tf.metrics.Precision(name="precision"),
#         tf.metrics.Recall(name="recall") ])

from tensorflow.keras.optimizers import SGD
# import tensorflow_addons as tfa

model.compile(
    optimizer=SGD(learning_rate=0.01), 
    loss=tf.keras.losses.CategoricalCrossentropy(name='loss'),  # Updated for 3 categories
    metrics=[
        tf.keras.metrics.CategoricalAccuracy(name='acc'),
        tf.keras.metrics.AUC(name='auc', multi_label=True),  # Updated for multi-class AUC
        # tfa.metrics.F1Score(num_classes=3, average='macro', name='f1_score'),  # Optional F1 score
        tf.keras.metrics.Precision(name="precision"),
        tf.keras.metrics.Recall(name="recall")
    ]
)

In [None]:
# Check for data leakage
train_paths_set = set(train_paths)
test_paths_set = set(test_paths)
# val_paths_set = set(val_paths)

assert len(train_paths_set & test_paths_set) == 0, "Data leakage detected between train and test sets."
# assert len(train_paths_set & val_paths_set) == 0, "Data leakage detected between train and validation sets."
# assert len(test_paths_set & val_paths_set) == 0, "Data leakage detected between test and validation sets."


In [None]:
leakage = train_paths_set & test_paths_set
print(f"Number of overlapping paths: {len(leakage)}")
print("Overlapping paths:", leakage)


Training the model

In [13]:
num_epoch = 1
history = model.fit(
    train_dataset,
    epochs = num_epoch,
    steps_per_epoch = len(train_paths) // batch_size,
    validation_data = test_dataset,
    validation_steps= len(test_paths) // batch_size,
)

[1m151/151[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m19852s[0m 131s/step - acc: 0.5666 - auc: 0.3474 - loss: 0.8622 - precision: 0.5925 - recall: 0.3580 - val_acc: 0.6974 - val_auc: 0.2768 - val_loss: 0.6238 - val_precision: 0.6974 - val_recall: 0.6974


In [14]:
model.save('ADDNet3D_All_data.keras')

In [20]:
import collections

# Check class balance
print("Training set class distribution:", collections.Counter(train_labels))
# print("Validation set class distribution:", collections.Counter(val_labels))
print("Test set class distribution:", collections.Counter(test_labels))

Training set class distribution: Counter({1: 829, 0: 385})
Test set class distribution: Counter({1: 212, 0: 92})


In [15]:
res = model.evaluate(
    test_dataset,
    steps = len(test_paths) // batch_size,
    batch_size = 8,
    verbose = 1
)

[1m38/38[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1083s[0m 29s/step - acc: 0.7074 - auc: 0.2797 - loss: 0.6137 - precision: 0.7074 - recall: 0.7074


In [16]:
print(res)

[0.6238309144973755, 0.6973684430122375, 0.2767893671989441, 0.6973684430122375, 0.6973684430122375]
