In [1]:
import numpy as np
import pandas as pd
from itertools import combinations, combinations_with_replacement
from tqdm.notebook import tqdm

In [2]:
def mutual_information(matrix):
    """Calculating mutual information from tabulated data"""
    total = matrix.sum()
    flatM = matrix.ravel()
    margin0 = matrix.sum(axis=0)
    margin1 = matrix.sum(axis=1)
    divisor = np.outer(margin1,margin0).ravel()
    indices = flatM.nonzero()
    return np.sum(flatM[indices]/total*np.log2(flatM[indices]*total/divisor[indices]))
    
def mutual_x_y(matrix,feature, output_grouping = True): 
    """Tabulating data based on a feature and the outcome from the full confusion matrix. 
    If output_grouping is True, the outcome is also grouped based on the feature"""
    if output_grouping:
        indices0 = []
        indices1 = []
        for i,v in enumerate(feature):
            if v:
                indices1.append(i)
            else:
                indices0.append(i)

        matrix_xy = np.array([[matrix[np.ix_(indices0,indices0)].sum(),
                          matrix[np.ix_(indices1,indices0)].sum()],
                         [matrix[np.ix_(indices0,indices1)].sum(),
                          matrix[np.ix_(indices1,indices1)].sum()]])
    else:
        a0 = matrix[:,feature].sum(axis=1)
        a1 = matrix[:,np.invert(feature)].sum(axis=1)
        matrix_xy = np.vstack([a0,a1]).transpose()
        
    return mutual_information(matrix_xy)
    

def mutual_xi_xj(counts, indices_i1, indices_j1):
    """Tabulating the input based on two features"""
    indices_i0 = np.invert(indices_i1)
    indices_j0 = np.invert(indices_j1)

    matrix_xx = np.array([[counts[indices_i0&indices_j0].sum(),
                      counts[indices_i1&indices_j0].sum()],
                     [counts[indices_i0&indices_j1].sum(),
                      counts[indices_i1&indices_j1].sum()]])
    return mutual_information(matrix_xx)


# def mutual_xi_y_xj(counts, indices_i1, indices_j1): 
    """Calculating conditional mutual information"""
    #to fill

def mRMR(indices, features, MI_xy,MI_xx, x_counts,k):
    """To minimize MI_xy - (1/k) * MI_xx (Peng et al., 2005)"""

    selected = np.array([],dtype='int')
    
    for i in range(k):
        best = 0
        j_max = 0
        for j in range(len(indices)):
            f = indices[j]
            j_x = MI_xy[f]
            for s in selected:
                if f < s:       
                    a = f
                    b = s
                else:
                    a = s
                    b = f
                if MI_xx[a,b] == -1:
                    MI_xx[a,b] = mutual_xi_xj(x_counts,features[a],features[b])
                j_x += MI_xx[s,f]
            if j_x > j_max:
                j_max = j_x
                best = j
        selected = np.append(selected, indices[best])
        indices=np.delete(indices,best)
    
    return selected
    
    
class confusion_matrix:
    def __init__(self, file_path):
        self.df = pd.read_csv(file_path, index_col=0)
        self.df.reindex(sorted(self.df.index), axis=0)
        self.df.reindex(sorted(self.df.columns), axis=1)
        self.matrix = self.df.to_numpy()
        self.x_counts = self.matrix.sum(axis=0)
        self.n = self.matrix.shape[0]
        self.n_features = 2**(self.n-1)


    def select(self, feature_lim = None, k = 5, batch_size = 50, output_grouping = True):
        
        if k >= batch_size:
            print('Error: Batch size is smaller than the number of features to be selected')
        
        self.features = np.array([[j=='1' 
                                   for j in '1'+np.binary_repr(i, width=self.n-1)] 
                                   for i in range(self.n_features)])
        
        print('Computing I(x;y)')
        self.MI_xy = np.array([])
        for i in tqdm(range(self.n_features)):
            self.MI_xy= np.append(self.MI_xy,mutual_x_y(self.matrix,self.features[i],output_grouping))
            
        self.order = np.arange(self.n_features, dtype='int') #this or the commented line below

# arranging features according to I(x,y) with a descending order 
#         self.order = np.argsort(self.MI_xy)
        
