In [1]:
from sklearn.datasets import load_digits
digits = load_digits()

i = int(digits.target.shape[0] / 100 * 65)
X_train, y_train = (digits.data[:i], digits.target[:i])
X_test, y_test = (digits.data[i:], digits.target[i:])
print(y_train.shape, y_test.shape)

(1168,) (629,)


In [2]:
import numpy as np
import bisect
from numpy import linalg as lg
from heapq import heappush, heappop, nlargest
from functools import partial

class BallTree:
    """Ball tree class"""

    # class initialization function
    def __init__(self, X, y):
        """Build tree structure
            
            Vars:
                X (array): array of points (features)
                y (array): array of labels or classes
        """
        self.X = np.asarray(X)
        self.y = np.asarray(y)

        self.child1 = None
        self.child2 = None
        
        # calculate centroid
        self.loc = X.mean(0)
        
        # calculate ball radius
        self.radius = np.max(self.dist(X, self.loc))

        if len(self.X) > 1:
            # sort on the dimension with the largest spread
            largest_dim = np.argmax(self.X.max(0) - self.X.min(0))
            i_sort = np.argsort(self.X[:, largest_dim])
            self.X = self.X[i_sort,:]
            self.y = self.y[i_sort]

            # find split point
            N = round(self.X.shape[0] / 2)
            self.split_point = 0.5 * (self.X[N, largest_dim] + self.X[N - 1, largest_dim])

            # recursively create subnodes
            self.child1 = BallTree(self.X[N:], self.y[N:])
            self.child2 = BallTree(self.X[:N], self.y[:N])
    
    def dist(self, a, b):
        """Calculate simplified euclidean distance
        
            Vars:
                a (array): first vector
                b (array): second vector
            
            Returns:
                Returns euclidean distance as per https://en.wikipedia.org/wiki/Euclidean_distance
        """
        a = np.asarray(a)
        b = np.asarray(b)
        if a.shape[0] == a.size and b.shape[0] == b.size:
            # if distance calculated between single points
            return ((a - b)**2).sum()
        else:
            # if distance calculated between sets of vectors
            return ((a - b)**2).sum(1)
    
    def search(self, t, k, heap):
        """Search for k nearest neighbor points in the tree
        
            Vars:
                t (array): Point used to search neighbors
                k (int): The number of neighbors
                heap (array): Sorted array of max k nearest distances 
                              and associated labels
                
            Returns:
                Returns an array of k nearest distances and associated labels
        """
        d = self.dist(t, self.loc)
        
        # calculate minimal distance from point t to any point 
        # in this tree node
        min_d = 0 if d < self.radius else d - self.radius
        
        # compare minimal destination with furthest element in the heap
        if len(heap) > 0 and min_d > heap[-1][0]:
            return
        else:
            if self.child1 is None and self.child2 is None:
                # if leaf node then calculate distances and add them to the heap
                # together with associated labels
                for p, l, d in zip(self.X, self.y, self.dist(self.X, t)):
                    if len(heap) == 0 or d < heap[-1][0]:
                        bisect.insort_left(heap, (d, l))
                        if len(heap) > k:
                            heap.pop(k)
            else:
                # Recursively search in child nodes
                if self.child1 is not None:
                    self.child1.search(t, k, heap)
                if self.child2 is not None:
                    self.child2.search(t, k, heap)
        return heap
    
class KNClassifier(object):
    def __init__(self, k):
        """ Initialize classifier

            Vars:
                k (int): The number of neighbors to be used
        """
        self.__tree = None
        self.__k = k
        
    def fit(self, X, y):
        """ Train data classifier

            Vars:
                X (array): array of points (features)
                y (array): array of labels or classes
        """
        self.__tree = BallTree(X, y)
        
    def predict(self, X):
        """ Predict labels

            Vars:
                X (array): array of points (features)
                
            Returns:
                Returns an array of predicted labels
        """
        y = []
        for t in X:
            kn = []
            kn = np.asarray(self.__tree.search(t, k = self.__k, heap = kn))
            unique, counts = np.unique(kn[:,1], return_counts=True)
            y.append(sorted(np.asarray((unique, counts)).T, key = lambda v: v[1], reverse = True)[0][0])
        return np.asarray(y)

In [3]:
from sklearn.metrics import accuracy_score
clf = KNClassifier(1)
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
accuracy_score(y_test, y_pred)


0.9634340222575517

In [4]:
from sklearn.neighbors import KNeighborsClassifier
clf = KNeighborsClassifier(n_neighbors = 1, algorithm = 'ball_tree')
clf.fit(X_train, y_train)
y_pred = clf.predict(X_test)
accuracy_score(y_test, y_pred)

0.9634340222575517