In [1]:
import numpy as np

def f1_score(real_labels, predicted_labels):
    """
    Information on F1 score - https://en.wikipedia.org/wiki/F1_score
    :param real_labels: List[int]
    :param predicted_labels: List[int]
    :return: float
    """
    assert len(real_labels) == len(predicted_labels)
#     raise NotImplementedError
    TP = FP = FN = 0
    for i in range(len(real_labels)):
        x = real_labels[i]
        y = predicted_labels[i]
        if x == 1 and y == 1:
            TP += 1
        elif x == 0 and y == 1:
            FP += 1
        elif x == 1 and y == 0:
            FN += 1
     
    if 2 * TP + FP + FN == 0:
        return 0
    
    res = 2 * TP / float(2 * TP + FP + FN)
    return res



class Distances:
    @staticmethod
    # TODO
    def minkowski_distance(point1, point2):
        """
        Minkowski distance is the generalized version of Euclidean Distance
        It is also know as L-p norm (where p>=1) that you have studied in class
        For our assignment we need to take p=3
        Information on Minkowski distance - https://en.wikipedia.org/wiki/Minkowski_distance
        :param point1: List[float]
        :param point2: List[float]
        :return: float
        """
#         raise NotImplementedError
        return np.power(sum( [pow(abs(x - y), 3) for x, y in zip(point1, point2)]), 1/3)
        
    @staticmethod
    # TODO
    def euclidean_distance(point1, point2):
        """
        :param point1: List[float]
        :param point2: List[float]
        :return: float
        """
#         raise NotImplementedError
        return np.sqrt(sum( [(x - y)**2 for x, y in zip(point1, point2)]))

    @staticmethod
    # TODO
    def inner_product_distance(point1, point2):
        """
        :param point1: List[float]
        :param point2: List[float]
        :return: float
        """
#         raise NotImplementedError
        return sum([x*y for x, y in zip(point1, point2)])

    @staticmethod
    # TODO
    def cosine_similarity_distance(point1, point2):
        """
       :param point1: List[float]
       :param point2: List[float]
       :return: float
       """
#         raise NotImplementedError
        return 1 - sum([x*y for x, y in zip(point1, point2)]) / np.sqrt(sum([x**2 for x in point1]) * sum([x**2 for x in point2]))

    @staticmethod
    # TODO
    def gaussian_kernel_distance(point1, point2):
        """
       :param point1: List[float]
       :param point2: List[float]
       :return: float
       """
#         raise NotImplementedError
        tmp = [(x - y)**2 for x, y in zip(point1, point2)]
        return -np.exp(-0.5 * sum(tmp))


In [2]:
import pandas as pd
def data_processing():
    data = pd.read_csv('heart_disease.csv', low_memory=False, sep=',', na_values='?').values

    N = data.shape[0]

#     np.random.shuffle(data)
    # prepare data

    ntr = int(np.round(N * 0.8))
    nval = int(np.round(N * 0.15))
    ntest = N - ntr - nval

    # spliting training, validation, and test
    x_train = np.append([np.ones(ntr)], data[:ntr].T[:-1], axis=0).T
    y_train = data[:ntr].T[-1].T
    x_val = np.append([np.ones(nval)], data[ntr:ntr + nval].T[:-1], axis=0).T
    y_val = data[ntr:ntr + nval].T[-1].T
    x_test = np.append([np.ones(ntest)], data[-ntest:].T[:-1], axis=0).T
    y_test = data[-ntest:].T[-1].T
    return x_train, y_train, x_val, y_val, x_test, y_test



In [3]:
import numpy as np
from collections import Counter


class KNN:
    def __init__(self, k, distance_function):
        """
        :param k: int
        :param distance_function
        """
        self.k = k
        self.distance_function = distance_function

    # TODO: save features and lable to self
    def train(self, features, labels):
        """
        In this function, features is simply training data which is a 2D list with float values.
        For example, if the data looks like the following: Student 1 with features age 25, grade 3.8 and labeled as 0,
        Student 2 with features age 22, grade 3.0 and labeled as 1, then the feature data would be
        [ [25.0, 3.8], [22.0,3.0] ] and the corresponding label would be [0,1]

        For KNN, the training process is just loading of training data. Thus, all you need to do in this function
        is create some local variable in KNN class to store this data so you can use the data in later process.
        :param features: List[List[float]]
        :param labels: List[int]
        """
