**Import & Extract Information from DICOM**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install pydicom
!pip install opencv-python

from pydicom import dcmread
import numpy as np
from PIL import Image
import cv2

# Path to the files
PATH = './drive/MyDrive/Colab Notebooks/SCMR Workshop/CMR/'

# Load the sample DICOM image
ds = dcmread(PATH + 'Sample_Dicom.dcm')

# Display the DICOM info
# print(ds)

# Extract the metadata "Spacing Between Slices"
print('Extracting the metadata "Spacing Between Slices":\n================================================')

print('\nMethod 1: using the name of the desired metadata')
print('Spacing Between Slices = {0}'.format(ds.SpacingBetweenSlices))

print('\nMethod 2: using the tag of the desired metadata')
element = ds[0x0018, 0x0088]
print(element)
print('Spacing Between Slices = {0}'.format(element.value))

In [None]:
# Extract the image from the DICOM file
image = ds.pixel_array

# Display the image dimensions
print('Image shape = {0}\n'.format(image.shape))

# Visualize the image
img = Image.fromarray(np.uint8(image), 'L')
img.save(PATH + 'Dicom_Image.png')
img

--------------------------------------------------------------------------

**Libraries**

In [None]:
!pip install visualkeras


import numpy as np
from scipy.io import loadmat
import matplotlib.pyplot as plt
from PIL import Image
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import Input, Dropout, concatenate
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from keras import backend as K

import visualkeras

**Functions**

In [None]:
# Average dice coefficient per batch
def dice_coef(y_true, y_pred, smooth=0.0):
    """
    Average dice coefficient per batch
    :param y_true: True labels
    :param y_pred: Predicted labels
    :param smooth: a scalar (default to 0.0)
    :return: the Dice index
    """
    axes = (1, 2, 3)
    intersection = K.sum(y_true * y_pred, axis=axes)
    summation = K.sum(y_true, axis=axes) + K.sum(y_pred, axis=axes)

    return K.mean((2.0 * intersection + smooth) / (summation + smooth), axis=0)

# Objective loss function to minimize, which is 1 minus the dice index
def dice_coef_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred, smooth=0.1)

