# Crack_Segmentation with UNet
## dataset https://www.kaggle.com/datasets/lakshaymiddha/crack-segmentation-dataset?resource=download

In [18]:
import glob
import os
import cv2
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, concatenate, Conv2DTranspose
from tensorflow.keras.layers import BatchNormalization, Activation
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping
from tensorflow.keras import backend as K
from tensorflow.keras.preprocessing.image import array_to_img, img_to_array, load_img
from sklearn.metrics import confusion_matrix

# Data_load

In [19]:
def train_data_loading(train_path,image_size = 512):

    out_rows = image_size
    out_cols = image_size
    
    img_path = train_path + '/images'
    mask_path = train_path + '/masks'
    
    print('-'*30)
    print('Creating training images...')
    print('-'*30)
    
    imgs = glob.glob(img_path +"/*.jpg")
    labels = glob.glob(mask_path +"/*.jpg")
    imgdatas = np.ndarray((len(imgs),out_rows,out_cols,1), dtype=np.uint8)
    imglabels = np.ndarray((len(imgs),out_rows,out_cols,1), dtype=np.uint8)
    imgnames=[]
    
    for i, imgname in enumerate(imgs):
        if i%100==0:
            print('{}/{}'.format(i, len(imgs)))
        name = os.path.split(imgname)[1][:-4]
        img = load_img(imgname, color_mode = "grayscale")
        labelname= mask_path + '/' + os.path.split(imgname)[1]
        label = load_img(labelname, color_mode = "grayscale")
        img=img.resize((out_rows,out_cols))
        label=label.resize((out_rows,out_cols))

        img = img_to_array(img)
        label = img_to_array(label)
        imgdatas[i] = img
        imglabels[i] = label
        imgnames.append(name)

    imgdatas = imgdatas.astype('float32')
    imglabels = imglabels.astype('float32')
    
    print('img : ', imgdatas.max())
    print('mask : ',imglabels.max())
    
    print('-'*30)
    print('normalization start...')
    print('-'*30)
    imgdatas = imgdatas/255.0
    
    imglabels[imglabels <= 127] = 0
    imglabels[imglabels > 127] = 1
    
    print('img : ',imgdatas.max())
    print('mask : ',imglabels.max())
    print('mask : ',imglabels.min())
    print('loading done')
    
    return imgdatas, imglabels, imgnames

## Function

In [20]:
def mkfolder(folder):
    if not os.path.lexists(folder):
        os.makedirs(folder)
    

def dice_coef(y_true, y_pred):
        y_true_f = K.flatten(y_true)
        y_pred_f = K.flatten(y_pred)
        intersection = K.sum(y_true_f * y_pred_f)
        return (2. * intersection + K.epsilon()) / (K.sum(y_true_f) + K.sum(y_pred_f) + K.epsilon())

def dice_coef_loss(y_true, y_pred):
        return 1-dice_coef(y_true, y_pred)

def sens(y_true, y_pred): # sensitivity, recall
    print(y_pred)
    print(y_true)
    y_target_yn = K.round(K.clip(y_true, 0, 1)) # 실제값을 0(Negative) 또는 1(Positive)로 설정한다
    y_pred_yn = K.round(K.clip(y_pred, 0, 1)) # 예측값을 0(Negative) 또는 1(Positive)로 설정한다

    # True Positive는 실제 값과 예측 값이 모두 1(Positive)인 경우이다
    count_true_positive = K.sum(y_target_yn * y_pred_yn) 

    # (True Positive + False Negative) = 실제 값이 1(Positive) 전체
    count_true_positive_false_negative = K.sum(y_target_yn)

    # Recall =  (True Positive) / (True Positive + False Negative)
    # K.epsilon()는 'divide by zero error' 예방차원에서 작은 수를 더한다
    recall = count_true_positive / (count_true_positive_false_negative + K.epsilon())

    # return a single tensor value
    return recall

def sch(epoch):
    if epoch>100 and epoch<=250:
        return 0.0001
    elif epoch>250:
        return 0.00001
    else:
        return 0.001

# Model Shape

