In [1]:
import numpy as np
from sklearn.datasets import make_classification

In [2]:
class KNearestNeighbours(object):
    def __init__(self, k, weight='uniform', classifier=True):
        self.k = k
        self.weight = weight
        self.classifier = classifier
        
    def calc_distance(self, x, y):
        return np.sqrt(np.sum((x - y)**2, axis=1))
    
    def get_neighbours(self, X_test, X_train):
        dists, neigh_idxs = [], []
        for x_test in X_test:
            dist = self.calc_distance(x_test, X_train)
            dist_enum = enumerate(dist)
            dist_enum_sorted = sorted(dist_enum, key=lambda x: x[1])[:self.k]
            top_dist = [tup[1] for tup in dist_enum_sorted]
            top_idx = [tup[0] for tup in dist_enum_sorted]
            dists.append(top_dist)
            neigh_idxs.append(top_idx)
        return np.array(dists), np.array(neigh_idxs)
    
    def predict(self, X_test, X_train, y_train):
        top_dists, top_idxs = self.get_neighbours(X_test, X_train)
        
        if self.classifier:
            if self.weight == 'uniform':
                y_pred = np.array([np.bincount(y_train[top_idx]).argmax()
                                   for top_idx in top_idxs])
            elif self.weight == 'distance':
                num_classes = len(set(y_train))
                proba = []
                inv_dists = 1 / top_dists
                mean_inv_dists = inv_dists / np.sum(inv_dists, axis=1)[:, np.newaxis]
                for i, row in enumerate(mean_inv_dists):
                    row_pred = y_train[top_idxs[i]]
                    for k in range(num_classes):
                        idxs = np.where(row_pred == k)
                        prob_idx = np.sum(row[idxs])
                        proba.append(prob_idx)
                pred_proba = np.array(proba).reshape((X_test.shape[0],
                                                      num_classes))
                y_pred = pred_proba.argmax(1)
                
        else:
            if self.weight == 'uniform':
                y_pred = np.array([np.mean(y_train[top_idx])
                                   for top_idx in top_idxs])
            elif self.weight == 'distance':
                inv_dists = 1 / top_dists
                mean_inv_dists = inv_dists / np.sum(inv_dists, axis=1)[:, np.newaxis]
                y_pred = []
                for i, top_idx in enumerate(top_idxs):
                    row_mean_inv_dist = mean_inv_dists[i]
                    row_pred = y_train[top_idx]
                    y_pred.append(np.sum(row_pred * row_mean_inv_dist))
                y_pred = np.array(y_pred)
                
        return y_pred
    
    def score(self, y_pred, y_test):
        if self.classifier:
            return np.float(y_pred == y_test).sum() / np.float(len(y_test))
        else:
            return np.sqrt(np.mean((y_pred - y_test)**2))