# filtering the features with I(x,y) only       
#         if self.feature_lim & self.feature_lim >self.n_features:
#             print('Only analyzing the {} features with the highest entropies'.format(feature_lim))
#             self.order = self.order[:feature_lim]        
        
        print('Selecting features with mRMR')
        self.MI_xx = -np.ones((len(self.features),len(self.features)))
        self.results = np.array([],dtype='int')
        for i in tqdm(range(np.ceil(self.n_features/batch_size).astype('int'))):
            start = i*batch_size
            batch = self.order[start:start+batch_size]
            batch = np.append(batch, self.results)
            self.results = mRMR(batch, self.features, self.MI_xy, 
                           self.MI_xx, self.x_counts,k)
            
            
        


In [3]:
production = confusion_matrix('cm_production.csv')
perception = confusion_matrix('cm_perception.csv')

## selection with outcomes coded with features

In [4]:
production.select()

Computing I(x;y)


  0%|          | 0/32768 [00:00<?, ?it/s]

Selecting features with mRMR


  0%|          | 0/656 [00:00<?, ?it/s]

In [5]:
for f in production.results:
    i = production.features[f]
    print(production.df.index[i],production.df.index[np.invert(i)])

Index(['b', 'd', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'v'], dtype='object') Index(['ð', 's', 'ʃ', 'z', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'v'], dtype='object') Index(['s', 'ʃ', 'z', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'v', 'ʒ'], dtype='object') Index(['s', 'ʃ', 'z', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'v', 'z'], dtype='object') Index(['s', 'ʃ', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'v', 'z', 'ʒ'], dtype='object') Index(['s', 'ʃ', 'θ'], dtype='object')


In [6]:
perception.select()

Computing I(x;y)


  0%|          | 0/32768 [00:00<?, ?it/s]

Selecting features with mRMR


  0%|          | 0/656 [00:00<?, ?it/s]

In [7]:
for f in perception.results:
    i = perception.features[f]
    print(perception.df.index[i],perception.df.index[np.invert(i)])

Index(['b', 'd', 'ð', 'g', 'm', 'n', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 's', 'ʃ', 't', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 'ʃ', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 's', 't', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'ʃ', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 't', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'ʃ', 'v', 'z', 'ʒ', 'θ'], dtype='object') Index(['f', 'k', 'p', 't'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'ʃ', 't', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 'θ'], dtype='object')


## selection with original outcomes

In [8]:
production = confusion_matrix('cm_production.csv')
perception = confusion_matrix('cm_perception.csv')

In [9]:
production.select(output_grouping=False)

Computing I(x;y)


  0%|          | 0/32768 [00:00<?, ?it/s]

Selecting features with mRMR


  0%|          | 0/656 [00:00<?, ?it/s]

In [10]:
for f in production.results:
    i = production.features[f]
    print(production.df.index[i],production.df.index[np.invert(i)])

Index(['b', 'd', 'f', 'g', 'k', 'm', 'n', 'p', 't'], dtype='object') Index(['ð', 's', 'ʃ', 'v', 'z', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't'], dtype='object') Index(['s', 'ʃ', 'v', 'z', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 't', 'ʒ'], dtype='object') Index(['s', 'ʃ', 'v', 'z', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 'ʃ', 't'], dtype='object') Index(['s', 'v', 'z', 'ʒ', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'f', 'g', 'k', 'm', 'n', 'p', 'ʃ', 't', 'ʒ'], dtype='object') Index(['s', 'v', 'z', 'θ'], dtype='object')


In [11]:
perception.select(output_grouping=False)

Computing I(x;y)


  0%|          | 0/32768 [00:00<?, ?it/s]

Selecting features with mRMR


  0%|          | 0/656 [00:00<?, ?it/s]

In [12]:
for f in perception.results:
    i = perception.features[f]
    print(perception.df.index[i],perception.df.index[np.invert(i)])

Index(['b', 'd', 'ð', 'g', 'm', 'n', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 's', 'ʃ', 't', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 'v', 'z', 'ʒ', 'θ'], dtype='object') Index(['f', 'k', 'p', 's', 'ʃ', 't'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'v', 'z', 'ʒ'], dtype='object') Index(['f', 'k', 'p', 'ʃ', 't', 'θ'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'v', 'z', 'ʒ', 'θ'], dtype='object') Index(['f', 'k', 'p', 'ʃ', 't'], dtype='object')
Index(['b', 'd', 'ð', 'g', 'm', 'n', 's', 'ʃ', 'v', 'z', 'ʒ', 'θ'], dtype='object') Index(['f', 'k', 'p', 't'], dtype='object')
