In [1]:
import numpy as np
import pandas as pd
import math
import time

In [2]:
def knn(k, data, labels, samples_to_classify):
    
    classifications = []
    for sample in samples_to_classify.values.tolist():
        sample_distances = []
        
        i = 0
        for row in data.values.tolist():
            distance = euclidean_distance(sample, row)
            sample_distances.append((distance, labels.iloc[i]))
            i += 1

        k_sorted_distances = sorted(sample_distances, key=lambda dist_tup: dist_tup[0])[:k]

        counter_yes = 0
        counter_no = 0
        for entry in k_sorted_distances:
            if entry[1] == 1:
                counter_yes += 1
            else:
                counter_no += 1
                
        if counter_yes >= counter_no:
            classifications.append(1)
        else:
            classifications.append(0)
    return classifications

In [5]:
def train_lvq(data, labels, num_epochs, learning_rate, validation_data=None, validation_labels=None):
    # Get unique class labels.
    num_dims = data.shape[1]
    labels = labels.astype(int)
    unique_labels = list(set(labels))
    
    num_protos = len(unique_labels)

    if num_protos != 2:
        return (False, False, False)

    
    prototypes = np.empty((num_protos, num_dims))
    #print(prototypes)
    proto_labels = []
    validation_errors = []

    # Initialize prototypes using class means.
    for i in unique_labels:
        class_data = data[labels == i, :]

        # Compute class mean.
        mean = np.mean(class_data, axis=0)

        prototypes[i] = mean
        proto_labels.append(i)

    # Loop through data set.
    for epoch in range(0, num_epochs):
        for fvec, lbl in zip(data, labels):
            # Compute distance from each prototype to this point
            distances = list(np.sum(np.subtract(fvec, p)**2) for p in prototypes)
            min_dist_index = distances.index(min(distances))

            # Determine winner prototype.
            winner = prototypes[min_dist_index]
            winner_label = proto_labels[min_dist_index]

            # Push or repel the prototype based on the label.
            if winner_label == lbl:
                sign = 1
            else:
                sign = -1

            # Update winner prototype
            prototypes[min_dist_index] = np.add(prototypes[min_dist_index], np.subtract(fvec, winner) * learning_rate * sign)

        # Use validation set to test performance.
        val_err = 0
        if validation_labels is not None:
            for fvec, lbl in zip(validation_data, validation_labels):
                distances = list(np.sum(np.subtract(fvec, p) ** 2) for p in prototypes)
                min_dist_index = distances.index(min(distances))

                # Determine winner prototype label
                winner_label = proto_labels[min_dist_index]

                # Check if labels match
                if not winner_label == lbl:
                    val_err = val_err + 1

            val_err = val_err / len(validation_labels)
            validation_error = val_err
            #print("Epoch " + str(epoch) + ". Validation error: " + str(val_err))
        else:
            pass
            #print("Epoch " + str(epoch))


    return (prototypes, proto_labels, validation_error)

In [6]:
def minmax_normalization(values):
    max_v = max(values)
    min_v = min(values)

    normalized_values = []

    for value in values:
        new_value = (value - min_v) / (max_v - min_v)

        normalized_values.append(new_value)

    return normalized_values


def normalize_columns(dataframe, numerical_columns_list, normalization_func):
    for column in numerical_columns_list:
        normalized_column = normalization_func(dataframe[column])
        dataframe[column] = normalized_column

     
def euclidean_distance(vec_p, vec_q):
    dist = 0.0
    
    #vec_p = p.tolist()
    #vec_q = q.tolist()
    i = 0
    while i < len(vec_p):
        dist += (vec_q[i] - vec_p[i]) ** 2
        #if i < 10:
            #print(vec_p)
            #print(vec_q)
            #print('---')
            
        i += 1

    dist = math.sqrt(dist)

    return dist


def get_lvq_error(lvq_test_df, lvq_test_labels, prototypes):
    idx = 0
    counter = 0
    for entry in lvq_test_df.values.tolist():
        
        distance_to_yes = euclidean_distance(entry, prototypes[1])
        distance_to_no = euclidean_distance(entry, prototypes[0])
        ans = 0
        if distance_to_yes <= distance_to_no:
           # print('yes')
            ans = 1
        else:
            #print('no')
            pass
            
            if ans == lvq_test_labels.iloc[idx]:
                counter += 1
                
        idx += 1
    #print('correct answers: ' + str(counter))
    #print(lvq_test_labels.shape[0])
    return 1 - counter/lvq_test_labels.shape[0]

def get_knn_error(knn_test_labels_df, knn_answer_labels):

    idx = 0
    counter = 0
    for entry in knn_answer_labels:
        if entry == knn_test_labels_df.iloc[idx]:
            counter += 1

    return 1 - counter/knn_test_labels_df.shape[0]

In [7]:
data_path = '../bank-a.csv' # put path here
validation_set_size = .15 # percentage from 0 to 1

bank_df = pd.read_csv(data_path, sep=None)

norm_timer_start = time.time()
# ------- data cleaning and normalization --------
numerical_columns = ['age', 'campaign', 'pdays', 'previous', 'emp.var.rate', 'cons.price.idx', 'cons.conf.idx', 'euribor3m', 'nr.employed']

columns_to_drop = ['duration', 'poutcome']

categorical_columns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'month', 'day_of_week']

normalize_columns(bank_df, numerical_columns, minmax_normalization)