#         raise NotImplementedError
        self.features = np.array(features)
        self.labels = np.array(labels)
        
        
    # TODO: predict labels of a list of points
    def predict(self, features):
        """
        This function takes 2D list of test data points, similar to those from train function. Here, you need process
        every test data point, reuse the get_k_neighbours function to find the nearest k neighbours for each test
        data point, find the majority of labels for these neighbours as the predict label for that testing data point.
        Thus, you will get N predicted label for N test data point.
        This function need to return a list of predicted labels for all test data points.
        :param features: List[List[float]]
        :return: List[int]
        """
#         raise NotImplementedError
        res = []
        for test in features:
            k_labels = self.get_k_neighbors(test)
#             print(k_labels)
            # using Counter to count each label and then use most_common to find majority
            c = Counter(k_labels)
            label, cnt = c.most_common(1)[0]
            res.append(label)
        return res
        
        
    # TODO: find KNN of one point
    def get_k_neighbors(self, point):
        """
        This function takes one single data point and finds k-nearest neighbours in the training set.
        You already have your k value, distance function and you just stored all training data in KNN class with the
        train function. This function needs to return a list of labels of all k neighours.
        :param point: List[float]
        :return:  List[int]
        """
#         raise NotImplementedError
        distances = [self.distance_function(point, train) for train in self.features]
        distances = np.array(distances)
        # using np.argpartition to get indexes of top K min elements, we can use indexes to search labels.
        
        kmin_index = np.argsort(distances)[: self.k]
#         kmin_index = np.argpartition(distances, self.k)[: self.k]

        return self.labels[kmin_index]
        

In [119]:
knn = KNN(2, Distances.euclidean_distance)
train = [[1,1,1],[2,2,2],[3,3,3],[4,4,4]]
labels = [1,1,2,2]
knn.train(train, labels)
knn.predict([[1.5,3.5,1.5]])

[1]

In [4]:
# utils.py
class HyperparameterTuner:
    def __init__(self):
        self.best_k = 0
        self.f1_score_best = -1
        self.best_distance_function = None
        self.best_scaler = None
        self.best_model = None
        
        self.sort_dict = {
            'euclidean': 5,
            'minkowski': 4,
            'gaussian': 3,
            'inner_prod': 2,
            'cosine_dist': 1,
            
            'min_max_scale': 10,
            'normalize': 9
        }

    # TODO: find parameters with the best f1 score on validation dataset
    def tuning_without_scaling(self, distance_funcs, x_train, y_train, x_val, y_val):
        """
        In this part, you should try different distance function you implemented in part 1.1, and find the best k.
        Use k range from 1 to 30 and increment by 2. Use f1-score to compare different models.

        :param distance_funcs: dictionary of distance functions you must use to calculate the distance.
            Make sure you loop over all distance functions for each data point and each k value.
            You can refer to test.py file to see the format in which these functions will be
            passed by the grading script
        :param x_train: List[List[int]] training data set to train your KNN model
        :param y_train: List[int] train labels to train your KNN model
        :param x_val:  List[List[int]] Validation data set will be used on your KNN predict function to produce
            predicted labels and tune k and distance function.
        :param y_val: List[int] validation labels

        Find(tune) best k, distance_function and model (an instance of KNN) and assign to self.best_k,
        self.best_distance_function and self.best_model respectively.
        NOTE: self.best_scaler will be None

        NOTE: When there is a tie, choose model based on the following priorities:
        Then check distance function  [euclidean > minkowski > gaussian > inner_prod > cosine_dist]
        If they have same distance fuction, choose model which has a less k.
        """
        # You need to assign the final values to these variables
