## Nearest Neighbor Mutual Information

This technique proclaims that it can successfully and accurately measure the mutual information between a continuous random variable and a discrete random variable without the use of binning. The following class implements and test this method.

In [1]:
import numpy as np
from scipy.special import digamma
import pandas as pd

## Creating a dummy dataset

This data set will comprise of x and y, y will be a continuous random variable and x will be the discrete random variable for each y point.

In [10]:
x = np.random.uniform(0, 3, (12,)).astype(np.uint16)
y = np.array([10, 12, 14, 15, 18, 20, 21, 24, 27, 31, 33, 40])

In [11]:
x = np.expand_dims(x, -1)
y = np.expand_dims(y, -1)

In [12]:
y.shape

(12, 1)

In [13]:
t = np.concatenate([x, y] ,axis=-1)

In [14]:
t.shape

(12, 2)

In [15]:
for x, y in t:
    print(f"({x}, {y})")

(1, 10)
(0, 12)
(2, 14)
(0, 15)
(2, 18)
(2, 20)
(1, 21)
(1, 24)
(0, 27)
(1, 31)
(0, 33)
(0, 40)


In [16]:
reds = t[:, 0] == 2
blues = t[:, 0] == 1
greens = t[:, 0] == 0

In [17]:
reds

array([False, False,  True, False,  True,  True, False, False, False,
       False, False, False])

In [18]:
r = t[reds]
b = t[blues]
g = t[greens]

In [19]:
r

array([[ 2, 14],
       [ 2, 18],
       [ 2, 20]])

In [83]:
selected_data_point = r[-1]
selected_data_point

array([ 2, 40])

In [16]:
rs = r[:, 1]

In [34]:
rs_sub = np.sort(np.abs(rs - 10))
kth_neighbor = rs_sub[3] + 10

In [27]:
kth_neighbor

40

In [28]:
data_point = r[r[:, 1] == 40]

In [29]:
data_point

array([[ 2, 40]])

## Algorithm and process to calculate mutual information

### Algorithm to find the kth neighbor of a data point from the same class

<ol>
<li> Extract the data points just for that class
<li> Create a second array which contains the sorted absolute differences between the values of the class array and the selected data point.
<li> Choose the kth index of this array (yes the kth index and not k-1 since the first value will be 0)
<li> Add the value of the data point to this kth value.
<li> Pick the first data point from the class array that has this value.
</ol>

### Algorithm to find the distance m between the two data points
<ol>
<li> Start at the selected index in the full dataset
<li> Move forward until the kth neighbor is found while incrementing a counter
<li> The counter value is the value of m.
</ol>

In [20]:
def find_k(dp, k, ds):

    val = dp[-1]
    cl = ds[:, 0][0]
    nums = ds[:, 1]
    subs = nums - val
    abs_subs = np.abs(subs)
    zipped_subs = zip(subs, abs_subs)
    zipped_subs = sorted(zipped_subs, key= lambda x: x[1])

    kth_neighbor_diffs = zipped_subs[k][0]
    kth_neighbor = np.array([cl,kth_neighbor_diffs + val])


    return kth_neighbor

In [21]:
def find_m(i, dp, ds):
    
    m = 1
    current_dp = ds[i]
    current_val = current_dp[1]
    wanted_val = dp[1]
    if current_val < wanted_val:
        for j in range(i+1, len(ds)):
            if np.allclose(ds[j], dp):
                break
            else:
                m += 1
    else:
        for j in range(i-1, 0, -1):
            if np.allclose(ds[j], dp):
                break
            else:
                m += 1
    return m


In [26]:


from statistics import mean


ms = []
iS = []
N = 12
k = 2
for i in range(len(t)):
    
    selected_data_point = t[i]
    dp_class = selected_data_point[0]
    
    if dp_class == 0:
        Ni = g.shape[0]
        kth_neighbor = find_k(selected_data_point, k, g)
    elif dp_class == 1:
        Ni = b.shape[0]
        kth_neighbor = find_k(selected_data_point, k, b)
    else:
        Ni = r.shape[0]
        kth_neighbor = find_k(selected_data_point, k, r)

    m = find_m(i, kth_neighbor, t)
    I = digamma(N) - digamma(Ni) + digamma(k) - digamma(m)
    iS.append(I)
    ms.append(m)

print(ms)
print(mean(iS))


[7, 7, 3, 5, 2, 3, 3, 2, 5, 3, 1, 3]
0.6184884559884565


In [157]:
find_k([2, 10], 2, r)

array([ 2, 21])

