In [1]:
import numpy as np
import scipy.sparse as sparse

import pandas as pd

import itertools

In [131]:
# Example Time Series Generator

X = np.random.randint(low=0,high=2,size=(3,1000))

X[2,:] = X[0,:] & X[1,:]

In [132]:
class Analysis:

    def __init__(self, data):

        # number of variables
        self.nvars = np.shape(data)[0]

        # length
        self.duration = np.shape(data)[1]

        # determine the joint alphabet
        vars = np.arange(0,self.nvars)
        alphabet = np.unique(data[vars,:],axis=1)

        # calculate the joint distribution
        self.P = pd.DataFrame(sparse.csc_matrix((np.shape(alphabet)[1],1)),index=pd.MultiIndex.from_arrays(alphabet))

        self.P.index.names = vars
        self.P.rename({0:'joint'}, axis='columns', inplace=True)
        
        for a in self.P.index:

            self.P.loc[a] = np.count_nonzero(np.all((data[vars,:]==np.reshape(np.array(a),(-1,1))),axis=0),axis=0)

        self.P['joint'] = self.P.joint.values/np.sum(self.P.joint.values)


    # calculate the marginal distribution of one variable or a higher-order marginal of several variables
    def marginal_distribution(self,x):

        return self.P.groupby(level=x).sum().rename({'joint':'marginal'}, axis='columns')

    
    # return a conditional distribution of one or several variables given one or several other variables
    def conditional_distribution(self,x,y):

        xy = tuple(list(set(x).union(set(y))))

        # Calculate the joint distribution of x and y
        p_xy = self.marginal_distribution(xy)
        p_xy.rename({'marginal':'joint'}, axis='columns', inplace=True)
        p_xy.reset_index(inplace=True)
        
        # Calculate the marginal of the conditioning variable
        p_y = self.marginal_distribution(y)
        p_y.reset_index(inplace=True)

        p_xcy = p_xy.merge(p_y, left_on=y, right_on=y)

        p_xcy['conditional'] = p_xcy['joint']/p_xcy['marginal']
        p_xcy.set_index(list(xy), inplace=True)

        return p_xcy[['conditional']]

    # Calculate the entropy of one or more variables
    def entropy(self,var=None):

        if np.all(var==None):

            p = self.P['joint'][self.P['joint'].ne(0)].values.astype('float')

        else:

            p = self.P.groupby(level=var).sum().values
        
        return -np.sum(p*np.log2(p))


    # Conditional Entropy
    def conditional_entropy(self,x,y):

        xy = list(set(x).union(set(y)))

        H_XY = self.entropy(xy)
        H_Y  = self.entropy(y)

        return H_XY  - H_Y
        
    # Mutual Information
    def mutual_information(self,x,y):

        xy = list(set(x).union(set(y)))

        H_X = self.entropy(x)
        H_Y = self.entropy(y)

        H_XY = self.entropy(xy)

        return H_X + H_Y - H_XY
    

    # Interaction Information
    def interaction_information(self):

        II = 0

        vars = np.arange(0,self.nvars)

        for n_subset in range(1,self.nvars+1):

            for subset in [list(sub) for sub in itertools.combinations(vars, r=n_subset)]:

                II += (-1)**(self.nvars-n_subset+1)*self.entropy(subset)


        return II


    # total correlation
    def total_correlation(self):

        vars = list(range(0,self.nvars))

        HS = self.entropy(vars)

        HX = np.sum([self.entropy([x]) for x in vars])

        return HX - HS


    # dual total correlation
    def dual_total_correlation(self):

        vars = np.arange(0,self.nvars)

        HS = self.entropy(list(vars))

        HX = np.sum([self.entropy(list(vars[vars!=x])) for x in vars])

        return HX - (self.nvars-1)*HS


    # redundancy synergy index
    def redundancy_synergy_index(self,source,target):

        I_joint = self.mutual_information(source,target)

        I_ind =  np.sum([self.mutual_information([x],target) for x in source])

        return I_joint - I_ind

    
    # varadans synergy
    def varadans_synergy(self,source, target, source_partitions):

        I_joint = self.mutual_information(source,target)

        I_max = max([sum([self.mutual_information(p,target) for p in partition]) for partition in source_partitions])

        return I_joint - I_max


    # Delta I
    def delta_I(self,x,y):

        # Calculate the empirical joint, conditional and y-marginal
        p = self.conditional_distribution(x,y)[['joint','conditional']]
        p.rename({'joint':'xy','conditional':'x|y'}, axis='columns', inplace=True)

        # blow up the index to all possible values
        temp = pd.DataFrame(index=pd.MultiIndex.from_product([np.unique(p.index.get_level_values(level=i)) for i in [*x,*y]]))
        temp.index.names = [*x,*y]
        p = pd.merge(temp, p, left_index=True, right_index=True, how='left')
        p = p.fillna(0)

        temp = self.marginal_distribution(y).rename({'marginal':'y'}, axis='columns')
        p = pd.merge(p.reset_index(), temp.reset_index(), left_on=y, right_on=y, how='left')
        p.set_index([*x,*y], inplace=True)


        # Calculate the distribution of each xi conditioned on y
        for xi in x:

            p_xicy = self.conditional_distribution([xi],y)[['conditional']]
            p_xicy.rename({'conditional':'x'+str(xi)+'|y'}, axis='columns', inplace=True)
            
            p = pd.merge(p.reset_index(), p_xicy.reset_index(), left_on=[xi,*y], right_on=[xi,*y], how='left')
            p.set_index([*x,*y], inplace=True)

        # Calculate the conditional distribution of x given y under the independent model
        p['ind_x|y'] = np.ones(p.shape[0])

        for xi in x:

            p.loc[:,'ind_x|y'] *= p.loc[:,'x'+str(xi)+'|y']
            p.drop(['x'+str(xi)+'|y'], axis='columns' ,inplace=True)

        # Calculate the evidence for x under the independent model
        temp = (p['ind_x|y']*p['y']).to_frame()
        temp = temp.groupby(level=x).sum()
        temp.rename({0:'ind_x'}, axis='columns', inplace=True)

        p = pd.merge(p.reset_index(), temp.reset_index(), left_on=x, right_on=x, how='left')
        p.set_index([*x,*y], inplace=True)

        # Calculate y|x with Bayes' theorem
        p['ind_y|x'] = p['ind_x|y']*p['y']/p['ind_x']

        # Calculate y|x under the data
        temp = self.conditional_distribution(y,x)[['conditional']]
        temp.rename({'conditional':'y|x'}, axis='columns', inplace=True)

        # this merge drops all the x,y combinations that do not occur in the data
        p = pd.merge(p, temp, right_index=True, left_index=True, how='right')

        # Calculate a weighted difference
        return np.nansum(p['xy'].values*np.log2(p['y|x'].values/p['ind_y|x']))
        


In [None]:
x = Analysis(X)
x.conditional_distribution([0,1],[1])

In [41]:
partitions = [[[0],[1,2]],
              [[1],[0,2]],
              [[2],[0,1]],
              [[0],[1],[2]]]

partitions = [[[0],[1]]]