#         self.best_k = None
#         self.best_distance_function = None
#         self.best_model = None
#         raise NotImplementedError
        
        sort_dict = self.sort_dict
        
        for name, func in distance_funcs.items():
            f1_score_best = -1
            for k in range(1, 30, 2):
                model = KNN(k, func)
                model.train(x_train, y_train)
#                 model.train(x_train[1:], y_train)
                val_f1_score = f1_score(y_val, model.predict(x_val))
                if val_f1_score > f1_score_best:
                    f1_score_best = val_f1_score
                    self.best_k = k
                    self.best_distance_function = name
                    self.best_model = model
                # process tie
                if val_f1_score == val_f1_score:
                    if self.best_distance_function == name:
                        if k < self.best_k:
                            self.best_k = k
                    else:
                        if sort_dict[name] > sort_dict[self.best_distance_function]:
                            self.best_distance_function = name
         
                        

    # TODO: find parameters with the best f1 score on validation dataset, with normalized data
    def tuning_with_scaling(self, distance_funcs, scaling_classes, x_train, y_train, x_val, y_val):
        """
        This part is similar to Part 1.3 except that before passing your training and validation data to KNN model to
        tune k and disrance function, you need to create the normalized data using these two scalers to transform your
        data, both training and validation. Again, we will use f1-score to compare different models.
        Here we have 3 hyperparameters i.e. k, distance_function and scaler.

        :param distance_funcs: dictionary of distance funtions you use to calculate the distance. Make sure you
            loop over all distance function for each data point and each k value.
            You can refer to test.py file to see the format in which these functions will be
            passed by the grading script
        :param scaling_classes: dictionary of scalers you will use to normalized your data.
        Refer to test.py file to check the format.
        :param x_train: List[List[int]] training data set to train your KNN model
        :param y_train: List[int] train labels to train your KNN model
        :param x_val: List[List[int]] validation data set you will use on your KNN predict function to produce predicted
            labels and tune your k, distance function and scaler.
        :param y_val: List[int] validation labels

        Find(tune) best k, distance_funtion, scaler and model (an instance of KNN) and assign to self.best_k,
        self.best_distance_function, self.best_scaler and self.best_model respectively

        NOTE: When there is a tie, choose model based on the following priorities:
        For normalization, [min_max_scale > normalize];
        Then check distance function  [euclidean > minkowski > gaussian > inner_prod > cosine_dist]
        If they have same distance function, choose model which has a less k.
        """
        
        # You need to assign the final values to these variables
#         self.best_k = None
#         self.best_distance_function = None
#         self.best_scaler = None
#         self.best_model = None
        # raise NotImplementedError
        for scale_name, scale in scaling_classes.items():
            scaler = scale()
            x_train_scaled = scaler(x_train)
            x_val_scaled = scaler(x_val)
            sort_dict = self.sort_dict

            for name, func in distance_funcs.items():
                for k in range(1, 30, 2):
                    model = KNN(k, func)
                    model.train(x_train_scaled, y_train)
                    val_f1_score = f1_score(y_val, model.predict(x_val_scaled))
                    if val_f1_score > self.f1_score_best:
                        self.f1_score_best = val_f1_score
                        self.best_k = k
                        self.best_distance_function = name
                        self.best_model = model
                        self.best_scaler = scale_name
                    # process tie
                    if val_f1_score == self.f1_score_best:
                        if self.best_scaler == scale_name:
                            if self.best_distance_function == func:
                                if k < self.best_k:
                                    self.best_k = k
                            else:
                                if sort_dict[name] > sort_dict[self.best_distance_function]:
                                    self.best_distance_function = name
                        else:
                            if sort_dict[scale_name] > sort_dict[self.best_scaler]:
                                self.best_scaler = scale_name
                    

            


In [5]:
class NormalizationScaler:
    def __init__(self):
        pass

    # TODO: normalize data
    def __call__(self, features):
        """
        Normalize features for every sample

        Example
        features = [[3, 4], [1, -1], [0, 0]]
        return [[0.6, 0.8], [0.707107, -0.707107], [0, 0]]

        :param features: List[List[float]]
        :return: List[List[float]]
        """
        # raise NotImplementedError
        res = []
        for feature in features:
            denom = np.sqrt(sum([pow(x, 2) for x in feature]))
            if denom == 0:
                res.append(feature)
            else:
                res.append([x / denom for x in feature])
        return res