In [158]:
find_m(0, np.array([2,21]), t)

6

## Testing the functions on a real dataset

In [5]:
iris = pd.read_csv("C:\\Users\Mohammed Ali\\Documents\\Documents-Suhaib\\Data-Science-Projects\\NNMI\\IRIS.csv")

In [6]:
iris.head()

Unnamed: 0,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,Iris-setosa
1,4.9,3.0,1.4,0.2,Iris-setosa
2,4.7,3.2,1.3,0.2,Iris-setosa
3,4.6,3.1,1.5,0.2,Iris-setosa
4,5.0,3.6,1.4,0.2,Iris-setosa


In [28]:
species = list(iris.species.unique())
species

['Iris-setosa', 'Iris-versicolor', 'Iris-virginica']

In [232]:
sepal_length = iris[["species", "sepal_length"]].sort_values(by=["sepal_length"])
sepal_width = iris[["species", "sepal_width"]].sort_values(by=["sepal_width"])
petal_lenght = iris[["species", "petal_length"]].sort_values(by=["petal_length"])
petal_width = iris[["species", "petal_width"]].sort_values(by=["petal_width"])

In [233]:
class_datasets = [sepal_length[sepal_length["species"] == s] for s in species]
class_dataset_2 = [sepal_width[sepal_width["species"] == s] for s in species]
class_dataset_3 = [petal_lenght[petal_lenght["species"] == s] for s in species]
class_dataset_4 = [petal_width[petal_width["species"] == s] for s in species]

In [234]:
dataset_dict = {s:np.array(d.sort_values(by=["sepal_length"])) for s,d in zip(species, class_datasets)}
dataset_dict_2 = {s:np.array(d.sort_values(by=["sepal_width"])) for s,d in zip(species, class_dataset_2)}
dataset_dict_3 = {s:np.array(d.sort_values(by=["petal_length"])) for s,d in zip(species, class_dataset_3)}
dataset_dict_4 = {s:np.array(d.sort_values(by=["petal_width"])) for s,d in zip(species, class_dataset_4)}   

In [259]:
class NNMI_Helper:

    def __init__(self, discrete_name, cont_variables: list, ds: pd.DataFrame):

        self.discrete_name = discrete_name
        self.cont_variables = cont_variables
        self.ds = ds
        self.class_vars = list(ds[discrete_name].unique())
    
    #This method extracts a discrete-continuous variable pair dataframe from the overall dataset, returns all such pairings
    def extract_subset(self):

        sub_ds = [self.ds[[self.discrete_name, cont_name]].sort_values(by = [cont_name]) for cont_name in self.cont_variables]
        return sub_ds

    def calculate_intermediate(self, sub_datasets):

        intermediate_datasets = [[sub_ds[sub_ds[self.discrete_name] == class_var] for class_var in self.class_vars] for sub_ds in sub_datasets]
        return intermediate_datasets
    
    def make_dataset_dicts(self, intermediate_datasets):
        
        all_dataset_dicts = []

        for cont_var, id in zip(self.cont_variables, intermediate_datasets):
            inter_dict = {}
            for s, sub_cd in zip(self.class_vars, id):
                inter_dict[s] = np.array(sub_cd.sort_values(by=[cont_var]))

            all_dataset_dicts.append(inter_dict)


              
        return all_dataset_dicts

    def process(self):

        sub_datasets = self.extract_subset()
        intermediate_datasets = self.calculate_intermediate(sub_datasets)
        ds_dicts   = self.make_dataset_dicts(intermediate_datasets)

        return {c: (subds, dd) for (c, subds , dd) in zip(self.cont_variables, sub_datasets, ds_dicts)}


        

