# Lab_04: K-NN algorithm

In [158]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data -O iris.csv

--2022-04-22 15:52:19--  https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
Resolving archive.ics.uci.edu (archive.ics.uci.edu)... 128.195.10.252
Connecting to archive.ics.uci.edu (archive.ics.uci.edu)|128.195.10.252|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4551 (4,4K) [application/x-httpd-php]
Saving to: ‘iris.csv’


2022-04-22 15:52:20 (39,5 MB/s) - ‘iris.csv’ saved [4551/4551]



In [159]:
!wget https://raw.githubusercontent.com/dbdmg/data-science-lab/master/datasets/mnist_test.csv -O mnist.csv

--2022-04-22 15:52:21--  https://raw.githubusercontent.com/dbdmg/data-science-lab/master/datasets/mnist_test.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18289443 (17M) [text/plain]
Saving to: ‘mnist.csv’


2022-04-22 15:52:22 (29,1 MB/s) - ‘mnist.csv’ saved [18289443/18289443]



In [160]:
import pandas as pd
import numpy as np

In [161]:
names = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'class']
iris = pd.read_csv('iris.csv', names=names)

In [162]:
iris.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,class
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [163]:
# set the seed to obtain same results
np.random.seed(42) 
# set the size of the Test set as the 20% of the dataset
size = round(.2*len(iris))
# create a mask to retrieve selected rows. 
# I used choice since I don't want replacements. 
# I passed the column index and then sampled indexes
mask = sorted(np.random.choice(iris.index, size=size, replace=False))
X_test, y_test = iris.iloc[mask][names[:-1]], iris.iloc[mask][names[-1]]

In [164]:
# create the train mask subtracting indexes belonging to previous mask.
# Then we need to cast it to list otherwise .iloc won't work
train_mask = list(set(iris.index) - set(mask))
X_train, y_train = iris.iloc[train_mask][names[:-1]], iris.iloc[train_mask][names[-1]]

In [165]:
class KNearestNeighbors:
    def __init__(self, k, distance_metric="euclidean", weighting_scheme:str='uniform'):
        self.k = k
        self.distance_metric = distance_metric
        self.weighting_scheme = weighting_scheme
        
    def compute_distance(self, x:np.array, y:np.array, type:str='euclidean'):
        if type == 'euclidean':
            return np.sqrt(np.sum((x-y)**2))
        elif type == 'cosine':
            num = np.sum(x*y)
            den = np.sqrt(np.sum(x**2))*np.sqrt(np.sum(y**2))
            return 1 - np.abs(num/den)
        elif type == 'manhattan':
            return np.sum(np.abs(x-y))
        
    def fit(self, X, y):
        """
        Store the 'prior knowledge'of you model that will be used
        to predict new labels.
        :param X : input data points, ndarray, shape = (R,C).
        :param y : input labels, ndarray, shape = (R,).
        """
        self.X_train = X
        self.y_train = y
        self.classes = list(y.unique()) # retrieve list of classes
    
    def predict(self, X):
        """
        Run the KNN classification on X.
        :param X: input data points, ndarray, shape = (N,C).
        :return: labels : ndarray, shape = (N,).
        """
        distances = dict() # len = y_test
        idx = 0
        for test_row in X.values:
            dist_list = []
            for train_row in self.X_train.values:
                dist_list.append(self.compute_distance(test_row, train_row, self.distance_metric))
            distances[idx] = np.array(dist_list)
            idx += 1

        pred = np.zeros(shape=(len(X), len(self.classes)))

        # use this stratagem to select indexes where the value is lower than this threshold 
        for i in range(len(distances)):
            threshold = sorted(distances[i])[self.k]
            maskerina = np.argwhere(distances[i] < threshold)
            for idx in maskerina.flatten():
                label = y_train.iloc[idx]
                if self.weighting_scheme == 'distance':
                    if distances[i][idx] == 0:
                        pred[i][label] = np.inf
                    else:
                        pred[i][label] += 1/(distances[i][idx])
                else:
                    pred[i][label] += 1
        return np.argmax(pred, axis=1)
        

In [166]:
# obtain names of classes
classes = iris['class'].unique()

