<a href="https://colab.research.google.com/github/xcellentbird/playground/blob/main/kNN_only_numpy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np

In [36]:
from queue import PriorityQueue


pq = PriorityQueue(maxsize=3)

In [37]:
pq.put((5, 'a'))
pq.put((1, 'b'))
pq.put((3, 'c'))

In [38]:
pq.full()

True

In [144]:
from collections import defaultdict


class SimpleKNNClassifier:
    """
    (1) 학습 단계: 단순히 학습 데이터를 저장하고
    (2) 예측 단계: 유클리드 거리를 이용해 가장 가까운 k개의 이웃을 찾아
        다수결(majority vote)로 클래스를 결정하는 매우 단순한 kNN 분류기.

    Parameters
    ----------
    n_neighbors : int, default=3
        사용할 이웃의 수(k). 1 이상이어야 함.
    """

    def __init__(self, n_neighbors: int = 3) -> None:
        if n_neighbors < 1:
            raise ValueError("n_neighbors는 1 이상이어야 합니다.")
        self.k = n_neighbors
        self._fitted = False  # fit() 호출 여부 플래그

    # --------------------------------------------------------------------- #
    # 학습 단계
    # --------------------------------------------------------------------- #
    def fit(self, X, y):
        """
        학습 데이터를 그대로 저장(lazy learning).
        실질적인 '모델 파라미터 추정'은 없으며, 예측 시점에 거리 계산을 수행합니다.

        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            학습 특성 배열
        y : array-like, shape (n_samples,)
            각 샘플의 레이블(정수, 문자열 등)

        Returns
        -------
        self  : 객체 자신 (method chaining을 위해)
        """
        # numpy 배열로 변환
        X = np.asarray(X)
        y = np.asarray(y)

        # 간단한 유효성 점검
        if X.ndim != 2:
            raise ValueError("X는 (n_samples, n_features) 형태의 2차원 배열이어야 합니다.")
        if X.shape[0] != y.shape[0]:
            raise ValueError("X와 y의 샘플 수가 일치하지 않습니다.")

        # 내부 저장
        self.X_train_ = X
        self.y_train_ = y
        self._fitted = True
        # self._x_train2 = (self.X_train_ ** 2).sum(axis=1)[np.newaxis, :]  # (1, n_train)
        return self

    def _euclidean_distances(self, X):
        """
        (a - b)^2 = a^2 + b^2 - 2ab 공식을 이용해
        한 번에 모든 거리 행렬을 구하는 벡터화 구현.

        Parameters
        ----------
        X : array-like, shape (n_queries, n_features)

        Returns
        -------
        dists : ndarray, shape (n_queries, n_train)
            X의 각 샘플과 학습 데이터 간의 거리
        """
        X = np.asarray(X)
        # ||a||^2, ||b||^2, -2a·b 내적을 분리 계산
        X2 = (X ** 2).sum(axis=1)[:, np.newaxis]           # (n_queries, 1)
        train2 = (self.X_train_ ** 2).sum(axis=1)[np.newaxis, :]  # (1, n_train)
        cross = X @ self.X_train_.T                        # (n_queries, n_train)

        dists_squared = X2 + self._x_train2 - 2.0 * cross
        # 수치 오차로 음수가 나올 가능성 방지
        return np.sqrt(np.maximum(dists_squared, 0.0))

    # --------------------------------------------------------------------- #
    # 예측 단계
    # --------------------------------------------------------------------- #
    def predict(self, X):
        """
        다수결 투표로 클래스 예측.

        Parameters
        ----------
        X : array-like, shape (n_queries, n_features)

        Returns
        -------
        preds : ndarray, shape (n_queries,)
            예측된 레이블
        """
        if not self._fitted:
            raise RuntimeError("먼저 fit()을 호출해야 합니다.")

        # 1) 모든 거리 계산
        dists = self._euclidean_distances(X)

        # 2) 각 행(질의 샘플)에서 거리가 가장 작은 k개 인덱스 추출
        neighbor_idx = np.argsort(dists, axis=1)[:, : self.k]      # (n_queries, k)

        # 3) 이웃의 레이블 수집
        neighbor_labels = self.y_train_[neighbor_idx]              # (n_queries, k)

        # 4) 행마다 최빈값(다수결) 계산
        preds = []
        for row in neighbor_labels:
            labels, counts = np.unique(row, return_counts=True)
            preds.append(labels[counts.argmax()])
        return np.array(preds)

    def _my_euclidean_distance(self, x1, x2):
        """
        내가 짠 euclidean_distance
        """
        square_sum = np.sum(np.square(x1 - x2))
        return np.sqrt(square_sum)

    # --------------------------------------------------------------------- #
    # 내가 짠 예측 코드
    # --------------------------------------------------------------------- #
    def my_predict(self, X):
        """
        다수결 투표로 클래스 예측.

        Parameters
        ----------
        X : array-like, shape (n_queries, n_features)

        Returns
        -------
        preds : ndarray, shape (n_queries,)
            예측된 레이블
        """
        if not self._fitted:
            raise RuntimeError("먼저 fit()을 호출해야 합니다.")

        # 1. X와 X_train_ 데이터와의 거리 계산
        preds = []
        for x in X:
            pq = PriorityQueue(maxsize=self.k)
            for _x_train, _y_train in zip(self.X_train_, self.y_train_):
                distance = self._my_euclidean_distance(x, _x_train)
                component = (-distance, _y_train)

                if pq.full():
                    if -pq.queue[0][0] > distance:
                        pq.get()
                        pq.put(component)
                else:
                    pq.put(component)

            votes = defaultdict(int)
            for rev_dist, label in pq.queue:
                votes[label] += 1

                if votes[label] > self.k / 2:
                    break

            sorted_votes = sorted(votes.items(), key=lambda x: x[1], reverse=True)
            pred = sorted_votes[0][0]
            preds.append(pred)

        return np.array(preds)

