In [1]:
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [2]:
# Import libraries 
import numpy as np
import nibabel as nib
import matplotlib.pyplot as plt 
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import os

In [None]:
# Import data 

def file_reader(filepath):
    
    image_nifti = nib.load(filepath)
    img = image_nifti.get_data()
  
  
    return img

path = "/content/drive/My Drive/MISA project/misa data"

# read data 

folders = ["Training_Set","Validation_Set","Test_Set"]


train_folders = [f for f in sorted(os.listdir(os.path.join(path,folders[0])))]
val_folders = [f for f in sorted(os.listdir(os.path.join(path,folders[1])))]
test_folders = [f for f in sorted(os.listdir(os.path.join(path,folders[2])))]

train_data = []
train_labels = []

print("Train ")

for i,name in enumerate(train_folders):
  train_data.append(file_reader(os.path.join(path,folders[0],name, name+".nii.gz")))
  train_labels.append(file_reader(os.path.join(path,folders[0],name, name+"_seg.nii.gz")))


test_data = []
test_labels = []

print("Val")

for i,name in enumerate(val_folders):
  test_data.append(file_reader(os.path.join(path,folders[1],name, name+".nii.gz")))
  test_labels.append(file_reader(os.path.join(path,folders[1],name, name+"_seg.nii.gz")))



train_data = np.array(train_data)
train_labels = np.array(train_labels)

test_data = np.array(test_data)
test_labels = np.array(test_labels)



print(train_data.shape, train_labels.shape,  test_data.shape, test_labels.shape)

In [4]:
def file_header(filepath):
    image_nifti = nib.load(filepath)
    header = image_nifti.header  
    return header

path = "/content/drive/My Drive/MISA project/misa data"

header_info=[]
for i,name in enumerate(val_folders):
  header_info.append(file_header(os.path.join(path,folders[1],name, name+".nii.gz")))

In [None]:
# shuffle train : 
np.random.seed(0)
x = list(range(train_data.shape[0]))
randomize = np.arange(len(x))
np.random.shuffle(randomize)
train_data = train_data[randomize]
train_labels = train_labels[randomize]

print(train_data.shape, train_labels.shape)

## Split train into train and validation

In [None]:
# split train into validation and train 
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(train_data, train_labels, test_size=0.3, random_state=42)

print(X_train.shape, X_val.shape, y_train.shape, y_val.shape, test_data.shape, test_labels.shape)

## Apply histogram equalization or other preprocessing 

In [None]:
# Apply ada histogram equalization on all data :

from skimage.exposure import equalize_adapthist

train = []
val= []
test = []

for i in range(len(X_train)):
  train.append(equalize_adapthist(np.squeeze(X_train[i])))

for i in range(len(X_val)):
  val.append(equalize_adapthist(np.squeeze(X_val[i])))

for i in range(len(test_data)):
  test.append(equalize_adapthist(np.squeeze(test_data[i])))

train = np.array(train)
val = np.array(val)
test = np.array(test)

print(train.shape, val.shape, test.shape)

In [8]:
train_labels = y_train
val_labels = y_val

## Cut data into slices

In [None]:
# Cut data into slices 

import pickle

print('Train')
tr_data = []
tr_labels = []


for i in range(len(train)):
  vol = train[i]
  vol_gt = np.squeeze(train_labels[i])
  for j in range(256):
    tr_data.append(vol[:,:,j])
    tr_labels.append(vol_gt[:,:,j])

print('Val')

vl_data = []
vl_labels = []

for i in range(len(val)):
  vol = val[i]
  vol_gt = np.squeeze(val_labels[i])
  for j in range(256):
    vl_data.append(vol[:,:,j])
    vl_labels.append(vol_gt[:,:,j])


tst_data = []
tst_labels = []

for i in range(len(test)):
  vol = test[i]
  vol_gt = np.squeeze(test_labels[i])
  for j in range(256):
    tst_data.append(vol[:,:,j])
    tst_labels.append(vol_gt[:,:,j])

In [None]:
train_data = np.expand_dims(np.array(tr_data), axis=3)
val_data = np.expand_dims(np.array(vl_data), axis=3)
test_data = np.expand_dims(np.array(tst_data), axis=3)

train_labels = np.expand_dims(np.array(tr_labels), axis=3)
val_labels = np.expand_dims(np.array(vl_labels), axis=3)
test_labels = np.expand_dims(np.array(tst_labels), axis=3)

print(train_data.shape, train_labels.shape, val_data.shape, val_labels.shape, test_data.shape, test_labels.shape)