# A simple U-Net model for image segmentation
def unet(pretrained_weights = None,input_size = (128, 128, 1)):
    inputs = Input(input_size)
    conv1 = Conv2D(2, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(inputs)
    conv1 = Conv2D(2, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(4, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(pool1)
    conv2 = Conv2D(4, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(8, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(pool2)
    conv3 = Conv2D(8, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(16, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(pool3)
    conv4 = Conv2D(16, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv4)
    drop4 = Dropout(0.5)(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(32, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(pool4)
    conv5 = Conv2D(32, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(16, 2, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(UpSampling2D(size = (2,2))(drop5))
    merge6 = concatenate([drop4,up6], axis = 3)
    conv6 = Conv2D(16, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(merge6)
    conv6 = Conv2D(16, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv6)

    up7 = Conv2D(8, 2, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(UpSampling2D(size = (2,2))(conv6))
    merge7 = concatenate([conv3,up7], axis = 3)
    conv7 = Conv2D(8, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(merge7)
    conv7 = Conv2D(8, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv7)

    up8 = Conv2D(4, 2, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(UpSampling2D(size = (2,2))(conv7))
    merge8 = concatenate([conv2,up8], axis = 3)
    conv8 = Conv2D(4, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(merge8)
    conv8 = Conv2D(4, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv8)

    up9 = Conv2D(2, 2, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(UpSampling2D(size = (2,2))(conv8))
    merge9 = concatenate([conv1,up9], axis = 3)
    conv9 = Conv2D(2, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(merge9)
    conv9 = Conv2D(2, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv9)
    conv9 = Conv2D(2, 3, activation = 'tanh', padding = 'same', kernel_initializer = 'glorot_uniform')(conv9)
    output = Conv2D(1, 1, activation = 'sigmoid')(conv9)

    model = Model(inputs, output)

    model.compile(optimizer = Adam(learning_rate = 0.001), loss = dice_coef_loss, metrics = [dice_coef])

    if(pretrained_weights):
      model.load_weights(pretrained_weights)

    return model

**Load the Data**

In [None]:
# OUR DATA IS ALREADY PREPROCESSED AND AVAILABLE ONLINE AT:  https://github.com/saeedkarimi/Cardiac_MRI_Segmentation
# FOR THE SOURCE OF DATA AND STUDY, PLEASE REFER TO THIS PAPER:  https://link.springer.com/article/10.1186/s12968-020-00678-0 

# Seed for random initializer
seed = 1234
np.random.seed(seed)

# Contour type: 'LV' or 'RV'
contour_type = 'LV'

# Random permutation of indices for 64 subjects. The first 45 are then selected as training and the rest are held as test set.
indices = np.random.permutation(64) + 1

# Define the U-Net model
model = unet()

# Print the model architecture
model.summary()
visualkeras.layered_view(model, legend=True, to_file=PATH + 'Model.png')

# Initialize the training and test data
training_images = np.zeros(shape=(1, 128, 128))
training_masks = np.zeros(shape=(1, 128, 128))
test_images = np.zeros(shape=(1, 128, 128))
test_masks = np.zeros(shape=(1, 128, 128))

# ==============================================================================
# Load the training data 'content/drive/MyDrive/Colab Notebooks/SCMR Workshop/CMR/'
for i in indices[0:45]:
  training_images = np.concatenate((training_images, loadmat(PATH + contour_type + '/Sbj_' + str(i) + '_' + contour_type + 'ED_image.mat')['image']), axis=0)
  training_masks = np.concatenate((training_masks, loadmat(PATH + contour_type + '/Sbj_' + str(i) + '_' + contour_type + 'ED_mask.mat')['mask']), axis=0)

training_images = training_images[1:]
training_masks = training_masks[1:]

# Reshape the data according to the model input size
training_images = np.expand_dims(training_images, axis=3)
training_masks = np.expand_dims(training_masks, axis=3)

print('\nTraining Data Size:')
print(training_images.shape)
print(training_masks.shape)

# ==============================================================================
# Load the test data
for i in indices[45:]:
  test_images = np.concatenate((test_images, loadmat(PATH + contour_type + '/Sbj_' + str(i) + '_' + contour_type + 'ED_image.mat')['image']), axis=0)
  test_masks = np.concatenate((test_masks, loadmat(PATH + contour_type + '/Sbj_' + str(i) + '_' + contour_type + 'ED_mask.mat')['mask']), axis=0)

test_images = test_images[1:]
test_masks = test_masks[1:]

# Reshape the data according to the model input size
test_images = np.expand_dims(test_images, axis=3)
test_masks = np.expand_dims(test_masks, axis=3)

print('\nTest Data Size:')
print(test_images.shape)
print(test_masks.shape)

img = cv2.imread(PATH + 'Model.png')
img = Image.fromarray(img.astype('uint8'))
img

**Training**

In [None]:
train_datagen = ImageDataGenerator(rotation_range=180, horizontal_flip=True, vertical_flip=True,
                                   featurewise_std_normalization=True, validation_split=0.3)

# Compute quantities required for featurewise normalization
# (std, mean, and principal components if ZCA whitening is applied)
train_datagen.fit(training_images)

# early_stopping = EarlyStopping(monitor='val_dice_coef', mode='max', verbose=1, patience=50, restore_best_weights=True)

# Model checkpoints
checkpoints = ModelCheckpoint(filepath=PATH + 'weights.h5', save_weights_only=True, monitor='val_dice_coef', mode='max', save_best_only=True)

# Fit the model on batches with real-time data augmentation:
history = model.fit(train_datagen.flow(training_images, training_masks, batch_size=16, subset='training'),
                    validation_data=train_datagen.flow(training_images, training_masks, batch_size=16, subset='validation'),
                    epochs=200, verbose=True, callbacks=[checkpoints])

# Uncomment this if early_stopping is uncommented and included in the callbacks
# model.save(PATH + 'weights.h5')

plt.plot(history.history['dice_coef'], 'r')
plt.plot(history.history['val_dice_coef'], 'b')
plt.title('Dice Per Epoch')
plt.ylabel('Dice')
plt.xlabel('Epoch')
plt.legend(['training dice', 'validation dice'], loc='upper left')
plt.grid(True)
plt.show()
plt.close()

**Evaluation**

In [None]:
# Clear the model graph
K.clear_session()

# Load the model
model = unet(pretrained_weights = PATH + 'weights.h5', input_size = (128, 128, 1))

# Predict segmentation masks for the test data using the trained model
predicted_masks = model.predict(test_masks, batch_size=4, verbose=True)

# Quantize the pixel values to 0 and 1
predicted_masks = np.where(predicted_masks > 0.5, 1.0, 0.0).astype('float32')

# Calculate the Dice metric
dice_metric = 0.0
for i in range(len(predicted_masks)):
  dice_metric += 2.0 * np.sum(np.multiply(test_masks[i], predicted_masks[i])) / (np.sum(test_masks[i]) + np.sum(predicted_masks[i]))

dice_metric /= len(predicted_masks)

print('\nTest Dice Metric = {0}\n'.format(dice_metric))

# Visualize sample segmentations
rgb = np.zeros(shape=(128, 128, 3))

image_number = 30
rgb[:, :, 0] = test_images[image_number, :, :, 0]
rgb[:, :, 1] = test_images[image_number, :, :, 0]
rgb[:, :, 2] = test_images[image_number, :, :, 0]
for i in range(128):
  for j in range(128):
    if test_masks[image_number, i, j, 0] == 1.0:
      rgb[i, j, 0] = 255.0
    if predicted_masks[image_number, i, j, 0] == 1.0:
      rgb[i, j, 1] = 255.0
img = Image.fromarray(rgb.astype('uint8'))
img.save(PATH + 'rgb.jpg')
img

**Studying Trained Models**

In [None]:
# LV Experiments
#   Exp1: We use checkpoints with 200 epochs and a learning-rate of 0.001
#   Exp2: We use checkpoints with 1000 epochs and a learning-rate of 0.001
#   Exp3: We use checkpoints with 1000 epochs and a learning-rate of 0.001
#   Exp4: We use early-stopping with the patience of 50 and a learning-rate of 0.001
#   Exp5: We use checkpoints with 200 epochs and a learning-rate of 0.1

# RV Experiments
#   Exp6: We use checkpoints with 200 epochs and a learning-rate of 0.001
#   Exp7: We use checkpoints with 1000 epochs and a learning-rate of 0.001
#   Exp8: We use early-stopping with the patience of 50 and a learning-rate of 0.001


EXP = 'Exp1'

# Clear the model graph
K.clear_session()

# Load the model
model = unet(pretrained_weights = PATH + EXP + '/weights.h5', input_size = (128, 128, 1))

# Predict segmentation masks for the test data using the trained model
predicted_masks = model.predict(test_masks, batch_size=4, verbose=True)

# Quantize the pixel values to 0 and 1
predicted_masks = np.where(predicted_masks > 0.5, 1.0, 0.0).astype('float32')

# Calculate the Dice metric
dice_metric = 0.0
for i in range(len(predicted_masks)):
  dice_metric += 2.0 * np.sum(np.multiply(test_masks[i], predicted_masks[i])) / (np.sum(test_masks[i]) + np.sum(predicted_masks[i]))

dice_metric /= len(predicted_masks)

print('\nTest Dice Metric = {0}\n'.format(dice_metric))

img = cv2.imread(PATH + EXP + '/train_val_curve.png')
img = Image.fromarray(img.astype('uint8'))
img

**Visualization**

In [None]:
# Visualize sample segmentations
rgb = np.zeros(shape=(128, 128, 3))

image_number = 30
rgb[:, :, 0] = test_images[image_number, :, :, 0]
rgb[:, :, 1] = test_images[image_number, :, :, 0]
rgb[:, :, 2] = test_images[image_number, :, :, 0]
for i in range(128):
  for j in range(128):
    if test_masks[image_number, i, j, 0] == 1.0:
      rgb[i, j, 0] = 255.0
    if predicted_masks[image_number, i, j, 0] == 1.0:
      rgb[i, j, 1] = 255.0
img = Image.fromarray(rgb.astype('uint8'))
img