In [1]:
import pandas as pd
import numpy as np
import pickle

from sklearn.ensemble import BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from scipy.stats import entropy

from joblib import Parallel, delayed

In [2]:
# Load data
data = pd.read_csv("vdf.csv")

data.age = pd.factorize(data.age)[0]
data.v = pd.factorize(data.v)[0]
data = data.fillna(0)
print(data)

col_names = ["claw", "dist", "age", "cluster"]
X = np.array(data[col_names])
y = np.array(pd.factorize(data.type)[0])
print(X.shape)
print(y.shape)

       v type  claw        dist  age  cluster
0      0   KC   2.0   337.63801    0        1
1      1   KC   1.0     0.00000    0        1
2      2   KC   1.0  1291.20357    0        1
3      3   KC   1.0   252.63347    0        1
4      4   KC   2.0  1822.29525    0        1
..   ...  ...   ...         ...  ...      ...
208  208   PN   0.0     0.00000   -1        6
209  209   PN   0.0     0.00000   -1        6
210  210   PN   0.0     0.00000   -1        6
211  211   PN   0.0     0.00000   -1        6
212  212   PN   0.0     0.00000   -1        6

[213 rows x 6 columns]
(213, 4)
(213,)


In [3]:
def cef_estimate_mike_1(X, y, n_estimators = 200, max_samples = .32, bootstrap = True, depth = 30, min_samples_leaf = 1, max_features = 1.):
    model = BaggingClassifier(DecisionTreeClassifier(max_depth = depth, min_samples_leaf = min_samples_leaf, max_features = int(np.ceil(np.sqrt(X.shape[1])))), 
                              n_estimators = n_estimators, 
                              max_samples= max_samples, 
                              bootstrap = bootstrap)
    model.fit(X, y)
    class_counts = np.zeros((X.shape[0], model.n_classes_))
    tree_idx = 0
    for tree in model:
        
        # get out of bag indices.       
        # Here's where we obtain unsampled indices.
        # unsampled_indices = _generate_unsampled_indices(tree.random_state, len(X), int((1 - max_samples)*len(X)))
        sampled_indices = model.estimators_samples_[tree_idx]
        unsampled_indices = np.delete(np.arange(0,X.shape[0]), sampled_indices)
        tree_idx = tree_idx + 1
        # Done with unsampled indices.
        
        total_unsampled = len(unsampled_indices)
        np.random.shuffle(unsampled_indices)
        prob_indices, eval_indices = unsampled_indices[:total_unsampled//2], unsampled_indices[total_unsampled//2:]
        # get all node counts
        node_counts = tree.tree_.n_node_samples
        # get probs for eval samples
        posterior_class_counts = np.zeros((len(node_counts), model.n_classes_))
        for prob_index in prob_indices:
            posterior_class_counts[tree.apply(X[prob_index].reshape(1, -1)).item(), y[prob_index]] += 1
        row_sums = posterior_class_counts.sum(axis=1)
        row_sums[row_sums == 0] = 1
        class_probs = (posterior_class_counts/row_sums[:, None])
        
        where_0 = np.argwhere(class_probs == 0)
        for elem in where_0:
            class_probs[elem[0], elem[1]] = 1/(2*row_sums[elem[0], None])
        where_1 = np.argwhere(class_probs == 1)
        for elem in where_1:
            class_probs[elem[0], elem[1]] = 1 - 1/(2*row_sums[elem[0], None])
        
        class_probs.tolist()
        partition_counts = np.asarray([node_counts[x] for x in tree.apply(X[eval_indices])])
        # get probability for out of bag samples
        eval_class_probs = [class_probs[x] for x in tree.apply(X[eval_indices])]
        eval_class_probs = np.array(eval_class_probs)
        # find total elements for out of bag samples
        elems = np.multiply(eval_class_probs, partition_counts[:, np.newaxis])
        # store counts for each x (repeat fhis for each tree)
        class_counts[eval_indices] += elems
    # calculate p(y|X = x) for all x's
    probs = class_counts/class_counts.sum(axis = 1, keepdims = True)
    entropies = -np.sum(np.log(probs)*probs, axis = 1)
    # convert nan to 0
    entropies = np.nan_to_num(entropies)
    return np.mean(entropies)

def entropy_estimate(y, base = np.exp(1)):
    _, counts = np.unique(y, return_counts=True)
    return entropy(counts, base=base)

def estimate_mi(X, y):
    H_Y = entropy_estimate(y)
    H_YX = cef_estimate_mike_1(X, y)
    return H_Y - H_YX

In [4]:
def _perm_stat(calc_stat, x, y):
    permy = np.random.permutation(y)
    perm_stat = calc_stat(x, permy)

    return perm_stat

def perm_test(calc_stat, X, y, reps=1000, workers=1):
    """
    Calculate the p-value via permutation
    """
    # calculate observed test statistic
    stat = calc_stat(X, y)

    # calculate null distribution
    null_dist = np.array(
        Parallel(n_jobs=workers)(
            [delayed(_perm_stat)(calc_stat, X, y) for rep in range(reps)]
        )
    )
    pvalue = (null_dist >= stat).sum() / reps

    # correct for a p-value of 0. This is because, with bootstrapping
    # permutations, a p-value of 0 is incorrect
    if pvalue == 0:
        pvalue = 1 / reps

    return stat, pvalue

In [5]:
reps = 1000

stat, pvalue = perm_test(estimate_mi, X, y, reps=reps, workers=-2)
print("Test Statistic: ", stat)
print("p-value: ", pvalue)

pickle.dump((stat, pvalue), open('stat_pval.pkl', 'wb'))

Test Statistic:  0.8317348955841228
p-value:  0.2


In [6]:
stat, pvalue = pickle.load(open('stat_pval.pkl', 'rb'))
print("Test Statistic: ", stat)
print("p-value: ", pvalue)

Test Statistic:  0.8317348955841228
p-value:  0.2
