<a href="https://colab.research.google.com/github/ziyadshezoo/Semantic-Segmentation/blob/main/Semantic_Segmentation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install albumentations



In [2]:
import os
import numpy as np
import cv2
from glob import glob
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from albumentations import HorizontalFlip, VerticalFlip, Rotate

In [3]:
def create_dir(path):
    """ Create a directory. """
    if not os.path.exists(path):
        os.makedirs(path)

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
def load_data(path, split=0.2):
    """ Load the images and fuse """
    org = sorted(glob(f"{path}/original/*.jpg"))
    fuse = sorted(glob(f"{path}/Fuse/*.png"))
    """ Split the data """
    split_size = int(len(org) * split)
    train_x, valid_x = train_test_split(org, test_size=split_size, random_state=42)
    train_y, valid_y = train_test_split(fuse, test_size=split_size, random_state=42)

    return (train_x, train_y), (valid_x, valid_y)

In [6]:
def augment_data(org, fuse, save_path, augment=True):
    """ Performing data augmentation. """
    H = 512
    W = 512

    for idx, (x, y) in tqdm(enumerate(zip(org, fuse)), total=len(org)):
        """ Extracting the dir name and org name """
        name =x.split("/")[-1].split("  ")[-1]
        """ Read the org and fuse """
        x = cv2.imread(x, cv2.IMREAD_COLOR)
        y = cv2.imread(y, cv2.IMREAD_COLOR)
        X = [x]
            #print(len(X))
        Y =[y]

        idx = 0
        for i, m in zip(X, Y):
            i = cv2.resize(i, (W, H))
            m = cv2.resize(m, (W, H))
            m = m/255.0
            m = (m > 0.5) * 255

            if len(X) == 1:  # each loop it will take one org
                tmp_image_name = f"{name}.jpg"
                tmp_fuse_name  = f"{name}.jpg"
#             else:
#                 tmp_image_name = f"{name}_{idx}.jpg"
#                 tmp_fuse_name  = f"{name}_{idx}.jpg"

            image_path = os.path.join(save_path, "image/", tmp_image_name)
            fuse_path  = os.path.join(save_path, "mask/", tmp_fuse_name)

            cv2.imwrite(image_path, i)
            cv2.imwrite(fuse_path, m)

            idx += 1


In [7]:
if __name__ == "__main__":
    """ Load the dataset """
    dataset_path = os.path.join("/content/drive/MyDrive/archive")

    (train_x, train_y), (valid_x, valid_y) = load_data(dataset_path, split=0.2)

    print("Train: ", len(train_x))
    print("Valid: ", len(valid_x))

    create_dir("new_data/train/image/")
    create_dir("new_data/train/mask/")
    create_dir("new_data/valid/image/")
    create_dir("new_data/valid/mask/")

    augment_data(train_x, train_y, "new_data/train/", augment=True)
    augment_data(valid_x, valid_y, "new_data/valid/", augment=False)

Train:  80
Valid:  20


100%|██████████| 80/80 [00:23<00:00,  3.40it/s]
100%|██████████| 20/20 [00:05<00:00,  3.87it/s]


In [8]:
from keras.models import *
from keras.layers import *
from keras.optimizers import *
import tensorflow as tf
import os
import random
import numpy as np
from tqdm import tqdm
from skimage.io import imread,imshow
from skimage.transform import resize
import matplotlib.pyplot as plt
IMG_WIDTH=512
IMG_HEIGHT=512
IMG_CHANNEL=3

In [9]:
!pip install segmentation_models

Collecting segmentation_models
  Downloading segmentation_models-1.0.1-py3-none-any.whl (33 kB)
