In [111]:
import matplotlib, cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from sklearn.cluster import KMeans, MiniBatchKMeans, DBSCAN
from sklearn.metrics import silhouette_score
import math
import os
import itertools
import random
import time
%matplotlib inline

In [112]:
task_2_train = "Task2Dataset/Training/png/"
task_2_test = "Task2Dataset/TestWithoutRotations/images/"
task_2_test_annotations = "Task2Dataset/TestWithoutRotations/annotations/"

task_3_test_annotations = "Task3AdditionalTestDataset/annotations/"
task_3_test = "Task3AdditionalTestDataset/images/"

In [125]:
#refactored to class structure to simplify parameter tuning and testing of different matching algorithms, outlier rejection etc
class SIFT:
    #c_thresh=0.09 to get 0.03 used by lowe as used in paper
    def __init__(self, n_octaves=3, n_feat=0, c_thresh=0.09, e_thresh=10, lowe_thresh=0.7, ransac_thresh=5.0, sigma=1.6, k=2, min_matches=10, train_folder="", test_folder="", test_annotation_folder="", matcher=cv2.FlannBasedMatcher()):
        """
        Initialise parameters, by default set to OpenCV default values.
        Allows easy testing of parameters by simply calling SIFT(n_octaves=5), for example
        """
        self.n_octaves=n_octaves #num octaves used in computing DoG pyramid
        self.n_feat=n_feat #num features to retain when getting keypoints/descriptors
        self.c_thresh=c_thresh #contrast threshold used in SIFT feature detection
        self.e_thresh=e_thresh #edge threshold in SIFT feature detection
        self.lowe_thresh=lowe_thresh #threshold for lowe's ratio test
        self.ransac_thresh=ransac_thresh #threshold used in RANSAC outlier rejection
        self.sigma=sigma #dictates level of gaussian blur in DoG pyramid
        self.k=k #number of nearest neighbours obtained in knnmatch
        self.min_matches=min_matches #number of matches required to calculate homography - kinda redundant
        self.train_folder=train_folder
        self.test_folder=test_folder
        self.test_annotation_folder=test_annotation_folder
        self.train_pts = {} # store keypoints of training images
        self.train_desc = {} # store descriptors of training images
        self.test_points = {}
        self.test_desc = {}
        self.train_feat_count = {} #store number of features found for training image for scoring mechanism
        self.matcher = matcher #FlannBasedMatcher or BFMatcher
        self.matches = [] #store retained matches
        self.homography_matrix = None #store homography matrix
        self.inlier_mask = "" #store inlier mask
        self.test_annotations = {} #store annotations for test images
        self.num_train_images=0
        self.sift = cv2.SIFT_create(self.n_feat, self.n_octaves, self.c_thresh, self.e_thresh, self.sigma)
        self.cluster_desc = []

    def get_label(self, file):
        """
        Helper function to get label from filename
        i.e. 001-this-is-the-label.png returns this-is-the-label
        """
        return file.split("-", 1)[1].split(".")[0]
    
    def get_file_name(self, file):
        """
        Helper function to get full file name barring extension
        i.e. 001-name.png returns 001-name
        """
        return file.split(".")[0]
        
    def get_sift_train_features(self):
        """
        Gets SIFT keypoints and descriptors for all images in train folder
        Stored under dictionaries for easy access using image name
        """
        
        for train_image in os.listdir(self.train_folder):
            image = cv2.imread(os.path.join(self.train_folder, train_image), 0)
            #image = cv2.medianBlur(image, 5)
            pt, desc = self.sift.detectAndCompute(image, None)
            self.train_pts[self.get_file_name(train_image)] = pt
            self.train_desc[self.get_file_name(train_image)] = desc
            self.train_feat_count[self.get_file_name(train_image)] = len(pt)
            self.num_train_images+=1

    def get_sift_test_features(self):
        for test_image in os.listdir(self.test_folder):
            image = cv2.imread(os.path.join(self.test_folder, test_image), 0)
            #image = cv2.medianBlur(image, 5)

            pt, desc = self.sift.detectAndCompute(image, None)
            test_image = self.get_file_name(test_image)
            self.test_points[test_image] = pt
            self.test_desc[test_image] = desc

    def get_annotations(self):
        """
        Get annotations for each test image
        """
        for annotation_file in os.listdir(self.test_annotation_folder):
            
            image_name = self.get_file_name(annotation_file)
            annotation_file = open(os.path.join(self.test_annotation_folder, annotation_file), "r")
            
            annotation_lines = annotation_file.readlines()
            image_annot = []
            
            for line in annotation_lines:
                image_annot.append(line.split(",")[0])
            
            self.test_annotations[image_name] = image_annot

    def is_bound_box(self,vertices):
        """
        Additional checks for bounding boxes
        """
        if np.any(vertices < 0) or np.any(vertices > 512):
            return False
        distances =[]
        for i in range(4):
            j = (i+1)%4
            distances.append(math.sqrt((vertices[i][0][0] - vertices[j][0][0])**2 + (vertices[i][0][1] - vertices[j][0][1])**2))
        
        if np.abs(distances[0] - distances[2]) > 0.8 and np.abs(distances[1] - distances[3]) > 0.8:
            return False
        
        a = [vertices[1][0][0] - vertices[0][0][0],vertices[1][0][1] - vertices[0][0][1] ]
        b = [ vertices[2][0][0] - vertices[1][0][0],vertices[2][0][1] - vertices[1][0][1]]
        dot_product = np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b))
        
        if np.abs(dot_product) > 0.1:
            return False
        return True

    def draw_boxes(self, train_image_name, test_image_name, homography_method, good_matches):
        """
        Draw bounding boxes given train and test images
        """
        assert self.train_desc, "Run SIFT.get_sift_train_features()"
        assert self.test_desc, "Run SIFT.get_sift_test_features()"
        canvas = cv2.imread(f"./{test_image_name}_boxed.png")
        train_image = cv2.imread(os.path.join(self.train_folder, train_image_name + ".png"))
        h, w = train_image.shape[:2]
        
        src_pts = np.float32([self.train_pts[self.get_file_name(train_image_name)][m[0].queryIdx].pt for m in good_matches]).reshape(-1,1,2)        
        dst_pts = np.float32([self.test_points[test_image_name][m[0].trainIdx].pt for m in good_matches]).reshape(-1,1,2)
        
        if len(src_pts) < 4 or len(dst_pts) < 4:
            return
        M, mask = cv2.findHomography(src_pts, dst_pts, homography_method, self.ransac_thresh)
        if M is None:
            return
        
        pts = np.float32([ [0,0],[0,h-1],[w-1,h-1],[w-1,0] ]).reshape(-1,1,2)
        dst = cv2.perspectiveTransform(pts, M)
        
        if not self.is_bound_box(dst):
            return

        canvas = cv2.polylines(canvas, [np.int32(dst)], True, (0,0,255), 2, cv2.LINE_AA)
        centroid = np.int32(dst.mean(axis=0)[0])
        centroid[0]-=55
        centroid[1]-=37
        centroid = tuple(centroid)

        cv2.putText(canvas, self.get_label(train_image_name), centroid, cv2.FONT_HERSHEY_COMPLEX, 0.4, (255,0,0), 1, cv2.LINE_AA )
        cv2.imwrite(f"./{test_image_name}_boxed.png", canvas)


    def sift_matches_lowes(self, train_image_name, test_image_name):
        """
        Match SIFT features, using Lowe's Ratio Test as a means of outlier rejection
        """

        #match features
        matches = self.matcher.knnMatch(self.train_desc[self.get_file_name(train_image_name)], self.test_desc[test_image_name], self.k)

        #lowe's ratio test
        for one, two in matches:
            if one.distance < self.lowe_thresh * two.distance:
                self.matches.append([one])
        self.draw_boxes(train_image_name, test_image_name, 0, self.matches)
    

    def sift_matches_ransac(self, train_image_name, test_image_name):
        """
        Match SIFT features, obtain homography matrix and use RANSAC/inlier mask for outlier rejection
        """

        #match features
        matches = self.matcher.knnMatch(self.train_desc[self.get_file_name(train_image_name)], self.test_desc[test_image_name], self.k)
        if len(matches) > self.min_matches:
            #if sufficient matches to compute homography, calculate source/dest points
            src_pts = np.float32([self.train_pts[self.get_file_name(train_image_name)][m[0].queryIdx].pt for m in matches]).reshape(-1, 1, 2)
            dst_pts = np.float32([self.test_points[test_image_name][m[0].trainIdx].pt for m in matches]).reshape(-1, 1, 2)
            
            #get homography matrix + mask
            H, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC, self.ransac_thresh)
            self.homography_matrix = H
            inlier_mask = mask.ravel().tolist()
            #extract inlier matches
            for i in range(len(matches)):
                if inlier_mask[i]:
                    self.matches.append(matches[i])
        else:
            #cannot compute homography
            self.matches=[]

        self.draw_boxes(train_image_name, test_image_name, 0, self.matches)


    def count_features(self, image_path):
        """
        Count number of features in an image based on number of outlines (contours) produced
        """
        image = cv2.medianBlur(cv2.imread(image_path, 0), 25)
        null, threshold_image = cv2.threshold(image, 250, 255, cv2.THRESH_BINARY_INV)
        outlines, null = cv2.findContours(threshold_image, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        return len(outlines)
    
    def compare_scores_annot(self, scores, test_image):
        """
        Compares annotations file with identified matches
        """

        true_pos, false_pos, true_neg, false_neg = 0, 0, 0, 0
        print("----------------------------------------------------")
        print(f"Correct labels: {self.test_annotations[test_image]}")
        print(f"Our labels: {scores}")
        for train_image in scores.keys():

            if self.get_label(train_image) in self.test_annotations[test_image]:
                true_pos+=1
            else:
                false_pos+=1
                false_neg+=1
        total = true_pos+false_pos
        print(f"{test_image}: {true_pos}/{(true_pos+false_pos)}")
        print("----------------------------------------------------")

        true_neg = self.num_train_images - false_neg - true_pos - false_pos

        return true_pos, false_pos, true_neg, false_neg
    
    def eval(self, lowes=False, normalise=False):
        """
        Get score for a specific test image
        
        Normalise == False: score is purely a ranking of the number of matches between train images and a test image
        Normalise == True: normalise number of matches by number of features extracted from train image
        
        If lowes=false, use ransac. Vice versa.
        """
        tp, fp, tn, fn = 0, 0, 0, 0
        for test_image in os.listdir(self.test_folder):
            scores = {}
            canvas = cv2.imread(os.path.join(self.test_folder, test_image))
            cv2.imwrite(f"./{self.get_file_name(test_image)}_boxed.png", canvas)
            for image, desc in self.train_desc.items():
                self.matches = []

                if lowes:
                    self.sift_matches_lowes(image, self.get_file_name(test_image))
                else:
                    self.sift_matches_ransac(image, self.get_file_name(test_image))

                if normalise and self.train_feat_count[image]:
                    
                    scores[self.get_file_name(image)] = len(self.matches) / self.train_feat_count[image]
                else:
                    scores[self.get_file_name(image)] = len(self.matches)

            scores = dict(sorted(scores.items(), key=lambda x: x[1], reverse=True))
            scores = dict(itertools.islice(scores.items(), self.count_features(os.path.join(self.test_folder, test_image))))
            # [print(key) for key in scores.keys()]
            # [self.draw_boxes(train_image, self.get_file_name(test_image), 0, self.matches) for train_image in scores.keys()]
            test_image = self.get_file_name(test_image)
            true_pos, false_pos, true_neg, false_neg = self.compare_scores_annot(scores, test_image)
            tp+=true_pos
            tn+=true_neg
            fp+=false_pos
            fn+=false_neg
            
        acc = ((tp+tn)/(tp+tn+fp+fn)) # accuracy
        tpr = (tp)/(tp+fn) # true positive rate
        fpr = (fp)/(fp+tn) # false positive rate
        return acc, tpr, fpr


## Example Usage



In [None]:
sift = SIFT(train_folder=<pass_train_folder_here>, 
            test_folder=<pass_test_image_folder>, 
            test_annotation_folder=<pass_test_annotation_folder>, 
            matcher=cv2.BFMatcher_create(), #vary matcher subject to desired use
            <pass_parameters_here> # see init for parameter customisation
            )

sift.get_annotations()
sift.get_sift_train_features()
sift.get_sift_test_features()
sift.eval(normalise=True, lowes=True) # see eval() for information on customisation

Within draw_bound_boxes(), comment out is_bound_box check for increased accuracy


### This will output results in the following format
---------------------------------------------------
Correct labels: ['barn', 'hospital', 'university', 'hotel']
Our labels: {'011-trash': 0.3194444444444444, '036-hotel': 0.12727272727272726, '015-barn': 0.10714285714285714, '048-hospital': 0.06779661016949153}
test_image_1: 3/4
----------------------------------------------------

## Bounding box images will be output in the root directory, under test_image_x_boxed.png

In [126]:
sift = SIFT(train_folder=task_2_train, 
            test_folder=task_3_test, 
            test_annotation_folder=task_3_test_annotations, 
            matcher=cv2.BFMatcher_create(),
            #e_thresh=6,
            lowe_thresh=0.4,)
            #n_octaves=4)

sift.get_annotations()
sift.get_sift_train_features()
sift.get_sift_test_features()
print(sift.eval(normalise=True, lowes=True))


  dot_product = np.dot(a,b)/(np.linalg.norm(a)*np.linalg.norm(b))


----------------------------------------------------
Correct labels: ['barn', 'hospital', 'university', 'hotel']
Our labels: {'011-trash': 0.3194444444444444, '036-hotel': 0.12727272727272726, '015-barn': 0.10714285714285714, '048-hospital': 0.06779661016949153}
test_image_1: 3/4
----------------------------------------------------
----------------------------------------------------
Correct labels: ['post-office', 'police', 'fire-station', 'lighthouse']
Our labels: {'046-fire-station': 0.2962962962962963, '001-lighthouse': 0.27906976744186046, '037-post-office': 0.27906976744186046, '035-police': 0.18333333333333332}
test_image_10: 4/4
----------------------------------------------------
----------------------------------------------------
Correct labels: ['supermarket', 'bus-stop', 'church', 'bus']
Our labels: {'007-supermarket': 0.3137254901960784, '006-church': 0.2, '012-bus': 0.0967741935483871, '048-hospital': 0.06779661016949153}
test_image_11: 3/4
------------------------------