In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import os
import nibabel
import imageio
from skimage.transform import resize
from skimage.io import imsave
from skimage.segmentation import mark_boundaries
from skimage.color import gray2rgb
from cv2 import bitwise_and, addWeighted
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, concatenate, Conv2D, MaxPooling2D, Conv2DTranspose , Dropout , BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
from skimage.exposure import rescale_intensity

In [2]:
image_rows = int(512/2)
image_cols = int(512/2)
Num_Classes = 3

In [16]:
imgs_train, imgs_mask_train = load_train_data()


In [3]:
def load_train_data():
  imgs_train = np.load('/content/drive/MyDrive/Pancreas_Unet/PancreasImg_train.npy')
  masks_train = np.load('/content/drive/MyDrive/Pancreas_Unet/PancreasLbs_train.npy')
  return imgs_train, masks_train

def load_test_data():
    imgs_test = np.load('/content/drive/MyDrive/Pancreas_Unet/PancreasImg_test.npy')
    masks_test = np.load('/content/drive/MyDrive/Pancreas_Unet/PancreasLbs_test.npy')

    return imgs_test,masks_test

# %% Build U-net model, loss function and metric

# TF dimension ordering in this code
K.set_image_data_format('channels_last')

In [7]:
def gen_dice_coef(y_true, y_pred, smooth=1e-7):
    '''
    Dice coefficient for num_classes labels (classes). Ignores background pixel label 0
    Pass to model as metric during compile statement
    '''
    y_true_f = K.flatten(K.one_hot(K.cast(y_true, 'int32'), num_classes = Num_Classes)[...,1:])
    y_pred_f = K.flatten(y_pred[...,1:])
    intersect = K.sum(y_true_f * y_pred_f, axis=-1)
    denom = K.sum(y_true_f + y_pred_f, axis=-1)
    return K.mean((2. * intersect / (denom + smooth)))

def gen_dice_coef_loss(y_true, y_pred):
    '''
    Dice loss to minimize. Pass to model as loss during compile statement
    '''
    return 1 - gen_dice_coef(y_true, y_pred)

def Unet():
    inputs = Input((image_rows, image_cols, 1))

    def conv_block(x, filters):
        x = Conv2D(filters, (3, 3), padding='same')(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        x = Conv2D(filters, (3, 3), padding='same')(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)
        return x

    conv1 = conv_block(inputs, 32)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = conv_block(pool1,64)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = conv_block(pool2, 128)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = conv_block(pool3, 256)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    # Bottleneck
    conv5 = conv_block(pool4, 512)
    conv5 = Dropout(0.5)(conv5)

    # Decoder
    up6 = concatenate([Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5), conv4], axis=3)
    conv6 = conv_block(up6, 256)

    up7 = concatenate([Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=3)
    conv7 = conv_block(up7, 128)

    up8 = concatenate([Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=3)
    conv8 = conv_block(up8, 64)

    up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=3)
    conv9 = conv_block(up9, 32)

    PredictedMask = Conv2D(Num_Classes, (1, 1), activation='softmax')(conv9)
    # last layer is the predicted mask/label image (organ+tumor), each pixel in the set {0,1}

    model = Model(inputs=[inputs], outputs=[PredictedMask])

    model.compile(optimizer=Adam(learning_rate=1e-4), loss=gen_dice_coef_loss, metrics=[gen_dice_coef])

    return model

In [None]:
def train_and_predict():
    print('-'*30)
    print('Loading and preprocessing train data...')
    print('-'*30)
    # load train data
    imgs_train, imgs_mask_train = load_train_data()

    # adapt the train images and masks dimensions so that we can feed it to the network (by inserting a new axis)
    imgs_train = imgs_train[..., np.newaxis]
    imgs_mask_train = imgs_mask_train[..., np.newaxis]

    # train data normalization: x --> z=(x-mean_x)/std_x --> mean_z=0 , std_z=1
    imgs_train = imgs_train.astype('float32')
    mean = np.mean(imgs_train)
    std = np.std(imgs_train)
    imgs_train -= mean
    imgs_train /= std

    # each pixel in the range [0,2] (pixels values belong to {0,1,2}), no need to normalized
    imgs_mask_train = imgs_mask_train.astype('float32')

    print('-'*30)
    print('Creating and compiling model...')
    print('-'*30)
    model = Unet()
    model_checkpoint = ModelCheckpoint('/content/drive/MyDrive/Pancreas_Unet/weights_pancreas_unet1.h5', monitor='val_loss', save_best_only=True)
    # saving the weights and the loss of the best predictions we obtained

    print('-'*30)
    print('Fitting model...')
    print('-'*30)
    history=model.fit(imgs_train, imgs_mask_train, batch_size=5, epochs=20, verbose=1, shuffle=True,
              validation_split=0.2,
              callbacks=[model_checkpoint])

In [None]:
train_and_predict()

------------------------------
Loading and preprocessing train data...
------------------------------
------------------------------
Creating and compiling model...
------------------------------
------------------------------
Fitting model...
------------------------------
Epoch 1/20


Expected: ['keras_tensor']
Received: inputs=Tensor(shape=(None, 256, 256, 1))


(None, 256, 256, 1)
(None, 256, 256, 3)
(None, 256, 256, 1)
(None, 256, 256, 3)
(None, 256, 256, 1)
(None, 256, 256, 3)
(None, 256, 256, 1)
(None, 256, 256, 3)
[1m695/931[0m [32m━━━━━━━━━━━━━━[0m[37m━━━━━━[0m [1m25s[0m 106ms/step - gen_dice_coef: 0.0267 - loss: 0.9733

KeyboardInterrupt: 

In [15]:
imgs_test, mask_test = load_test_data()


In [18]:
imgs_test = imgs_test[..., np.newaxis]
imgs_train = imgs_train[..., np.newaxis]
    # test data normalization: x --> z=(x-mean_x)/std_x --> mean_z=0 , std_z=1
imgs_train = imgs_train.astype('float32')
imgs_test = imgs_test.astype('float32')
mean = np.mean(imgs_train)
std = np.std(imgs_train)
imgs_test -= mean
imgs_test /= std

print('-'*30)
print('Loading saved weights...')
print('-'*30)
model = Unet()
model.load_weights('/content/drive/MyDrive/Pancreas_Unet/weights_pancreas_unet.h5')
print('-'*30)
print('Predicting masks on test data...')
print('-'*30)
imgs_mask_test = model.predict(imgs_test, verbose=1)
np.save('mask_predict4.npy', imgs_mask_test)
print('Saving to .npy files done.')



------------------------------
Loading saved weights...
------------------------------
------------------------------
Predicting masks on test data...
------------------------------


Expected: ['keras_tensor_207']
Received: inputs=Tensor(shape=(32, 256, 256, 1, 1))


[1m45/46[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 180ms/step

Expected: ['keras_tensor_207']
Received: inputs=Tensor(shape=(None, 256, 256, 1, 1))


[1m46/46[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 218ms/step
Saving to .npy files done.


In [9]:
mask_predict = np.load('/content/drive/MyDrive/Pancreas_Unet/predict/mask_predict_unet.npy')


In [11]:
mask_test = mask_test[..., np.newaxis]
mask_test = mask_test.astype('float32')

In [14]:
score = gen_dice_coef(mask_test, imgs_mask_test)
print(f'Dice Coefficient trên tập test: {score}')

Dice Coefficient trên tập test: 0.6335667371749878
