<a href="https://colab.research.google.com/github/selahattinozturk/Urbansound-Classification/blob/main/knn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
CUR_PATH = '/content/drive/My Drive/EEE485'

Mounted at /content/drive


In [None]:
classes = ["air_conditioner", "car_horn", "children_playing", "dog_bark", "drilling", "engine_idling", "gun_shot", "jackhammer", "siren", "street_music"]
class_idx = {c: i for i, c in enumerate(classes)}

In [None]:
def onehot_encoder(y_train):
  onehot_y_train = []
  for yi in y_train:
    onehot_yi = [0 for i in range(len(classes))]
    onehot_yi[class_idx[yi]] = 1
    onehot_y_train.append(onehot_yi)

  return np.array(onehot_y_train).copy()

In [None]:
def class2int(y_train):
  onehot_y_train = []
  for yi in y_train:
    onehot_y_train.append(class_idx[yi])
  return np.array(onehot_y_train).copy()

In [None]:
def standardize_data(X_train, X_test):
  mean = np.mean(X_train, axis=0)
  std = np.std(X_train, axis=0)
  X_train = (X_train - mean) / std
  X_test = (X_test - mean) / std
  return X_train, X_test

In [None]:
def import_data(cv_idx): # cv_index = {0, 1, .., 9}
  names = [f'fold{i}.csv' for i in range(1, 11)]
  X_train = []
  X_test = []
  y_train = []
  y_test = []
  for f_idx, name in enumerate(names):
    fold_data = pd.read_csv(f'{CUR_PATH}/data/{name}', index_col=0)
    fold_X = fold_data.drop(["filename", "class"], axis=1)
    # fold_X = fold_X.drop([f"mfcc_min{i}" for i in range(1, 26)], axis=1)
    # fold_X = fold_X.drop([f"mfcc_max{i}" for i in range(1, 26)], axis=1)
    fold_y = fold_data['class'].values.tolist()
    fold_X = fold_X.values.tolist()
    if f_idx == cv_idx:
      X_test.extend(fold_X)
      y_test.extend(fold_y)
    else:
      X_train.extend(fold_X)
      y_train.extend(fold_y)
  
  return np.array(X_train).copy(), np.array(X_test).copy(), y_train, y_test

In [None]:
def prepare_dataset(cv_idx):
  X_train, X_test, y_train, y_test = import_data(cv_idx)
  X_train, X_test = standardize_data(X_train, X_test)
  y_train, y_test = class2int(y_train), class2int(y_test)
  X_train, X_test = pca(X_train, X_test)  
  X_train = np.insert(X_train, 0, 1, axis=1)
  X_test = np.insert(X_test, 0, 1, axis=1)
  return X_train, X_test,y_train,y_test

In [None]:
import matplotlib.pyplot as plt

def pca(X_train, X_test): # X must be standardized
  CUTOFF = 90
  n, p = X_train.shape
  total_variance = (np.linalg.norm(X_train, ord="fro") ** 2) / n
  sigma = (X_train.T @ X_train) / n
  w, v = np.linalg.eig(sigma)
  w_argsort = w.argsort()[::-1]
  w[::-1].sort()
  v = v[w_argsort]
  pve_first_k = 0

  project_X_train = X_train @ v
  pve_first_k_all = np.zeros((len(v),1))
  needed_k = []

  for eig_idx in range(len(w)):
    pve_first_k += (project_X_train[:,eig_idx].T @ project_X_train[:,eig_idx]) / (n * total_variance)
    pve_first_k_all[eig_idx] = pve_first_k
    if pve_first_k > CUTOFF / 100:
      needed_k.append(eig_idx)

  k = needed_k[0]
  u = v[:,:k]
  X_train = X_train @ u
  X_test = X_test @ u

  plt.plot(np.linspace(1, p, p),100 * pve_first_k_all)
  plt.title('# of PC vs. PVE (%)\n'
            '# of PC where PVE exceeds {0:d}% first time: {1:d}'.format(CUTOFF,k))
  plt.xlabel('# of PC')
  plt.ylabel('PVE (%)')
  plt.plot(k, 100 * pve_first_k_all[k],'rx')
  plt.plot(np.linspace(1, p, p), CUTOFF * np.ones(p),'r--')
  return X_train, X_test


In [None]:
from math import sqrt
import numpy as np
def dist(row1, row2):
    distance = 0
    for i in range(len(row1)):
        distance += (row1[i] - row2[i])**2          
    return sqrt(distance)

