In [15]:
import copy

import numpy as np
from sklearn.neighbors import NearestNeighbors
from scipy.spatial.distance import cdist

class KNNClassifier:
    metric_type = 'minkowski'
    p = None
    kernel = None
    X_train = None
    y_train = None
    annoy_i = None
    n_trees = 10
    
    def __init__(self, kernel=lambda x: 1 - np.abs(x), metric='minkowski', p=2.0):
        self.kernel = kernel
        self.metric_type = metric
        self.p = p
        
    def get_distances(self):
        dists = list()
        
        for i in range(len(self.X_vals)):
            for j in range(i + 1, len(self.X_vals)):
                dists.append(self.metric(self.X_vals[i], self.X_vals[j]))
                
        return np.asarray(dists)
        
    def metric(self, a, b):
        if self.metric_type == 'minkowski':
            return np.sum(np.abs(a - b) ** self.p) ** (1.0 / self.p)
        if self.metric_type == 'cosine':
            return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
        if self.metric_type == 'chebyshev':
            return np.max(np.abs(a - b))
        
        raise AttributeError(f'no metric with name {self.metric_type}')
    
    def fit(self, X, y):
        self.X_vals = X.values
        self.y_vals = y.values
        self.X_train = X
        self.y_train = y
        self.counter_pattern = dict()
        
        for label in np.unique(self.y_train):
            self.counter_pattern[label] = 0
    
    def base_predict(self, X, neighboubrs_mapper, element_processor, double_compute=False):
        nn = NearestNeighbors(
            metric=self.metric_type,
            p=self.p
        )
    
        nn.fit(self.X_train, self.y_train)
        dists, neighs = neighboubrs_mapper(nn)(X)
        dists_X, neighs_X = dists, neighs
        if double_compute:
            dists_X, neighs_X = neighboubrs_mapper(nn)(self.X_train)
        
        y_test = list()
        
        for i in range(len(X)):
            pred = element_processor(dists[i], neighs[i], dists_X[i], neighs_X[i])
            y_test.append(pred)
            
        return np.asarray(y_test)
    
    def __constant_window_processor(self, dists, neighs, ignored1, ignored2):
        tmp_counter = copy.deepcopy(self.counter_pattern)
            
        # no neigh
        if len(dists) == 0:
            return None, None
            
        for (dist, neigh) in zip(dists, neighs):
            n_type = self.y_vals[neigh]
            
            tmp_counter[n_type] += self.kernel(dist / h)

        pred = max(tmp_counter, key=tmp_counter.get)
        
        return pred
    
    def __float_window_processor(self, dists, neighs, ignored1, ignored2):
        tmp_counter = copy.deepcopy(self.counter_pattern)
        
        for (dist, neigh) in zip(dists[:-2], neighs[:-2]):
            n_type = self.y_vals[neigh]
            tmp_counter[n_type] += self.kernel(dist / dists[-1])
            
        pred = max(tmp_counter, key=tmp_counter.get)
        
        return pred
    
    def __lowess_window_processor(self, dists, neighs, dists_X, neighs_X):
        tmp_counter = copy.deepcopy(self.counter_pattern)
        
        for (dist, neigh) in zip(dists[:-3], neighs[:-3]):
            n_type = self.y_vals[neigh]
            tmp_counter[n_type] += self.kernel(n_type - self.__float_window_processor(dists_X[1:], neighs_X[1:], None, None))
            
        pred = max(tmp_counter, key=tmp_counter.get)
        
        return pred  
    
    def predict_constant_window(self, X, h):
        return self.base_predict(X, lambda nn: lambda x: nn.radius_neighbors(X=x, radius=h), self.__constant_window_processor)
    
    def predict_float_window(self, X, k):
        return self.base_predict(X, lambda nn: lambda x: nn.kneighbors(X=x, n_neighbors=k), self.__float_window_processor)
    def predict_float_lowess_window(self, X, k):
        return self.base_predict(X, lambda nn: lambda x: nn.kneighbors(X=x, n_neighbors=k + 2), self.__lowess_window_processor)


In [13]:
def base_kernel_constructor(a, b, c):
    return lambda x: c * (1 - np.abs(a) ** a) ** b

default_kernel = base_kernel_constructor(1, 1, 1)

def gaussian_kernel(x):
    return 1 / np.sqrt(2 * np.pi) * np.exp(-0.5 * x**2)

epanechnikov_kernel = base_kernel_constructor(0.75, 2, 1)

def cosine_kernel(x):
    return np.pi / 4 * np.cos(np.pi / 2 * x)

def uniform_kernel(ignored):
    return 0.5

triquadratic_kernel = base_kernel_constructor(35.0/32, 2, 3)


In [None]:
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.neighbors import RadiusNeighborsClassifier, KNeighborsClassifier
from sklearn.metrics import mean_squared_error, accuracy_score

df = pd.read_csv('phone_infos.csv')
X = df.drop(columns=['price_range'])
y = df['price_range']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=153)

X_test

In [None]:
h = 1450

knn = KNNClassifier(metric='minkowski', p=10)
knn.fit(X_train, y_train)
y_test1, y_test_proba1 = knn.predict_constant_window(X_test, h)

knn_sklearn = RadiusNeighborsClassifier(radius=h)
knn_sklearn.fit(X_train, y_train)
y_test2 = knn_sklearn.predict(X_test)
y_test_proba2 = knn_sklearn.predict_proba(X_test)

print("ours", accuracy_score(y_test.values, y_test1))
print("theirs", accuracy_score(y_test, y_test2))
print("ours vs theirs", accuracy_score(y_test1, y_test2))

