<a href="https://colab.research.google.com/github/kr7/cython/blob/main/HWKNN_Cython.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import math
import numpy as np
from time import time
from sklearn.metrics.pairwise import cosine_distances

**Load the data**

In [None]:
data = np.loadtxt('https://archive.ics.uci.edu/ml/machine-learning-databases/spambase/spambase.data', delimiter=',')

In [None]:
# The last 100 instances will be considered as test data
# Indices need to be updated in case if another dataset is used

np.random.seed(42)
np.random.shuffle(data)
train_data = data[:4500,0:57]
train_labels = np.array(data[:4500,57], dtype=int)
test_data = data[4501:,0:57]
test_labels = np.array(data[4501:,57], dtype=int)

**Training phase of hubness-weighted k-NN**

* Calculation of pairwise distances on the training data
* Hubness calculation in Python and Cython (so that both implementations may be compared)
* Calculation of hubness-aware weights

In [None]:
d_mat = cosine_distances(train_data)

In [None]:
def calculate_hubness_python(distance_matrix, labels, k = 10):
  t0 = time()

  num_instances = len(labels)

  gn = np.zeros( (num_instances) )
  bn = np.zeros( (num_instances) )

  for i in range( num_instances ):
    k_nn_idx = [-1]*k
    k_nn_dist = [np.inf]*k
    k_nn_labels = [-1]*k

    d_max = np.inf
    for j in range( num_instances ):    
      if i == j:
        continue
      
      d = distance_matrix[i,j]

      if d < d_max:
        pos = k_nn_dist.index(d_max)

        k_nn_dist[pos] = d
        k_nn_idx[pos] = j
        k_nn_labels[pos] = labels[j]

        d_max = max(k_nn_dist)

    x_label = labels[i]

    for i in range(k):
      if x_label == k_nn_labels[i]:
        gn[k_nn_idx[i]] = gn[k_nn_idx[i]] + 1
      else:
        bn[k_nn_idx[i]] = bn[k_nn_idx[i]] + 1
      
  return gn, bn, time()-t0

In [None]:
%load_ext cython

In [None]:
%%cython

import numpy as np
cimport numpy as np

from time import time

def calculate_hubness_cython(np.ndarray[np.float_t, ndim=2] distance_matrix, 
                             np.ndarray[np.int_t, ndim=1] labels, 
                             int k = 10):
  t0 = time()
 
  cdef int i
  cdef int j
  cdef int num_instances
  cdef int pos
  cdef int x_label

  cdef double d
  cdef double d_max

  cdef np.ndarray[np.int_t, ndim=1] gn
  cdef np.ndarray[np.int_t, ndim=1] bn
  cdef np.ndarray[np.int_t, ndim=1] k_nn_idx
  cdef np.ndarray[np.int_t, ndim=1] k_nn_labels

  num_instances = len(labels)

  gn = np.zeros( (num_instances), dtype=int )
  bn = np.zeros( (num_instances), dtype=int )

  for i in range( num_instances ):

    k_nn_idx =  np.zeros( (k), dtype=int )
    k_nn_dist = [np.inf]*k
    k_nn_labels = np.zeros( (k), dtype=int )

    d_max = np.inf
    for j in range( num_instances ):    
      if i == j:
        continue
      
      d = distance_matrix[i,j]

      if d < d_max:
        pos = k_nn_dist.index(d_max)

        k_nn_dist[pos] = d
        k_nn_idx[pos] = j
        k_nn_labels[pos] = labels[j]

        d_max = max(k_nn_dist)

    x_label = labels[i]

    for i in range(k):
      if x_label == k_nn_labels[i]:
        gn[k_nn_idx[i]] = gn[k_nn_idx[i]] + 1
      else:
        bn[k_nn_idx[i]] = bn[k_nn_idx[i]] + 1
      
  return gn, bn, time()-t0

In [None]:
gn, bn, runtime = calculate_hubness_python(d_mat, train_labels)

In [None]:
gn_c, bn_c, runtime_c = calculate_hubness_cython(d_mat, train_labels)

In [None]:
print("Runtime of hubness calculations:")
print(f"  Python: {runtime:5.2f} s" )
print(f"  Cython: {runtime_c:5.2f} s" )

To be on the safe side, we check that both implementations return the same results.

In [None]:
assert np.sum(gn!=gn_c) == 0

In [None]:
assert np.sum(bn!=bn_c) == 0



Computation of hubness-aware weights 

In [None]:
mu = np.mean(bn)
sigma = np.std(bn)
hb = (bn - mu)/sigma
w = np.array([ math.exp(-h) for h in hb ])

**Prediction for Test Instances**

In [None]:
d_mat = cosine_distances(test_data, train_data)

In [None]:
def kNN(distance_matrix, train_labels, k, weights):
  """
      distance_matrix is expected to have n rows and m columns, 
         where n is the number of test instances and 
               m is the number of training instances. 
         Each row contains the distances of a test instance from
         all the training instances. 

      This implementation assumes that the classes are numbered 
      from 0 on as follows: 
        0, 1, ..., c-1,
      where c is the number of classes.
  """
  number_of_classes = max(train_labels)+1

  predictions = []
  for i in range(len(distance_matrix)):
    thr = np.sort(d_mat[i])[k]
    neighbor_mask = d_mat[i] <= thr
    labels_of_the_neighbors = train_labels[neighbor_mask]
    weights_of_the_neighbors = weights[neighbor_mask]

    class_votes = np.zeros( (number_of_classes) )
    for j in range(len(labels_of_the_neighbors)):
      class_votes[ labels_of_the_neighbors[j] ] += weights_of_the_neighbors[j]
      
    predictions.append( np.argmax(class_votes) )
  
  return np.array(predictions)

In [None]:
pred = kNN(d_mat, train_labels, 10, w)

Number of correctly classified instanes (out of the 100 test instances)

In [None]:
np.sum(pred==test_labels)