In [45]:
from sklearn.datasets import fetch_california_housing
from sklearn.model_selection import train_test_split

In [46]:
data = fetch_california_housing()
X = data["data"]
y = data["target"]

In [47]:
print(X.shape)
print(y.shape)

(20640, 8)
(20640,)


In [48]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.33, random_state=42, shuffle=True, stratify=None
)

In [49]:
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

(13828, 8) (13828,)
(6812, 8) (6812,)


In [50]:
import numpy as np
import numpy.typing as npt

In [51]:
class KNeighborsRegressor:
    def __init__(
        self,
        n_neighbors: int = 5,
        weights: str = "uniform",
        p: int = 2,
        metric: str = "minkowski",
    ):
        self.n_neighbors = n_neighbors
        self.weights = weights
        self.p = p
        self.metric = metric
        return

    def fit(
        self,
        X_train: npt.NDArray[np.float64],
        y_train: npt.NDArray[np.float64],
    ) -> None:
        self.X_train = X_train
        self.y_train = y_train
        return self

    def __minkowski_distances(
        self,
        X_test: npt.NDArray[np.float64],
    ) -> npt.NDArray[np.float64]:
        absolute_dimension_wise_differences = np.abs(
            self.X_train - X_test[:, np.newaxis, :]
        )
        return np.power(
            np.sum(np.power(absolute_dimension_wise_differences, self.p), axis=-1),
            1 / self.p,
        )

    def __cosine_distances(
        self,
        X_test: npt.NDArray[np.float64],
    ) -> npt.NDArray[np.float64]:
        dimension_wise_products = self.X_train * X_test[:, np.newaxis, :]
        dot_products = np.sum(dimension_wise_products, axis=-1)

        norms_of_train = np.sqrt(np.sum(np.square(self.X_train), axis=-1))
        norms_of_test = np.sqrt(np.sum(np.square(X_test), axis=-1))

        cosine_similarity = dot_products / norms_of_train.reshape(-1, 1) / norms_of_test
        return 1 - cosine_similarity

    def __target_means(
        self,
        target_of_nearest_neighbors: npt.NDArray[np.float64],
        distances_of_nearest_neighbors: npt.NDArray[np.float64],
    ) -> npt.NDArray[np.float64]:
        weights_of_nearest_neighbors = np.ones_like(distances_of_nearest_neighbors)
        if self.weights == "distance":
            weights_of_nearest_neighbors = 1.0 / distances_of_nearest_neighbors

        y_pred = []
        for targets, weights in zip(
            target_of_nearest_neighbors, weights_of_nearest_neighbors
        ):
            prediction = np.sum(targets * weights) / weights.sum()
            y_pred.append(prediction)

        return np.array(y_pred)

    def predict(
        self,
        X_test: npt.NDArray[np.float64],
    ) -> npt.NDArray[np.float64]:
        distance_metrics = {
            "minkowski": self.__minkowski_distances,
            "cosine": self.__cosine_distances,
        }
        distance_metric = distance_metrics.get(self.metric)
        distances = distance_metric(X_test)

        indices_of_nearest_neighbors = np.argpartition(
            a=distances, kth=self.n_neighbors, axis=-1
        )[:, : self.n_neighbors]
        target_of_nearest_neighbors = self.y_train[indices_of_nearest_neighbors]
        distances_of_nearest_neighbors = distances[
            np.arange(distances.shape[0]).reshape(-1, 1), indices_of_nearest_neighbors
        ]

        delta = 1e-10
        distances_of_nearest_neighbors += delta

        return self.__target_means(
            target_of_nearest_neighbors, distances_of_nearest_neighbors
        )

    def score(
        self,
        X_test: npt.NDArray[np.float64],
        y_test: npt.NDArray[np.float64],
    ) -> npt.NDArray[np.float64]:
        y_pred = self.predict(X_test)

        u = np.sum(np.square(y_test - y_pred))
        v = np.sum(np.square(y_test - y_test.mean()))
        coeff_of_determination = 1 - u / v

        return coeff_of_determination.item()

In [52]:
params = {
    "n_neighbors": 3,
    "weights": "uniform",
    "p": 2,
    "metric": "minkowski",
}

In [53]:
clf = KNeighborsRegressor(**params)
clf.fit(X_train, y_train)

<__main__.KNeighborsRegressor at 0x7e3ba2d48a40>

In [54]:
clf.score(X_test, y_test)

0.08043544739387165

In [55]:
from sklearn.neighbors import KNeighborsRegressor as KNR

In [56]:
clf2 = KNR(**params)
clf2.fit(X_train, y_train)

In [57]:
clf2.score(X_test, y_test)

0.08043544739387165

In [58]:
%%timeit
clf.score(X_test, y_test)

4.95 s ± 133 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [59]:
%%timeit
clf2.score(X_test, y_test)

10.8 ms ± 90.2 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)