In [133]:
# get iris dataset
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split


iris = load_iris()

label_size = len(np.unique(iris['target']))

X_train, X_test, y_train, y_test = train_test_split(iris['data'], iris['target'], test_size=0.2, random_state=42)

In [134]:
y_test[-1] = 1

In [135]:
clf = SimpleKNNClassifier(n_neighbors=3)

clf.fit(X_train, y_train)

<__main__.SimpleKNNClassifier at 0x7e7b6e819d00>

In [136]:
from time import time

start = time()
preds = clf.predict(X_test)
end = time()

print(f"time: {end - start}")

time: 0.003069162368774414


In [137]:
start = time()
my_preds = clf.my_predict(X_test)
end = time()

print(f"time: {end - start}")

time: 0.06311249732971191


In [138]:
def get_confusion_matrix(preds, y_test, label_size):
    confusion_matrix = np.zeros((label_size, label_size))
    for pred, label in zip(preds, y_test):
        confusion_matrix[pred, label] += 1

    return confusion_matrix

In [139]:
def print_score(confusion_matrix):
    precision_0 = confusion_matrix[0, 0] / np.sum(confusion_matrix[:, 0])
    precision_1 = confusion_matrix[1, 1] / np.sum(confusion_matrix[:, 1])
    precision_2 = confusion_matrix[2, 2] / np.sum(confusion_matrix[:, 2])

    recall_0 = confusion_matrix[0, 0] / np.sum(confusion_matrix[0, :])
    recall_1 = confusion_matrix[1, 1] / np.sum(confusion_matrix[1, :])
    recall_2 = confusion_matrix[2, 2] / np.sum(confusion_matrix[2, :])

    f1_score_0 = 2 * precision_0 * recall_0 / (precision_0 + recall_0)
    f1_score_1 = 2 * precision_1 * recall_1 / (precision_1 + recall_1)
    f1_score_2 = 2 * precision_2 * recall_2 / (precision_2 + recall_2)

    accuracy_score = np.sum(np.diag(confusion_matrix)) / np.sum(confusion_matrix)

    print(f"precision_0: {precision_0}")
    print(f"precision_1: {precision_1}")
    print(f"precision_2: {precision_2}")

    print(f"recall_0: {recall_0}")
    print(f"recall_1: {recall_1}")
    print(f"recall_2: {recall_2}")

    print(f"f1_score_0: {f1_score_0}")
    print(f"f1_score_1: {f1_score_1}")
    print(f"f1_score_2: {f1_score_2}")

    print(f"accuracy: {accuracy_score}")

In [140]:
from sklearn.neighbors import KNeighborsClassifier

sk_clf = KNeighborsClassifier(n_neighbors=3)
sk_clf.fit(X_train, y_train)

start = time()
sk_preds = sk_clf.predict(X_test)
end = time()

print(f"time: {end - start}")

time: 0.00274658203125


In [141]:
clf_matrix = get_confusion_matrix(preds, y_test, label_size)
print_score(clf_matrix)

precision_0: 1.0
precision_1: 0.9
precision_2: 1.0
recall_0: 0.9
recall_1: 1.0
recall_2: 1.0
f1_score_0: 0.9473684210526316
f1_score_1: 0.9473684210526316
f1_score_2: 1.0
accuracy: 0.9666666666666667


In [142]:
my_clf_matrix = get_confusion_matrix(my_preds, y_test, label_size)
print_score(my_clf_matrix)

precision_0: 1.0
precision_1: 0.9
precision_2: 1.0
recall_0: 0.9
recall_1: 1.0
recall_2: 1.0
f1_score_0: 0.9473684210526316
f1_score_1: 0.9473684210526316
f1_score_2: 1.0
accuracy: 0.9666666666666667


In [143]:
sk_clf_matrix = get_confusion_matrix(sk_preds, y_test, label_size)
print_score(sk_clf_matrix)

precision_0: 1.0
precision_1: 0.9
precision_2: 1.0
recall_0: 0.9
recall_1: 1.0
recall_2: 1.0
f1_score_0: 0.9473684210526316
f1_score_1: 0.9473684210526316
f1_score_2: 1.0
accuracy: 0.9666666666666667
