The bulk of this code details an implementation of the Kalman filter based on a sequence of object detections from YOLO. Bounding box information is passed to the filter and defects are detected based on whether multiple detections occur in the same location

In [1]:
import torch
import cv2
import torch
from PIL import Image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import random
from csv import writer
import math

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def matchup(previous_boxes,current_boxes):
    #Function that finds the closest boxes from the previous frame to those in the current frame, function returns a pandas DF
    #with each row representing a pair of boxes and the columns representing the x and y coordinates of their centres. (based on
    # euclidean distance between centres)
    i=0
    x_p = previous_boxes.xcenter.to_numpy()
    y_p = previous_boxes.ycenter.to_numpy()
    x_old = list()
    y_old = list()
    for x_c in current_boxes.xcenter.to_numpy():
        y_c = current_boxes.ycenter[i]
        euc = np.sqrt(np.square((x_p - x_c)) + np.square((y_p - y_c)))
        
        closest = np.where(euc == np.amin(euc))
        # x_old.append(x_p[closest[0]])
        # y_old.append(y_p[closest[0]])
        if np.amin(euc) < 30:
            x_old.append(x_p[closest[0]])
            y_old.append(y_p[closest[0]])
        else:
            x_old.append([0])
            y_old.append([0])

        i=i+1
    matched_data = {'xcentre_old':np.asarray(x_old).reshape(-1),
                    'ycentre_old':np.asarray(y_old).reshape(-1),
                    'xcentre_new':current_boxes.xcenter.to_numpy(),
                    'ycentre_new':current_boxes.ycenter.to_numpy()}

    matched_boxes = pd.DataFrame(data = matched_data)
    return matched_boxes

In [23]:
def predict(initial_x,initial_y,new_x,new_y,x_e_est,y_e_est,x_est,y_est,i):
    x_k_minus_1 = np.array([initial_x,initial_y])
    #x_k_minus_1 = np.array([x_est,y_est])
    e_est = np.array([x_e_est,y_e_est])
    x_k = np.array([new_x,new_y])
    l_est = np.array([x_est,y_est])
    #e_meas = np.array([abs(initial_x-new_x),abs(initial_y-new_y)])
    #e_meas = np.array([(np.exp(1/(0.001*(i-8)))),(np.exp(1/(0.001*(i-8))))])
    e_meas = np.array([(330/i),(330/i)])
    #e_meas = np.array([50,50])

    kg = e_est/(e_est + e_meas)
    l_est = x_k_minus_1 + np.multiply(kg,(x_k - l_est))
    e_est = np.multiply(e_est,(1-kg))
    
    
    return e_est[0],e_est[1],l_est[0],l_est[1]

In [6]:
def update(matched_boxes,kalman_matrix):
    new_kalman_matrix = kalman_matrix.copy()
    #Shift x and y coordinates from new to old column
    new_kalman_matrix['xcentre_old'] = new_kalman_matrix['xcentre_new']
    new_kalman_matrix['ycentre_old'] = new_kalman_matrix['ycentre_new']
    #remove any rows for points that are no longer present
    discarded_kalman_matrix = new_kalman_matrix[new_kalman_matrix.xcentre_old.isin(matched_boxes['xcentre_old'].values)]
    final_new_kalman_matrix = matched_boxes.copy()
    #create new kalman matrix by copying in the error estimate and location estimates from the old matrix and concatenating to 
    #correct entries in new table(necessary as often the same box is used by muplitple boxes in next image)
    for index, row in discarded_kalman_matrix.iterrows():
        for index2, row2 in final_new_kalman_matrix.iterrows():
            if row2['xcentre_old'] == row['xcentre_old']:
                final_new_kalman_matrix.loc[index2,['x_e_est']] = row['x_e_est']
                final_new_kalman_matrix.loc[index2,['y_e_est']] = row['y_e_est']
                final_new_kalman_matrix.loc[index2,['x_est']] = row['x_est']
                final_new_kalman_matrix.loc[index2,['y_est']] = row['y_est']

    for index,row in final_new_kalman_matrix.iterrows():
        if row['xcentre_old'] not in discarded_kalman_matrix['xcentre_old'].values:
                final_new_kalman_matrix.loc[index,['x_e_est']] = 300
                final_new_kalman_matrix.loc[index,['y_e_est']] = 300
                final_new_kalman_matrix.loc[index,['x_est']] = 0
                final_new_kalman_matrix.loc[index,['y_est']] = 0
    return final_new_kalman_matrix

In [7]:
def create_kalman_matrix(matched_boxes):
    new_matrix = matched_boxes.copy()
    new_matrix['x_e_est'] = 300
    new_matrix['y_e_est'] = 300
    new_matrix['x_est'] = 0
    new_matrix['y_est'] = 0
    
    return new_matrix

