In [2]:
import numpy as np
from kdtree import Kdtree

from tensorflow import keras
from keras.datasets import mnist
from tests import test_knn_model

Using TensorFlow backend.


In [3]:
class KnnKdtreeClassifier(object):
    '''Классификатор реализует взвешенное голосование по ближайшим соседям.
    При подсчете расcтояния используется l1-метрика.
    Поиск ближайшего соседа осуществляется поиском по kd-дереву.
    Параметры
    ----------
    n_neighbors : int, optional
        Число ближайших соседей, учитывающихся в голосовании
    weights : str, optional (default = 'uniform')
        веса, используемые в голосовании. Возможные значения:
        - 'uniform' : все веса равны.
        - 'distance' : веса обратно пропорциональны расстоянию до классифицируемого объекта
        -  функция, которая получает на вход массив расстояний и возвращает массив весов
    leaf_size: int, optional
        Максимально допустимый размер листа дерева
    '''

    def __init__(self, n_neighbors=1, weights='uniform', leaf_size=30):
        self.n_neighbors = n_neighbors
        self.weights = weights
        self.leaf_size = leaf_size
        self.n_classes = None
        self.kdtree = None

    def fit(self, x, y):
        '''Обучение модели - построение kd-дерева
        Парметры
        ----------
        x : двумерным массив признаков размера n_queries x n_features
        y : массив/список правильных меток размера n_queries
        Выход
        -------
        Метод возвращает обученную модель
        '''
        self.n_classes = int(np.max(y) + 1)
        self.kdtree = Kdtree(x, y, self.leaf_size, self.weights)
        return self

    def predict(self, x):
        """ Предсказание класса для входных объектов - поиск по kd-дереву ближайших соседей
        и взвешенное голосование
        Параметры
        ----------
        X : двумерным массив признаков размера n_queries x n_features
        Выход
        -------
        y : Массив размера n_queries
        """
        result = list()

        for i in x:
            result.append(self.kdtree.classify_point(i, self.n_neighbors))

        return np.array(result)

    def predict_proba(self, X):
        """Возвращает вероятности классов для входных объектов
        Параметры
        ----------
        X : двумерным массив признаков размера n_queries x n_features
        Выход
        -------
        p : массив размера n_queries x n_classes] c вероятностями принадлежности
        объекта к каждому классу
        """
        tree = self.kdtree
        p = list()

        def classify_k_neighbors(kn):
            classes = np.zeros(self.n_classes)

            max_key, weights_of_classes = tree.voting(kn=kn)
            sum = 0
            for key in weights_of_classes:
                value = weights_of_classes.get(key)
                classes[int(key)] = value
                sum += value
            return np.divide(classes, sum)

        for i in X:
            leaf = tree.closest_leaf(i)
            result_for_current_query = tree.closest_k_neighbors(pivot=i, node=leaf,
                                                                k_neighbors=self.n_neighbors)
            p.append(classify_k_neighbors(result_for_current_query))

        return np.array(p)

    def kneighbors(self, x, n_neighbors):
        """Возвращает n_neighbors ближайших соседей для всех входных объектов при помощи поска по kd-дереву
        и расстояния до них
        Параметры
        ----------
        X : двумерным массив признаков размера n_queries x n_features
        Выход
        -------
        neigh_dist массив размера n_queries х n_neighbors
        расстояния до ближайших элементов
        neigh_indarray, массив размера n_queries x n_neighbors
        индексы ближайших элементов
        """
        neigh_dist = np.zeros((x.shape[0], n_neighbors))
        neigh_indarray = np.zeros((x.shape[0], n_neighbors))

        tree = self.kdtree

        for i, index in zip(x, range(x.shape[0])):
            leaf = tree.closest_leaf(i)
            result_for_current_query = tree.closest_k_neighbors(pivot=i, node=leaf,
                                                                k_neighbors=self.n_neighbors)
            neigh_dist[index] = result_for_current_query[:, -1]
            neigh_indarray[index] = result_for_current_query[:, -2]

        return neigh_dist, neigh_indarray

    def score(self, y_gt, y_pred):
        """Возвращает точность классификации
        Параметры
        ----------
        y_gt : правильные метки объектов
        y_pred: предсказанные метки объектов
        Выход
        -------
        accuracy - точность классификации
        """
        sum = 0
        for i in range(len(y_gt)):
            if y_gt[i] - y_pred[i] == 0:
                sum += 1
        return sum / len(y_gt)