## Extract patches and useful patches 

In [None]:
IMAGE_SIZE = (256, 128, 256)

N_INPUT_CHANNELS = 1
PATCH_SIZE = (32, 32)
PATCH_STRIDE = (16, 16)

image_size = IMAGE_SIZE
patch_size = PATCH_SIZE
stride = PATCH_STRIDE


def extract_patches(x, patch_size, patch_stride) :
  return tf.image.extract_patches(
    x,
    sizes=[1, *patch_size, 1],
    strides=[1, *patch_stride, 1],
    rates=[1, 1, 1, 1],
    padding='SAME', name=None)
  

def extract_useful_patches(
    volumes, labels,
    image_size=IMAGE_SIZE,
    patch_size=PATCH_SIZE,
    stride=PATCH_STRIDE,
    threshold=0.5,
    num_classes=4) :

  vol_patches = extract_patches(volumes, patch_size, stride).numpy()
  seg_patches = extract_patches(labels, patch_size, stride).numpy()

  vol_patches = vol_patches.reshape([-1, *patch_size, 1])
  seg_patches = seg_patches.reshape([-1, *patch_size, ])

  foreground_mask = seg_patches != 0

  useful_patches = foreground_mask.sum(axis=(1, 2)) > threshold * np.prod(patch_size)

  

  vol_patches = vol_patches[useful_patches]
  seg_patches = seg_patches[useful_patches]

  seg_patches = tf.keras.utils.to_categorical(
    seg_patches, num_classes=num_classes, dtype='float32')
  
  return (vol_patches, seg_patches)


#extract from train and validation : 

(train_patches, train_patches_seg) = extract_useful_patches(train_data, train_labels)
(val_patches, val_patches_seg) = extract_useful_patches(val_data, val_labels)

print(train_patches.shape, train_patches_seg.shape, val_patches.shape, val_patches_seg.shape)

## Oversampling for CSF 

In [None]:
number, x, y, _ = train_patches.shape

csf = []
idx = []
for i in range(number):
  if np.sum(train_patches_seg[i,:,:,1]) == 0:
    continue
  elif np.sum(train_patches_seg[i,:,:,1]) > 200:
    idx.append(i)
    csf.append(train_patches_seg[i,:,:,1])
csf = np.array(csf)
csf.shape


csf_filled = train_patches[idx]
csf_filled_mask = train_patches_seg[idx]

upsampleRate = 50

for i in range(upsampleRate):
  train_patches = np.append(train_patches,csf_filled, axis = 0)
  train_patches_seg= np.append(train_patches_seg, csf_filled_mask, axis = 0)

train_patches.shape, train_patches_seg.shape

x = list(range(train_patches.shape[0]))
randomize = np.arange(len(x))
np.random.shuffle(randomize)
train_patches = train_patches[randomize]
train_patches_seg = train_patches_seg[randomize]

## Calculate percentage of each class in the train patches

In [None]:
number, x, y, _ = train_patches_seg.shape

num = number*x*y
names = ['BG', 'CSF', 'GM', 'WM']

for i in range(4):
  print(f"{names[i]}: {np.round(np.sum(train_patches_seg[:,:,:,i])/num, 3)}")

## Define 2D UNET architecture

In [11]:
# Define Unet : 

N_EPOCHS = 20
BATCH_SIZE = 64
PATIENCE = 10
MODEL_FNAME_PATTERN = 'model.h5'
OPTIMISER = 'Adam'
LOSS = 'categorical_crossentropy'

# network parameters
N_CLASSES = 4
N_INPUT_CHANNELS = 1
PATCH_SIZE = (32, 32)



