<a href="https://colab.research.google.com/github/vishnu921/damage_detection_using_deep_learning/blob/main/damage_detection_using_unet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [2]:
import platform
print("Python version:", platform.python_version())

import sys
import os

import tensorflow as tf
print("tensorflow version:",tf.__version__)

import numpy as np
print("numpy version:",np.__version__)

import cv2
print("cv2 version:",cv2.__version__)

import matplotlib
from matplotlib import pyplot as plt
import matplotlib.image as mpimg
print("matplotlib version:",matplotlib.__version__)

import pandas as pd
print("pandas version:",pd.__version__)

from skimage.transform import resize

from tensorflow.keras import datasets, layers, models

Python version: 3.7.13
tensorflow version: 2.8.2
numpy version: 1.21.6
cv2 version: 4.1.2
matplotlib version: 3.2.2
pandas version: 1.3.5


In [None]:
#Importing the necessary Keras libraries for Creating the U-Net Model with gated attention and residual connections

from keras.layers import Conv2D, BatchNormalization, Activation, Add, Dropout, UpSampling2D, Input, Multiply, MaxPooling2D, Concatenate, concatenate, AveragePooling2D, Lambda, Conv2DTranspose, Reshape, ZeroPadding2D, MaxPool2D
from keras.models import Model
from tensorflow.keras.optimizers import Adam
from keras import backend as K

In [None]:
path_ds = os.path.join('/content/drive/MyDrive/IC-SHM 2021','Tokaido_dataset') #put a path to the dataset

In [None]:
#Function to change the path format 
def path_correct(path):
  path = '/'.join(path[2:].split('\\'))
  path = os.path.join(path_ds, path)
  return path

In [None]:
#function to put the label images into 3 channels
def get_label(file_path):
  
    mask = np.squeeze(file_path, axis = 2)
    target_array = np.zeros((mask.shape[0],mask.shape[1],3))
    target_array[:,:,0]=np.where(mask == 1, 1, 0)
    target_array[:,:,1]=np.where(mask == 2, 1, 0)
    target_array[:,:,2]=np.where(mask == 3, 1, 0)
    
    return target_array

In [None]:
# Functions to plot the images and labels
def show_img(image):
  plt.figure()
  plt.imshow(image)
  plt.axis('off')

def show_label(label):
  fig, axes = plt.subplots(1, 3, figsize=(16, 112))
  y1 = label[:,:,0]
  y2 = label[:,:,1]
  y3 = label[:,:,2]
  plt.axis('off')
  axes[0].axis('off')
  axes[1].axis('off')
  axes[2].axis('off')
  axes[0].imshow(y1)
  axes[1].imshow(y2)
  axes[2].imshow(y3)

In [None]:
# Function to print the prediction images
def show_pred(y):
  fig, axes = plt.subplots(1, 3, figsize=(16, 112))
  y1 = y[:,:,0]
  y2 = y[:,:,1]
  y3 = y[:,:,2]
  plt.axis('off')
  axes[0].axis('off')
  axes[1].axis('off')
  axes[2].axis('off')
  axes[0].imshow(y1)
  axes[1].imshow(y2)
  axes[2].imshow(y3)

In [None]:
#function to normalize the images
def normalize(input_image):
  input_image = tf.cast(input_image, tf.float32) / 255.0
  return input_image

In [None]:
# Image and Label augmentation function 
def image_augmentation(img):
  seed = (2,3)
  img = tf.image.stateless_random_flip_left_right(img, seed) 
  return img

def label_augmentation(img):
  seed = (2,3)
  img = tf.image.stateless_random_flip_left_right(img, seed)
  return img

