In [1]:
import cv2
import numpy as np
import scipy
import glob
import matplotlib.pyplot as plt

import imageio
from time import time, sleep


In [2]:

class Line_of_horizont_fitting:

    line=[]

    def inputNormalized(self, image, img_w, img_h):
        image=self.resize_image(image, img_w, img_h)
        #image=cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image=image/255.0
        return image

    def resize_image(self, image, img_w, img_h):
        image = cv2.resize(image,(img_w,img_h))
        return image

    def plot_binary_image(self, image):
        image=image*255
        binary_img = np.squeeze(image, axis=2)
        plt.imshow(binary_img)
        plt.show()

    def get_binary_image(self, image, treshold):
        image = cv2.threshold(image,treshold,1,cv2.THRESH_BINARY)
        return image[1]

    def binary_edge_detection(self, image):
        edges = image - scipy.ndimage.morphology.binary_dilation(image)
        return edges

    def median_blur(self, img, kernel_size):
        """Applies a Gaussian Noise kernel"""
        return cv2.blur(img, (kernel_size, kernel_size))

    def hough_lines(self, img, rho, theta, threshold, min_line_len, max_line_gap):  
        lines = cv2.HoughLinesP(img, rho, theta, threshold, np.array([]), minLineLength=min_line_len, maxLineGap=max_line_gap)
        return lines

    def binary2gray(self, image):
        image = np.uint8(255 * image)
        return image

    def Collect_points(self, lines):

        # interpolation & collecting points for RANSAC
        points=[]
        for line in lines:
            new_point = np.array([[int(line[0]),int(line[1])]])
            points.append(new_point)
            new_point = np.array([[int(line[2]),int(line[3])]])
            points.append(new_point)

        return points

    def smoothing(self, lines, pre_frame=10):
        # collect frames & print average line
        lines = np.squeeze(lines)
        avg_line = np.array([0.0,0.0,0.0,0.0])

        for ii,line in enumerate(reversed(lines)):
            if ii == pre_frame:
                break
            avg_line += line
        avg_line = avg_line / pre_frame

        return avg_line

    def getLineImage(self, image,label,fit_line, width, height):
        height,width,_=image.shape
        imageOUT = cv2.bitwise_or(image,label)
        cv2.line(imageOUT, (int(fit_line[2]-fit_line[0]*width), int(fit_line[3]-fit_line[1]*width)), (int(fit_line[2]+fit_line[0]*width), int(fit_line[3]+fit_line[1]*width)), (255, 0, 255), 12)

    def predict_segmentation(self, image, model):
        predict = model.predict(image[None,...])

        return predict[0]
    
    def horizont_line_from_binary_image(self, image, average_n_frame=1, rho = 2, theta = np.pi/180, threshold = 20, min_line_length = 20, 
                                        max_line_gap = 5):
        output = self.binary_edge_detection(image)
        output = self.binary2gray(output)

        rho = rho# distance resolution in pixels of the Hough grid
        theta = theta # angular resolution in radians of the Hough grid
        threshold = threshold     # minimum number of votes (intersections in Hough grid cell)
        min_line_length = min_line_length #minimum number f pixels making up a line
        max_line_gap = 5    # maximum gap in pixels between connectable line segments
        lines=self.hough_lines(output, rho, theta, threshold, min_line_length, max_line_gap)
        line_arr = np.squeeze(lines)
        points = self.Collect_points(line_arr)

        if(len(points)<2):
            points= line_arr.reshape(lines.shape[0]*2,2)

        if(len(points)>2):
            fit_line = cv2.fitLine(np.float32(points), cv2.DIST_HUBER, 1, 0.001, 0.001)
            #self.line.append(fit_line)

            #if len(self.line) > 10:
            #    fit_line = self.smoothing(self.line, average_n_frame)

        return fit_line
        

    def horizont_line_pipeline(self, image, model, img_w, img_h, average_n_frame, kernel_median_blur=50, predict_treshold=0.5,
                               rho = 2, theta = np.pi/180, threshold = 20, min_line_length = 20, max_line_gap = 5):
        or_image=image

        or_height, or_width, or_depth = or_image.shape
        image=self.inputNormalized(or_image,img_w,img_h)
        predict_segmentation = self.predict_segmentation(image,model)
        predict = self.median_blur(predict_segmentation,kernel_median_blur)
        predict = self.get_binary_image(predict,predict_treshold)
        predict = self.resize_image(predict, or_width, or_height)
        output = self.binary_edge_detection(predict)
        output = self.binary2gray(output)

        rho = rho# distance resolution in pixels of the Hough grid
        theta = theta # angular resolution in radians of the Hough grid
        threshold = threshold     # minimum number of votes (intersections in Hough grid cell)
        min_line_length = min_line_length #minimum number f pixels making up a line
        max_line_gap = 5    # maximum gap in pixels between connectable line segments
        lines=self.hough_lines(output, rho, theta, threshold, min_line_length, max_line_gap)
        line_arr = np.squeeze(lines)
        points = self.Collect_points(line_arr)

        if(len(points)<2):
            points= line_arr.reshape(lines.shape[0]*2,2)

        if(len(points)>2):
            fit_line = cv2.fitLine(np.float32(points), cv2.DIST_HUBER, 1, 0.001, 0.001)
            #self.line.append(fit_line)

            #if len(self.line) > 10:
            #    fit_line = self.smoothing(self.line, average_n_frame)

        return fit_line, self.resize_image(predict_segmentation, or_width, or_height)
    
    def horizont_line_pipeline_verbose(self, image, model, img_w, img_h, average_n_frame, kernel_median_blur=50, predict_treshold=0.5, rho = 2, theta = np.pi/180, threshold = 20, min_line_length = 20, max_line_gap = 5):
        or_image=image

        or_height, or_width, or_depth = or_image.shape
        image=self.inputNormalized(or_image,img_w,img_h)
        predict_segmentation = self.predict_segmentation(image,model)
        predict = self.median_blur(predict_segmentation,kernel_median_blur)
        predict = self.get_binary_image(predict,predict_treshold)
        predict = self.resize_image(predict, or_width, or_height)
        output = self.binary_edge_detection(predict)
        output = self.binary2gray(output)

        rho = rho# distance resolution in pixels of the Hough grid
        theta = theta # angular resolution in radians of the Hough grid
        threshold = threshold     # minimum number of votes (intersections in Hough grid cell)
        min_line_length = min_line_length #minimum number f pixels making up a line
        max_line_gap = 5    # maximum gap in pixels between connectable line segments
        lines=self.hough_lines(output, rho, theta, threshold, min_line_length, max_line_gap)
        line_arr = np.squeeze(lines)
        points = self.Collect_points(line_arr)

        if(len(points)<2):
            points= line_arr.reshape(lines.shape[0]*2,2)

        if(len(points)>2):
            fit_line = cv2.fitLine(np.float32(points), cv2.DIST_HUBER, 1, 0.001, 0.001)
            #self.line.append(fit_line)

            #if len(self.line) > 10:
            #    fit_line = self.smoothing(self.line, average_n_frame)
        
        pred_visual = predict_segmentation*255
        pred_visual= np.uint8(np.concatenate((predict_segmentation,predict_segmentation,pred_visual),axis=2))
        return fit_line, self.resize_image(predict_segmentation, or_width, or_height), self.resize_image(or_image, img_w, img_h), self.resize_image(pred_visual, img_w, img_h)