Collecting keras-applications<=1.0.8,>=1.0.7 (from segmentation_models)
  Downloading Keras_Applications-1.0.8-py3-none-any.whl (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.7/50.7 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting image-classifiers==1.0.0 (from segmentation_models)
  Downloading image_classifiers-1.0.0-py3-none-any.whl (19 kB)
Collecting efficientnet==1.0.0 (from segmentation_models)
  Downloading efficientnet-1.0.0-py3-none-any.whl (17 kB)
Installing collected packages: keras-applications, image-classifiers, efficientnet, segmentation_models
Successfully installed efficientnet-1.0.0 image-classifiers-1.0.0 keras-applications-1.0.8 segmentation_models-1.0.1


In [27]:
def U_NET(input_shape):
  x=Input(input_shape)
  inputs=x
  #DownSampling
  f=64
  layers=[]
  for i in range(4):
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    x=Dropout(.1)(x)
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    layers.append(x)
    x=MaxPool2D()(x)
    f=f*2
    print(f)
  ff2=512
  #bottelneck
  j=len(layers)-1
  print(j)
  x=Conv2D(f,3,activation='relu',padding='same')(x)
  x=Dropout(.1)(x)
  x=Conv2D(f,3,activation='relu',padding='same')(x)
  x=Conv2DTranspose(ff2,2,strides=(2,2),padding='same')(x)
  x=Concatenate(axis=3)([x,layers[j]])
  j=j-1
  print(j)
  for i in range(3):
    ff2=ff2//2
    f=f//2
    print(f)
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    x=Conv2D(f,3,activation='relu',padding='same')(x)
    x=Conv2DTranspose(ff2,2,strides=(2,2),padding='same')(x)
    x=Concatenate(axis=3)([x,layers[j]])
    j=j-1
  x=Conv2D(64,3,activation='relu',padding='same')(x)
  x=Dropout(.1)(x)
  x=Conv2D(64,3,activation='relu',padding='same')(x)
  x=Conv2D(64,3,activation='relu',padding='same')(x)
  outputs=Conv2D(1,1,activation= 'sigmoid')(x)
  model=Model(inputs=[inputs],outputs=[outputs],name='U-Net')
  model.compile(optimizer=Adam(lr=1e-4),loss='binary_crossintropy',metrics=['mean_iou'])
  #model.summary()
  return model

In [28]:
if __name__=="__main__":
  model=U_NET((IMG_WIDTH,IMG_HEIGHT,IMG_CHANNEL))
  model.summary()

128
256
512
1024
3
2
512




256
128
Model: "U-Net"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_7 (InputLayer)        [(None, 512, 512, 3)]        0         []                            
                                                                                                  
 conv2d_138 (Conv2D)         (None, 512, 512, 64)         1792      ['input_7[0][0]']             
                                                                                                  
 dropout_30 (Dropout)        (None, 512, 512, 64)         0         ['conv2d_138[0][0]']          
                                                                                                  
 conv2d_139 (Conv2D)         (None, 512, 512, 64)         36928     ['dropout_30[0][0]']          
                                                                                      

In [12]:
from tensorflow.keras.layers import Conv2D, BatchNormalization, Activation, MaxPool2D, Conv2DTranspose, Concatenate, Input
from tensorflow.keras.models import Model

def conv_block(input, num_filters):
    x = Conv2D(num_filters, 3, padding="same")(input)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x = Conv2D(num_filters, 3, padding="same")(x)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    return x


def encoder_block(input, num_filters):
    x = conv_block(input, num_filters)
    p = MaxPool2D((2, 2))(x)
    return x, p


def decoder_block(input, skip_features, num_filters):
    x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
    x = Concatenate()([x, skip_features])
    x = conv_block(x, num_filters)
    return x

def build_unet(input_shape):
    inputs = Input(input_shape)

    s1, p1 = encoder_block(inputs, 64)
    s2, p2 = encoder_block(p1, 128)
    s3, p3 = encoder_block(p2, 256)
    s4, p4 = encoder_block(p3, 512)

    b1 = conv_block(p4, 1024)

    d1 = decoder_block(b1, s4, 512)
    d2 = decoder_block(d1, s3, 256)
    d3 = decoder_block(d2, s2, 128)
    d4 = decoder_block(d3, s1, 64)

    outputs = Conv2D(1, 1, padding="same", activation="sigmoid")(d4)

    model = Model(inputs, outputs, name="U-Net")
    return model


if __name__ == "__main__":
    input_shape = (512, 512, 3)
    model = build_unet(input_shape)
    model.summary()

Model: "U-Net"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 512, 512, 3)]        0         []                            
                                                                                                  
 conv2d_19 (Conv2D)          (None, 512, 512, 64)         1792      ['input_2[0][0]']             
                                                                                                  
 batch_normalization (Batch  (None, 512, 512, 64)         256       ['conv2d_19[0][0]']           
 Normalization)                                                                                   
                                                                                                  
 activation (Activation)     (None, 512, 512, 64)         0         ['batch_normalization[0][0

In [29]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
import numpy as np
import cv2
from glob import glob
from sklearn.utils import shuffle
import tensorflow as tf
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger, ReduceLROnPlateau, EarlyStopping, TensorBoard
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import Recall, Precision
# from modell import U_NET
from metrics import dice_loss, dice_coef, iou

H = 512
W = 512

def create_dir(path):
    """ Create a directory. """
    if not os.path.exists(path):
        os.makedirs(path)

def shuffling(x, y):
    x, y = shuffle(x, y, random_state=42)
    return x, y

def load_data(path):
    x = sorted(glob(os.path.join(path, "image", "*.jpg")))
    y = sorted(glob(os.path.join(path, "mask", "*.jpg")))
    return x, y

def read_image(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_COLOR)
    x = x/255.0
    x = x.astype(np.float32)
    return x

def read_mask(path):
    path = path.decode()
    x = cv2.imread(path, cv2.IMREAD_GRAYSCALE)
    x = x/255.0
    x = x > 0.5
    x = x.astype(np.float32)
    x = np.expand_dims(x, axis=-1)
    return x

def tf_parse(x, y):
    def _parse(x, y):
        x = read_image(x)
        y = read_mask(y)
        return x, y

    x, y = tf.numpy_function(_parse, [x, y], [tf.float32, tf.float32])
    x.set_shape([H, W, 3])
    y.set_shape([H, W, 1])
    return x, y

def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.map(tf_parse)
    dataset = dataset.batch(batch)
    dataset = dataset.prefetch(10)
    return dataset


if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("files")

    """ Hyperparameters """
    batch_size = 2
    lr = 1e-4
    num_epochs = 5
    model_path = os.path.join("files", "model.h5")
    csv_path = os.path.join("files", "data.csv")

    """ Dataset """
    dataset_path = os.path.join("new_data")
    train_path = os.path.join(dataset_path, "train")
    valid_path = os.path.join(dataset_path, "valid")

    train_x, train_y = load_data(train_path)
    train_x, train_y = shuffling(train_x, train_y)
    valid_x, valid_y = load_data(valid_path)

    print(f"Train: {len(train_x)} - {len(train_y)}")
    print(f"Valid: {len(valid_x)} - {len(valid_y)}")

    train_dataset = tf_dataset(train_x, train_y, batch=batch_size)
    valid_dataset = tf_dataset(valid_x, valid_y, batch=batch_size)

    """ Model """
    model = U_NET((H, W, 3))
    metrics = [dice_coef, iou, Recall(), Precision()]
    model.compile(loss=dice_loss, optimizer=Adam(lr=1e-6), metrics=metrics)

    callbacks = [
        ModelCheckpoint(model_path, verbose=1, save_best_only=True),
        ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=10, min_lr=1e-7, verbose=1),
        CSVLogger(csv_path),
        TensorBoard(),
        EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=False),
    ]



