# 15장. K-최근접 이웃

<table align="left"><tr><td>
<a href="https://colab.research.google.com/github/rickiepark/ml-with-python-cookbook-2nd/blob/main/ch15.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="코랩에서 실행하기"/></a>
</td></tr></table>

In [1]:
# 코랩을 사용하는 경우 다음 명령을 실행하세요.
!pip install faiss-cpu



In [2]:
import numpy as np
import sklearn
import faiss

print('numpy', np.__version__)
print('sklearn', sklearn.__version__)
print('faiss', faiss.__version__)

numpy 1.25.2
sklearn 1.2.2
faiss 1.8.0


## 15.1 샘플의 최근접 이웃 찾기

In [3]:
# 라이브러리를 임포트합니다.
from sklearn import datasets
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

# 데이터를 로드합니다.
iris = datasets.load_iris()
features = iris.data

# 표준화 객체를 만듭니다.
standardizer = StandardScaler()

# 특성을 표준화합니다.
features_standardized = standardizer.fit_transform(features)

# k=2인 최근접 이웃 모델을 만듭니다.
nearest_neighbors = NearestNeighbors(n_neighbors=2).fit(features_standardized)

# 새로운 샘플을 만듭니다.
new_observation = [ 1,  1,  1,  1]

# 이 샘플과 가장 가까운 이웃의 인덱스와 거리를 찾습니다.
distances, indices = nearest_neighbors.kneighbors([new_observation])

# 최근접 이웃을 확인합니다.
features_standardized[indices]

array([[[1.03800476, 0.55861082, 1.10378283, 1.18556721],
        [0.79566902, 0.32841405, 0.76275827, 1.05393502]]])

In [4]:
nearestneighbors_euclidean = NearestNeighbors(
    n_neighbors=2, metric='euclidean').fit(features_standardized)

In [5]:
# 거리를 확인합니다.
distances

array([[0.49140089, 0.74294782]])

In [6]:
# 유클리디안 거리를 기반으로 각 샘플에 대해 (자기 자신을 포함한) 세 개의 최근접 이웃을 찾습니다.
nearestneighbors_euclidean = NearestNeighbors(
    n_neighbors=3, metric="euclidean").fit(features_standardized)

# 각 샘플의 (자기 자신을 포함한) 3개의 최근접 이웃을 나타내는 리스트의 리스트
nearest_neighbors_with_self = nearestneighbors_euclidean.kneighbors_graph(
    features_standardized).toarray()

# 최근접 이웃 중에서 1로 표시된 자기 자신을 제외시킵니다.
for i, x in enumerate(nearest_neighbors_with_self):
    x[i] = 0

# 첫 번째 샘플에 대한 두 개의 최근접 이웃을 확인합니다.
nearest_neighbors_with_self[0]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

### 붙임

In [7]:
# 이 샘플과 가장 가까운 이웃의 다섯개의 인덱스를 찾습니다.
indices = nearest_neighbors.kneighbors(
    [new_observation], n_neighbors=5, return_distance=False)

# 최근접 이웃을 확인합니다.
features_standardized[indices]

array([[[1.03800476, 0.55861082, 1.10378283, 1.18556721],
        [0.79566902, 0.32841405, 0.76275827, 1.05393502],
        [0.4321654 , 0.78880759, 0.93327055, 1.44883158],
        [0.55333328, 0.78880759, 1.0469454 , 1.58046376],
        [1.03800476, 0.55861082, 1.10378283, 1.71209594]]])

In [8]:
# 반경 0.5 안에 있는 모든 샘플의 인덱스를 찾습니다.
indices = nearest_neighbors.radius_neighbors(
    [new_observation], radius=0.5, return_distance=False)

# 반경 내의 이웃을 확인합니다.
features_standardized[indices[0]]

array([[1.03800476, 0.55861082, 1.10378283, 1.18556721]])

In [9]:
# 반경 내의 이웃을 나타내는 리스트의 리스트
nearest_neighbors_with_self = nearest_neighbors.radius_neighbors_graph(
    [new_observation], radius=0.5).toarray()

# 첫 번째 샘플에 대한 반경 내의 이웃을 확인합니다.
nearest_neighbors_with_self[0]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

## 15.2 K-최근접 이웃 분류기 만들기

In [10]:
# 라이브러리를 임포트합니다.
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn import datasets

# 데이터를 로드합니다.
iris = datasets.load_iris()
X = iris.data
y = iris.target

# 표준화 객체를 만듭니다.
standardizer = StandardScaler()

# 특성을 표준화합니다.
X_std = standardizer.fit_transform(X)

# 5개의 이웃을 사용한 KNN 분류기를 훈련합니다.
knn = KNeighborsClassifier(n_neighbors=5, n_jobs=-1).fit(X_std, y)

# 두 개의 샘플을 만듭니다.
new_observations = [[ 0.75,  0.75,  0.75,  0.75],
                    [ 1,  1,  1,  1]]

# 두 샘플의 클래스를 예측합니다.
knn.predict(new_observations)

array([1, 2])

In [11]:
# 각 샘플이 세 클래스에 속할 확률을 확인합니다.
knn.predict_proba(new_observations)

array([[0. , 0.6, 0.4],
       [0. , 0. , 1. ]])

### 붙임

In [12]:
# 라이브러리를 임포트합니다.
from sklearn.neighbors import KNeighborsRegressor
from sklearn import datasets

# 데이터를 로드합니다.
diabetes = datasets.load_diabetes()
features = diabetes.data
target = diabetes.target

# 최근접 회귀 모델을 만듭니다.
knn_regressor = KNeighborsRegressor(n_neighbors=10)

