K近邻算法是一个无模型算法，只需要存储训练数据即可。当有新的数据进来需要预测类别时，我们从存储的数据中选择离新数据点最近的K个样本，采用多数投票的方法决定新数据的类别。

K近邻法有三个决定因素：
* 距离度量
* K值选择
* 分类决策规则

一般而言，分类决策规则不会有什么变动，都是采用投票表决的方法。距离度量可以选择欧式距离和更一般的$L_p$距离。K值小的时候，模型更为复杂；K值大的时候，模型更为简单。K值的选择是近似误差和估计误差之间的一个取舍，通常可以用交叉验证法来选择最优的K值。

K近邻算法性能的关键在于如何快速的找到近邻点。对于每一个需要预测的点，我们可以简单的线性遍历之前存储的样本，但是这样做的时间复杂度过高。KD树可以用来加速近邻点的搜索过程。

参考：
https://www.cnblogs.com/eyeszjwang/articles/2429382.html  
https://zhuanlan.zhihu.com/p/23966698

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets import load_iris
%matplotlib inline

In [2]:
iris = load_iris()
X, y = iris.data, iris.target
print(X.shape, y.shape)

(150, 4) (150,)


In [3]:
from collections import Counter
class KNearestNeighbourhood:
    def __init__(self):
        self.data = None
    
    def fit(self, X, y):
        assert X.shape[0] == y.shape[0], "shape error, there should be equal items in X and y"
        self.data = [(X[i], y[i]) for i in range(len(X))]
    
    def predict(self, x, metric='euclidean', K=5):
        if metric == 'euclidean':
            classes = self._nearestK(x, self._euclidean, K)
        else:
            assert False, metric + " is not supported!"
        
        # find the class with the highest occurance         
        return Counter(classes).most_common(1)[0][0]
    
    def _nearestK(self, x, metric_func, K):
        candidates = sorted(self.data, key=lambda t: metric_func(x, t[0]))[:K]
        return [x[1] for x in candidates]
    
    def _euclidean(self, x1, x2):
        return np.sqrt(np.sum((x1 - x2)**2))

In [4]:
a = KNearestNeighbourhood()
a.fit(X, y)

In [5]:
a.predict([1,1,1,1])

0

In [6]:
class KDNode:
    def __init__(self):
        self.data_point = None # is a typle (ndarray, int) where ndarray is the features and int is the class
        self.split_d = -1
        self.left = None
        self.right = None
        self.parent = None

class KDTree:
    def __init__(self):
        self.root = None
    
    def construct(self, dataset):
        self.root = self._construct_rec(None, dataset, 0)
        
    def get_nearest_neighborhood(self, x, k=1, metric='euclidean'):
        search_list = self.search_leaf(x)
        neighbors = self.back_traverse(search_list, x, k, metric)
        return neighbors
        
    def search_leaf(self, x):
        search_list = []
        node = self.root
        while node:
            search_list.append(node)
            if node.left and not node.right:
                node = node.left
            elif node.right and not node.left:
                node = node.right
            elif not node.left and not node.right:
                break
            else:
                x_v = x[node.split_d]
                n_v = node.data_point[0][node.split_d]
                if x_v <= n_v:
                    node = node.left
                else:
                    node = node.right

        return search_list
    
    def back_traverse(self, search_list, x, k=1, metric='euclidean'):
        result = []
        dist_stored = []
        while search_list:
            node = search_list.pop()
            dist = self._distance(node.data_point[0], x, metric)
            if len(result) < k:
                result.append(node)
                dist_stored.append(dist)
            else:
                MAX = max(dist_stored)
                if dist < MAX:
                    max_idx = dist_stored.index(MAX)
                    result.pop(max_idx)
                    dist_stored.pop(max_idx)
                    result.append(node)
                    dist_stored.append(dist)
            # check if we need to tranverse the other part
            h = abs(node.data_point[0][node.split_d] - x[node.split_d])
            if min(dist_stored) > h:
                # we need to traverse to the other part
                if x[node.split_d] < node.data_point[0][node.split_d] and node.right:
                    search_list.append(node.right)
                elif x[node.split_d] > node.data_point[0][node.split_d] and node.left:
                    search_list.append(node.left)
        
        return result
                
    
    def _construct_rec(self, parent, dataset, depth):
        if not dataset:
            return None
        node = KDNode()
        node.parent = parent
        dim = len(dataset[0][0]) # dimesion of feature space
        node.split_d = depth % dim
        dataset.sort(key=lambda x: x[0][node.split_d])
        mid_idx = len(dataset) // 2
        node.data_point = dataset[mid_idx]
        node.left = self._construct_rec(node, dataset[:mid_idx], depth+1)
        node.right = self._construct_rec(node, dataset[mid_idx+1:], depth+1)
        
        return node
    
    def _distance(self, x1, x2, metric='euclidean'):
        if metric == 'euclidean':
            return np.sum((x1 - x2)**2)
        else:
            assert False, metric + 'is not supported!'

In [7]:
# test KDTree
test_X = [[7,2],[5,4],[9,6],[2,3],[4,7],[8,1]]
test_X = [np.asarray(x) for x in test_X]
test_y = [1,0,1,0,1,0]
test_data = [(test_X[i], test_y[i]) for i in range(len(test_X))]

tree = KDTree()
tree.construct(test_data)
# check KDTree construction
assert np.array_equal(tree.root.right.left.data_point[0], np.array([8,1]))
assert np.array_equal(tree.root.left.right.data_point[0], np.array([4,7]))

# check 1 Nearest Neightbourhood
l = tree.get_nearest_neighborhood(np.array([2.1, 3.1]), 1)
assert np.array_equal(l[0].data_point[0], np.array([2, 3]))
l = tree.get_nearest_neighborhood(np.array([2, 4.5]), 1)
assert np.array_equal(l[0].data_point[0], np.array([2, 3]))

In [8]:
class KNearestNeighbourhoodKDTree:
    def __init__(self):
        self.data = None
        self.tree = KDTree()
    
    def fit(self, X, y):
        assert X.shape[0] == y.shape[0], "shape error, there should be equal items in X and y"
        self.data = [(X[i], y[i]) for i in range(len(X))]
        self.tree.construct(self.data)
    
    def predict(self, x, metric='euclidean', K=5):
        if metric == 'euclidean':
            classes = self._nearestK(x, metric, K)
        else:
            assert False, metric + " is not supported!"
        
        # find the class with the highest occurance         
        return Counter(classes).most_common(1)[0][0]
    
    def _nearestK(self, x, metric, K):
        candidates = self.tree.get_nearest_neighborhood(x, K, metric)
        return [x.data_point[1] for x in candidates]


In [9]:
b = KNearestNeighbourhoodKDTree()
b.fit(X, y)

In [10]:
assert a.predict(np.array([1,1,1,1])) == b.predict(np.array([1,1,1,1]))
assert a.predict(np.array([1,12,1,2])) == b.predict(np.array([1,12,1,2]))
assert a.predict(np.array([1,1,1,1.3])) == b.predict(np.array([1,1,1,1.3]))

In [11]:
a.predict(np.array([1,1,1,1]))

0

In [12]:
b.predict(np.array([1,1,1,1]))

0