# Causation Entropy

Entropy is a measure historically coined to study the reversiblity of heat engines. So a high entopic process means it is far away from reversibilty. This seemingly phenomenological quantity proved to have a much deeper connection to broader ranges of phenomenon, and the way it is defined proved it has deeper implications and connection to different fields of knowledge. a citation is needed here to illustrate this connection and show some interpretations.

Entropy is accepted as a measure of _surprise_ or _uncertainty_ in communication theory. Entropy is accepted as a measure of _disorder_ in statistical physics. Here I would like to adopt the communication theory interpretation of entropy, a measure of _uncertainty_ or _surprise_. 



In [None]:
## TODO
# entropy: one argument, probably not useful
# cross entropy: two arguments, useful, base for other levels
# conditional entropy: argument is a joint probability distribution 
#                      not two marginal probability distributions
# transfer entropy: conditional entropy of (Xt+1 | Xt) - (Xt+1 | Xt, Yt)
# Causation entropy: difference in transfer entropy conditioned on a set S
#                    rather than a prior distribution Xt

Check: [`pyinform`](https://elife-asu.github.io/PyInform/)

Summary of Libraries

Library |	Mutual Information (MI)   |	Conditional Entropy (CE)     |	Transfer Entropy (TE)
--------|-----------------------------|------------------------------|------------------------
sklearn |   ✅ (mutual_info_classif)  |	❌ (must compute manually) |	          ❌
scipy	| ✅ (via entropy differences)|	✅ (manual computation)	  |           ❌
NPEET	| ✅ (mi)	                 |   ✅ (condentropy)          |	          ❌
PyInform|             ❌              |             ❌              | ✅ (transferentropy) 

# Define Conditional Entropy

In [10]:
import numpy as np
import pandas as pd
from scipy.stats import entropy as scipy_entropy
from numpy.testing import assert_almost_equal
from itertools import product

In [88]:
def gen_conditional_entropy(X, *Y, nbins=10, base=2):

    b = np.log(base)

    Y = np.array(Y)
    X, Y = X.flatten(), Y.reshape(Y.shape[0], -1)

    x_min, x_max = np.min(X), np.max(X)
    y_min, y_max = np.min(Y, axis=1), np.max(Y, axis=1)
    X_bins = np.linspace(x_min, x_max, nbins+1)
    Y_bins = np.linspace(y_min, y_max, nbins+1).T

    X_bins[-1] += 1
    Y_bins[:,-1] += 1

    joint_dims = tuple(Y.shape[0] * [nbins])
    joint_dimsp1 = tuple((Y.shape[0] + 1) * [nbins])

    Py = np.empty(joint_dims)
    Pxy = np.empty(joint_dimsp1)
    CE = 0

    joint_ranges = [range(x) for x in joint_dims]
    for indices in product(*joint_ranges):
        condition = np.ones_like(Y[0], dtype=np.bool_)
        for ii in range(len(joint_ranges)):
            condition = condition & (Y[ii] >= Y_bins[ii,indices[ii]]) &\
                        (Y[ii] < Y_bins[ii,indices[ii]+1])
        Py[*indices] = np.where(condition, 1, 0).sum() / Y.shape[1]

    for k, indices in product(range(nbins), product(*joint_ranges)):
        condition = np.ones_like(Y[0], dtype=np.bool_)
        for ii in range(len(joint_ranges)):
            condition = condition & (Y[ii] >= Y_bins[ii,indices[ii]]) &\
                        (Y[ii] < Y_bins[ii,indices[ii]+1])
        condition = condition & (X >= X_bins[k]) & (X < X_bins[k+1])
        Pxy[k, *indices] = np.where(condition, 1, 0).sum() / X.shape[0]

        if Pxy[k, *indices] and Py[*indices]:
            CE += Pxy[k, *indices] * np.log(Py[*indices] / Pxy[k, *indices] ) / b

    return CE

In [91]:
X = np.random.rand(100)
Y = np.random.rand(100)

ce = gen_conditional_entropy(X, Y)
cesp = scipy_conditional_entropy(X,Y)
print(f"   my conditional entropy = {ce}")
print(f"scipy conditional entropy = {cesp}")

   my conditional entropy = 2.459459458304302
scipy conditional entropy = 2.468045019757652


In [None]:
def Prob(X, nbins=10):
    X = X.flatten()

    x_min, x_max = np.min(X), np.max(X)
    binlims = np.linspace(x_min, x_max, nbins+1)

    binlims[-1] += 1

    Px = np.empty_like(binlims[1:])

    for i in range(nbins):
        Px[i] = np.where((X >= binlims[i]) & (X < binlims[i+1]), 1, 0).sum()\
               / X.shape[0]
    
    return Px


def JointProb(X, Y, nbins=10):
    
    X, Y = X.flatten(), Y.flatten()

    x_min, x_max = np.min(X), np.max(X)
    y_min, y_max = np.min(Y), np.max(Y)
    X_bins = np.linspace(x_min, x_max, nbins+1)
    Y_bins = np.linspace(y_min, y_max, nbins+1)

    X_bins[-1] += 1
    Y_bins[-1] += 1

    Pxy = np.empty((nbins, nbins))

    for i,j  in product(range(nbins), range(nbins)):
        Pxy[i,j] = np.where((Y >= Y_bins[j]) & (Y < Y_bins[j+1]) &\
                            (X >= X_bins[i]) & (X < X_bins[i+1]),
                             1, 0).sum() / X.shape[0]
        
    return Pxy

def entropy(X, nbins=10, base=2):

    b = np.log(base)
    Px = Prob(X, nbins=nbins)

    return np.nansum( - Px * np.log(Px) / b)


def conditional_entropy(X, Y, nbins=10, base=2):

    b = np.log(base)

    X, Y = X.flatten(), Y.flatten()

    x_min, x_max = np.min(X), np.max(X)
    y_min, y_max = np.min(Y), np.max(Y)
    X_bins = np.linspace(x_min, x_max, nbins+1)
    Y_bins = np.linspace(y_min, y_max, nbins+1)

    X_bins[-1] += 1
    Y_bins[-1] += 1

    Py = np.empty_like(Y_bins[1:])
    Pxy = np.empty((nbins, nbins))
    CE = 0

    for i in range(nbins):
        Py[i] = np.where((Y >= Y_bins[i]) & (Y < Y_bins[i+1]), 1, 0).sum()\
               / Y.shape[0]

    for i,j  in product(range(nbins), range(nbins)):
        Pxy[i,j] = np.where((Y >= Y_bins[j]) & (Y < Y_bins[j+1]) &\
                            (X >= X_bins[i]) & (X < X_bins[i+1]),
                             1, 0).sum() / X.shape[0]
        if Pxy[i,j] and Py[j]:
            CE -= Pxy[i,j] * np.log(Pxy[i,j] / Py[j]) / b
    return CE

def scipy_conditional_entropy(X, Y):
    """
    Calculates the conditional entropy H(Y|X) of Y given X.

    Args:
        X (array-like): Values of the first random variable.
        Y (array-like): Values of the second random variable.

    Returns:
        float: The conditional entropy H(Y|X).
    """

    # Calculate the joint probability distribution of X and Y
    joint_prob = np.histogram2d(X, Y, density=True)[0].flatten()
    # joint_prob = joint_prob[joint_prob > 0]  # Remove 0 probabilities to avoid errors in log calculation

    # Calculate the marginal probability distribution of X
    marginal_prob_x = np.histogram(X, density=True)[0]
    marginal_prob_x = marginal_prob_x[marginal_prob_x > 0]

    # Calculate joint entropy H(X, Y)
    joint_entropy = scipy_entropy(joint_prob, base=2)

    # Calculate marginal entropy H(X)
    marginal_entropy_x = scipy_entropy(marginal_prob_x, base=2)

    # Calculate conditional entropy H(Y|X)
    conditional_entropy = joint_entropy - marginal_entropy_x

    return conditional_entropy

## Test Implementation

In [None]:
X = np.random.rand(100)
Y = np.random.rand(100)

Px = Prob(X)
Py = Prob(Y)
Pxy = JointProb(X,Y)

ass = lambda x, y : assert_almost_equal(x, y, 10)

ass(Px.sum(), 1)
ass(Py.sum(), 1)
ass(Pxy.sum(), 1)
ass(Pxy.sum(1), Px)
ass(Pxy.sum(0), Py)

ce = gen_conditional_entropy(X, Y)
cesp = scipy_conditional_entropy(X,Y)
print(f"   my conditional entropy = {ce}")
print(f"scipy conditional entropy = {cesp}")

IndexError: index 11 is out of bounds for axis 0 with size 11

In [21]:
print(f"   my entropy = {entropy(X)}")
scipy_distr_X = np.histogram(X, density=True)[0]
print(f"scipy entropy = {scipy_entropy(scipy_distr_X, base=2)}")

   my entropy = 3.2410128215230296
scipy entropy = 3.24101282152303


# Preprocess Data

In [23]:
df_raw = pd.read_csv("data/exam_grades.csv")

df_raw.head()

Unnamed: 0,semester,sex,exam1,exam2,exam3,course_grade
0,2000-1,Man,84.5,69.5,86.5,76.2564
1,2000-1,Man,80.0,74.0,67.0,75.3882
2,2000-1,Man,56.0,70.0,71.5,67.0564
3,2000-1,Man,64.0,61.0,67.5,63.4538
4,2000-1,Man,90.5,72.5,75.0,72.3949


In [24]:
df_raw.describe()

Unnamed: 0,exam1,exam2,exam3,course_grade
count,232.0,233.0,233.0,233.0
mean,80.766185,72.605579,75.479589,72.238831
std,11.06786,13.777468,14.706791,9.807053
min,46.5,38.0,28.0,43.2733
25%,73.5,63.0,67.0,66.6958
50%,82.0,74.0,78.0,72.5267
75%,89.625,83.0,86.0,78.931
max,99.3,99.5,98.8889,97.5667


In [25]:
df_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 233 entries, 0 to 232
Data columns (total 6 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   semester      233 non-null    object 
 1   sex           233 non-null    object 
 2   exam1         232 non-null    float64
 3   exam2         233 non-null    float64
 4   exam3         233 non-null    float64
 5   course_grade  233 non-null    float64
dtypes: float64(4), object(2)
memory usage: 11.1+ KB


In [26]:
df = df_raw.dropna()