# 모델을 훈련합니다.
model = knn_regressor.fit(features, target)

In [13]:
# 첫 번째 샘플의 타깃 값을 예측합니다.
model.predict(features[0:1])[0]

191.8

In [14]:
import numpy as np

indices = model.kneighbors(features[0:1], return_distance=False)
np.mean(target[indices])

191.8

## 15.3 최선의 이웃 개수 결정하기

In [15]:
# 라이브러리를 임포트합니다.
from sklearn.neighbors import KNeighborsClassifier
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import GridSearchCV

# 데이터를 로드합니다.
iris = datasets.load_iris()
features = iris.data
target = iris.target

# 표준화 객체를 만듭니다.
standardizer = StandardScaler()

# KNN 분류기를 만듭니다.
knn = KNeighborsClassifier(n_neighbors=5, n_jobs=-1)

# 파이프라인을 만듭니다.
pipe = Pipeline([("standardizer", standardizer), ("knn", knn)])

# 탐색 영역의 후보를 만듭니다.
search_space = [{"knn__n_neighbors": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]}]

# 그리드 서치를 만듭니다.
classifier = GridSearchCV(
    pipe, search_space, cv=5, verbose=0).fit(features, target)

In [16]:
# 최선의 이웃 개수 (k)
classifier.best_estimator_.get_params()["knn__n_neighbors"]

6

## 15.4 반지름 기반의 최근접 이웃 분류기 만들기

In [17]:
# 라이브러리를 임포트합니다.
from sklearn.neighbors import RadiusNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn import datasets

# 데이터를 로드합니다.
iris = datasets.load_iris()
features = iris.data
target = iris.target

# 표준화 객체를 만듭니다.
standardizer = StandardScaler()

# 특성을 표준화합니다.
features_standardized = standardizer.fit_transform(features)

# 반지름 이웃 분류기를 훈련합니다.
rnn = RadiusNeighborsClassifier(
    radius=.5, n_jobs=-1).fit(features_standardized, target)

# 두 개의 샘플을 만듭니다.
new_observations = [[ 1,  1,  1,  1]]

# 두 샘플의 클래스를 예측합니다.
rnn.predict(new_observations)

array([2])

### 붙임

In [18]:
# 반지름 이웃 분류기를 훈련합니다.
rnn = RadiusNeighborsClassifier(
    radius=.5, outlier_label=-1, n_jobs=-1).fit(features_standardized, target)

rnn.predict([[100, 100, 100, 100]])



array([-1])

## 15.5 근사 최근접 이웃 찾기

In [19]:
# 라이브러리를 임포트합니다.
import faiss
import numpy as np
from sklearn import datasets
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

# 데이터를 로드합니다.
iris = datasets.load_iris()
features = iris.data

# 표준화 전처리 객체를 만듭니다.
standardizer = StandardScaler()

# 특성을 표준화합니다.
features_standardized = standardizer.fit_transform(features)

# faiss 파라미터를 지정합니다.
n_features = features_standardized.shape[1]
nlist = 3
k = 2

# IVF 인덱스를 만듭니다.
quantizer = faiss.IndexFlatIP(n_features)
index = faiss.IndexIVFFlat(quantizer, n_features, nlist)

# 인덱스를 훈련하고 특성 벡터를 추가합니다.
index.train(features_standardized)
index.add(features_standardized)

# 새로운 샘플을 만듭니다.
new_observation = np.array([[ 1,  1,  1,  1]])

# 2개의 최근접 이웃을 위해 인덱스를 검색합니다.
distances, indices = index.search(new_observation, k)

# 2개의 최근접 이웃에 해당하는 특성 벡터를 출력합니다.
np.array([list(features_standardized[i]) for i in indices[0]])

array([[1.03800476, 0.55861082, 1.10378283, 1.18556721],
       [0.79566902, 0.32841405, 0.76275827, 1.05393502]])

## 15.4 근사 최근접 이웃 평가하기

In [20]:
# 라이브러리를 로드합니다.
import faiss
import numpy as np
from sklearn import datasets
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

# 최근접 이웃 개수
k = 10

# 데이터를 로드합니다.
iris = datasets.load_iris()
features = iris.data

# 표준화 전처리 객체를 만듭니다.
standardizer = StandardScaler()

# 특성을 표준화합니다.
features_standardized = standardizer.fit_transform(features)

# 10개의 최근접 이웃을 사용하는 KNN을 만듭니다.
nearest_neighbors = NearestNeighbors(n_neighbors=k).fit(features_standardized)

# faiss 파라미터를 지정합니다.
n_features = features_standardized.shape[1]
nlist = 3

# IVF 인덱스를 만듭니다.
quantizer = faiss.IndexFlatIP(n_features)
index = faiss.IndexIVFFlat(quantizer, n_features, nlist)

# 인덱스를 훈련하고 특성 벡터를 추가합니다.
index.train(features_standardized)
index.add(features_standardized)
index.nprobe = 1

# 새로운 샘플을 만듭니다.
new_observation = np.array([[ 1,  1,  1,  1]])

# 샘플의 최근접 이웃에 해당하는 인덱스와 거리를 찾습니다.
knn_distances, knn_indices = nearest_neighbors.kneighbors(new_observation)

# 두 개의 최근접 이웃을 위해 인덱스를 검색합니다.
ivf_distances, ivf_indices = index.search(new_observation, k)

# 두 인덱스의 교집합을 구합니다.
recalled_items = set(list(knn_indices[0])) & set(list(ivf_indices[0]))

# 재현율을 출력합니다.
print(f"Recall @k={k}: {len(recalled_items)/k * 100}%")

Recall @k=10: 100.0%