def get_unet(img_size=PATCH_SIZE, n_classes=N_CLASSES, n_input_channels=N_INPUT_CHANNELS, scale=2, drp_rate=0.2):
    inputs = keras.Input(shape=img_size + (n_input_channels, ))

    # Encoding path
    conv1 = layers.Conv2D(32*scale, (3, 3), padding="same", activation='relu')(inputs)
    drop1 = layers.Dropout(rate=drp_rate)(conv1, training=True)
    max1 = layers.MaxPooling2D((2, 2))(drop1)

    conv2 = layers.Conv2D(64*scale, (3, 3), padding="same", activation='relu')(max1)
    drop2 = layers.Dropout(rate=drp_rate)(conv2, training=True)
    max2 = layers.MaxPooling2D((2, 2))(drop2)

    conv3 = layers.Conv2D(128*scale, (3, 3), padding="same", activation='relu')(max2)
    drop3 = layers.Dropout(rate=drp_rate)(conv3, training=True)
    max3 = layers.MaxPooling2D((2, 2))(drop3)

    lat = layers.Conv2D(256*scale, (3, 3), padding="same", activation='relu')(max3)
    drop4 = layers.Dropout(rate=drp_rate)(lat, training=True)

    # Decoding path
    
    up1 = layers.UpSampling2D((2, 2))(drop4)
    concat1 = layers.concatenate([conv3, up1], axis=-1)
    conv4 = layers.Conv2D(128*scale, (3, 3), padding="same", activation='relu')(concat1)
    drop5 = layers.Dropout(rate=drp_rate)(conv4, training=True)

    up2 = layers.UpSampling2D((2, 2))(drop5)
    concat2 = layers.concatenate([conv2, up2], axis=-1)
    conv5 = layers.Conv2D(64*scale, (3, 3), padding="same", activation='relu')(concat2)
    drop6 = layers.Dropout(rate=drp_rate)(conv5, training=True)
    
    up3 = layers.UpSampling2D((2, 2))(drop6)
    concat3 = layers.concatenate([conv1, up3], axis=-1)
    conv6 = layers.Conv2D(32*scale, (3, 3), padding="same", activation='relu')(concat3)
    drop7 = layers.Dropout(rate=drp_rate)(conv6, training=True)

    outputs = layers.Conv2D(n_classes, (1, 1), activation="softmax")(drop7)

    model = keras.Model(inputs, outputs)

    return model

## Compile and train model 

In [None]:
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint

weights_dir = "/content/drive/My Drive/misa_unet_optim.h5"

checkpoint = ModelCheckpoint(filepath=weights_dir, save_weights_only=True, monitor='val_loss', verbose=1,
    save_best_only=True, mode='auto', period=1)

unet = get_unet()
unet.compile(optimizer=OPTIMISER, loss=LOSS)

In [None]:
EPOCHS = 15
unet.fit(
    x=train_patches, 
    y=train_patches_seg,
    validation_data=(val_patches, val_patches_seg),
    batch_size=64,
    epochs=EPOCHS, 
    callbacks = checkpoint)

## Load best weights and redifine input size for the validation set prediction

In [12]:
#load model with weights and compile 
unet_bis = get_unet(
    img_size=(256, 128),
    n_classes=N_CLASSES,
    n_input_channels=1)

unet_bis.compile(optimizer=OPTIMISER, loss=LOSS)
unet_bis.load_weights("/content/drive/My Drive/misa_unet.h5")

## Calculate DICE on validation set

In [13]:
def get_dice(seg,gt):
    dice = [] 
    for i in range(4):
        print(i)
        d = np.sum(seg[gt==i]==i)*2.0 / (np.sum(seg[seg==i]==i) + np.sum(gt[gt==i]==i))
        print('Dice similarity score is {}'.format(d))
        dice.append(d)
    return dice

In [None]:
#predict on validation 

prediction = unet_bis.predict(test_data)

prediction = np.argmax(prediction, axis=3)
validation_labels = np.squeeze(test_labels)
dice = get_dice(test_labels, prediction)

## Calculate relative volumetric difference

In [None]:
## VD

# 100 * (segm.sum() - gt.sum()) / float(gt.sum())

def get_vd(seg,gt):
  vd =[]
  for i in range(4):
    print(i)
    v = 100*((np.sum(seg[seg==i])-np.sum(gt[gt==i]))/np.sum(gt[gt==i]))
    print('vd {} '.format(v))
    vd.append(v)
  return vd

vd = get_vd(prediction,test_labels)

## transform to volumes again

In [23]:
def get_volume(slices):
  shape = slices.shape[0]
  iter = int(shape/256)
  volumes = np.zeros((iter,256,128,256))
  for i in range(iter):
    vol = np.zeros((256,128,256))
    for j in range(256):
      vol[:,:,j]= slices[j+i*256]
    volumes[i]= vol
  return volumes

In [None]:
test_predictions = get_volume(prediction)

print(test_predictions.shape)

## Save as NIFTII FILES

In [27]:
# save them onto nifti files: 

for i in range(5):

    ni_img_i = nib.Nifti1Image(test_predictions[i], affine=np.eye(4), header = header_info[i])
    header = header_info[i]
    ni_img_i.header['pixdim'][1:4] = header['pixdim'][1:4]
    nib.save(ni_img_i, '/content/drive/My Drive/'+f'predicted_test_volume_labels{i+1}.nii.gz')