In [None]:
# Preprocess function to run all the required preprocessing functions on the images
def preprocess(x, y):
    def f(x, y):
        x = tf.io.read_file(x)
        y = tf.io.read_file(y)
        x = tf.io.decode_png(x, channels = 3)
        x = tf.image.resize_with_pad(x, 256, 448)
        x = tf.numpy_function(normalize, [x], [tf.float32])
        y = tf.io.decode_bmp(y, channels = 0)
        y = tf.numpy_function(get_label, [y], [tf.float32])
        y = tf.image.resize_with_pad(y, 256, 448)
        x = tf.numpy_function(image_augmentation, [x], [tf.float32])
        y = tf.numpy_function(label_augmentation, [y], [tf.float32])
        
        return x, y

    images, masks = tf.numpy_function(f, [x, y], [tf.float32, tf.float32])
    images.set_shape([256, 448, 3])
    masks.set_shape([256, 448, 3])

    return images, masks

In [None]:
# data pipeline function to cache dataset to ram for reducing the data loading bottleneck during training
def tf_dataset(x, y, batch=8):
    dataset = tf.data.Dataset.from_tensor_slices((x, y))
    dataset = dataset.shuffle(buffer_size=1800)
    dataset = dataset.map(preprocess, num_parallel_calls= tf.data.AUTOTUNE)
    dataset = dataset.batch(batch, num_parallel_calls= tf.data.AUTOTUNE)
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    dataset = dataset.cache()
    return dataset

In [None]:
#Loading the training data for damage images
#Access the csv file containing the absolute directory paths to each file

col_names = ['image file name', 'component label file name', 'damage label file name', 'depth image file name', 
             'camera focal length in mm', 'regular images', 'images containing damage in the RRDR']
ftrain = pd.read_csv(os.path.join(path_ds,'files_train.csv'),names = col_names,delimiter=',')
ftrain.iloc[:,0] = ftrain.iloc[:,0].apply(lambda x: path_correct(x))
ftrain.iloc[:,1] = ftrain.iloc[:,1].apply(lambda x: path_correct(x))
ftrain.iloc[:,2] = ftrain.iloc[:,2].apply(lambda x: path_correct(x))
ftrain.iloc[:,3] = ftrain.iloc[:,3].apply(lambda x: path_correct(x))
train_comp = ftrain.loc[ftrain['regular images']==True, ['image file name', 'component label file name', 'damage label file name', 'depth image file name', 'camera focal length in mm']]
train_dmg = ftrain.loc[ftrain['images containing damage in the RRDR']==True, ['image file name', 'component label file name', 'damage label file name', 'depth image file name', 'camera focal length in mm']]


In [None]:
train_dmg

In [None]:
#Printing the training image

#image
print(ftrain.iloc[3])
image = mpimg.imread(ftrain.iloc[3][0])
print(image.shape)
print(type(image))
plt.imshow(image)

In [None]:
#label
#Printing the above image label
print(ftrain.iloc[3])
image = mpimg.imread(ftrain.iloc[3][2])#[x][y] x = row no. & y = {0:raw image, 1:component label, 2:damage label, 3:depth label}
print(image.shape)
print(type(image))
plt.imshow(image)

In [None]:
#Loading the training data from the pure_tex file

#Access the csv file containing the absolute directory paths to each file

col_names = ['image file name', 'damage label file name']

ftrain_tex = pd.read_csv(os.path.join(path_ds,'files_puretex_train.csv'),names = col_names,delimiter=',')
ftrain_tex.iloc[:,0] = ftrain_tex.iloc[:,0].apply(lambda x: path_correct(x))
ftrain_tex.iloc[:,1] = ftrain_tex.iloc[:,1].apply(lambda x: path_correct(x))

train_dmg_tex = ftrain_tex

In [None]:
train_dmg_tex

In [None]:
#Printing the training image
print(ftrain_tex.iloc[2])
image = mpimg.imread(ftrain_tex.iloc[2][1])
print(image.shape)
print(type(image))
plt.imshow(image)

In [None]:
#splitting the data for training and validation and also joining the dataset of both texture and component demage

#Run for training on the damage images
train1 = train_dmg[:1500]
val1 = train_dmg[2000:2800]
#train1 = train_dmg[:4000]
#val1 = train_dmg[4000:]

#appending texture images to the training and validation dataset
#train1 = train1.append(train_dmg_tex[:2300])
#val1 = val1.append(train_dmg_tex[2300:])

In [None]:
train1