In [8]:
def isoverlap(xmin_1,xmax_1,ymin_1,ymax_1,xmin_2,xmax_2,ymin_2,ymax_2):

    if(xmin_1 > xmax_2 or xmin_2 > xmax_1):
        overlap = False
    
    elif(ymax_1 < ymin_2 or ymax_2 < ymin_1):
        overlap = False

    else:
        overlap = True
        if overlap == True:
            union = ((xmax_1-xmin_1)*(ymax_1-ymin_1))+((xmax_2-xmin_2)*(ymax_2-ymin_2))
            xmin_i = max(xmin_1, xmin_2)
            xmax_i = min(xmax_1, xmax_2)
            ymin_i = max(ymin_1, ymin_2)
            ymax_i = min(ymax_1, ymax_2)       
            intersection = ((xmax_i-xmin_i)*(ymax_i-ymin_i))
            iou = intersection/union
            if iou > 0.5:
                located = True


    return overlap

In [21]:
def kalman(results,threshold,box_thresh):
    i = 0
    defect_flag = False
    br = False
    for i in range(len(results)-1):
        pred_1 = results.pandas().xywh[i]
        pred_2 = results.pandas().xywh[i+1]
        if (pred_1.shape[0]==0 & pred_2.shape[0] == 0) and ('stored_pred_1' not in locals()):
            continue

        if (pred_1.shape[0] != 0 & pred_2.shape[0] == 0):
            stored_pred_1 = pred_1
        #If no defects detected in new image just continue
        if (pred_1.shape[0] == 0):
            pred_1 = stored_pred_1
        
        matched_box = matchup(pred_1,pred_2)
        if 'kalman_matrix' not in locals():
            kalman_matrix = create_kalman_matrix(matched_box)
        else:
            kalman_matrix = update(matched_box,kalman_matrix)
        
        for index, row in kalman_matrix.iterrows():
            kalman_matrix.loc[index,['x_e_est']],kalman_matrix.loc[index,['y_e_est']],kalman_matrix.loc[index,['x_est']],kalman_matrix.loc[index,['y_est']] = predict(row['xcentre_old'],
                                                                                                                                                                        row['ycentre_old'],
                                                                                                                                                                        row['xcentre_new'],
                                                                                                                                                                        row['ycentre_new'],
                                                                                                                                                                        row['x_e_est'],
                                                                                                                                                                        row['y_e_est'],
                                                                                                                                                                        row['x_est'],
                                                                                                                                                                        row['y_est'],i+9)
        for index, row in kalman_matrix.iterrows():
            if (abs(row['xcentre_new']- row['x_est']) + abs(row['ycentre_new']- row['y_est']))< box_thresh and (row['x_e_est'] +  row['y_e_est']) < threshold:
                br=True
        
        if br == True:
            defect_flag = True
            break
        # if (kalman_matrix['x_e_est'] + kalman_matrix['y_e_est']).min() < threshold:#changed from 5 to 0.1 on old set
        #     defect_flag = True 
        #     break
    try:
        output = kalman_matrix
        images = i+2
    except UnboundLocalError:
        #if get to end of sequence without making prediction return empty kalman matrix
        #output = create_kalman_matrix(pd.DataFrame(columns=['xcentre_old','ycentre_old','xcentre_new','ycentre_new']))
        output = pd.DataFrame(columns =['xcentre_old','ycentre_old','xcentre_new','ycentre_new','x_e_est','y_e_est','x_est','y_est'])
        images = i+2
    return output,images,defect_flag

In [36]:
defect_image_store ="C:\\Users\\micha\\OneDrive\\Documents\\UniWork\\Project\\Dataset\\New_simulated\\Final_set\\Experimental2\\33elem\\"
defect_names = [name for name in os.listdir(defect_image_store) if name.endswith(".bmp")]

no_defect_image_store = "C:\\Users\\micha\\OneDrive\\Documents\\UniWork\\Project\\Dataset\\New_simulated\\Final_set\\Experimental_defect_free\\33elem\\"
no_defect_names = [name for name in os.listdir(no_defect_image_store) if name.endswith(".bmp")]
defect_names = defect_names[0:33] + defect_names[41:46]

In [12]:
def append_list_as_row(file_name, list_of_elem):
    # Open file in append mode
    with open(file_name, 'a+', newline='') as write_obj:
        # Create a writer object from csv module
        csv_writer = writer(write_obj)
        # Add contents of list as last row in the csv file
        csv_writer.writerow(list_of_elem)

In [98]:
thresholds = [0.1,0.5,2,3,4,5,6,7,8,9,10]
b_threshs = [1,2,3,5,7,9,15]

In [34]:
model = torch.hub.load('C:\\Users\\micha\\OneDrive\\Documents\\UniWork\\Project\\Python work\\YOLO\\YOLO_Kalman_Implementation\\yolov5', 'custom', path='C:\\Users\\micha\\OneDrive\\Documents\\UniWork\\Project\\Python work\\YOLO\\Models\\Final_model.pt', source='local')
model.conf = 0.55#0.5