# map classes to values 0, 1, 2 used to index when we count the votes
classes_dict = {cl:i for i,cl in enumerate(classes)}
# reverse_classes to map back names of classes
reverse_classes_dict = {i:cl for i,cl in enumerate(classes)}
# map classes values (both test and train)
y_train = y_train.map(classes_dict)
y_test = y_test.map(classes_dict)


In [167]:
knn = KNearestNeighbors(7, 'euclidean', 'distance')

In [168]:
knn.fit(X_train, y_train)
predictions = knn.predict(X_test)
predictions

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2])

In [169]:
# map each integer value to the initial class
labeled_classes = pd.Series(predictions).map(reverse_classes_dict)
labeled_classes

0         Iris-setosa
1         Iris-setosa
2         Iris-setosa
3         Iris-setosa
4         Iris-setosa
5         Iris-setosa
6         Iris-setosa
7         Iris-setosa
8         Iris-setosa
9         Iris-setosa
10    Iris-versicolor
11    Iris-versicolor
12    Iris-versicolor
13    Iris-versicolor
14    Iris-versicolor
15    Iris-versicolor
16    Iris-versicolor
17    Iris-versicolor
18    Iris-versicolor
19     Iris-virginica
20     Iris-virginica
21     Iris-virginica
22     Iris-virginica
23     Iris-virginica
24     Iris-virginica
25     Iris-virginica
26     Iris-virginica
27     Iris-virginica
28     Iris-virginica
29     Iris-virginica
dtype: object

In [170]:
def compute_accuracy(pred:np.array, y_test:np.array) -> float:
    accuracy = 0

    for i,j in zip(pred, y_test):
        if i == j:
            accuracy += 1
        else:
            continue
    return accuracy/len(y_test)

In [171]:
compute_accuracy(predictions, y_test)

1.0

In [172]:
def run_knn(X_train:np.array, y_train:np.array, X_test:np.array, y_test:np.array, k_list:list, distance_names:list, schemes:list) -> dict:
    results = dict()
    for k in k_list:
        print(f'{k}-Nearest neighbors...')
        scheme_list = []
        
        # choose weighting scheme
        for scheme in schemes:
            #print(f'\tUsing {scheme} scheme...')
            res_list = []
            
            # compute each distance
            for distance in distance_names:    
                #print(f'\tUsing {distance} as distance metric')
                
                knn = KNearestNeighbors(k, distance, scheme)
                knn.fit(X_train, y_train)
            
                predictions = knn.predict(X_test)
            
                accuracy = compute_accuracy(predictions, y_test)
            
                # save the result of each distance in a list of tuple
                res_list.append((distance, accuracy))
            
            # save each res_list as value of scheme
            scheme_list.append({f'{scheme}_weighting':res_list})
            
        # end up with a dictionary with k as initial field and as value
        # a list containing 2 dictionaries. Each of them contains the weighting and
        # corresponding distances and accuracies.
        results[k] = scheme_list
    return results


In [173]:
k_list = [1,2,3,4,5,7,10]
distance_names = ['euclidean', 'cosine', "manhattan"]
schemes = ['uniform', 'distance']
res = run_knn(X_train, y_train, X_test, y_test, k_list, distance_names, schemes)
res

1-Nearest neighbors...
2-Nearest neighbors...
3-Nearest neighbors...
4-Nearest neighbors...
5-Nearest neighbors...
7-Nearest neighbors...
10-Nearest neighbors...