Test Simple:
Этот тест хорошо иллюстрирует обратный ход при n_neighbors=1, weights='uniform', leaf_size=1.
Здесь же я проверял работу остальных функций на правильность.

In [36]:
points = np.float64([(4, 7), (7, 13), (9, 4), (11, 10), (14, 11), (16, 10), (15, 3)])
y = np.float64([0, 0, 0, 0, 1, 2, 1])
pivot = np.float64([[14, 9], ])
knn = KnnKdtreeClassifier(n_neighbors=3, weights='distance')
knn.fit(points, y)
print(knn.predict_proba(pivot))

[[0.23076923 0.46153846 0.30769231]]


Test MNIST:
На полном MNIST fit работает несколько минут, а predict работает неимоверно долго из-за обратного хода (не дождался).
Поэтому тестировал на меньшей части MNIST (например, 1000 образцов, 100 тестов, его я дождался: Your accuracy is 0.84)

In [4]:
(trainX, trainY), (testX, testY) = mnist.load_data()

trainX = trainX.reshape(trainX.shape[0], 784)
testX = testX.reshape(testX.shape[0], 784)
trainX = trainX.astype('float32')
testX = testX.astype('float32')

trainX /= 255
testX /= 255

x = trainX[:1000]
y = trainY[:1000]
test = testX[:100]

knn = KnnKdtreeClassifier(n_neighbors=3, weights='distance')
knn.fit(x, y)
y_pred = knn.predict(test)
neigh_dist, neigh_indarray = knn.kneighbors(test, 3)
print(neigh_dist)
print(neigh_indarray)
acc = knn.score(testY[:100], y_pred)
print('Your accuracy is %s' % str(acc))

[[ 37.45098046  42.36862744  45.47450992]
 [ 75.54117646  92.85490224  93.58823535]
 [ 15.23137258  16.45490202  17.46666649]
 [ 52.84705883  57.80784309  63.86274494]
 [ 48.59999996  57.36470613  58.87058817]
 [ 19.81960785  20.30980395  23.46666671]
 [ 55.18431403  64.50588263  71.94902022]
 [ 72.29803956  73.82745126  75.46666676]
 [ 99.04313786 104.83921626 105.30980464]
 [ 51.64705876  54.30588242  54.62745107]
 [ 54.45490219  69.43921572  70.36470607]
 [ 92.53333333  97.14509856 101.996079  ]
 [ 45.7843135   46.13725481  47.76470615]
 [ 56.83529413  58.67058829  68.58039223]
 [ 19.69803935  23.02352945  23.68627445]
 [ 81.65882411  82.49019679  87.03137273]
 [ 51.09803922  56.87843143  60.28627457]
 [ 49.47843151  49.61568633  50.38431367]
 [104.62352989 105.00392208 108.18039263]
 [ 43.67058848  46.7294121   47.80392181]
 [ 47.22745074  47.34901927  49.5921567 ]
 [ 47.15294118  55.20000006  56.30588281]
 [ 80.62352961  86.86666703  87.63137293]
 [ 52.0705883   57.2941178   72.39

In [38]:
# Test
# weights=lambda x: np.log2(x)
model = KnnKdtreeClassifier(n_neighbors=10, weights='uniform', leaf_size=10)
test_knn_model(model)

fit_predict test passed
predict_proba test passed
score test passed
kneighbors test passed
accuracy test passed
Test passed