In [None]:
import numpy as np
def dist2(row1, row2):
    return np.linalg.norm(row1 - row2)

In [None]:
import numpy as np
def get_nearest_neighbors(x_train,row_to_search,y_train, k):
      
        distances, neighbors = [], [] 
        for i, x_row in enumerate(x_train):
            d = dist2(row_to_search, x_row)
            distances.append([d,y_train[i], i]) # dist, index
        
        distances.sort(key = lambda x: x[0])
        
        for i in range(k):
            neighbors.append(distances[i])
        
        return neighbors

In [None]:
import numpy as np
def predict( X_test, X_train, Y_train,k):

        x_train, y_train = X_train, Y_train

        y_predict = []

        for x_row in X_test:

            neighbors = get_nearest_neighbors(x_train,x_row,Y_train,k)
            targets = []
            for n in neighbors:
                ind = n[2]
                targets.append(y_train[ind])

            y_predict.append(max(targets, key = targets.count))
        y_predict = np.array(y_predict)
        return y_predict

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

def eval_metrics(y_preds, y_true):
    k = len(classes)
    conf_matrix = np.zeros((k, k), dtype=int)
    for i in range(len(y_preds)):
        conf_matrix[y_true[i], y_preds[i]] += 1

    acc = (y_preds == y_true).sum() / len(y_preds)

    precision, recall, f1 = np.zeros(k), np.zeros(k), np.zeros(k)

    for j in range(k):
        precision[j] = conf_matrix[j, j] / np.sum(conf_matrix[:, j]) * 100
        recall[j] = conf_matrix[j, j] / np.sum(conf_matrix[j, :]) * 100
        f1[j] = 2 * precision[j] * recall[j] / (precision[j] + recall[j])
    
    conf_matrix = pd.DataFrame(data=conf_matrix, index=classes, columns=classes)
    plt.figure(figsize=(10, 8), dpi=100)
    plt.title("Confusion Matrix - Softmax Regression")
    plt.ylabel("True Labels")
    plt.xlabel("Predicted Labels")
    
    precision = [f'{p:.2f}' for p in precision]
    recall = [f'{p:.2f}' for p in recall]
    f1 = [f'{p:.2f}' for p in f1]
    
    heatmap = sns.heatmap(conf_matrix, annot=True, cmap='Blues', fmt="d")
    heatmap.set_xticklabels(heatmap.get_xticklabels(), rotation=30)
    heatmap.set_xlabel('Predicted Labels', fontsize=12)
    heatmap.set_ylabel('True Labels', fontsize=12)

    plt.show()

    return acc, precision, recall, f1

In [None]:
from timeit import default_timer as timer
from pprint import pprint
import csv

k_list = [6]

accuracies = np.zeros(len(k_list))
output_lines = []

for idx, k in enumerate(k_list):
  correct = 0
  length = 0
  time = 0
  y_preds = []
  y_true = []
  y_train_preds = []
  y_train_true = []
  for fold_idx in range(0, 10):
        X_train, X_test, y_train, y_test = prepare_dataset(fold_idx)
        start = timer()
        # y_predict = predict(X_test, X_train, y_train, k)
        end = timer()
        time = time + end - start
        y_predict = np.zeros_like(y_test)
        # print((y_predict == y_test).sum() / y_test.shape[0])
        y_preds.append(y_predict)
        y_true.append(y_test)
        y_train_preds.append(np.array(predict(X_train, X_train, y_train, k)))
        y_train_true.append(y_train)
        #correct += (y_predict == y_test).sum()
        #length += y_test.shape[0]

  y_preds = np.concatenate(y_preds, axis=None)
  y_true = np.concatenate(y_true, axis=None)
  y_train_preds = np.concatenate(y_train_preds, axis=None)
  y_train_true = np.concatenate(y_train_true, axis=None)
  acc, precision, recall, f1 = eval_metrics(y_preds, y_true)
  train_acc, _, _, _ = eval_metrics(y_train_preds, y_train_true)
  accuracies[idx] = acc

  with open(f'{CUR_PATH}/eval_metrics_knn.csv', 'w') as f:
    writer = csv.writer(f)

    # write the header
    writer.writerow(["Classes", "Precision", "Recall", "F1-Score"])

    # write multiple rows
    writer.writerows(zip(classes, precision, recall, f1))

  print(f'k: {k}, Train Accuracy: {train_acc:.3f}, Test Accuracy: {acc:.3f}, Time: {time:.3f} seconds')
pprint(accuracies)