Train: 80 - 80
Valid: 20 - 20
128
256
512
1024
3
2
512
256
128




In [30]:
model.fit(
        train_dataset,
        epochs=num_epochs,
        validation_data=valid_dataset,
        callbacks=callbacks,
        shuffle=False
    )

Epoch 1/5
Epoch 1: val_loss improved from inf to 0.66409, saving model to files/model.h5


  saving_api.save_model(


Epoch 2/5
Epoch 2: val_loss did not improve from 0.66409
Epoch 3/5
Epoch 3: val_loss did not improve from 0.66409
Epoch 4/5
Epoch 4: val_loss did not improve from 0.66409
Epoch 5/5
Epoch 5: val_loss did not improve from 0.66409


<keras.src.callbacks.History at 0x7de4a7634490>

In [31]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"

import numpy as np
import cv2
import pandas as pd
from glob import glob
from tqdm import tqdm
import tensorflow as tf
from tensorflow.keras.utils import CustomObjectScope
from sklearn.metrics import accuracy_score, f1_score, jaccard_score, precision_score, recall_score
from metrics import dice_loss, dice_coef, iou
# from train import load_data

H = 512
W = 512

""" Creating a directory """
def create_dir(path):
    if not os.path.exists(path):
        os.makedirs(path)

def save_results(image, mask, y_pred, save_image_path):
    ## i - m - y
    line = np.ones((H, 10, 3)) * 128

    """ Mask """
    mask = np.expand_dims(mask, axis=-1)    ## (512, 512, 1)
    mask = np.concatenate([mask, mask, mask], axis=-1)  ## (512, 512, 3)

    """ Predicted Mask """
    y_pred = np.expand_dims(y_pred, axis=-1)    ## (512, 512, 1)
    y_pred = np.concatenate([y_pred, y_pred, y_pred], axis=-1)  ## (512, 512, 3)
    y_pred = y_pred * 255

    cat_images = np.concatenate([image, line, mask, line, y_pred], axis=1)
    cv2.imwrite(save_image_path, cat_images)

if __name__ == "__main__":
    """ Seeding """
    np.random.seed(42)
    tf.random.set_seed(42)

    """ Directory for storing files """
    create_dir("results")

    """ Loading model """
    with CustomObjectScope({'iou': iou, 'dice_coef': dice_coef, 'dice_loss': dice_loss}):
        model = tf.keras.models.load_model("files/model.h5")

    """ Load the dataset """
    test_x = sorted(glob(os.path.join("new_data", "valid", "image", "*")))
    test_y = sorted(glob(os.path.join("new_data", "valid", "mask", "*")))
    print(f"Test: {len(test_x)} - {len(test_y)}")

    """ Evaluation and Prediction """
    SCORE = []
    for x, y in tqdm(zip(test_x, test_y), total=len(test_x)):
        """ Extract the name """
        name = x.split("/")[-1].split(".")[0]

        """ Reading the image """
        image = cv2.imread(x, cv2.IMREAD_COLOR)
        x = image/255.0
        x = np.expand_dims(x, axis=0)

        """ Reading the mask """
        mask = cv2.imread(y, cv2.IMREAD_GRAYSCALE)
        y = mask/255.0
        y = y > 0.5
        y = y.astype(np.int32)

        """ Prediction """
        y_pred = model.predict(x)[0]
        y_pred = np.squeeze(y_pred, axis=-1)
        y_pred = y_pred > 0.5
        y_pred = y_pred.astype(np.int32)

        """ Saving the prediction """
        save_image_path = f"results/{name}.png"
        save_results(image, mask, y_pred, save_image_path)

        """ Flatten the array """
        y = y.flatten()
        y_pred = y_pred.flatten()

        """ Calculating the metrics values """
        acc_value = accuracy_score(y, y_pred)
        f1_value = f1_score(y, y_pred, labels=[0, 1], average="binary", zero_division=1)
        jac_value = jaccard_score(y, y_pred, labels=[0, 1], average="binary", zero_division=1)
        recall_value = recall_score(y, y_pred, labels=[0, 1], average="binary", zero_division=1)
        precision_value = precision_score(y, y_pred, labels=[0, 1], average="binary", zero_division=1)
        SCORE.append([name, acc_value, f1_value, jac_value, recall_value, precision_value])

    """ Metrics values """
    score = [s[1:]for s in SCORE]
    score = np.mean(score, axis=0)
    print(f"Accuracy: {score[0]:0.5f}")
    print(f"F1: {score[1]:0.5f}")
    print(f"Jaccard: {score[2]:0.5f}")
    print(f"Recall: {score[3]:0.5f}")
    print(f"Precision: {score[4]:0.5f}")

    df = pd.DataFrame(SCORE, columns=["Image", "Accuracy", "F1", "Jaccard", "Recall", "Precision"])
    df.to_csv("files/score.csv")

Test: 20 - 20


  0%|          | 0/20 [00:00<?, ?it/s]



  5%|▌         | 1/20 [00:00<00:15,  1.25it/s]



 10%|█         | 2/20 [00:01<00:11,  1.61it/s]



 15%|█▌        | 3/20 [00:01<00:09,  1.76it/s]



 20%|██        | 4/20 [00:02<00:08,  1.88it/s]



 25%|██▌       | 5/20 [00:02<00:07,  2.00it/s]



 30%|███       | 6/20 [00:03<00:07,  2.00it/s]



 35%|███▌      | 7/20 [00:03<00:06,  1.99it/s]



 40%|████      | 8/20 [00:04<00:06,  1.98it/s]



 45%|████▌     | 9/20 [00:04<00:05,  2.05it/s]



 50%|█████     | 10/20 [00:05<00:04,  2.05it/s]



 55%|█████▌    | 11/20 [00:05<00:04,  2.11it/s]



 60%|██████    | 12/20 [00:06<00:03,  2.05it/s]



 65%|██████▌   | 13/20 [00:06<00:03,  2.11it/s]



 70%|███████   | 14/20 [00:07<00:02,  2.13it/s]



 75%|███████▌  | 15/20 [00:07<00:02,  2.08it/s]



 80%|████████  | 16/20 [00:08<00:01,  2.09it/s]



 85%|████████▌ | 17/20 [00:08<00:01,  1.89it/s]



 90%|█████████ | 18/20 [00:09<00:01,  1.66it/s]



 95%|█████████▌| 19/20 [00:10<00:00,  1.54it/s]



100%|██████████| 20/20 [00:10<00:00,  1.84it/s]

Accuracy: 0.20986
F1: 0.32589
Jaccard: 0.20986
Recall: 1.00000
Precision: 0.20986