In [21]:
def get_unet(img_rows, img_cols):
    inputs = Input((img_rows, img_cols,1))
    conv1 = Conv2D(32, (3, 3), activation=None, padding='same')(inputs)
    conv1 = BatchNormalization()(conv1)
    conv1 = Activation('relu')(conv1)
    conv1 = Conv2D(32, (3, 3), activation=None, padding='same')(conv1)
    conv1 = BatchNormalization()(conv1)
    conv1 = Activation('relu')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)

    conv2 = Conv2D(64, (3, 3), activation=None, padding='same')(pool1)
    conv2 = BatchNormalization()(conv2)
    conv2 = Activation('relu')(conv2)
    conv2 = Conv2D(64, (3, 3), activation=None, padding='same')(conv2)
    conv2 = BatchNormalization()(conv2)
    conv2 = Activation('relu')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)

    conv3 = Conv2D(128, (3, 3), activation=None, padding='same')(pool2)
    conv3 = BatchNormalization()(conv3)
    conv3 = Activation('relu')(conv3)
    conv3 = Conv2D(128, (3, 3), activation=None, padding='same')(conv3)
    conv3 = BatchNormalization()(conv3)
    conv3 = Activation('relu')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)

    conv4 = Conv2D(256, (3, 3), activation=None, padding='same')(pool3)
    conv4 = BatchNormalization()(conv4)
    conv4 = Activation('relu')(conv4)
    conv4 = Conv2D(256, (3, 3), activation=None, padding='same')(conv4)
    conv4 = BatchNormalization()(conv4)
    conv4 = Activation('relu')(conv4)
    pool4 = MaxPooling2D(pool_size=(2, 2))(conv4)

    conv5 = Conv2D(512, (3, 3), activation=None, padding='same')(pool4)
    conv5 = BatchNormalization()(conv5)
    conv5 = Activation('relu')(conv5)
    conv5 = Conv2D(512, (3, 3), activation=None, padding='same')(conv5)
    conv5 = BatchNormalization()(conv5)
    conv5 = Activation('relu')(conv5)

    up6 = concatenate([Conv2DTranspose(256, (2, 2), strides=(2, 2), padding='same')(conv5), conv4], axis=3)
    conv6 = Conv2D(256, (3, 3), activation=None, padding='same')(up6)
    conv6 = BatchNormalization()(conv6)
    conv6 = Activation('relu')(conv6)
    conv6 = Conv2D(256, (3, 3), activation=None, padding='same')(conv6)
    conv6 = BatchNormalization()(conv6)
    conv6 = Activation('relu')(conv6)

    up7 = concatenate([Conv2DTranspose(128, (2, 2), strides=(2, 2), padding='same')(conv6), conv3], axis=3)
    conv7 = Conv2D(128, (3, 3), activation=None, padding='same')(up7)
    conv7 = BatchNormalization()(conv7)
    conv7 = Activation('relu')(conv7)
    conv7 = Conv2D(128, (3, 3), activation=None, padding='same')(conv7)
    conv7 = BatchNormalization()(conv7)
    conv7 = Activation('relu')(conv7)

    up8 = concatenate([Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv7), conv2], axis=3)
    conv8 = Conv2D(64, (3, 3), activation=None, padding='same')(up8)
    conv8 = BatchNormalization()(conv8)
    conv8 = Activation('relu')(conv8)
    conv8 = Conv2D(64, (3, 3), activation=None, padding='same')(conv8)
    conv8 = BatchNormalization()(conv8)
    conv8 = Activation('relu')(conv8)

    up9 = concatenate([Conv2DTranspose(32, (2, 2), strides=(2, 2), padding='same')(conv8), conv1], axis=3)
    conv9 = Conv2D(32, (3, 3), activation=None, padding='same')(up9)
    conv9 = BatchNormalization()(conv9)
    conv9 = Activation('relu')(conv9)
    conv9 = Conv2D(32, (3, 3), activation=None, padding='same')(conv9)
    conv9 = BatchNormalization()(conv9)
    conv9 = Activation('relu')(conv9)

    conv10 = Conv2D(1, (1, 1), activation='sigmoid')(conv9)

    model = Model(inputs=inputs, outputs=conv10)

    return model

