In [4]:
import pandas as pd
import random
from typing import Tuple
import numpy as np

In [5]:
class ProcessingData:
    @staticmethod
    def shuffleDF(df: pd.DataFrame) -> pd.DataFrame:
        return df.iloc[random.sample(range(len(df)), len(df))].reset_index(drop=True)
        #return df.iloc[np.random.permutation(len(df))]

    @staticmethod
    def normalizeDF(df: pd.DataFrame, columnNames: list) -> pd.DataFrame:
        for columnName in columnNames:
            df[columnName] = ((df[columnName]-df[columnName].mean())/df[columnName].std())
        return df
    @staticmethod
    def splitDF(df: pd.DataFrame, trainSize: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
        return df[df.index < int(len(df)*trainSize)], df[df.index >= int(len(df)*trainSize)]
    @staticmethod
    def processData(df: pd.DataFrame, columnNames: list, trainSize: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
        df = ProcessingData.shuffleDF(df)
        df = ProcessingData.normalizeDF(df, columnNames)
        return ProcessingData.splitDF(df, trainSize)
    



In [6]:
class KNN:
    def __init__(self, m: int, k: int):
        self.m = m
        self.k = k

    @staticmethod
    def dst(x: np.array, y: np.array, m: int) -> float:
        return sum([(abs(xi-yi)**m)
                    for xi,yi in zip(x,y)])**(1/m)

    def fit(self, df: pd.DataFrame):
        self.df = df
    
    def predict(self, point: pd.DataFrame) -> str:
        types = {}
        for v in pd.unique(self.df["quality"]):
            types[v] = 0
        result = []
        for sample in self.df.values:
            result.append([KNN.dst(sample[:-1], point, self.m), sample[-1]])
        result.sort(key=lambda x:x[0])
        for i in range(self.k):
            types[result[i][1]] += 1
        return max(types, key=types.get)
    
    def score(self, test_X: pd.DataFrame) -> float:
        good = 0
        bad = 0
        for sample in test_X.values:
            if (x:=self.predict(sample)) == sample[-1]:
                good += 1
            else:
                bad +=1
        return good/(bad+good)*100

In [7]:
wine = pd.read_csv('winequality-red.csv', sep=';')
train_X, test_X = ProcessingData.processData(wine, wine.columns[:-1], 0.8)
knn = KNN(2, 4)
knn.fit(train_X)
knn.score(test_X)


56.875

In [None]:
# classify wine quality using sklearn kneighbours
from sklearn.neighbors import KNeighborsClassifier