{1: [{'uniform_weighting': [('euclidean', 1.0),
    ('cosine', 0.9),
    ('manhattan', 0.9666666666666667)]},
  {'distance_weighting': [('euclidean', 1.0),
    ('cosine', 0.9),
    ('manhattan', 0.9666666666666667)]}],
 2: [{'uniform_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]},
  {'distance_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]}],
 3: [{'uniform_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]},
  {'distance_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]}],
 4: [{'uniform_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]},
  {'distance_weighting': [('euclidean', 1.0),
    ('cosine', 0.9666666666666667),
    ('manhattan', 1.0)]}],
 5: [{'uniform_weighting': [('euclidean', 1.0),
    ('cosine', 1.0),
    ('manhattan', 1.0)]},
  {'distance_weighting': [('e

In [174]:
mnist = pd.read_csv('mnist.csv', header=None)

In [175]:
import csv

with open('mnist.csv', "r") as f:
    labels = [] 
    images = []
    for cols in csv.reader(f):
        labels.append(cols[0])
        images.append(cols[1:])

In [176]:
labels = np.array(labels, dtype=int)
labels

array([7, 2, 1, ..., 4, 5, 6])

In [177]:
# list of arrays
images = [np.array(i, dtype=int) for i in images]
# array of arrays
images = np.array(images)

In [178]:
len(labels)

10000

In [179]:
# check how many images for each number we have
numbers = list(range(10))
for n in numbers:
    print(n, sum(labels == n))

0 980
1 1135
2 1032
3 1010
4 982
5 892
6 958
7 1028
8 974
9 1009


In [180]:
indexes = []
numbers = list(range(10)) # 0, 1, ..., 9

for n in numbers:
    a = np.argwhere(labels == n).flatten()
    sample = np.random.choice(a, size=100)
    indexes.append(sample)
indexes = np.array(indexes).flatten()

In [181]:
images[indexes]

array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]])

In [182]:
mnist_df = pd.DataFrame(images[indexes])
mnist_df['label'] = labels[indexes]
mnist_df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,775,776,777,778,779,780,781,782,783,label
0,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
1,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
2,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
3,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
4,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [183]:
np.random.seed(42)
size = round(.2*len(mnist_df))
test_mask = sorted(np.random.choice(mnist_df.index, size=size, replace=False))
X_test, y_test = mnist_df.iloc[test_mask][mnist_df.columns[:-1]], mnist_df.iloc[test_mask][mnist_df.columns[-1]]

In [184]:
X_test.iloc[0], y_test.iloc[0]

(0      0
 1      0
 2      0
 3      0
 4      0
       ..
 779    0
 780    0
 781    0
 782    0
 783    0
 Name: 10, Length: 784, dtype: int64,
 0)

In [185]:
train_mask = list(set(mnist_df.index) - set(test_mask))
X_train, y_train = mnist_df.iloc[train_mask][mnist_df.columns[:-1]], mnist_df.iloc[train_mask][mnist_df.columns[-1]]

In [186]:
knn_mnist = KNearestNeighbors(4, 'euclidean', 'distance')
knn_mnist.fit(X_train, y_train)
pred = knn_mnist.predict(X_test)
compute_accuracy(pred, y_test)

0.925

In [187]:
k_list = [1,2,3,4,5,7,10]
distance_names = ['euclidean', 'cosine', "manhattan"]
schemes = ['uniform', 'distance']
results = run_knn(X_train, y_train, X_test, y_test, k_list, distance_names, schemes)

1-Nearest neighbors...
2-Nearest neighbors...
3-Nearest neighbors...
4-Nearest neighbors...
5-Nearest neighbors...
7-Nearest neighbors...
10-Nearest neighbors...


In [191]:
best_results = dict()
for k,v in results.items():
    uniform = sorted(v[0]['uniform_weighting'])[0]
    distance = sorted(v[1]['distance_weighting'])[0]
    if uniform[1] > distance[1]:
        #print('uniform')
        #print(k, uniform)
        best_results[k] = ('uniform_weighting', *uniform) # unpack the tuple
    elif uniform[1] == distance[1]:
        #print('They are the same')
        best_results[k] = ('uniform_weighting', *uniform)
    else: 
        #print("distance")
        #print(k, distance)
        best_results[k] = ('distance_weighting', *distance)

In [192]:
best_results

{1: ('uniform_weighting', 'cosine', 0.9),
 2: ('distance_weighting', 'cosine', 0.89),
 3: ('uniform_weighting', 'cosine', 0.92),
 4: ('uniform_weighting', 'cosine', 0.92),
 5: ('uniform_weighting', 'cosine', 0.92),
 7: ('uniform_weighting', 'cosine', 0.92),
 10: ('uniform_weighting', 'cosine', 0.915)}