# Train

In [22]:
def deep(imgs_train, imgs_mask_train, path, batch_size = 4, epochs = 10, image_size=512): 

    model = get_unet(image_size, image_size)
    model.compile(optimizer=Adam(lr=0.001), loss=dice_coef_loss, 
                        metrics=['accuracy', sens, dice_coef_loss])
    
    check_model_path = path+'/crack_check'
    predict_path = path+'/crack_pred'
    mkfolder(check_model_path)
    mkfolder(predict_path)

    model_checkpoint = ModelCheckpoint(check_model_path + '/final_{epoch:d}_{loss:f}.hdf5', 
                                        monitor='val_dice_coef_loss',verbose=1, 
                                        save_best_only=False)
#     earlystopping = EarlyStopping(monitor='val_dice_coef_loss', patience=30, restore_best_weights=True)
#     cosine_schedule = CosineAnnealingWarmup(epochs_per_cycle=60, iteration=1,max_lr = 1e-3, min_lr = 1e-6)
    
    print('Fitting model...')
    model.fit(imgs_train, imgs_mask_train, batch_size=batch_size,  validation_split=0.1,epochs=epochs, verbose=1, 
              shuffle=True, callbacks = [model_checkpoint])
    print('save model')
    model.save(predict_path + '/final.h5')
    return model


# Predict

In [23]:
def predict_save(pred_list,name_list,test_path):
    
    print('-'*30)
    print('Saving test images...')
    print('-'*30)
    
    pred_img_path = test_path + '/pred'
    mkfolder(pred_img_path)

    imgs = pred_list
    for i in range(imgs.shape[0]):
        img = imgs[i]
        img[img <= 0.5] = 0
        img[img > 0.5] = 255
        img = array_to_img(img)
        img.save(pred_img_path+"/%s_pred.jpg" %(name_list[i]))

def predict_val(model,test_path,image_size=512):
    
    imgs_test, imgs_label_test, test_name = create_test_data(test_path, image_size, image_size)
    print('predict test data')
    
    imgs_label_pred = model.predict(imgs_test, batch_size=4, verbose=1)
    name_list=test_name
    df = pd.DataFrame(columns=['name', 'acc', 'sen', 'spe', 'dsc'],dtype = float)
    df = df.astype({'name': 'str'})

    true_list=imgs_label_test
    print(true_list.shape)

    pred_list=imgs_label_pred
    print(np.unique(pred_list))
    pred_list[pred_list > 0.5] = 1
    pred_list[pred_list <= 0.5] = 0
    
    sensitivity=[]
    specificity=[]
    acc=[]
    dsc=[]

    for i in range(len(true_list)):
        yt=true_list[i].flatten()
        yp=pred_list[i].flatten()
        mat=confusion_matrix(yt,yp)
        if len(mat) == 2:
            ac=(mat[1,1]+mat[0,0])/(mat[1,0]+mat[1,1]+mat[0,1]+mat[0,0])
            st=mat[1,1]/(mat[1,0]+mat[1,1])
            sp=mat[0,0]/(mat[0,1]+mat[0,0])
            if mat[1,0]+mat[1,1] == 0:
                specificity.append(sp)
                acc.append(ac)
            else:
                sensitivity.append(st)  
                specificity.append(sp)
                acc.append(ac)
        else:
            specificity.append(1)
            acc.append(1)

        yt=true_list[i]
        yp=pred_list[i]
        if np.sum(yt) != 0 and np.sum(yp) != 0:
            dice = np.sum(yp[yt==1])*2.0 / (np.sum(yt) + np.sum(yp))
            dsc.append(dice)
            df=  df.append({'name':name_list[i], 'acc':ac, 'sen':st, 'spe':sp, 'dsc':dice}, ignore_index=True)

    print("complete")      
    print("acc avg : {0:0.4f}".format(np.mean(acc)))
    print("sensitivity avg : {0:0.4f}".format(np.mean(sensitivity)))
    print("specificity avg : {0:0.4f}".format(np.mean(specificity)))
    print("dsc avg : {0:0.4f}".format(np.mean(dsc)))
    
    

    predict_save(pred_list,name_list,test_path)
    return test_name

