In [50]:
import os
import numpy as np
from joblib import Parallel, delayed

In [70]:
def euclidean_distance(a,b):
    return np.sqrt(np.sum((a - b) ** 2))

class CustomKNN:
    def __init__(self, k=1, n_jobs=-1):
        self.k = k
        self.n_jobs = n_jobs

    def fit(self, X, y):
        if self.k > X.shape[0]:
            raise ValueError("k cannot be greater than the number of training samples.")
        self.X_train = X
        self.y_train = y
    
    def predict(self, X):
        y_preds = Parallel(n_jobs=self.n_jobs)(
            delayed(self._get_neighbors)(x) for x in np.array(X)
        )
        # y_preds = [self._get_neighbors(x) for x in X]
        return np.array(y_preds)
    
    def _get_neighbors(self, x):
        # distances = [euclidean_distance(x, x_train) for x_train in self.X_train]
        distances = np.sqrt(np.sum((self.X_train - x) ** 2, axis=1))
        sorted_indices = np.argpartition(distances, self.k)[:self.k]
        nearest_labels = self.y_train[sorted_indices]
        return np.bincount(nearest_labels).argmax()

In [76]:
from sklearn import datasets
from sklearn.model_selection import train_test_split

# iris = datasets.load_iris()
# X, y = iris.data, iris.target

# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0.2, random_state=1234
# )
X, y = datasets.make_classification(
    n_samples=100000, n_features=4, random_state=4
)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=1234
)

In [77]:
X_train.shape, y_train.shape, X_test.shape, y_test.shape

((80000, 4), (80000,), (20000, 4), (20000,))

In [78]:
k=5
knn = CustomKNN(k=k)
knn.fit(X_train, y_train)
y_preds = knn.predict(X_test)

In [79]:
from sklearn.metrics import accuracy_score

accuracy = accuracy_score(y_test, y_preds)
print(f"Custom KNN accuracy with k={k}: {accuracy:.4f}")

Custom KNN accuracy with k=5: 0.9600


In [80]:
from sklearn.neighbors import KNeighborsClassifier

knn_og = KNeighborsClassifier(n_neighbors=k)
knn_og.fit(X_train, y_train)
y_preds_og = knn_og.predict(X_test)

accuracy_og = accuracy_score(y_test, y_preds_og)
print(f"sklearn KNN accuracy with k={k}: {accuracy_og:.4f}")

sklearn KNN accuracy with k=5: 0.9600