In [3]:

class Utils:
    @staticmethod
    def median_accuracy_line_of_horizont(x, y, model, inp_w, inp_h, steps=1, visualization=False):
        avg_distance=[]
        max_distance=[]
        
        line_of_horizont=Line_of_horizont_fitting()
        for label, img in zip(y, x):

            image = np.uint8(255 * img)
            
            label_med=line_of_horizont.median_blur(label,5)
            label_med=line_of_horizont.get_binary_image(label_med, 0.5)
            fit_line= line_of_horizont.horizont_line_from_binary_image(label_med)

            label=line_of_horizont.get_binary_image(label, 0.5)
            height,width=label.shape
            label_line = np.zeros([height,width], dtype = "uint8")
            cv2.line(label_line, (int(fit_line[2]-fit_line[0]*width), 
                                             int(fit_line[3]-fit_line[1]*width)), 
                     (int(fit_line[2]+fit_line[0]*width), 
                      int(fit_line[3]+fit_line[1]*width)), (255, 255, 255), 1)

            image_pred = line_of_horizont.resize_image(img, inp_w, inp_h)
            pred=line_of_horizont.predict_segmentation(image_pred, model)
            pred=line_of_horizont.get_binary_image(pred, 0.5)
            pred=line_of_horizont.resize_image(pred, width, height)

            fit_line, predict=line_of_horizont.horizont_line_pipeline(image, model, inp_w, inp_h, steps, 5)

            pred_line = np.zeros([height,width], dtype = "uint8")
            cv2.line(pred_line, (int(fit_line[2]-fit_line[0]*width), 
                                             int(fit_line[3]-fit_line[1]*width)), 
                     (int(fit_line[2]+fit_line[0]*width), 
                      int(fit_line[3]+fit_line[1]*width)), (255, 255, 255), 1)

            distance=[]
            for j in range (width):
                for i in range (height):
                    if(label_line[i,j]==255):
                        y1=i
                    if(pred_line[i,j]==255):
                        y2=i
                distance.append(abs(y1-y2))

            avg_y= int((y1+y2)/2)
            
            avg_distance.append(np.mean(distance)/width)
            max_distance.append(max(distance))
            
            if(visualization):
                print("avg_distance: ", (np.mean(distance)/width)," - max_distance: ", (max(distance)))
                plt.imshow(label_line)
                plt.show()
                plt.imshow(pred_line)
                plt.show()
        
        return avg_distance, max_distance
    
    
    @staticmethod
    def accuracy_on_line_of_horizont_area(x, y, model, inp_w, inp_h, steps=1, visualization=False):
        recall_list=[]
        precision_list=[]
        specificity_list=[]
        accuracy_list=[]
        f1score_list=[]

        line_of_horizont=Line_of_horizont_fitting()
        for label, img in zip(y, x):

            image = np.uint8(255 * img)
            label=line_of_horizont.get_binary_image(label, 0.5)
            height,width=label.shape

            image_pred = line_of_horizont.resize_image(img, inp_w, inp_h)
            pred=line_of_horizont.predict_segmentation(image_pred, model)
            pred=line_of_horizont.get_binary_image(pred, 0.5)
            pred=line_of_horizont.resize_image(pred, width, height)

            fit_line, predict=line_of_horizont.horizont_line_pipeline(image, model, inp_w, inp_h, steps)

            line_annotation_image = np.zeros([height,width], dtype = "uint8")
            cv2.line(line_annotation_image, (int(fit_line[2]-fit_line[0]*width), 
                                             int(fit_line[3]-fit_line[1]*width)), 
                     (int(fit_line[2]+fit_line[0]*width), 
                      int(fit_line[3]+fit_line[1]*width)), (255, 255, 255), 1)

            for i in range (height):
                if(label[i,0]==1):
                    y1=i
                    break

            for i in range (height):
                if(label[i,width-1]==1):
                    y2=i
                    break

            avg_y= int((y1+y2)/2)

            annotation_image = label[avg_y-100:avg_y+100, 0:width]
            pred_image = pred[avg_y-100:avg_y+100, 0:width]

            label=annotation_image
            pred=pred_image

            True_neg=len(np.where((label==0)&(pred==0))[0])
            False_neg=len(np.where((label==1)&(pred==0))[0])
            True_pos=len(np.where((label==1)&(pred==1))[0])
            False_pos=len(np.where((label==0)&(pred==1))[0])
            precision=True_pos/(True_pos+False_pos)
            recall=True_pos/(True_pos+False_neg)
            specificity=1-(True_neg/(True_neg+False_pos))
            accuracy=(True_pos+True_neg)/(True_pos+True_neg+False_pos+False_neg)
            f1score=2*((precision*recall)/(precision+recall))

            recall_list.append(recall)
            precision_list.append(precision)
            specificity_list.append(specificity)
            accuracy_list.append(accuracy)
            f1score_list.append(f1score)

            if(visualization):
                print("Recall: ", recall," - Precision: ", precision, " - Specificity: ", specificity, " - Accuracy: ", 
                      accuracy, " - F1score: ", f1score)
                plt.imshow(label)
                plt.show()
                plt.imshow(pred)
                plt.show()

        return recall_list, precision_list, specificity_list, accuracy_list, f1score_list
    
    
    @staticmethod
    def accuracy_on_images(x, y, model, inp_w, inp_h, steps=1, visualization=False):
        recall_list=[]
        precision_list=[]
        specificity_list=[]
        accuracy_list=[]
        f1score_list=[]
        
        line_of_horizont=Line_of_horizont_fitting()
        for label, img in zip(y, x):

            label=line_of_horizont.get_binary_image(label, 0.5)
            height,width=label.shape
            image_pred = line_of_horizont.resize_image(img, 160, 160)
            pred=line_of_horizont.predict_segmentation(image_pred, model)
            pred=line_of_horizont.get_binary_image(pred, 0.5)
            pred=line_of_horizont.resize_image(pred, width, height)

            True_neg=len(np.where((label==0)&(pred==0))[0])
            False_neg=len(np.where((label==1)&(pred==0))[0])
            True_pos=len(np.where((label==1)&(pred==1))[0])
            False_pos=len(np.where((label==0)&(pred==1))[0])
            precision=True_pos/(True_pos+False_pos)
            recall=True_pos/(True_pos+False_neg)
            specificity=1-(True_neg/(True_neg+False_pos))
            accuracy=(True_pos+True_neg)/(True_pos+True_neg+False_pos+False_neg)
            f1score=2*((precision*recall)/(precision+recall))
            
            recall_list.append(recall)
            precision_list.append(precision)
            specificity_list.append(specificity)
            accuracy_list.append(accuracy)
            f1score_list.append(f1score)
            
            if(visualization):
                print("Recall: ", recall," - Precision: ", precision, " - Specificity: ", specificity, " - Accuracy: ", 
                      accuracy, " - F1score: ", f1score)
                plt.imshow(label)
                plt.show()
                plt.imshow(pred)
                plt.show()
            
        return recall_list, precision_list, specificity_list, accuracy_list, f1score_list
            
    
    
    @staticmethod
    def test_speed_from_video(filename, model, inp_w, inp_h, n_iteration, steps=1):
        lineofhorizont = Line_of_horizont_fitting()
        reader = imageio.get_reader(filename,  'ffmpeg')
        fps = reader.get_meta_data()['fps']
        n_steps=0
        frame_to_discard = 10
        now=time()
        for i in range(n_iteration):
            if i == frame_to_discard:
                start_time=now
            if i > frame_to_discard:
                #n_steps+=1
                elapsed_time = now - start_time
                #print(n_steps, elapsed_time)
            or_image=reader.get_data(i)
            or_height, or_width, or_depth = or_image.shape

            fit_line, predict=lineofhorizont.horizont_line_pipeline(or_image, model, inp_w, inp_h, steps)

            predict = predict.reshape(or_height,or_width,1)
            predict1 = predict*255
            predict= np.uint8(np.concatenate((predict,predict,predict1),axis=2))
            imageOUT = cv2.bitwise_or(or_image,predict)
            imageOUT=cv2.line(imageOUT, (int(fit_line[2]-fit_line[0]*or_height), int(fit_line[3]-fit_line[1]*or_width)), 
                              (int(fit_line[2]+fit_line[0]*or_height), int(fit_line[3]+fit_line[1]*or_width)), 
                              (255, 0, 255), 5)
            now=time()
            
        reader.close()
        return ((i-frame_to_discard))/elapsed_time
    
    @staticmethod
    def test_speed_from_video_v2(reader, model, inp_w, inp_h, n_iteration, steps=1):
        n_steps=0
        frame_to_discard = 10
        now=time()
        lineofhorizont = Line_of_horizont_fitting()
        for i in range(n_iteration):
            if i == frame_to_discard:
                start_time=now
            if i > frame_to_discard:
                #n_steps+=1
                elapsed_time = now - start_time
                #print(n_steps, elapsed_time)
            or_image=reader.get_data(i)
            or_height, or_width, or_depth = or_image.shape

            fit_line, predict=lineofhorizont.horizont_line_pipeline(or_image, model, inp_w, inp_h, steps)

            predict = predict.reshape(or_height,or_width,1)
            predict1 = predict*255
            predict= np.uint8(np.concatenate((predict,predict,predict1),axis=2))
            imageOUT = cv2.bitwise_or(or_image,predict)
            imageOUT=cv2.line(imageOUT, (int(fit_line[2]-fit_line[0]*or_height), int(fit_line[3]-fit_line[1]*or_width)), 
                              (int(fit_line[2]+fit_line[0]*or_height), int(fit_line[3]+fit_line[1]*or_width)), 
                              (255, 0, 255), 5)
            now=time()

        return ((i-frame_to_discard))/elapsed_time
    
    @staticmethod
    def test_from_video(filename, model, inp_w, inp_h, n_iteration, steps=1):
        lineofhorizont = Line_of_horizont_fitting()
        reader = imageio.get_reader(filename,  'ffmpeg')
        fps = reader.get_meta_data()['fps']
        now=time()
        start_time=now
        for i in range(n_iteration):
            elapsed_time = now - start_time
            print(i, elapsed_time)
            or_image=reader.get_data(i)
            plt.imshow(or_image)
            plt.show()
            or_height, or_width, or_depth = or_image.shape

            fit_line, predict=lineofhorizont.horizont_line_pipeline(or_image, model, inp_w, inp_h, steps)

            predict = predict.reshape(or_height,or_width,1)
            predict1 = predict*255
            predict= np.uint8(np.concatenate((predict,predict,predict1),axis=2))
            imageOUT = cv2.bitwise_or(or_image,predict)
            imageOUT=cv2.line(imageOUT, (int(fit_line[2]-fit_line[0]*or_height), int(fit_line[3]-fit_line[1]*or_width)), 
                              (int(fit_line[2]+fit_line[0]*or_height), int(fit_line[3]+fit_line[1]*or_width)), 
                              (255, 0, 255), 5)
            now=time()
            plt.imshow(imageOUT)
            plt.show()
        reader.close()

    @staticmethod
    def test_from_folder(path, model, inp_w, inp_h, steps=1):
        lineofhorizont = Line_of_horizont_fitting()
        path_images=glob.glob(path)
        images=[]
        now=time()
        start_time=now
        for path_img in path_images:
            elapsed_time = now - start_time
            print(elapsed_time)
            or_image=cv2.imread(path_img)
            or_image=cv2.cvtColor(or_image, cv2.COLOR_BGR2RGB)
            or_height, or_width, or_depth = or_image.shape

            fit_line, predict, img_inp_or, pred_inp_or=lineofhorizont.horizont_line_pipeline_verbose(or_image, model, inp_w, inp_h, steps)

            predict = predict.reshape(or_height,or_width,1)
            predict1 = predict*255
            predict= np.uint8(np.concatenate((predict,predict,predict1),axis=2))
            imageOUT = cv2.bitwise_or(or_image,predict)
            
            #(x0-m*vx[0], y0-m*vy[0]), (x0+m*vx[0], y0+m*vy[0])
            
            W = or_width 
            H = or_height
            
            x0 = (int(fit_line[2]-(W*fit_line[0])))
            x1 = (int(fit_line[2]+(W*fit_line[0])))
            y0 = (int(fit_line[3]-(H*fit_line[1])))
            y1 = (int(fit_line[3]+(H*fit_line[1])))
            
            imageOUT=cv2.line(imageOUT, (x0,y0), (x1,y1), (255, 0, 255), 5)
            now=time()
        
            yield path_img, imageOUT, predict, img_inp_or, pred_inp_or
