In [51]:
import numpy as np
import pandas as pd

from operator import itemgetter

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score

In [52]:
def calculate_manhattan_distance(array1: np.ndarray,
                                 array2: np.ndarray
                                 ) -> np.ndarray:
    """
    Generalised formula for calculating both Manhattan and Euclidean distances. Calculates pairwise distances between every point in two n-dimensional arrays.
    array1: first set of points;
    array2: second set of points;
    p: power parameter which determines the distance metric used, with 1 = Manhattan and 2 = Euclidean.
    """

    diffs = array1[:, None, :] - array2[None, :, :]
    abs_diffs = np.abs(diffs)
    return abs_diffs.sum(axis=-1)

In [53]:
def calculate_euclidean_distance(array1: np.ndarray,
                                 array2: np.ndarray
                                 ) -> np.ndarray:
    """
    Generalised formula for calculating both Manhattan and Euclidean distances. Calculates pairwise distances between every point in two n-dimensional arrays.
    array1: first set of points;
    array2: second set of points;
    p: power parameter which determines the distance metric used, with 1 = Manhattan and 2 = Euclidean.
    """

    diffs = array1[None, :, :] - array2[:, None, :]
    abs_diffs = np.power(diffs, 2)
    return abs_diffs.sum(axis=-1) ** (1 / 2)

In [54]:
def calculate_nearest_neighbour(distances: np.ndarray,
                                 labels: np.ndarray,
                                 k: int
                                 ) -> int:

        """
        For any observation, takes its distances between a range of other observations and their class labels and calculates the likely class of that point using the labels of the top K neighbours.
        distances: an array containing distances between the observation and a range of other observations;
        labels: the class labels for the range of other observations;
        k: the desired number of neighbours to use to calculate the predicted label.
        """

        # Create a 2D array with the pairwise distance between each point and the observation label
        distances_labelled = np.vstack((distances, labels)).T

        # Sort ascending by distance and keep only the k smallest observations
        k_top_labels = distances_labelled[distances_labelled[:, 0].argsort()][0:k][:, 1]

        # Get the frequencies of each label and convert to a 2D array
        labels, counts = np.unique(k_top_labels, return_counts = True)
        label_freqs = np.asarray((labels, counts)).T

        # Sort the frequencies table descending by frequency count and keep the label of the most frequent observation
        return label_freqs[label_freqs[:, 1].argsort()[::-1]][0, 0]

In [56]:
def calculate_knn(X_train: np.ndarray,
                  X_test: np.ndarray,
                  y_train: np.ndarray,
                  distance_metric: str,
                  k: int
                  ) -> np.ndarray:

    """
    Implementation of the kNN algorithm. Generates a list of class label predictions for a given set of test points, based on a selected distance metric and number of neighbours.
    X: the training features;
    y: the training labels;
    X_: the test features;
    p: power parameter for the Minkowski distance, which determines the distance metric used, with 1 = Manhattan and 2 = Euclidean.
    k: the number of neighbours to use.
    """
    # Generate a matrix of distances between each test observation and all train observations
    if distance_metric == "manhattan":
        train_test_distances = calculate_manhattan_distance(X_train, X_test)
    else:
        train_test_distances = calculate_euclidean_distance(X_train, X_test)

    # For each test observation, generate the majority class of its k nearest neighbours from
    # the training set
    return np.apply_along_axis(lambda x: calculate_nearest_neighbour(distances = x, labels = y_train, k = k),
                               1,
                               train_test_distances)

In [57]:
beans = pd.read_excel("../data/Dry_Bean_Dataset.xlsx")
bscaler = StandardScaler()

In [58]:
def calculate_samples(cols: list, rows: int, scaler: StandardScaler) -> tuple:
    # Prepare random sample with selected columns and row count
    sample_df = beans[cols].sample(n=rows, random_state=456)
    X = sample_df.drop(columns=["Class"])
    y = sample_df["Class"].astype("category").cat.codes.to_numpy()

    # Create test and train sets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=456)

    # Scale features
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    return X_train, X_test, y_train, y_test

In [59]:
X_train_small, X_test_small, y_train_small, y_test_small = calculate_samples(
    ["MajorAxisLength", "MinorAxisLength", "roundness", "Class"], 400, bscaler)

In [61]:
%%time
y_pred_small = calculate_knn(X_train_small,
                             X_test_small,
                             y_train_small,
                             "euclidean",
                             k=3)

CPU times: user 11.8 ms, sys: 1.2 ms, total: 13 ms
Wall time: 11.5 ms


In [62]:
print(accuracy_score(y_test_small, y_pred_small))

0.93


In [63]:
X_train_med, X_test_med, y_train_med, y_test_med = calculate_samples(
    ["MajorAxisLength", "MinorAxisLength", "roundness", "Class"], len(beans), bscaler)

In [65]:
%%time
y_pred_med = calculate_knn(X_train_med,
                           X_test_med,
                           y_train_med,
                           "euclidean",
                           k=3)

CPU times: user 6.22 s, sys: 845 ms, total: 7.07 s
Wall time: 7.07 s


In [66]:
print(accuracy_score(y_test_med, y_pred_med))

0.9006758742286218


In [67]:
X_train_large, X_test_large, y_train_large, y_test_large = calculate_samples(
    beans.columns.to_list(), len(beans), bscaler)

In [68]:
%%time
y_pred_large = calculate_knn(X_train_large,
                             X_test_large,
                             y_train_large,
                             "euclidean",
                             k=3)

CPU times: user 16.6 s, sys: 3.06 s, total: 19.7 s
Wall time: 19.8 s


In [70]:
print(accuracy_score(y_test_large, y_pred_large))

0.9200705260064649