YOLOv5  2022-6-20 Python-3.8.13 torch-1.11.0+cpu CPU

Fusing layers... 
Model summary: 213 layers, 7012822 parameters, 0 gradients
Adding AutoShape... 


In [37]:
#Optimum parameters
thresholds = [6]
b_threshs = [15]

Testing model on sequences of images, code can also be used to perform gridsearch if required

In [38]:
for threshold in thresholds:
    for b_thresh in b_threshs:
        tp_count = 0
        fp_count = 0
        required_frames = 0
        true_box_count = 0
        false_box_count = 0
        model.conf = 0.55
        model.iou = 0.45
        kalman_thresh = threshold
        box_thresh = b_thresh
        for defect_name in defect_names:
            imgs = []
            preds = []
            for i in range(9,34):

                img = Image.open((defect_image_store.replace('33',str(i)) + defect_name.replace('33elem',(str(i)+'elem'))))

                imgs.append(img)
                
            results = model(imgs, size=600)


            test_full,image_number,defective = kalman(results,kalman_thresh,box_thresh)

            display_image = np.array(imgs[image_number - 1])
            
            if defective == True:
                tp_count = tp_count + 1
                required_frames = required_frames + image_number
                defects_kalman = test_full.loc[((abs(test_full['xcentre_new']-test_full['x_est'])+abs(test_full['ycentre_new']-test_full['y_est']))< box_thresh) & ((test_full['x_e_est'] + test_full['y_e_est']) < kalman_thresh)]
                defects_options = results.pandas().xywh[image_number-1]
                defects = defects_options.loc[defects_options['xcenter'].isin(defects_kalman['xcentre_new'])]
                for index,row in defects.iterrows():
                    start_x = round(row['xcenter'] - (row['width']/2))
                    end_x = round(row['xcenter'] + (row['width']/2))
                    start_y = round(row['ycenter'] - (row['height']/2))
                    end_y = round(row['ycenter'] + (row['height']/2))
                    display_image = cv2.rectangle(display_image,(start_x,start_y),(end_x,end_y),(0,255,0),2)
                    with open(defect_image_store.replace('33elem','labels') + defect_name.replace('bmp','txt')) as f:
                        lines = f.readlines()
                        for line in lines:

                            boxes = np.fromstring(line, dtype=float, sep=' ')
                            start_x_2 =  round(600*(boxes[1]-(boxes[3]/2)))
                            start_y_2 = round(249*(boxes[2]-(boxes[4]/2)))
                            end_x_2 = round(600*(boxes[1]+(boxes[3]/2)))
                            end_y_2 = round(249*(boxes[2]+(boxes[4]/2)))
                            display_image = cv2.rectangle(display_image,(start_x_2,start_y_2),(end_x_2,end_y_2),(255,0,0),2)

                    overlapping = isoverlap(start_x,end_x,start_y,end_y,start_x_2,end_x_2,start_y_2,end_y_2)
                    if overlapping == True:
                        true_box_count = true_box_count + 1
                    elif overlapping == False:
                        false_box_count = false_box_count + 1
            plt.imshow(display_image)
            plt.title('Result after' + str(image_number) + 'Images')
            plt.savefig('C:\\Users\\micha\\OneDrive\\Documents\\UniWork\\Project\\Dataset\\New_simulated\\Final_set\\Experimental\\output\\' + defect_name + '.png')

        for no_defect_name in no_defect_names[0:39]:
            imgs = []
            for i in range(9,34):
            #for i in [5,9,17,33]:
                img = Image.open((no_defect_image_store.replace('33',str(i)) + no_defect_name.replace('33elem',(str(i)+'elem'))))
                #img = Image.open((image_store + name.replace('33elem',(str(i)+'elem'))))
                imgs.append(img)

            results = model(imgs, size=600)

            test_full,image_number,defective = kalman(results,kalman_thresh,box_thresh)
            if defective == True:
                fp_count = fp_count + 1

        try:
            print((tp_count/38),true_box_count,false_box_count,(fp_count/40))
            print(required_frames/tp_count)
            append_list_as_row('GridSearch_Output_best.csv', [(tp_count/39),true_box_count,false_box_count,(fp_count/40),(required_frames/tp_count),threshold,0.55,0.45,b_thresh])   
        except ZeroDivisionError:
             pass


0.8421052631578947 28 4 0.425
21.125


In [40]:
print((tp_count/38),true_box_count,false_box_count,(fp_count/40))
print((required_frames/tp_count)+9)

0.8421052631578947 28 4 0.425
30.125


In [None]:
test_full,image_number,defective = kalman(results,kalman_thresh,box_thresh)