In [None]:
d = knn.get_distances()

In [None]:
d.mean()

In [None]:
k = 5

knn = KNNClassifier(kernel=epanechnikov_kernel)
knn.fit(X_train, y_train)
y_test1, y_test_proba1 = knn.predict_float_window(X_test, k)

knn_sklearn = KNeighborsClassifier(n_neighbors=k)
knn_sklearn.fit(X_train, y_train)
y_test2 = knn_sklearn.predict(X_test)
y_test3 = knn_sklearn.predict_proba(X_test)

print("ours", accuracy_score(y_test.values, y_test1))
print("theirs", accuracy_score(y_test, y_test2))
print("ours vs theirs", accuracy_score(y_test1, y_test2))
print("ours vs theirs 2", mean_squared_error(y_test_proba1, y_test3))

In [None]:
d = knn.get_distances()

In [None]:
d.mean()

In [None]:
import optuna

def objective_const(trial: optuna.trial.Trial):
    metric = trial.suggest_categorical('metric', ['minkowski', 'cosine', 'chebyshev'])
    p = None
    
    if metric == 'minkowski':
        p = trial.suggest_float('p', 1, 10, log=True)
        
    h = trial.suggest_float('h', 1200, 4000, log=True)
    kernel = trial.suggest_categorical('kernel', ['default', 'epanechnikov', 'gaussian', 'cosine', 'uniform', 'triquadratic'])
    
    knn = KNNClassifier(
        metric=metric,
        p=p,
        kernel=globals()[f'{kernel}_kernel']
    )
    knn.fit(X_train, y_train)
    y_pred, _ = knn.predict_constant_window(X_test, h)
    
    return 1 - accuracy_score(y_test, y_pred)

study = optuna.create_study()  
study.optimize(objective_const, n_jobs=-1, n_trials=1000, show_progress_bar=True)
study.best_params

In [None]:
import optuna

def objective_float(trial: optuna.trial.Trial):
    metric = trial.suggest_categorical('metric', ['minkowski', 'cosine', 'chebyshev'])
    p = None
    
    if metric == 'minkowski':
        p = trial.suggest_float('p', 1, 10, log=True)
        
    k = trial.suggest_int('k', 1, len(X_train.values) - 1, log=True)
    kernel = trial.suggest_categorical('kernel', ['default', 'epanechnikov', 'gaussian', 'cosine', 'uniform', 'triquadratic'])
    
    knn = KNNClassifier(
        metric=metric,
        p=p,
        kernel=globals()[f'{kernel}_kernel']
    )
    knn.fit(X_train, y_train)
    y_pred, _ = knn.predict_float_window(X_test, k)
    
    return 1 - accuracy_score(y_test, y_pred)

study = optuna.create_study()  
study.optimize(objective_float, n_jobs=-1, n_trials=1000, show_progress_bar=True)  
study.best_params


In [None]:
import optuna

def objective_float_sklearn(trial: optuna.trial.Trial):
    metric = trial.suggest_categorical('metric', ['minkowski', 'cosine', 'chebyshev'])
    p = None
    
    if metric == 'minkowski':
        p = trial.suggest_float('p', 1, 10, log=True)
        
    k = trial.suggest_int('k', 1, len(X_train.values) - 1, log=True)
    
    kernel = trial.suggest_categorical('kernel', ['default', 'epanechnikov', 'gaussian', 'cosine', 'uniform', 'triquadratic'])
    kernel_func = globals()[f'{kernel}_kernel']
    
    def weight(dists):
        weights = list()
        for d in dists:
            weights.append(np.asarray([kernel_func(dist / d[-1]) for dist in d]))
            
        return np.asarray(weights)
    
    knn = KNeighborsClassifier(
        metric=metric,
        p=p,
        weights=weight,
        n_neighbors=k
    )
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    
    print(y_pred)
    
    return 1 - accuracy_score(y_test.values, y_pred)

study = optuna.create_study()  
study.optimize(objective_float_sklearn, n_jobs=1, n_trials=1000, show_progress_bar=True)  
study.best_params


In [None]:
import optuna

def objective_const_sklearn(trial: optuna.trial.Trial):
    metric = trial.suggest_categorical('metric', ['minkowski', 'cosine', 'chebyshev'])
    p = None
    
    if metric == 'minkowski':
        p = trial.suggest_float('p', 1, 10, log=True)
        
    h = trial.suggest_float('h', 1200, 4000, log=True)
    kernel = trial.suggest_categorical('kernel', ['default', 'epanechnikov', 'gaussian', 'cosine', 'uniform', 'triquadratic'])
    kernel_func = globals()[f'{kernel}_kernel']

    def weight(dists):        
        weigths = list()
        for dist in dists:
            print(dist)
            weigths.append(kernel_func(dist / h))
            
        return np.asarray(weigths)
    
    knn = RadiusNeighborsClassifier(
        metric=metric,
        p=p,
        weights=weight,
        radius=h
    )
    knn.fit(X_train, y_train)
    y_pred = knn.predict(X_test)
    
    return 1 - accuracy_score(y_test, y_pred)

study = optuna.create_study()  
study.optimize(objective_const_sklearn, n_jobs=1, n_trials=1000, show_progress_bar=True)
study.best_params


In [19]:
nn = KNNClassifier(
    metric='chebyshev',
    kernel=cosine_kernel
)
nn.fit(X_train, y_train)

k = 25

pred = nn.predict_float_lowess_window(X_test, k)
accuracy_score(y_test, pred)

0.008750000000000036