In [1]:
import operator

%run sim_metrics.ipynb

In [2]:
class kNN:
    def __init__(self, X_train, Y_train, k=3, sim=manhattan, weighted=False):
        self.X_train = X_train
        self.Y_train = Y_train

        if k <= len(self.X_train):
            self.k = k  # set the k value for neighbourhood size
        else:
            # to ensure the get_neighbours dont crash
            self.k = len(self.X_train)

        # specify a sim metric that has been pre-defined e.g. manhattan or euclidean
        self.similarity = sim

        # boolean to choose between weighted / unweighted majority voting
        self.weighted = weighted

        # store results from testing
        self.results = []

    # With k-NN, we are interested in finding the k number of points with the greatest similarity
    # to the the query or test instance.
    def get_neighbours(self, test_instance):
        '''
        Locate most similar neighbours 
        X_train will be a containing features (Float) values (i.e. your training data)
        Y_train will be the corresponding class labels for each instance in X_train
        test_instance will be a List of Float values (i.e. a query instance)
        '''
        similarities = []  # collection to store the similarities to be computed

        # for each member of the training set
        for train_instance, y in zip(self.X_train, self.Y_train):
            # calculate the similarity to the test instance
            sim = self.similarity(test_instance, train_instance)

            # add the actual label of the example and the computed similarity to a collection
            similarities.append((y, sim))
        # print(distances)
        # sort the collection by decreasing similarity
        similarities.sort(key=operator.itemgetter(1), reverse=True)
        neighbours = []  # holds the k most similar neighbours
        for x in range(self.k):  # extract the k top indices of the collection for return
            neighbours.append(similarities[x])

        return neighbours

    # given the neighbours make a prediction
    # the boolean parameter when set to False will use unweighted majority voting; otherwise weighted majority voting
    # weighting can be helpful to break any ties in voting
    def predict(self, neighbours):
        '''
        Summarise a prediction based upon weighted neighbours calculation
        '''
        class_votes = {}
        for x in range(len(neighbours)):
            response = neighbours[x][0]
            if response in class_votes:
                # if not weighted simply add 1
                class_votes[response] += (1-self.weighted) + \
                    (self.weighted * neighbours[x][1])
                #class_votes[response] += [1, neighbours[x][1]][weighted == True]

            else:
                class_votes[response] = (
                    1-self.weighted) + (self.weighted * neighbours[x][1])
                #class_votes[response] = [1, neighbours[x][1]][weighted == True]

        # print(class_votes)
        sorted_votes = sorted(class_votes, key=lambda k: (
            class_votes[k], k), reverse=True)
        # print(sorted_votes)
        return sorted_votes[0]

    # iterate through all the test data to calculate accuracy
    def test(self, X_test, Y_test):
        self.results = []  # store the predictions returned by kNN

        count = 0
        
        print('Testing...')
        for test_instance, target_label in zip(X_test, Y_test):
            neighbours = self.get_neighbours(test_instance)
            predict_label = self.predict(neighbours)
            self.results.append([predict_label, target_label])

            count += 1
            if count % 200 == 0:
                print("Iteration#", count)

        print('Testing complete!')
        
    
    ## custom test function to convert X_test to the same dimensions as X_train -> for hybrid system
    def test_using_activations(self, X_test, Y_test, model):
        self.results = []  # store the predictions returned by kNN

        count = 0
        
        print('Testing...')
        for test_instance, target_label in zip(X_test, Y_test):
            encodings = model.encode(test_instance)
            neighbours = self.get_neighbours(encodings)
            predict_label = self.predict(neighbours)
            self.results.append([predict_label, target_label])

            count += 1
            if count % 200 == 0:
                print("Iteration#", count)

        print('Testing complete!')
        

    def evaluate(self, results):
        correct = 0
        for predict, target in results:
            if predict == target:
                correct += 1
        return (correct / float(len(results))) * 100