# Implementation 1

In [3]:
import pandas as pd
import numpy as np
from tqdm import tqdm
from time import time
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

tqdm.pandas()

In [None]:
# Get data
train_data_small = pd.read_csv(r"MNIST_train_small.csv")
test_data_small = pd.read_csv(r"MNIST_test.csv")

In [None]:
def custom_mode(object):
    modes = object.mode()
    # If modes is a Series, take the first value; if it's a single value, return it directly
    mode_value = modes.iloc[0] if isinstance(modes, pd.Series) else modes

    return mode_value

In [None]:
# Func to extract features and labels form dataset
def get_features_targets(data):

    targets= data.iloc[:, 0]
    features= data.iloc[:, 1:]
    features.columns = list(range(features.shape[1]))

    return features, targets

In [None]:
# Create KNN Classifier

class KNN:

    # Initialize Object
    def __init__(self, k, distance):
        if distance == 'l2':
            self.norm = 2
        self.features = None
        self.labels = None
        self.k = k

    # Fit to training set
    def fit(self, features, labels):
        self.features = features
        self.labels = labels

        
    def predict_one(self, x, mult=None):

        distances = self.features.progress_apply(lambda row: np.linalg.norm(row.sub(x), ord=self.norm), axis=1)
        distances = distances.sort_values().loc[distances!=0]

        if mult:

            # Output max neighbours in k list
            k_neighbours = distances[:np.max(self.k)].index
            k_neighbours_labels = self.labels.loc[k_neighbours]
            predicted_labels = []
            
            for i in self.k:       
                relevant_neighbours = pd.Series(list(k_neighbours_labels)[:i])      
                predicted_k = custom_mode(relevant_neighbours)
                predicted_labels.append(predicted_k)

            return predicted_labels
        
        else:
            k_neighbours = distances[:self.k].index
            k_neighbours_labels = self.labels.loc[k_neighbours]
            prediction_label = custom_mode(k_neighbours_labels)

            return prediction_label
    
    # Prediction for dataset
    def predict(self, x, mult=None):
        predictions = x.progress_apply(lambda row: self.predict_one(row, mult), axis=1)

        return predictions

    # Error score    
    def error_score(self, predicted, data_label):
        difference = predicted.sub(data_label)
        error = len(difference.loc[difference!=0]) / len(data_label)

        return error
    
    # Error scores for multiple ks
    def error_mult(self, predicted, data_labels):

        if isinstance(self.k, (np.ndarray, list)):
            
            differences = [pr.sub(data_labels) for pr in predicted]
            errors = [len(d.loc[d!=0]) / len(d) for d in differences]
                
            return errors
            
        else:
            print("This method is used for multiple K")
            pass
        
    def loocv_score(self):
        k_list = self.k
        big_data_x = self.features.copy()
        big_data_y = self.labels.copy()
        loocv_errors = np.zeros(len(k_list))
        
        for i in range(len(self.features)):
            big_data_x_copy = big_data_x.copy()
            big_data_y_copy = big_data_y.copy()
            
            current_data_x = big_data_x_copy.drop(index=i)
            current_data_y = big_data_y_copy.drop(index=i)
            
            validation_x = big_data_x_copy.loc[i]
            validation_y = big_data_y_copy.loc[i]
            # print('The value of y to be verified is ', validation_y)
            
            self.fit(current_data_x, current_data_y)
            
            current_predictions = self.predict_one(validation_x, True)
            
            loocv_err = np.array([int(pr != validation_y) for pr in current_predictions])
            
            loocv_errors = loocv_errors + loocv_err
            
        return loocv_errors / len(self.features)


In [None]:
# Get features and labels for datasets
train_small_X, train_small_y = get_features_targets(train_data_small)
test_small_X, test_small_y = get_features_targets(test_data_small)

In [None]:
# Create KNN instance to get error score for multiple k
k_list = np.array([i for i in range(1,20)])
KNN_mult = KNN(k=k_list, distance='l2')
KNN_mult.fit(train_small_X, train_small_y)