In [260]:
class NearestNeighborMutualInformation:

    def __init__(self, k, data, class_dict):
        self.data = np.array(data)
        self.class_dict = class_dict
        self.k = k
    
    def find_k(self, dp, k, ds):

        val = dp[-1]
        cl = ds[:, 0][0]
        nums = ds[:, 1]
        subs = nums - val
        abs_subs = np.abs(subs)
        zipped_subs = zip(subs, abs_subs)
        zipped_subs = sorted(zipped_subs, key= lambda x: x[1])

        kth_neighbor_diffs = zipped_subs[k][0]
        kth_neighbor = np.array([cl,kth_neighbor_diffs + val])


        return kth_neighbor

    def find_m(self, i, dp, ds):
        
        m = 1
        current_dp = ds[i]
        current_val = float(current_dp[1])
        wanted_val = float(dp[1])
        if current_val < wanted_val:
            for j in range(i+1, len(ds)):
                if (ds[j][0] == dp[0]) and (ds[j][1] == float(dp[1])):
                    break
                else:
                    m += 1
        else:
            for j in range(i-1, 0, -1):
                if (ds[j][0] == dp[0]) and (ds[j][1] == float(dp[1])):
                    break
                else:
                    m += 1
        return m
    
    def calc_I(self, N, Ni, k, m):
        return (digamma(N) - digamma(Ni) + digamma(k) - digamma(m))
    
    def calc_mi(self):

        iS = []
        N = len(self.data)
        for i in range(N):
            
            selected_data_point = self.data[i]
            dp_class = selected_data_point[0]

            Ni = self.class_dict[dp_class].shape[0]
            kth_neighbor = self.find_k(selected_data_point, self.k, self.class_dict[dp_class])
            #print(f"k: {kth_neighbor}")
            m = self.find_m(i, kth_neighbor, self.data)
            #print(f"datapoint: {selected_data_point}")
            #print(N, Ni, m)
            I = self.calc_I(N, Ni, k, m)
            iS.append(I)
        return np.mean(iS)
    

In [272]:
class NNMI_Aggregator:

    def __init__(self, data, k, disc_name):
        
        self.data = data
        self.scores = {}
        self.keys = data.keys()
        self.k = k
        self.disc_name = disc_name

    def process_scores(self, print_mode = False):

        for key in self.keys:
            cont_data, cont_dict = self.data[key]
            nnmi = NearestNeighborMutualInformation(self.k, cont_data, cont_dict)
            self.scores[key] = nnmi.calc_mi()
            if print_mode:
                print(f"The mutual information of the continuous variable {key} with {self.disc_name} is {self.scores[key]}")
            else:
                continue
        


In [261]:
nnmi_helper = NNMI_Helper("species", ["sepal_length", "sepal_width", "petal_length", "petal_width"], iris)

In [262]:
data = nnmi_helper.process()
petal_width, pw_dict = data["petal_width"]

In [273]:
nnmi_agg = NNMI_Aggregator(data, 3, "species")

In [275]:
nnmi_agg.process_scores(True)

The mutual information of the continuous variable sepal_length with species is 0.8363259854789786
The mutual information of the continuous variable sepal_width with species is 0.6837603255555182
The mutual information of the continuous variable petal_length with species is 1.3760477676104501
The mutual information of the continuous variable petal_width with species is 1.4680088431523317


## Testing the results of Nearest Neighbor Mutual Information with the iris dataset

The following code section will fit decision trees on different datasets, one with all the data, one with the two variables which have a lower mutual information with the species, and one with the two variables which have a higher mutual information with the species. The decision trees will then be evaluated against test data to measure the performance which will therefore show whether the nearest neighbor mutual information works correctly.

In [277]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split

# X = df.drop(['target'],axis=1).values   # independant features
# y = df['target'].values					# dependant variable

# # Choose your test size to split between training and testing sets:
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

### 1 Decision Tree trained on all data

In [305]:
X = iris.drop(["species"], axis = 1).values
y = iris["species"].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state = 42)

In [306]:
dt_1  = DecisionTreeClassifier()
dt_1.fit(X_train, y_train)

DecisionTreeClassifier()

In [307]:
dt_1.score(X_test, y_test)

0.98

With all the data passed into the classifier, it returns a test accuracy of 0.97 which is already incredibly high.

### 2 Decision Tree trained on the variables with low mutual information

In [312]:
X = iris[["sepal_length", "sepal_width"]].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.33, random_state = 42)

In [313]:
dt_2 = DecisionTreeClassifier()
dt_2.fit(X_train, y_train)

DecisionTreeClassifier()

In [314]:
dt_2.score(X_test, y_test)

0.66

Using the independent variables with low mutual information has resulted in a low test accuracy of 66%

### 3 Testing with the variables with the highest mutual information

In [328]:
X = iris[["petal_length", "petal_width"]].values

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.33, random_state = 42)

In [329]:
dt_3 = DecisionTreeClassifier()
dt_3.fit(X_train, y_train)

DecisionTreeClassifier()

In [330]:
dt_3.score(X_test, y_test)

1.0

When trained on the independent variables which showed high mutual information with the response variable, the test accuracy of the model is the highest with a test accuracy of 99% on average, this is slightly higher than training the model on all the independent variables.

While the dataset was quite small, these results show that nearest neighbor mutual information can be applied successfully to a dataset to choose only those independent variables which will result in maximal learning.