In [None]:
# Creating the tf dataset from dataframe containing image addresses

images = train1['image file name'].to_numpy()
masks = train1['damage label file name'].to_numpy()
dataset = tf_dataset(images, masks)
images_val = val1['image file name'].to_numpy()
masks_val = val1['damage label file name'].to_numpy()
dataset_val = tf_dataset(images_val, masks_val)

In [None]:
#Printing out an image for reference
for x, y in dataset.take(2):
        show_img(x[0])
        show_label(y[0])
        
        break 

In [None]:
#number of features to be used in the model df for encoder and uf for decoder, 32 was found to be optimal for both
df=32
uf=32
    
def build_unet(input_shape):

  # E-Res Layer
  def eres_block(inputs, num_filters=32):

    x1 = Conv2D(num_filters, 3, padding="same")(inputs)
    x1 = BatchNormalization()(x1)
    x1 = Activation("relu")(x1)

    x2 = Conv2D(num_filters, 3, padding="same")(x1)
    x2 = BatchNormalization()(x2)
    x2 = Activation("relu")(x2)

    x3 = Conv2D(num_filters, 3, padding="same")(x2)
    x3 = BatchNormalization()(x3)
    x3 = Activation("relu")(x3)

    sc = Conv2D(num_filters, 1, padding="same")(inputs)
    sc = BatchNormalization()(sc)
    sc = Activation("relu")(sc)

    # x4 = Concatenate(axis=3)([x1, x2, x3])

    rp = Add()([sc, x3])
    rp = Activation('relu')(rp)

    return rp

  # E-Res Path for skip connection
  def eres_path(inputs, num_filters):
    
    for i in range(1):
      x1 = Conv2D(num_filters, 3, padding="same")(inputs)
      x1 = BatchNormalization()(x1)
      x1 = Activation("relu")(x1)

      x2 = Conv2D(num_filters, 1, padding="same")(inputs)
      x2 = BatchNormalization()(x2)
      x2 = Activation("relu")(x2)

      inputs = Add()([x1, x2])
      inputs = Activation('relu')(inputs)
    
    return inputs

  # Dilated Convolution Module
  def dcm(inputs, num_filters):

    x = Conv2D(num_filters, 1, padding="same")(inputs)
    x = BatchNormalization()(x)
    x = Activation("relu")(x)

    x1 = Conv2D(num_filters, 3, dilation_rate=1 , padding="same")(x)
    x1 = BatchNormalization()(x1)
    x1 = Activation("relu")(x1)

    x2 = Conv2D(num_filters, 3, dilation_rate=2 , padding="same")(x)
    x2 = BatchNormalization()(x2)
    x2 = Activation("relu")(x2)

    x3 = Conv2D(num_filters, 3, dilation_rate=3 , padding="same")(x)
    x3 = BatchNormalization()(x3)
    x3 = Activation("relu")(x3)

    x4 = Add()([x1, x2, x3])
    x4 = Activation("relu")(x4)
    
    res = Conv2D(num_filters, 1, padding="same")(x4)
    res = BatchNormalization()(res)
    res = Activation("relu")(res)

    return res
    

  # encoder block
  def encoder_block(input, num_filters):
    x = eres_block(input, num_filters)
    x = eres_path(x, num_filters)
    p = MaxPool2D((2,2))(x)
    return x, p

  # Decoder block
  def decoder_block(input, skip_features, num_filters):
      x = Conv2DTranspose(num_filters, (2, 2), strides=2, padding="same")(input)
      x = Concatenate()([x, skip_features])
      x = eres_block(x, num_filters)
      return x


  #actual model definition depth = 4, i.e. 4 times downsampled by factor of 2 (by maxpooling)

  inputs = Input(input_shape)
    
  s1, p1 = encoder_block(inputs, 64)

  s2, p2 = encoder_block(p1, 128)

  s3, p3 = encoder_block(p2, 256)

  s4 = dcm(p3, 512)

  d1 = decoder_block(s4, s3, 256)

  d2 = decoder_block(d1, s2, 128)

  d3 = decoder_block(d2, s1, 64)

  #here the first input to Conv2D i.e. 3 defines the output number of classes to be pridicted

  outputs = Conv2D(3,kernel_size=(1,1),strides=(1,1),activation='sigmoid')(d3)
   
  model = Model(inputs=inputs,outputs=outputs)
    
  return model

