# kNN implementation

#### BigO inference: O(nm) for one test data, where n is number of training observation and m is number of features

- Below implementation use quickselect to find nearest neighbors (shortest distances). Average complexity is O(n), worst case is O(n^2)
- We could use heap, the time complexity is O(nlogk)

In [1]:
import numpy as np
from scipy.spatial.distance import cdist
from heapq import heapify, heappop
from collections import Counter

In [2]:
import sys
print(sys.getrecursionlimit())

3000


In [3]:
class kNearestNeighbor():
    def __init__(self, k, tp):
        self.k = k
        self.type = tp
        
    def fit(self, X_train, y_train):
        self.X_train = X_train # n*m
        self.y_train = y_train
        
        
    def predict(self, X_test):
        num_test = len(X_test)
        y_test = []
        for i in range(num_test):
            x = X_test[i]
            neighbors = self.find_nearest(x)
            preds = [self.y_train[idx] for idx in neighbors]
            
            if self.type == 'classification':
                counter = Counter(preds)
                pred = counter.most_common(1)[0][0]
            if self.type == 'regression':
                pred = preds.mean()
            
            y_test.append(pred)
            
        return y_test
        
        
    def find_nearest(self, x):
        dists = np.sqrt(np.sum((self.X_train - x)**2, axis=1)) # n dimension
        n = len(dists)
        nums = list(range(n))
        k = self.k -1  # new k is index
        return self.quick_select(nums, dists, 0, len(nums)-1, k)
        
    def quick_select(self, nums, dists, start, end, k): # use quickselect
        left, right = start, end
        pivot = dists[nums[(left + right) // 2]]
        
        while left <= right:
            while left <= right and dists[nums[left]] < pivot:
                left += 1
            while left <= right and dists[nums[right]] > pivot:
                right -= 1
                
            if left <= right:
                nums[left], nums[right] = nums[right], nums[left]
                left += 1
                right -= 1
                
        if k <= right:
            return self.quick_select(nums, dists, start, right, k)
        
        if k >= left:
            return self.quick_select(nums, dists, left, end, k)
        
        return nums[:(k+1)]
        
        
#         # if use heap
#         dists = np.sqrt(np.sum((self.X_train - x)**2, axis=1))
#         heapify(dists)
#         preds = []
#         for _ in range(self.k):
#             _, idx = heappop(dists)
#             preds.append( y_train[idx] )

# Test on Sklearn Iris dats

In [4]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

X, y = load_iris(return_X_y=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [5]:
kNN = kNearestNeighbor(5, 'classification')
kNN.fit(X_train, y_train)
predictions = kNN.predict(X_test)

In [6]:
acc = sum(y_test == predictions) / len(y_test)
print(f'accuracy on the test set is {acc}')

accuracy on the test set is 1.0
