In [1]:
import numpy as np
import os
import matplotlib.pyplot as plt
import tensorflow as tf
import _pickle as pickle
import math
import scipy.spatial.distance as sp


In [2]:
#(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

In [3]:
cifar10_dataset_folder_path = '/home/pranav/My Data/M.Tech/ML/Lab Assignments/Assignment-1/cifar-10-python/cifar-10-batches-py/'



In [4]:

# Load single batch of cifar
def load_cifar_batch(filename):
    with open(filename, 'rb') as f:
        datadict = pickle.load(f, encoding='latin1')
    return datadict['data'].astype(np.float64), np.array(datadict['labels'])


# Load all of cifar
def load_cifar(folder):
    with open(os.path.join(folder, 'batches.meta'), 'rb') as f:
        names = pickle.load(f, encoding='latin1')
    training_data = np.empty([50000, 3072], dtype=np.float64)
    training_labels = np.empty([50000], dtype=np.uint8)
    for i in range(1, 6):
        start = (i - 1) * 10000
        end = i * 10000
        training_data[start:end], training_labels[start:end] = \
            load_cifar_batch(os.path.join(folder, 'data_batch_%d' % i))
    testing_data, testing_labels = load_cifar_batch(os.path.join(folder, 'test_batch'))
    training_data_grayscale = training_data.reshape((50000, 3, 1024)).transpose((0, 2, 1))
    training_data_grayscale = np.mean(training_data_grayscale, axis=2)
    testing_data_grayscale = testing_data.reshape((10000, 3, 1024)).transpose((0, 2, 1))
    testing_data_grayscale = np.mean(testing_data_grayscale, axis=2)
    return training_data, training_data_grayscale, training_labels, testing_data, testing_data_grayscale,\
        testing_labels, names['label_names']


# Load part of cifar for cross validation
def load_cifar_cross_validation(folder, i):
    td = np.empty([40000, 3072], dtype=np.float64)
    tl = np.empty([40000], dtype=np.uint8)
    for j in range(1, 6):
        if i != j:
            if j < i:
                diff = 1
            else:
                diff = 2
            start = (j - diff) * 10000
            end = (j - diff + 1) * 10000
            td[start:end, :], tl[start:end] = \
                load_cifar_batch(os.path.join(folder, 'data_batch_%d' % j))
    vd, vl = load_cifar_batch(os.path.join(folder, 'data_batch_%d' % i))
    return td, tl, vd, vl

In [5]:
class KNN(object):

    def __init__(self):
        pass

    def train(self, data, labels):
        # data is N x D where each row is a data point. labels is 1-dimension of size N
        # KNN classifier simply remembers all the training data
        self.training_data = data
        self.training_labels = labels

    def predict(self, data, k, l):
        # data is M x D where each row is a data point, k is the number of nearest neighbours, l  is the distance metric ('L1' or 'L2')
        # y_predict is the predicted labels of data
        y_predict = np.zeros(data.shape[0], dtype=self.training_labels.dtype)
        if l == 'L1':
            self.l1(data, k, y_predict)
        else:
            self.l2(data, k, y_predict)
        return y_predict

    def l1(self, data, k, y_pred):
        # data is M x D where each row is a data point, k is the number of nearest neighbours, y_pred is the predicted labels of data
        # Compute Manhattan distance, distances is M x N where each row 'i' is the distances of the ith data point from the training data points
        distances = sp.cdist(data, self.training_data, 'cityblock')
        for i in range(data.shape[0]):
            # Get ith row of distances
            curr_distance = distances[i]
            # Get the k indexes corresponding to the lowest distances
            min_idx = np.argpartition(curr_distance, k)[0:k]
            # Get the votes
            votes = self.training_labels[min_idx]
            # Count the votes
            labels_count = np.bincount(votes)
            # Choose the majority vote
            y_pred[i] = np.argmax(labels_count)

    def l2(self, data, k, y_pred):
        # data is M x D where each row is a data point, k is the number of nearest neighbours, y_pred is the predicted labels of data
        # (a + b)^2 = a^2 + b^2 - 2ab
        a_sum_square = np.sum(np.square(self.training_data), axis=1)
        b_sum_square = np.sum(np.square(data), axis=1)
        two_a_dot_bt = 2 * np.dot(self.training_data, data.T)
        # Compute Euclidean distance, distances is N x M where each column 'i' is the distances of the ith data point from the training data points
        distances = np.sqrt(a_sum_square[:, np.newaxis] + b_sum_square - two_a_dot_bt)
        for i in range(data.shape[0]):
            # Get ith column of distances and continue operations on it as normal (get lowest k)
            curr_distance = distances[:, i]
            # Get the k indexes corresponding to the lowest distances
            min_idx = np.argpartition(curr_distance, k)[0:k]
            # Get the votes
            votes = self.training_labels[min_idx]
            # Count the votes
            labels_count = np.bincount(votes)
            # Choose the majority vote
            y_pred[i] = np.argmax(labels_count)

In [6]:

k_global = 0
accuracy2 = 0
#%matplotlib inline
plt.rcParams['figure.figsize'] = (10.0, 8.0) # set default size of plots
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

def cross_validate(root, l, k, results):
    accuracy = np.empty([5], dtype=np.float64)
    for i in range(1, 6):
        td, tl, vd, vl = load_cifar_cross_validation(root, i)
        knn_o = KNN()
        knn_o.train(td, tl)
        predictions = knn_o.predict(vd, k, l)
        num_correct = np.sum(predictions == vl)
        accuracy[i - 1] = num_correct / 10000
    with lock:
        results[k] = accuracy
        accuracy2 = accuracy

def plot_data(l, res):
    for k, v in sorted(res.items()):
        plt.scatter([k] * len(v), v)
    # Plot the trend line with error bars that correspond to standard deviation
    mean = np.array([np.mean(v) for k, v in sorted(res.items())])
    std = np.array([np.std(v) for k, v in sorted(res.items())])
    plt.errorbar([k for k, v in sorted(res.items())], mean, yerr=std)
    plt.title('Cross-validation on %s and k' % (l, ))
    plt.xlabel('k')
    plt.ylabel('Cross-validation accuracy')
    plt.show()

def test_cross_validate(l):
    num_threads = 2
    threads = [None] * num_threads
    ks = [1, 3, 5, 7, 9, 10, 13, 17, 20, 50, 75, 100]
    root = 'cifar-10-batches-py'
    results = dict()
    for i in range(len(ks) // num_threads):
        for j in range(num_threads):
            threads[j] = threading.Thread(target=cross_validate, args=(root, l, ks[i * num_threads + j], results))
            threads[j].start()
        for j in range(num_threads):
            threads[j].join()
    k_global = ks        
    plot_data(l, results)

In [None]:
results = []
#cross_validate(cifar10_dataset_folder_path, 'L2', 5, results)
test_cross_validate('L2')


In [None]:
print("Optimal value of k =",k)
print("Training error = " 1 - accuracy)

In [2]:
from sklearn.metrics import confusion_matrix
confusion_array = confusion_matrix(results, prediction)
print(confusion_array)

NameError: name 'results' is not defined

In [None]:
import xlsxwriter
workbook = xlsxwriter.Workbook('Confusion_Matrix.xlsx')
worksheet = workbook.add_worksheet()
row = 0
temp = np.transpose(confusion_array) 
for col, data in enumerate(temp):
    worksheet.write_column(row, col, data)

workbook.close()