In [None]:
#building the unet model with gated attention and residual connections and printing its summary
model = build_unet((256,448,3))
model.summary()

In [None]:
#using Adam optimizer and fixing the learning rate 
optimizer_adam = tf.keras.optimizers.Adam(learning_rate=0.0001, beta_1=0.9, beta_2=0.999, epsilon=1e-07)

In [None]:
#Checkpoint for model saving and reloading the saved weights
# "training_dmg_resatt_final/cp_dmg.ckpt" this checkpoint to be supplied for best weights as trained by us
ck_path = os.path.join('/content/drive/MyDrive/summer_intern','eres_damage_cps')
checkpoint_path = os.path.join(ck_path, "my_checkpoint.ckpt")
checkpoint_dir = os.path.dirname(checkpoint_path)


cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,  monitor = 'iou', mode='max', save_best_only = True,
                                                 verbose=1)

In [None]:
""" Courtesy stackoverflow Daniel Möller https://stackoverflow.com/users/2097240/daniel-m%c3%b6ller?tab=profile
answer to question 'Custom loss function for U-net in keras using class weights: `class_weight` not supported for 3+ dimensional targets' 
"""
# Weighted loss function 
def weightedLoss(originalLossFunc, weightsList):

    def lossFunc(true, pred):

        axis = -1 #if channels last 
        #axis=  1 #if channels first


        #argmax returns the index of the element with the greatest value
        #done in the class axis, it returns the class index    
        classSelectors = K.argmax(true, axis=axis)
        classSelectors = tf.cast(classSelectors, tf.int32) 
            #if your loss is sparse, use only true as classSelectors

        #considering weights are ordered by class, for each class
        #true(1) if the class index is equal to the weight index   
        classSelectors = [K.equal(i, classSelectors) for i in range(len(weightsList))]

        #casting boolean to float for calculations  
        #each tensor in the list contains 1 where ground true class is equal to its index 
        #if you sum all these, you will get a tensor full of ones. 
        classSelectors = [K.cast(x, K.floatx()) for x in classSelectors]

        #for each of the selections above, multiply their respective weight
        weights = [sel * w for sel,w in zip(classSelectors, weightsList)] 

        #sums all the selections
        #result is a tensor with the respective weight for each element in predictions
        weightMultiplier = weights[0]
        for i in range(1, len(weights)):
            weightMultiplier = weightMultiplier + weights[i]


        #make sure your originalLossFunc only collapses the class axis
        #you need the other axes intact to multiply the weights tensor
        loss = originalLossFunc(true,pred) 
        loss = loss * weightMultiplier

        return loss
    return lossFunc

In [None]:
#summation of dice loss and cross entropy loss

from keras import backend as K
def DiceCELoss(targets, inputs, smooth=1e-6):
    CE = tf.keras.metrics.binary_crossentropy(targets, inputs)
    y_true_f=K.flatten(targets)
    y_pred_f=K.flatten(inputs)
    intersection=K.sum(y_true_f*y_pred_f)
    dice_loss=1-((2*intersection) + smooth)/(K.sum(y_true_f*y_true_f)+K.sum(y_pred_f*y_pred_f)+ smooth)
    dice_CE=dice_loss+CE
    return dice_CE

In [None]:
#summation of log of (1-dice loss) and cross entropy loss

import math
from keras import backend as K
def logDiceCELoss(targets, inputs, smooth=1e-6):
    CE = tf.keras.metrics.binary_crossentropy(targets, inputs)
    #CE = tf.keras.metrics.CategoricalCrossentropy(targets, inputs)
    y_true_f=K.flatten(targets)
    y_pred_f=K.flatten(inputs)
    intersection=K.sum(y_true_f*y_pred_f)
    dice_loss=1-((2*intersection) + smooth)/(K.sum(y_true_f*y_true_f)+K.sum(y_pred_f*y_pred_f)+ smooth)
    dice_CE=CE-math.log(1-dice_loss)
    return dice_CE