In [None]:
# Calculate error scores
errors_training = KNN_mult.error_mult(train_small_X, train_small_y)
errors_testing = KNN_mult.error_mult(test_small_X, test_small_y)

In [None]:
# Plot errors
plt.plot(k_list, errors_training, color='m', label='Risk for Training Set')
plt.plot(k_list, errors_testing, color='g', label='Risk for Testing Set')
plt.title('Emperical Risk for a Range of Ks')
plt.ylabel('Error Rate')
plt.xlabel('K-Value')
plt.xticks(list(range(1,21)))
plt.legend()
plt.show()

In [None]:
# Get LOOCV risk scores
loocv_small = KNN_mult.loocv_score()
print('The errors for each k are given by ', loocv_small)

# Plot errors with loocv
plt.plot(k_list, loocv_small, color='m', label='Risk for Train Set with LOOCV')
plt.plot(k_list, errors_testing, color='g', label='Risk for Test Set')
plt.title('Empirical Risk for a Range of Ks')
plt.ylabel('Empirical Risk')
plt.xlabel('K-Value')
plt.ylim((0,0.15))
plt.xlim((1,20))
plt.xticks(list(range(1,21)))
plt.legend()
plt.show()

In [None]:
# Preprocessing implementation
train_small_X_binary = (train_small_X > 128).astype(int)

pca = PCA(n_components=50)
train_small_X_pca = pca.fit_transform(train_small_X_binary)

train_small_X_pca = pd.DataFrame(train_small_X_pca, index=train_small_X_binary.index)

KNN_LOOCV_preprocessed = KNN(k_list, 'l2')
KNN_LOOCV_preprocessed.fit(train_small_X_pca, train_small_y)

loocv_preprocessed = KNN_LOOCV_preprocessed.loocv()

print('LOOCV Error Rates after Preprocessing:', loocv_preprocessed)
plt.plot(k_list, loocv_small, label='Before Preprocessing', linestyle='dashed', marker='o', color='red')
plt.plot(k_list, loocv_preprocessed, label='After Preprocessing', linestyle='solid', marker='s', color='blue')

plt.xlabel('k-Value')
plt.ylabel('LOOCV Error Rate')
plt.title('Before vs After Preprocessing')
plt.legend()
plt.show()

This implementation broke down while running on the large dataset

# Implementation Two and Three

Uses new KNN classifier

In [1]:
from scipy.spatial.distance import cdist

In [2]:
# Create KNN Classifier