bank_df.drop(columns=columns_to_drop, inplace=True)

bank_df = pd.get_dummies(bank_df, columns=categorical_columns)

norm_timer_end = time.time()

normalization_time = norm_timer_end - norm_timer_start
print('time to format data: ' + str(round(normalization_time, 4)) + ' s')

#print(bank_df)
#print(bank_df['y'])


n_steps = 50
skipped_steps = 0

lvq_errors = []
lvq_validation_errors = []
knn_errors = []

knn_times = []
lvq_training_times = []
lvq_testing_times = []

bank_df_backup = bank_df.copy(deep=True)
for step in range(n_steps):
    print('\niteration: ' + str(step + 1))
    # --- separating dataset -----
    row_number = bank_df.shape[0]

    # dataframe.sample(n)
    test_set = bank_df.sample(int(row_number * validation_set_size))
    test_set_labels = test_set['y'].map({'yes': 1, 'no': 0})
    test_set.drop(columns=['y'], inplace=True)
    
    bank_df.drop(index=test_set.index, inplace=True)
    
    #create validation set for k-NN
    validation_set_knn = bank_df.copy(deep=True)
    validation_labels_knn = validation_set_knn['y'].map({'yes': 1, 'no': 0})
    validation_set_knn.drop(columns=['y'], inplace=True)
    
    # create validation set for LVQ
    row_number = bank_df.shape[0]
    validation_set = bank_df.sample(int(row_number * validation_set_size))

    training_set = bank_df.drop(index=validation_set.index)

    training_labels = training_set['y'].map({'yes': 1, 'no': 0})
    training_set.drop(columns=['y'], inplace=True)

    validation_labels = validation_set['y'].map({'yes': 1, 'no': 0})
    validation_set.drop(columns=['y'], inplace=True)

    #--------------------------------
    print('training LVQ')

    lvq_train_timer_start = time.time()

    proto, protolabels, val_error = train_lvq(training_set.to_numpy(), training_labels.to_numpy(), 100, 0.05, validation_data=validation_set.to_numpy(), validation_labels=validation_labels.to_numpy())

    lvq_train_timer_end = time.time()

    lvq_validation_errors.append(val_error)
    
    lvq_train_time = lvq_train_timer_end - lvq_train_timer_start
    print('time to train LVQ: ' + str(round(lvq_train_time, 4)) + ' s')
    lvq_training_times.append(lvq_train_time)
    
    if proto is False:
        print('skipping')
        skipped_steps += 1
        continue

    print('testing LVQ')

    lvq_test_timer_start = time.time()
    lvq_errors.append(get_lvq_error(test_set, test_set_labels, proto))
    lvq_test_timer_end = time.time()

    lvq_test_time = lvq_test_timer_end - lvq_test_timer_start
    print('time to test LVQ: ' + str(round(lvq_test_time, 4)) + ' s')
    lvq_testing_times.append(lvq_test_time)

    #print('aaaaaaa')
    #print(validation_set_knn)
    print('executing k-NN')
    knn_timer_start = time.time()
    
    knn_answers = knn(5, validation_set_knn, validation_labels_knn, test_set)
    knn_errors.append(get_knn_error(test_set_labels, knn_answers))
    
    knn_timer_end = time.time()
    knn_time = knn_timer_end - knn_timer_start
    print('time to execute k-NN: ' + str(round(knn_time, 4)) + ' s')
    knn_times.append(knn_time)
    
    bank_df = bank_df_backup.copy(deep=True)

avg_lvq_error = 0
avg_knn_error = 0
avg_lvq_val_error = 0

avg_knn_time = 0
avg_lvq_training_time = 0
avg_lvq_testing_time = 0
avg_lvq_time = 0

for n in range (n_steps):
    avg_lvq_error += lvq_errors[n]
    avg_knn_error += knn_errors[n]
    avg_lvq_val_error += lvq_validation_errors[n]

    avg_knn_time += knn_times[n]
    avg_lvq_training_time += lvq_training_times[n]
    avg_lvq_testing_time += lvq_testing_times[n]

avg_lvq_error = avg_lvq_error/(n_steps - skipped_steps)
avg_knn_error = avg_knn_error/(n_steps - skipped_steps)
avg_lvq_val_error = avg_lvq_val_error/(n_steps - skipped_steps)

avg_knn_time = avg_knn_time/(n_steps - skipped_steps)
avg_lvq_training_time = avg_lvq_training_time/(n_steps - skipped_steps)
avg_lvq_testing_time = avg_lvq_testing_time/(n_steps - skipped_steps)
avg_lvq_time = avg_lvq_testing_time + avg_lvq_training_time

print('skipped steps: ' + str(skipped_steps))
print(len(lvq_errors))
print(len(knn_errors))
print(len(lvq_validation_errors))

print('average LVQ error: ' + str(avg_lvq_error * 100))
print('average LVQ validation error:' + str(avg_lvq_val_error * 100))
print('average k-NN error: ' + str(avg_knn_error * 100))

print('average LVQ training time: ' + str(round(avg_lvq_training_time, 4)) + ' s')
print('average LVQ testing time:' + str(round(avg_lvq_testing_time, 4)) + ' s')
print('average LVQ time:' + str(round(avg_lvq_time, 4)) + ' s')
print('average k-NN time: ' + str(round(avg_knn_time, 4)) + ' s')

  return func(*args, **kwargs)


FileNotFoundError: [Errno 2] No such file or directory: './bank-additional.csv'