In [None]:
# The weights for each class are hardcoded here and are calculated using the code presented towards the end of this notebook
w1= 0.03611688490088685
w2= 1.0
w3=9.345573778790564

In [None]:
#best results were obtained with binary cross entropy loss

model.compile(optimizer= optimizer_adam,
              loss= weightedLoss(tf.keras.losses.BinaryCrossentropy(), [w1,w2,w3]),
              metrics=[
                       tf.keras.metrics.BinaryAccuracy(),
                       tf.keras.metrics.Recall(thresholds = 0.2),
                       tf.keras.metrics.Precision(thresholds = 0.7),
                       tf.keras.metrics.MeanIoU(3,name="iou")])

In [None]:
#run this cell for loading the weights from saved checkpoints
model.load_weights(checkpoint_path)

#**Training, Metrics and Validation**

In [None]:
#Train the model
history = model.fit(dataset, epochs= 20 ,validation_data= dataset_val, verbose = 1, callbacks=[cp_callback])

In [None]:
#Print Loss
loss = history.history['loss']
val_loss = history.history['val_loss']

plt.figure()
plt.plot(history.epoch, loss, 'r', label='Training loss')
plt.plot(history.epoch, val_loss, 'b', label='Validation loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Value')
plt.legend()
plt.show()

In [None]:
#Print Binary Accuracy
accu = history.history['binary_accuracy']
val_accu = history.history['val_binary_accuracy']

plt.figure()
plt.plot(history.epoch, accu, 'r', label='Training binary accuracy')
plt.plot(history.epoch, val_accu, 'b', label='Validation binary accuracy')
plt.title('Training and Validation binary accuracy')
plt.xlabel('Epoch')
plt.ylabel('binary accuracy Value')
plt.legend()
plt.show()

In [None]:
#print recall
recall = history.history['recall']
val_recall = history.history['val_recall']

plt.figure()
plt.plot(history.epoch, recall, 'r', label='Training recall')
plt.plot(history.epoch, val_recall, 'b', label='Validation recall')
plt.title('Training and Validation recall')
plt.xlabel('Epoch')
plt.ylabel('recall Value')
plt.legend()
plt.show()

In [None]:
#Print precision
pres = history.history['precision']
val_pres = history.history['val_precision']

plt.figure()
plt.plot(history.epoch, pres, 'r', label='Training precision')
plt.plot(history.epoch, val_pres, 'b', label='Validation precision')
plt.title('Training and Validation precision')
plt.xlabel('Epoch')
plt.ylabel('precision Value')
plt.legend()
plt.show()

In [None]:
#Print Mean IoU
miou = history.history['iou']
val_miou = history.history['val_iou']

plt.figure()
plt.plot(history.epoch, miou, 'r', label='Training mean_io_u')
plt.plot(history.epoch, val_miou, 'b', label='Validation mean_io_u')
plt.title('Training and Validation mean_io_u')
plt.xlabel('Epoch')
plt.ylabel('mean_io_u Value')
plt.legend()
plt.show()

In [None]:
#Evaluate the model on validation dataset
model.evaluate(dataset_val, verbose = 1)

In [None]:
#Printing predictions on the validation dataset
z=1
for x, y in dataset_val.take(10):
        z += 1
        show_img(x[2])
        show_label(y[2])
        pred = model.predict(x)
        show_pred(pred[2])
        if (z==9):
          break

#**Testing**

In [None]:
# Function to get the labels of texture images for testing (its required for the test dataloader to work)
#this function would load the false test data labels for working of data loader only. the false labels arent used there for any actual purpose, this only enables
#the data loader to work smoothly
def get_label_tex(file_path):
    mask = file_path
    target_array = np.zeros((mask.shape[0],mask.shape[1],3))
    
    return target_array

