In [1]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install tensorflow



In [3]:
import os
import numpy as np
import cv2
from glob import glob
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, CSVLogger
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
#from metrics import dice_loss, dice_coef, iou
#from unet import build_unet
from sklearn.model_selection import train_test_split

In [4]:
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model


In [5]:
def conv_block(inputs, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x



In [6]:
def encoder_block(inputs, num_filters):
    x = conv_block(inputs, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p




In [7]:
def decoder_block(inputs, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, 2 , strides=2, padding="same")(inputs)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x



In [8]:
from json import decoder
def build_unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)
    # print(s1.shape, s2.shape, s3.shape, s4.shape)
    # print(p1.shape, p2.shape, p3.shape, p4.shape)

    b1 = conv_block(p4, 1024) #Bridge

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model



In [9]:
if __name__ == "__main__":
    input_shape = (256, 256, 3)
    model = build_unet(input_shape)
    model.summary()


Model: "U-Net"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 256, 256, 64)         1792      ['input_1[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 256, 256, 64)         256       ['conv2d[0][0]']              
 Normalization)                                                                                   
                                                                                                  
 activation (Activation)     (None, 256, 256, 64)         0         ['batch_normalization[0][0

In [10]:
H = 256
W = 256


In [11]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)




In [12]:
def load_dataset(path, split=0.2):
    images = sorted(glob(os.path.join(path, "images", "*.png")))
    masks = sorted(glob(os.path.join(path, "masks", "*.png")))
    split_size = int(len(images) * split)
    #print(split_size)
    train_x, valid_x = train_test_split(images, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(masks, test_size=split_size, random_state=42)

    train_x, test_x = train_test_split(train_x, test_size=split_size, random_state=42)
    train_y, test_y = train_test_split(train_y, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y), (test_x, test_y)

In [13]:
def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    return x

In [14]:
def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = cv2.resize(x, (W, H))
    x = x/255.0
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x


In [15]:
def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y
    x,y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y


In [16]:
def tf_dataset(X,Y,batch=2):
    dataset = tf.data.Dataset.from_tensor_slices((X, Y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset


In [17]:
import numpy as np
import tensorflow as tf
from tensorflow.keras import backend as K

smooth = 1e-15
def dice_coef(y_true, y_pred):
    y_true = tf.keras.layers.Flatten()(y_true)
    y_pred = tf.keras.layers.Flatten()(y_pred)
    intersection = tf.reduce_sum(y_true * y_pred)
    return (2. * intersection + smooth) / (tf.reduce_sum(y_true) + tf.reduce_sum(y_pred) + smooth)


In [18]:
def dice_loss(y_true, y_pred):
    return 1.0 - dice_coef(y_true, y_pred)

In [19]:
from keras.api._v2.keras import callbacks
from tensorflow.python import train

if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)
    create_dir("/content/drive/MyDrive/files")

    batch_size = 8
    lr = 1e-4
    num_epochs = 500
    model_path = "/content/drive/MyDrive/files/model.h5"
    csv_path = "/content/drive/MyDrive/files/data.csv"
    dataset_path = "/content/drive/MyDrive/brain"
    (train_x, train_y), (valid_x, valid_y), (test_x, test_y) = load_dataset(dataset_path)
    # print(f"Train: {len(train_x)} - {len(train_y)}")
    # print(f"Valid: {len(valid_x)} - {len(valid_y)}")
    # print(f"Test: {len(test_x)} - {len(test_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)


    model = build_unet((H, W, 3))
    model.compile(loss=dice_loss, optimizer=tf.keras.optimizers.Adam(lr), metrics=[dice_coef])
    callbacks = [
        callbacks.ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, min_lr=1e-7, verbose=1),
        callbacks.CSVLogger(csv_path),
        callbacks.EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=False)
    ]

    model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks)





















