In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import cv2
import numpy as np
import pandas as pd
import albumentations as albu
import matplotlib.pyplot as plt
from torch.utils.data import Dataset as Dataset
from torch.utils.data import DataLoader,random_split
from numba import jit

DATA_DIR = 'D:\Pengwei/segmentation_models/data'

x_train_dir = os.path.join(DATA_DIR, 'train_image') 
y_train_dir = os.path.join(DATA_DIR, 'train_label')
#x_train_dir = os.path.join(DATA_DIR, 'Normal concrete') 
#y_train_dir = os.path.join(DATA_DIR, 'Normal_label')

#x_valid_dir = os.path.join(DATA_DIR, 'val_image')
#y_valid_dir = os.path.join(DATA_DIR, 'val_label')

#x_test_dir = os.path.join(DATA_DIR, 'Test')
x_test_dir = os.path.join(DATA_DIR, 'test_image')
#y_test_dir = os.path.join(DATA_DIR, 'Label')
y_test_dir = os.path.join(DATA_DIR, 'test_label')

In [2]:
class Dataset(Dataset):
    
    CLASSES = ['crack']
    
    def __init__(
            self, 
            images_dir, 
            masks_dir, 
            classes=None, 
            augmentation=None, 
            preprocessing=None,
    ):
        self.ids = os.listdir(images_dir)
        self.images_fps = [os.path.join(images_dir, image_id) for image_id in self.ids]
        self.masks_fps = [os.path.join(masks_dir, image_id) for image_id in self.ids]
        
        self.class_values = [self.CLASSES.index(cls.lower()) for cls in classes]
        
        self.augmentation = augmentation
        self.preprocessing = preprocessing
    
    def __getitem__(self, i):
        
        # read data
        image = cv2.imread(self.images_fps[i])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        mask = cv2.imread(self.masks_fps[i], 0)
        
        # extract certain classes from mask (e.g. cars)
        masks = [(mask == v) for v in self.class_values]
        mask = np.stack(masks, axis=-1).astype('float')
        
        # apply augmentations
        if self.augmentation:
            sample = self.augmentation(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
        
        # apply preprocessing
        if self.preprocessing:
            sample = self.preprocessing(image=image, mask=mask)
            image, mask = sample['image'], sample['mask']
            
        return image, mask
        
    def __len__(self):
        return len(self.ids)
    
def get_training_augmentation():
    train_transform = [

        albu.HorizontalFlip(p=0.5),

        albu.ShiftScaleRotate(scale_limit=0.5, rotate_limit=0, shift_limit=0.1, p=1, border_mode=0),

        albu.PadIfNeeded(min_height=320, min_width=320, always_apply=True, border_mode=0),
        albu.RandomCrop(height=320, width=320, always_apply=True),

        albu.IAAAdditiveGaussianNoise(p=0.2),
        albu.IAAPerspective(p=0.5),

        albu.OneOf(
            [
                albu.CLAHE(p=1),
                albu.RandomBrightness(p=1),
                albu.RandomGamma(p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.IAASharpen(p=1),
                albu.Blur(blur_limit=3, p=1),
                albu.MotionBlur(blur_limit=3, p=1),
            ],
            p=0.9,
        ),

        albu.OneOf(
            [
                albu.RandomContrast(p=1),
                albu.HueSaturationValue(p=1),
            ],
            p=0.9,
        ),
    ]
    return albu.Compose(train_transform)

def get_validation_augmentation():
    """Add paddings to make image shape divisible by 32"""
    test_transform = [
        albu.PadIfNeeded(384, 480)
    ]
    return albu.Compose(test_transform)


def to_tensor(x, **kwargs):
    return x.transpose(2, 0, 1).astype('float32')

def get_preprocessing(preprocessing_fn):
    """Construct preprocessing transform
    
    Args:
        preprocessing_fn (callbale): data normalization function 
            (can be specific for each pretrained neural network)
    Return:
        transform: albumentations.Compose
    
    """
    
    _transform = [
        albu.Lambda(image=preprocessing_fn),
        albu.Lambda(image=to_tensor, mask=to_tensor),
    ]
    return albu.Compose(_transform)

In [3]:
import torch
import numpy as np
import segmentation_models_pytorch as smp
ENCODER = 'resnet50'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['crack']
ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multicalss segmentation
DEVICE = 'cuda'

# create segmentation model with pretrained encoder
model = smp.DeepLabV3Plus(
    encoder_name=ENCODER, 
    encoder_weights=None, 
    classes=len(CLASSES), 
    activation=ACTIVATION,
)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)

train_dataset = Dataset(
    x_train_dir, 
    y_train_dir, 
    #augmentation=get_training_augmentation(), 
    preprocessing=get_preprocessing(preprocessing_fn),
    classes=CLASSES,)

val_percent = 0.2
n_val = int(len(train_dataset) * val_percent)
n_train = len(train_dataset) - n_val
train, val = random_split(train_dataset, [n_train, n_val])

train_loader = DataLoader(train, batch_size=4, 
                              shuffle=True, num_workers=0, pin_memory=True)
    
valid_loader = DataLoader(val, batch_size=4, 
                              shuffle=False, num_workers=0, pin_memory=True, drop_last=True)

loss = smp.utils.losses.DiceLoss()
metrics = [
    smp.utils.metrics.IoU(threshold=0.5),
    smp.utils.metrics.Precision(threshold=0.5),
    smp.utils.metrics.Recall(threshold=0.5),
    smp.utils.metrics.Fscore(threshold=0.5),  
]

optimizer = torch.optim.RMSprop([ 
    dict(params=model.parameters(), lr=0.0001),
])

train_epoch = smp.utils.train.TrainEpoch(
    model, 
    loss=loss, 
    metrics=metrics, 
    optimizer=optimizer,
    device=DEVICE,
    verbose=True,)

valid_epoch = smp.utils.train.ValidEpoch(
    model, 
    loss=loss, 
    metrics=metrics, 
    device=DEVICE,
    verbose=True,)

In [None]:
# train model for 40 epochs
max_score = 0

loader_train = []
loader_test = []
for i in range(0, 30):
    print('\nEpoch: {}'.format(i))
    train_logs = train_epoch.run(train_loader)
    loader_train.append(train_logs)
    print(train_logs)
    valid_logs = valid_epoch.run(valid_loader)
    loader_test.append(valid_logs)
    print(valid_logs)
    # do something (save model, change lr, etc.)
    if max_score < valid_logs['iou_score']:
        max_score = valid_logs['iou_score']
        torch.save(model, 'D:\Pengwei/segmentation_models/best_model.pth')
        print('Model saved!')
        
    if i == 25:
        optimizer.param_groups[0]['lr'] = 1e-5
        print('Decrease decoder learning rate to 1e-5!')

In [None]:
# Test the test dataset
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import cv2
import numpy as np
import pandas as pd
import albumentations as albu
import matplotlib.pyplot as plt
from torch.utils.data import Dataset as Dataset
from torch.utils.data import DataLoader,random_split
from numba import jit
ENCODER = 'resnet50'
ENCODER_WEIGHTS = 'imagenet'
CLASSES = ['crack']
ACTIVATION = 'sigmoid' # could be None for logits or 'softmax2d' for multicalss segmentation
DEVICE = 'cuda'

# create segmentation model with pretrained encoder
model = smp.Linknet(
    encoder_name=ENCODER, 
    encoder_weights=None, 
    classes=len(CLASSES), 
    activation=ACTIVATION,)

preprocessing_fn = smp.encoders.get_preprocessing_fn(ENCODER, ENCODER_WEIGHTS)
best_model = torch.load('D:\segmentation_models/best_model.pth')
test_dataset = Dataset(x_test_dir, preprocessing=get_preprocessing(preprocessing_fn))
model.eval()
with torch.no_grad():
    i = 0
    for images in test_dataset:
        img = images.reshape(images.shape[0], images.shape[1], images.shape[2])
        x_tensor = torch.from_numpy(img).to(DEVICE).unsqueeze(0)
        masks = best_model.predict(x_tensor)
        #pred = np.array(masks.data.cpu()[0])[0]
        pr_mask = (masks.squeeze().cpu().numpy().round())
        pr_mask[pr_mask >= 0.5] = 255
        img = cv2.cvtColor(pr_mask, cv2.COLOR_BGR2RGB)
        cv2.imwrite('D:/segmentation_models/data/%d.jpg'%i, img)
        i+=1


In [4]:
# Test single image
import cv2
import torch
import numpy as np
from numpy import *
from PIL import Image
import matplotlib.pyplot as plt
from torchvision import transforms

tfms = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

PATH = 'D:\pengwei/Smart robotic car/'
path_save = 'D:\pengwei/'
img_path = "D:\Pengwei/segmentation_models/data/test_image/ECC.jpg"
seg_model = torch.load('D:\Pengwei/segmentation_models/best_model.pth')
#detect_model = torch.hub.load('ultralytics/yolov5', 'custom', path= PATH + 'best.pt')
color_image = Image.open('D:\Pengwei/segmentation_models/data/test_image/ECC.jpg')
#color_image = Image.open('D:\Pengwei/ruihua.jpg')
#detected_crack = detect_model(color_image)

img_tensor = tfms(color_image).to('cuda').unsqueeze(0)
output = seg_model.predict(img_tensor)
output = (output.squeeze().cpu().numpy().round())
pred_mask = np.asarray(output, dtype=np.uint8)*255
cv2.imwrite(path_save + 'segmented.png', pred_mask)
#cv2.imwrite(path_save + 'detected.png', detected_crack.render()[0])

True

In [5]:
import os
import cv2
import math
import numpy as np
from decimal import Decimal
import time
import pandas as pd
start = time.time()

def crack_quantification(img,h_1,Actual_height):
    COLORS = [[128,0,0],[255,0,0],[255,144,30],[255,255,0],[0,255,0],[0,255,127],[0,255,255],[18,153,255],
              [0,97,255],[147,20,255],[0,0,255],[240,32,160]]
    img = 255 - img
    contours, hierarchy = cv2.findContours(img, cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
    h,w = img.shape
    img_new =np.zeros([h,w], np.uint8)
    img_new_4 = np.zeros([h,w,3], np.uint8)
    img_new_4.fill(255)
    k = 0
    crack_width_list = []
    for i in range(len(contours)):
        area = cv2.contourArea(contours[i])
        Crack_width_list = []
        if area > 300:
            k+=1
            img1 = cv2.drawContours(img_new,contours,i,255,thickness=-1)   
            n = 0
            x = int(h/h_1) 
            s = 0
            g = h_1 
            for j in range(x):
                cropped = img1[s:g,0:w]
                h1,w1 = cropped.shape
                contours1, hierarchy1 = cv2.findContours(cropped,cv2.RETR_TREE,cv2.CHAIN_APPROX_SIMPLE)
                for a in range(len(contours1)):
                    img_new_1 = np.zeros([h1,w1], np.uint8)
                    img2 = cv2.drawContours(img_new_1,contours1,a,255,thickness=-1)
                    #cv2.imwrite("E:\paper/Fast_detection/quantification/ooo/%d_"%n +"%d.png"%a,img2)
                    img_new_2 = np.zeros([h1,w1], np.uint8)
                    if (img2 == img_new_2).all():
                        actual_crack_width = 0
                    else:
                        threshold_level = 50
                        coords = np.column_stack(np.where(img2 > threshold_level))
                        #print(coords)
                        coord_first = []
                        coord_last = []
                        for i in range(len(coords)):
                            if coords[i][0] == coords[0][0]:
                                coord_first.append(coords[i])
                            if coords[i][0]==coords[-1][0]:
                                coord_last.append(coords[i])
                                
                        y1 = coord_first[0][0]
                        x1 = coord_first[0][1]
                        y2 = coord_first[-1][0]
                        x2 = coord_first[-1][1]
                        y3 = coord_last[0][0]
                        x3 = coord_last[0][1]
                        y4 = coord_last[-1][0]
                        x4 = coord_last[-1][1]
                        
                        if abs(x2-x4)==0:
                            width_pixel = abs(x1-x2)
                        else:
                            D = abs(x2-x4)
                            H = abs(y4-y2)
                            L = math.sqrt((D**2)+(H**2))
                            sina = D/L
                            angle = math.asin(sina)
                            angle_degree = math.degrees(angle)
                            if angle_degree <= 45:
                                cosa = math.cos(angle)
                                W = abs(x1-x2)
                                width_pixel = W*cosa 
                            if angle_degree > 45:
                                D1 = abs(x2-x4)
                                H1 = abs(y4-y2)
                                L1 = math.sqrt((D1**2)+(H1**2))
                                sina1 = D1/L1
                                angle1 = math.asin(sina1)
                                cosa1 = math.cos(angle1)
                                W1 = abs(x1-x2)
                                width_pixel = W1*cosa1
                       
                        actual_crack_width = abs(width_pixel)/(h/Actual_height)
                        #actual_crack_width = Decimal(actual_crack_width).quantize(Decimal("0.00"))    
                        Crack_width_list.append(actual_crack_width)
                        #print("the crack width for %d is:"%k, Crack_width_list)
                        for i in range(len(coords)):
                            x = coords[i][0]
                            y = coords[i][1]                      
                            if float(actual_crack_width)!=0:
                                if float(actual_crack_width)>0 and float(actual_crack_width)<=25:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[0]
                                if float(actual_crack_width)>25 and float(actual_crack_width)<=50:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[1]
                                if float(actual_crack_width)>50 and float(actual_crack_width)<=75:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[2]
                                if float(actual_crack_width)>75 and float(actual_crack_width)<=100:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[3]
                                if float(actual_crack_width)>100 and float(actual_crack_width)<=125:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[4]
                                if float(actual_crack_width)>125 and float(actual_crack_width)<=150:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[5]
                                if float(actual_crack_width)>150 and float(actual_crack_width)<=175:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[6]
                                if float(actual_crack_width)>175 and float(actual_crack_width)<=200:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[7]
                                if float(actual_crack_width)>225 and float(actual_crack_width)<=250:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[8]
                                if float(actual_crack_width)>250 and float(actual_crack_width)<=500:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[9]
                                if float(actual_crack_width)>500 and float(actual_crack_width)<=1000:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[10]        
                                if float(actual_crack_width)>1000 and float(actual_crack_width)<=5000:
                                    img_new_4[s:g,0:w,:][x,y]= COLORS[11]   
                s = s + h_1 
                g = g + h_1 
                n += 1

                cv2.imwrite(path_save + 'measure.png',img_new_4)
            Avg_crack_width = mean(Crack_width_list)
            crack_width_list.append(Avg_crack_width)
    
    #print("The number of crack is :",k)   
    #print(crack_width_list)
    return img_new_4, crack_width_list
            

img_new_4,crack_width_list = crack_quantification(pred_mask,16,30000)
Avg = mean(crack_width_list)
print("The average crack width is:", Avg)
df = df1 = pd.DataFrame(crack_width_list)
df.to_csv("D:\Pengwei/crack_width_list.csv")
end = time.time()
print(end-start)

The average crack width is: 115.87595822166075
25.582770586013794