In [None]:
#Preprocessing for the test dataset
def preprocess_test(x,y):
    def f(x,y):
        x = tf.io.read_file(x)
        y = tf.io.read_file(y)
        x = tf.io.decode_png(x, channels = 3)
        x = tf.image.resize_with_pad(x, 256,448)
        x = tf.numpy_function(normalize, [x], [tf.float32]) 
        y = tf.io.decode_png(y, channels = 0)   
        y = tf.numpy_function(get_label_tex, [y], [tf.float32])
        y = tf.image.resize_with_pad(y, 256, 448)   

        return x, y

    images, masks = tf.numpy_function(f, [x, y], [tf.float32, tf.float32])
    images.set_shape([256, 448, 3])
    masks.set_shape([256, 448, 3])

    return images, masks

In [None]:
#dataloader for the test dataset
def tf_dataset_test(x,y, batch = 8):
    dataset = tf.data.Dataset.from_tensor_slices((x,y))
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    dataset = dataset.map(preprocess_test, num_parallel_calls= tf.data.AUTOTUNE)
    dataset = dataset.batch(batch, num_parallel_calls= tf.data.AUTOTUNE)
    dataset = dataset.cache()
    return dataset

In [None]:
#Function to change the path format 
def path_correct_png(path):
  path = '/'.join(path[2:].split('\\'))
  path = os.path.join(path_ds, path)
  path = path[:-4] + '.png'
  return path

In [None]:
#Loading the training data from the pure_tex file

#Access the csv file containing the absolute directory paths to each file

col_names = ['image file name', 'damage label file name']

ftest = pd.read_csv(os.path.join(path_ds,'files_puretex_test.csv'),names = col_names,delimiter=',')
ftest.iloc[:,0] = ftest.iloc[:,0].apply(lambda x: path_correct(x))
ftest.iloc[:,1] = ftest.iloc[:,1].apply(lambda x: path_correct_png(x))

test_dmg_tex = ftest

In [None]:
# creating dataloader for texture images
images_test = test_dmg_tex['image file name'].to_numpy()
masks_test = test_dmg_tex['damage label file name'].to_numpy()
dataset_test_tex = tf_dataset_test(images_test,masks_test)
address_tex = test_dmg_tex['damage label file name'].apply(lambda x : os.path.split(x)[1]).to_numpy()

In [None]:
#Printing predictions on the texture test dataset
z=1
for x, y in dataset_test_tex.take(10):
        z += 1
        show_img(x[2])
        show_label(y[2])
        pred = model.predict(x)
        show_pred(pred[2])
        if (z==9):
          break

In [None]:
# Loading the testing data from the component demage test dataset file

#Access the csv file containing the absolute directory paths to each file

col_names = ['image file name', 'component label file name', 'damage label file name', 'depth image file name', 
             'camera focal length in mm', 'regular images', 'images containing damage in the RRDR']
ftest = pd.read_csv(os.path.join(path_ds,'files_test.csv'),names = col_names,delimiter=',')
ftest.iloc[:,0] = ftest.iloc[:,0].apply(lambda x: path_correct(x))
ftest.iloc[:,1] = ftest.iloc[:,1].apply(lambda x: path_correct(x))
ftest.iloc[:,2] = ftest.iloc[:,2].apply(lambda x: path_correct_png(x))
ftest.iloc[:,3] = ftest.iloc[:,3].apply(lambda x: path_correct(x))
test_comp = ftest.loc[ftest['images containing damage in the RRDR']==True, ['image file name', 'component label file name', 'damage label file name', 'depth image file name', 'camera focal length in mm']]


In [None]:
# creating dataloader for component damage images

images_test = test_comp['image file name'].to_numpy()
masks_test = test_comp['damage label file name'].to_numpy()
dataset_test = tf_dataset_test(images_test,masks_test)
address = test_comp['damage label file name'].apply(lambda x : os.path.split(x)[1]).to_numpy()

In [None]:
#Printing predictions on the component damage test dataset
z=1
for x, y in dataset_test.take(10):
        z += 1
        show_img(x[0])
        show_label(y[0])
        pred = model.predict(x)
        show_pred(pred[0])
        if (z==9):
          break