Epoch 1/500
Epoch 1: val_loss improved from inf to 0.95527, saving model to /content/drive/MyDrive/files/model.h5


  saving_api.save_model(


Epoch 2/500
Epoch 2: val_loss improved from 0.95527 to 0.93420, saving model to /content/drive/MyDrive/files/model.h5
Epoch 3/500
Epoch 3: val_loss improved from 0.93420 to 0.63896, saving model to /content/drive/MyDrive/files/model.h5
Epoch 4/500
Epoch 4: val_loss improved from 0.63896 to 0.51742, saving model to /content/drive/MyDrive/files/model.h5
Epoch 5/500
Epoch 5: val_loss improved from 0.51742 to 0.41792, saving model to /content/drive/MyDrive/files/model.h5
Epoch 6/500
Epoch 6: val_loss did not improve from 0.41792
Epoch 7/500
Epoch 7: val_loss improved from 0.41792 to 0.34905, saving model to /content/drive/MyDrive/files/model.h5
Epoch 8/500
Epoch 8: val_loss improved from 0.34905 to 0.32455, saving model to /content/drive/MyDrive/files/model.h5
Epoch 9/500
Epoch 9: val_loss improved from 0.32455 to 0.30087, saving model to /content/drive/MyDrive/files/model.h5
Epoch 10/500
Epoch 10: val_loss improved from 0.30087 to 0.27737, saving model to /content/drive/MyDrive/files/mode

In [50]:
from tqdm import tqdm
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import f1_score, jaccard_score, precision_score, recall_score






In [51]:
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)



In [67]:
def save_results(image, mask, y_pred, save_image_path):
    mask = np.expand_dims(mask, axis=-1)
    mask = np.concatenate([mask, mask, mask], axis=-1)



    y_pred = np.expand_dims(y_pred, axis=-1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)
    y_pred = y_pred * 255

    line = np.ones((H, 10, 3)) * 255

    cat_images = np.concatenate([image, mask, y_pred, line], axis=1)
    cv2.imwrite(save_image_path, cat_images)







In [71]:
if __name__ == "__main__":
    np.random.seed(42)
    tf.random.set_seed(42)
    create_dir("/content/drive/MyDrive/files/results")

    with CustomObjectScope({'dice_loss': dice_loss, 'dice_coef': dice_coef}):
        model = tf.keras.models.load_model("/content/drive/MyDrive/files/model.h5")

    #model.summary()
    SCORE = []
    for x,y in tqdm(zip(test_x, test_y), total=len(test_y)):
       name = x.split("/")[-1]
       image = cv2.imread(x, cv2.IMREAD_COLOR)
       image = cv2.resize(image, (W, H))
       x = image / 255.0
       x = np.expand_dims(x, axis=0)

       mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
       mask = cv2.resize(mask, (W, H))


       y_pred = model.predict(x,verbose=0)[0]
       y_pred = np.squeeze(y_pred,axis=-1)
       y_pred = y_pred >= 0.5
       y_pred = y_pred.astype(np.int32)

       save_image_path = "/content/drive/MyDrive/files/results/"+name
       save_results(image,mask,y_pred,save_image_path)

       mask = mask/255.0
       mask = (mask > 0.5).astype(np.int32).flatten()
       y_pred = y_pred.flatten()

       f1_value = f1_score(mask, y_pred, labels=[0, 1], average="binary")
       jac_value = jaccard_score(mask, y_pred, labels=[0, 1], average="binary")
       recall_value = recall_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
       precision_value = precision_score(mask, y_pred, labels=[0, 1], average="binary", zero_division=0)
       SCORE.append([name, f1_value, jac_value, recall_value, precision_value])

    """ Metrics values """
    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis=0)
    print(f"F1: {score[0]:0.5f}")
    print(f"Jaccard: {score[1]:0.5f}")
    print(f"Recall: {score[2]:0.5f}")
    print(f"Precision: {score[3]:0.5f}")

    df = pd.DataFrame(SCORE, columns=["Image", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("/content/drive/MyDrive/files/score.csv")






100%|██████████| 612/612 [01:52<00:00,  5.42it/s]

F1: 0.74869
Jaccard: 0.66662
Recall: 0.75098
Precision: 0.79749