class KNN:

    # Initialize Object
    def __init__(self, k, distance=None):
        if distance == None:
            self.norm = 2
        else:
            self.norm = distance
        self.features = None
        self.labels = None
        self.k = k

    # Fit to training set
    def fit(self, features, labels):
        self.features = features
        self.labels = labels

    
    # Prediction for dataset
    def predict(self, x, mult=None, all=None):

        if mult == None and all == None:
            predictions = cdist(x, self.features, metric='minkowski', p=self.norm)
            pred_ind = np.argsort(predictions, axis=1)[:, :self.k]
            preds = pd.DataFrame(self.labels[pred_ind]).mode(axis=1).iloc[:, 0].values

        elif all == 'all':
            preds = cdist(x, self.features, metric='minkowski', p=self.norm)
        
        else:
            predictions = cdist(x, self.features, metric='minkowski', p=self.norm)
            pred_ind = np.argsort(predictions, axis=1)[:, :max(self.k)]
            preds = self.labels[pred_ind]

        return preds
    
    # Prediction for multiple K
    def error_mult(self, x, true_labels, prediction_data = None):

        if isinstance(self.k, (np.ndarray, list)):
            preds = self.predict(x, mult=True)
            pred_df = pd.DataFrame(columns=[str(i) for i in self.k])
            errors_k = []

            for i in self.k:
                pred_labels = pd.DataFrame(preds).iloc[:,:i].mode(axis=1).iloc[:,0].values
                pred_df[str(i)] = pred_labels
                errors_k.append(self.error_score(pred_labels, true_labels))
            
            if prediction_data != None :
                return pred_df, errors_k
            
            if prediction_data == None:
                return errors_k
        
        else:

            print("This method is used for multiple K. Enter a list of K-values.")
            pass

    # Error score
    def error_score(self, predicted, data_label):
        errors = predicted - data_label
        error_score = len(errors[errors!=0])/len(data_label)

        return error_score

    # Leave One Out CV Score
    def loocv_score(self):

        all_pred = self.predict(self.features, all='all')

        if isinstance(self.k, (np.ndarray, list)):
            k_errors = []
            sorted_ind = np.argsort(all_pred, axis=1)

            for i in self.k:
                pred_ind = sorted_ind[:, 1:i+1]
                preds = pd.DataFrame(self.labels[pred_ind]).mode(axis=1).iloc[:, 0].values
                k_errors.append(self.error_score(preds, self.labels))

            return k_errors
        
        else:
            pred_ind = np.argsort(all_pred, axis=1)[:, 1:self.k+1]
            preds = pd.DataFrame(self.labels[pred_ind]).mode(axis=1).iloc[:, 0].values

            return self.error_score(preds, self.labels)
        
    # Leave One Out CV Score - 2
    def loocv_score_big(self):

        data_split = [self.features[(i*10000):((i+1)*10000)] for i in range(0,6)]
        label_split = [self.labels[(i*10000):((i+1)*10000)] for i in range(0,6)]

        k_errors = np.zeros(len(self.k))

        if isinstance(self.k, (np.ndarray, list)):

            for i in tqdm(range(len(data_split))):
                curr_pred = self.predict(data_split[i], all='all')

                k_curr_errors = np.zeros(len(self.k))

                sorted_ind = np.argsort(curr_pred, axis=1)

                for j, l in enumerate(self.k):
                    pred_ind = sorted_ind[:, 1:l+1]
                    preds = pd.DataFrame(self.labels[pred_ind]).mode(axis=1).iloc[:, 0].values
                    k_curr_errors[j] = self.error_score(preds, label_split[i])

                k_errors += k_curr_errors

            return k_errors/len(data_split)
        
        else:
            
            for i in tqdm(range(len(data_split))):
                curr_pred = self.predict(data_split[i], all='all')

                k_curr_errors = np.zeros(len(self.k))

                sorted_ind = np.argsort(curr_pred, axis=1)
                pred_ind = sorted_ind[:, 1:self.k+1]
                preds = pd.DataFrame(self.labels[pred_ind]).mode(axis=1).iloc[:, 0].values
                k_curr_errors = self.error_score(preds, label_split[i])

                k_errors += k_curr_errors

            return k_errors/len(data_split)
        
    # Search for best metric 
    def grid_search_dist(self, dist_range):

        dist_df = pd.DataFrame(columns=[str(i) for i in dist_range])

        for i in tqdm(dist_range): 
            self.norm = i
            dist_df[str(i)] = self.loocv_score()

        return dist_df


## Implementation Two

In [None]:
# Do everything as before

k_list = np.array([i for i in range(1,21)])
KNN_mult = KNN(k=k_list)
KNN_mult.fit(train_small_X, train_small_y)

errors_training = KNN_mult.error_mult(train_small_X, train_small_y)
errors_testing = KNN_mult.error_mult(test_small_X, test_small_y)

plt.plot(k_list, errors_training, color='m', label='Risk for Train Set')
plt.plot(k_list, errors_testing, color='g', label='Risk for Test Set')
plt.title('Empirical Risk for a Range of Ks')
plt.ylabel('Empirical Risk')
plt.xlabel('K-Value')
plt.ylim((0,0.15))
plt.xlim((1,20))
plt.xticks(k_list)
plt.legend()
plt.show()