class MinMaxScaler:
    """
    Please follow this link to know more about min max scaling
    https://en.wikipedia.org/wiki/Feature_scaling
    You should keep some states inside the object.
    You can assume that the parameter of the first __call__
    will be the training set.

    Hints:
        1. Use a variable to check for first __call__ and only compute
            and store min/max in that case.

    Note:
        1. You may assume the parameters are valid when __call__
            is being called the first time (you can find min and max).

    Example:
        train_features = [[0, 10], [2, 0]]
        test_features = [[20, 1]]

        scaler1 = MinMaxScale()
        train_features_scaled = scaler1(train_features)
        # train_features_scaled should be equal to [[0, 1], [1, 0]]

        test_features_scaled = scaler1(test_features)
        # test_features_scaled should be equal to [[10, 0.1]]

        new_scaler = MinMaxScale() # creating a new scaler
        _ = new_scaler([[1, 1], [0, 0]]) # new trainfeatures
        test_features_scaled = new_scaler(test_features)
        # now test_features_scaled should be [[20, 1]]

    """
    # create instance only call __init__(), which runs only once, then call instance which will call __call__()
    def __init__(self):
        self.min = None
        self.max = None
        self.cnt = 0

    def __call__(self, features):
        """
        normalize the feature vector for each sample . For example,
        if the input features = [[2, -1], [-1, 5], [0, 0]],
        the output should be [[1, 0], [0, 1], [0.333333, 0.16667]]

        :param features: List[List[float]]
        :return: List[List[float]]
        """
        if self.cnt < 1:
            # amin/amax return min/max of an array along the axis, so that we use broadcast to solve
            self.min = np.amin(features, axis=0)
            self.max = np.amax(features, axis=0)
            self.feature = features
            self.cnt += 1
        
        features = np.array(features)
        res = (features - self.min)/(self.max - self.min)
        return res.tolist()
        


In [6]:
distance_funcs = {
        'euclidean': Distances.euclidean_distance,
        'minkowski': Distances.minkowski_distance,
        'gaussian': Distances.gaussian_kernel_distance,
        'inner_prod': Distances.inner_product_distance,
        'cosine_dist': Distances.cosine_similarity_distance,
    }

scaling_classes = {
    'min_max_scale': MinMaxScaler,
    'normalize': NormalizationScaler,
}

x_train, y_train, x_val, y_val, x_test, y_test = data_processing()

print('x_train shape = ', x_train.shape)
print('y_train shape = ', y_train.shape)

tuner_without_scaling_obj = HyperparameterTuner()
tuner_without_scaling_obj.tuning_without_scaling(distance_funcs, x_train, y_train, x_val, y_val)

print("**Without Scaling**")
print("k =", tuner_without_scaling_obj.best_k)
print("distance function =", tuner_without_scaling_obj.best_distance_function)

tuner_with_scaling_obj = HyperparameterTuner()
tuner_with_scaling_obj.tuning_with_scaling(distance_funcs, scaling_classes, x_train, y_train, x_val, y_val)

print("\n**With Scaling**")
print("k =", tuner_with_scaling_obj.best_k)
print("distance function =", tuner_with_scaling_obj.best_distance_function)
print("scaler =", tuner_with_scaling_obj.best_scaler)

x_train shape =  (242, 14)
y_train shape =  (242,)
**Without Scaling**
k = 15
distance function = cosine_dist





**With Scaling**
k = 3
distance function = euclidean
scaler = min_max_scale


In [4]:
import numpy as np
labels = np.array([1,2,1,1])



In [12]:
tmp = []
total = labels.size
for label in np.unique(labels):
    tmp.append(np.where(labels==label)[0].size)
tmp = np.array(tmp)
tmp = tmp/total
S = np.sum([ -s*np.log2(s) for s in tmp])
S

0.8112781244591328