In [24]:
def create_test_data(test_path, out_rows, out_cols):
    
    img_path = test_path + '/images'
    mask_path = test_path + '/masks'
    
    print('-'*30)
    print('Creating test images...')
    print('-'*30)
    
    imgs = glob.glob(img_path +"/*.jpg")
    labels = glob.glob(mask_path +"/*.jpg")
    imgdatas = np.ndarray((len(imgs),out_rows,out_cols,1), dtype=np.uint8)
    imglabels = np.ndarray((len(imgs),out_rows,out_cols,1), dtype=np.uint8)
    imgnames=[]
    
    for i, imgname in enumerate(imgs):
        if i%100==0:
            print('{}/{}'.format(i, len(imgs)))
        name = os.path.split(imgname)[1][:-4]
        img = load_img(imgname, color_mode = "grayscale")
        labelname= mask_path + '/' + os.path.split(imgname)[1]
        label = load_img(labelname, color_mode = "grayscale")
        img=img.resize((out_rows,out_cols))
        label=label.resize((out_rows,out_cols))

        img = img_to_array(img)
        label = img_to_array(label)
        imgdatas[i] = img
        imglabels[i] = label
        imgnames.append(name)

    imgdatas = imgdatas.astype('float32')
    imglabels = imglabels.astype('float32')
    
    print('img : ', imgdatas.max())
    print('mask : ',imglabels.max())
    
    print('-'*30)
    print('normalization start...')
    print('-'*30)
    imgdatas = imgdatas/255.0
    
    imglabels[imglabels <= 127] = 0
    imglabels[imglabels > 127] = 1
    
    print('img : ',imgdatas.max())
    print('mask : ',imglabels.max())
    print('mask : ',imglabels.min())
    print('loading done')
    
    return imgdatas, imglabels, imgnames

# Main

In [25]:
def main():
    # 주어진 Train Dataset 경로.
    path = 'C:/Users/s_wnsgk4041/crack_segmentation_dataset'
    train_path = 'C:/Users/s_wnsgk4041/crack_segmentation_dataset/train'
    test_path = 'C:/Users/s_wnsgk4041/crack_segmentation_dataset/test'
    image_size = 512
    epochs = 1
    batch_sizes = 4
    
    imgs_train, imgs_mask_train, imgs_name = train_data_loading(train_path, image_size = image_size)
    model = deep(imgs_train, imgs_mask_train, path, batch_size = batch_sizes, epochs = epochs, image_size=image_size)

    test_name = predict_val(model, test_path, image_size = image_size)
    
if __name__ == "__main__":
    main()

------------------------------
Creating training images...
------------------------------
0/9603
100/9603
200/9603
300/9603
400/9603
500/9603
600/9603
700/9603
800/9603
900/9603
1000/9603
1100/9603
1200/9603
1300/9603
1400/9603
1500/9603
1600/9603
1700/9603
1800/9603
1900/9603
2000/9603
2100/9603
2200/9603
2300/9603
2400/9603
2500/9603
2600/9603
2700/9603
2800/9603
2900/9603
3000/9603
3100/9603
3200/9603
3300/9603
3400/9603
3500/9603
3600/9603
3700/9603
3800/9603
3900/9603
4000/9603
4100/9603
4200/9603
4300/9603
4400/9603
4500/9603
4600/9603
4700/9603
4800/9603
4900/9603
5000/9603
5100/9603
5200/9603
5300/9603
5400/9603
5500/9603
5600/9603
5700/9603
5800/9603
5900/9603
6000/9603
6100/9603
6200/9603
6300/9603
6400/9603
6500/9603
6600/9603
6700/9603
6800/9603
6900/9603
7000/9603
7100/9603
7200/9603
7300/9603
7400/9603
7500/9603
7600/9603
7700/9603
7800/9603
7900/9603
8000/9603
8100/9603
8200/9603
8300/9603
8400/9603
8500/9603
8600/9603
8700/9603
8800/9603
8900/9603
9000/9603
9100/9603
92

MemoryError: Unable to allocate 9.38 GiB for an array with shape (9603, 512, 512, 1) and data type float32