LOOCV_score = KNN_mult.loocv_score()

plt.plot(k_list, LOOCV_score, color='m', label='Risk for Train Set with LOOCV')
plt.plot(k_list, errors_testing, color='g', label='Risk for Test Set')
plt.title('Empirical Risk for a Range of Ks')
plt.ylabel('Empirical Risk')
plt.xlabel('K-Value')
plt.ylim((0,0.15))
plt.xlim((1,20))
plt.xticks(list(range(1,21)))
plt.legend()
plt.show()

In [None]:
# Grid Search for best k and p
dist_range = [i for i in range(1,16)]
dist_df = KNN_mult.grid_search_dist(dist_range=dist_range)

# Plot heatmap
plt.imshow(dist_df, cmap='magma')
plt.xlabel('P-Value')
plt.ylabel('K-Value')
plt.xticks(ticks=range(0,15), labels=dist_range)
plt.yticks(ticks=range(0,20), labels=k_list)
plt.colorbar()
plt.show()

# Get position of minimum risk
min_error = dist_df.to_numpy().min()
print(f"{min_error:.4f} error rate acheived at k={np.where(dist_df == min_error)[0][0]+1} and p={np.where(dist_df == min_error)[1][0]+1}")

In [None]:
# Using large dataset
train_data = pd.read_csv(r"MNIST_train.csv")
test_data = pd.read_csv(r"MNIST_test.csv")

# Get features and labels for datasets
train_X, train_y = get_features_targets(train_data)
test_X, test_y = get_features_targets(test_data)

# Binarize data
train_X_binary = (train_X > 128).astype(int)

k_list = np.array([i for i in range(1,21)])

# Create KNN instance
KNN_big_loocv = KNN(k=k_list, distance=2)
KNN_big_loocv.fit(train_X_binary, train_y)

# Get loocv score
KNN_big_loocv_scores = KNN_big_loocv.loocv_score()

The code broke down at the last step which leads us to implementation three

## Implementation Three

In [None]:
# Do as before
# Using large dataset
train_data = pd.read_csv(r"MNIST_train.csv")
test_data = pd.read_csv(r"MNIST_test.csv")

# Get features and labels for datasets
train_X, train_y = get_features_targets(train_data)
test_X, test_y = get_features_targets(test_data)

# Binarize data
train_X_binary = (train_X > 128).astype(int)

k_list = np.array([i for i in range(1,21)])

# Create KNN instance
KNN_big_loocv = KNN(k=k_list, distance=2)
KNN_big_loocv.fit(train_X_binary, train_y)

# Get loocv score with new func
KNN_big_loocv_scores = KNN_big_loocv.loocv_score_big()

# Plot
plt.plot(k_list, KNN_big_loocv_scores)
plt.xticks(k_list)
plt.ylim((0.03,0.06))
plt.xlabel('K Value')
plt.ylabel('Empirical Risk')
plt.title('Empirical Risk with LOOCV')
plt.show() 

In [None]:
# Predict for test data
test_X_binary = (test_X > 128).astype(int)

KNN_test_big = KNN(k=3, distance=2)
KNN_test_big.fit(train_X_binary, train_y)
predictions_test = KNN_test_big.predict(test_X_binary) 

# Get error score
error_test = KNN_test_big.error_score(predictions_test, test_y)

In [None]:
# Get error score for all k 
k_list = np.array([i for i in range(1,21)])
KNN_test_mult = KNN(k=k_list, distance=2)
KNN_test_mult.fit(train_X_binary, train_y)

test_error_mult = KNN_test_mult.error_mult(test_X_binary, test_y)

# Plot
import matplotlib.pyplot as plt

plt.plot(k_list, test_error_mult)
plt.xticks(k_list)
plt.ylim((0.04,0.055))
plt.xlabel('K Value')
plt.ylabel('Empirical Risk')
plt.title('Empirical Risk on Entire Test Set')
plt.show()