# imports

In [45]:
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Concatenate, Conv2DTranspose, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras import backend as K
from tensorflow.keras.callbacks import ModelCheckpoint
from glob import glob
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
plt.style.use("ggplot")
import cv2
import pandas as pd
import numpy as np

In [25]:
train_files = []
mask_files = glob('../input/lgg-mri-segmentation/kaggle_3m/*/*_mask*')

for i in mask_files:
    train_files.append(i.replace('_mask',''))

print(train_files[:3])
print(mask_files[:3])

# load dataset

In [26]:
def plot_samples(rows, cols):
    fig = plt.figure(figsize=(12,12))
    for i in range(1, rows * cols + 1):
        fig.add_subplot(rows, cols, i)
        image_path = train_files[i]
        mask_path = mask_files[i]
        image = cv2.imread(image_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(mask_path)
        plt.imshow(image)
        plt.imshow(mask, alpha=0.6)
    plt.show()

plot_samples(3,4)

# create dataframe & split data

In [27]:
df = pd.DataFrame(data={'filename': train_files, 'mask': mask_files})
df_train, df_test = train_test_split(df, test_size=0.1, random_state=42) # test:0.1
df_train, df_val = train_test_split(df_train, test_size=0.2, random_state=42) # train: 0.7  validation: 0.2

print(df_train.values.shape)
print(df_val.values.shape)
print(df_test.values.shape)

# data generator

In [29]:
def train_generator(dataframe, 
                    batch_size, 
                    aug_dict,
                    image_color_mode = "rgb",
                    mask_color_mode = "grayscale",
                    image_save_prefix = "image",
                    mask_save_prefix = "mask",
                    save_to_dir = None,
                    target_size = (256,256),
                    seed = 1):
    image_datagen = ImageDataGenerator(**aug_dict)
    mask_datagen = ImageDataGenerator(**aug_dict)
    
    image_generator = image_datagen.flow_from_dataframe(dataframe,
                                                              x_col = "filename",
                                                              class_mode = None,
                                                              color_mode = image_color_mode,
                                                              target_size = target_size,
                                                              batch_size = batch_size,
                                                              save_to_dir = save_to_dir,
                                                              save_prefix  = image_save_prefix,
                                                              seed = seed)
    mask_generator = mask_datagen.flow_from_dataframe(dataframe,
                                                            x_col = "mask",
                                                            class_mode = None,
                                                            color_mode = mask_color_mode,
                                                            target_size = target_size,
                                                            batch_size = batch_size,
                                                            save_to_dir = save_to_dir,
                                                            save_prefix  = mask_save_prefix,
                                                            seed = seed)
    train_gen = zip(image_generator, mask_generator)
    for (image, mask) in train_gen:
        image, mask = normalize_data(image, mask)
        yield (image, mask)

        
def normalize_data(image, mask):
    image = image / 255
    mask = mask / 255
    mask[mask > 0.5] = 1 # tumor
    mask[mask <= 0.5] = 0 # no tumor
    return (image, mask)

# metrics

DSC = تابع زیان تاس
\begin{equation}
D S C=\frac{2|X \cap Y|}{|X|+|Y|}
\end{equation}

In [30]:
def dice_coef(y_true, y_pred, smooth = 100):
    y_true_flat = K.flatten(y_true)
    y_pred_flat = K.flatten(y_pred)
    intersection = K.sum(y_true_flat * y_pred_flat)
    sum_ = K.sum(y_true_flat) + K.sum(y_pred_flat)
    return (2 * intersection + smooth) / (sum_ + smooth)

def dice_coef_loss(y_true, y_pred):
    return -dice_coef(y_true, y_pred)

IOU = تابع زیان اشتراک در اجتماع متقارن

\begin{equation}
J(A, B)=\frac{|A \cap B|}{|A \cup B|}=\frac{|A \cap B|}{|A|+|B|-|A \cap B|}
\end{equation}

In [31]:
def iou(y_true, y_pred, smooth = 100):
    y_true_flat = K.flatten(y_true)
    y_pred_flat = K.flatten(y_pred)
    intersection = K.sum(y_true_flat * y_pred_flat)
    union = K.sum(y_true_flat) + K.sum(y_pred_flat) - intersection
    return (intersection + smooth) / (union + smooth)

# build model

In [32]:
def conv_block(input, num_filters):
    x = Conv2D(num_filters, (3,3), padding='same')(input)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    x = Conv2D(num_filters, (3,3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    
    return x

In [33]:
def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPooling2D(pool_size=(2,2))(x)
    return x,p

In [34]:
def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2,2), strides=2, padding='same')(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

In [35]:
def build_unet(input_shape):
    inputs = Input(input_shape)
    
    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)
    
    b1 = conv_block(p4, 1024) # bridge
    
    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)
    
    outputs = Conv2D(1, (1,1), padding='same', activation='sigmoid')(d4)
    model = Model(inputs, outputs, name="U-Net")
    return model

