<a href="https://colab.research.google.com/github/masao1112/MLFromScratch/blob/main/%5BOn_going%5DKNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import sklearn


In [None]:
# Load the train data points
# Compute the euclidean/manhattan distance of test data points with train data points
# nearest neighbour searching
# major voting mechanism

In [None]:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

data = load_iris()
X = data.data
y = data.target
# shuffle the dataset
dataset = np.c_[X, y]
np.random.shuffle(dataset)
shuffled_X = dataset[:, :-1]
shuffled_y = dataset[:, -1]
# train test split
X_train, X_test, y_train, y_test = train_test_split(shuffled_X, shuffled_y, test_size=50)

In [None]:
def calculate_distance(train_set, data_point):
  # calculate the distance of test data point wrt all data points in train set
  return np.sqrt(np.sum((train_set - data_point)**2, axis=1))

In [None]:
def get_k_nearest_indices(k, distance):
  return np.argsort(distance)[:k]

In [None]:
get_k_nearest_indices(5, calculate_distance(X_train, X_test[0]))

array([93,  9, 53, 96, 91])

In [None]:
def find_nearest_neighbours(k, train_data, train_label, data_point, weights="uniform", eps=1e-5):
  # calculate the distance
  distance = calculate_distance(train_data, data_point)

  # find nearest neighbours
  idx = get_k_nearest_indices(k, distance)
  nearest_classes = train_label[idx]
  unique_labels, counts = np.unique(nearest_classes, return_counts=True) # get classes labels and counts

  # Uniform voting
  if weights == "uniform":
    pred = unique_labels[np.argmax(counts)]
  # Weighted voting
  elif  weights == "distance":
    # get k nearest distances
    nearest_distances = distance[idx]
    # compute weighted distances
    w = {
          label: 1 / ((nearest_distances[label == nearest_classes]).sum()  + eps)
          for label in unique_labels
        }
    pred = max(w, key=w.get)
  return pred

# Apply for al test_data in one go

In [None]:
def calculate_distances(train_set, test_set):
  # calculate the distance of test set points wrt all data points in train set
  train_l2_norm = np.sum(train_set**2, axis=1)
  test_l2_norm = np.sum(test_set**2, axis=1)
  distances = np.sqrt(train_l2_norm.reshape(1, -1) + test_l2_norm.reshape(-1, 1) - 2 * np.dot(test_set, train_set.T))
  return distances

In [None]:
# params need: unique_labels(y_train)
# Compute distances for all data points --> (50, 100)
# get k nearest indices --> (50, k)
# get k nearest classes --> (50, k)
# For uniform prob: get counts from nearest_classes -> idx, counts = np.unique()
# For distance based: get k nearest distances -> compute weighted distances wrt to each class in nearest_classes

In [None]:
a = np.zeros((3, 5))
a[0][[1, 2]] = 3, 5
a

array([[0., 3., 5., 0., 0.],
       [0., 0., 0., 0., 0.],
       [0., 0., 0., 0., 0.]])

In [None]:
# apply for all data points in one go
def find_nearest_neighbours(k, train_data, train_label, test_data, weights="uniform", eps=1e-5):

  # Extract unique labels
  unique_labels = np.unique(train_label)

  m = test_data.shape[0]
  n = unique_labels.shape[0]
  # calculate l2 norm(euclidean distance) for each test data point wrt train data
  distances = calculate_distances(train_data, test_data) # (50, 100)

  # find nearest neighbours
  indices = get_k_nearest_indices(k, distances) # (50, k)
  nearest_classes = train_label[indices] # (50, k)

  # Uniform voting
  if weights == "uniform":
    # label counts holer
    label_counts = np.zeros((m, n))
    # for each test data point
    for i in range(m):
      idx, counts = np.unique(nearest_classes[i], return_counts=True) # get classes labels and counts
      label_counts[i][idx] = counts # assign counts for correspond label index
    pred = unique_labels[np.argmax(label_counts, axis=1)] # (50,)
  # Weighted voting
  elif  weights == "distance":
    # get k nearest distances
    nearest_distances = distances[idx]
    # compute weighted distances
    w = {
          label: 1 / ((nearest_distances[label == nearest_classes]).sum()  + eps)
          for label in unique_labels
        }
    pred = max(w, key=w.get)
  return pred

In [None]:
from sklearn.neighbors import KNeighborsClassifier
knn = KNeighborsClassifier(n_neighbors=3, weights="uniform", algorithm="ball_tree")
knn.fit(X_train, y_train)
start = time.time()
knn.score(X_test, y_test)
end = time.time()
print(f"KNN sklearn running time: {end - start}")

KNN sklearn running time: 0.01259160041809082


In [None]:
import time

class KNNClassifier:
  def __init__(self, k, weights="uniform"):
    self.k = k
    self.weights = weights

  def fit(self, X_train, y_train):
    self.X_train = X_train
    self.y_train = y_train

  def predict(self, X_test):
    y_pred = []
    start = time.time()
    for i in range(len(X_test)):
      pred = find_nearest_neighbours(self.k, self.X_train, self.y_train, X_test[i], weights=self.weights)
      y_pred.append(pred)
    end = time.time()
    print(f'Running Tinme: {end - start}')

    return y_pred

  def score(self, X_test, y_test):
    y_pred = self.predict(X_test)
    return (y_pred == y_test).mean()

In [None]:
classifier = KNNClassifier(100, weights="distance")
classifier.fit(X_train, y_train)
start = time.time()
classifier.score(X_test, y_test)
end = time.time()
print(f"My KNN running time: {end - start}")

Running Tinme: 0.007066249847412109
My KNN running time: 0.007790088653564453


In [None]:
import time
from scipy.spatial.distance import cdist

t1 = time.time()
d1 = cdist(X_train, X_test)
t2 = time.time()
print(f"scipy calculate_distances running time: {t2 - t1}")

t1 = time.time()
d2 = classifier.predict(X_test)
t2 = time.time()
print(f"My calculate_distances running time: {t2 - t1}")


scipy calculate_distances running time: 0.0009734630584716797
Running Tinme: 0.011380195617675781
My calculate_distances running time: 0.0116729736328125


In [None]:
def euclidean_dist(train, test):
  train_sq = np.sum(train*train, 1)
  test_sq = np.sum(test*test, 1)

  return np.sqrt(train_sq.reshape(-1, 1) + test_sq.reshape(1, -1) - 2 * np.dot(train, test.T))

In [None]:
t1 = time.time()
d2 = euclidean_dist(X_train, X_test)
print(f"running time: {time.time() - t1}")

running time: 0.0010199546813964844