In [36]:
IMG_WIDTH = 256
IMG_HEIGHT = 256
IMG_CHANNELS = 3

input_shape = (IMG_HEIGHT, IMG_WIDTH, IMG_CHANNELS)

In [37]:
EPOCHS = 150
BATCH_SIZE = 32
learning_rate = 1e-4

In [38]:
train_generator_args = dict(rotation_range=0.2,
                            width_shift_range=0.05,
                            height_shift_range=0.05,
                            shear_range=0.05,
                            zoom_range=0.05,
                            horizontal_flip=True,
                            fill_mode='nearest')
train_gen = train_generator(df_train,
                            BATCH_SIZE,
                            train_generator_args,
                            target_size=(IMG_HEIGHT, IMG_WIDTH))
val_gen = train_generator(df_val,
                          BATCH_SIZE,
                          dict(),
                          target_size=(IMG_HEIGHT, IMG_WIDTH))
model = build_unet(input_shape)
decay_rate = learning_rate / EPOCHS
opt = Adam(learning_rate, beta_1=0.9, beta_2=0.999, epsilon=None, decay=decay_rate, amsgrad=False)
model.compile(optimizer=opt, loss=dice_coef_loss, metrics=["binary_accuracy", iou, dice_coef])
model.summary()

# train

In [39]:
callbacks = [ModelCheckpoint('unet_brain_mri_seg.hdf5', verbose=1, save_best_only=True)]
history = model.fit(train_gen,
                    steps_per_epoch=len(df_train) / BATCH_SIZE, 
                    epochs=EPOCHS, 
                    callbacks=callbacks,
                    validation_data = val_gen,
                    validation_steps=len(df_val) / BATCH_SIZE)

In [40]:
a = history.history

list_traindice = a['dice_coef']
list_testdice = a['val_dice_coef']

list_trainjaccard = a['iou']
list_testjaccard = a['val_iou']

list_trainloss = a['loss']
list_testloss = a['val_loss']

plt.figure(1)
plt.plot(list_testloss, 'b-')
plt.plot(list_trainloss,'r-')
plt.xlabel('iteration')
plt.ylabel('loss')
plt.title('loss graph', fontsize = 15)
plt.figure(2)
plt.plot(list_traindice, 'r-')
plt.plot(list_testdice, 'b-')
plt.xlabel('iteration')
plt.ylabel('accuracy')
plt.title('accuracy graph', fontsize = 15)
plt.show()

In [43]:
model = load_model('unet_brain_mri_seg.hdf5', custom_objects={'dice_coef_loss': dice_coef_loss, 'iou': iou, 'dice_coef': dice_coef})
test_gen = train_generator(df_test,
                           BATCH_SIZE,
                           dict(),
                           target_size=(IMG_WIDTH, IMG_HEIGHT))
results = model.evaluate(test_gen, steps=len(df_test) / BATCH_SIZE)
print("Test lost: ",results[0])
print("Test IOU: ",results[1])
print("Test Dice Coefficent: ",results[2])

In [47]:
for i in range(50):
    index=np.random.randint(1,len(df_test.index))
    img = cv2.imread(df_test['filename'].iloc[index])
    img = cv2.resize(img ,(IMG_HEIGHT, IMG_WIDTH))
    img = img / 255
    img = img[np.newaxis, :, :, :]
    pred=model.predict(img)

    plt.figure(figsize=(12,12))
    plt.subplot(1,3,1)
    plt.imshow(np.squeeze(img))
    plt.title('Original Image')
    plt.subplot(1,3,2)
    plt.imshow(np.squeeze(cv2.imread(df_test['mask'].iloc[index])))
    plt.title('Original Mask')
    plt.subplot(1,3,3)
    plt.imshow(np.squeeze(pred) > .5)
    plt.title('Prediction')
    plt.show()