In [1]:
import numpy as np
import warnings
import jpype as jp

Goal: Try and initialise an MI calculator and use it

In [2]:
jarloc = "/Users/joshua/Documents/jidt/infodynamics.jar"
jp.startJVM(jp.getDefaultJVMPath(), "-ea", "-Djava.class.path=" + jarloc)

Gaussian estimator

In [56]:
miCalcClass = jp.JPackage("infodynamics.measures.continuous.gaussian").MutualInfoCalculatorMultiVariateGaussian

In [57]:
miCalc = miCalcClass()

In [58]:
miCalc.initialise(1,1)

In [59]:
miCalc.setProperty('NOISE_LEVEL_TO_ADD', '0')

In [6]:
y1 = np.random.randn(20)
y2 = np.random.randn(20)

In [7]:
jp.JArray(jp.JDouble)(y1)

<java array 'double[]'>

In [8]:
miCalc.setObservations(jp.JArray(jp.JDouble)(y1), jp.JArray(jp.JDouble)(y2))

In [9]:
miCalc.computeAverageLocalOfObservations()

0.0024375638947790566

In [3]:
from Operations.IN_AutoMutualInfo import IN_AutoMutualInfo
from scipy import stats

In [25]:
ts1 = np.loadtxt("ts1.txt")
ts2 = np.loadtxt("ts2.txt")
ts3 = np.loadtxt("ts3.txt")

In [5]:
def IN_Initialize_MI(estMethod, extraParam=None, addNoise=False):
    """
    Initialize Information Dynamics Toolkit object for MI computation.
    """
    if estMethod == 'gaussian':
        implementingClass = 'infodynamics.measures.continuous.gaussian'
        miCalc = jp.JPackage(implementingClass).MutualInfoCalculatorMultiVariateGaussian()
    elif estMethod == 'kernel':
        implementingClass = 'infodynamics.measures.continuous.kernel'
        miCalc = jp.JPackage(implementingClass).MutualInfoCalculatorMultiVariateKernel()
    elif estMethod == 'kraskov1':
        implementingClass = 'infodynamics.measures.continuous.kraskov'
        miCalc = jp.JPackage(implementingClass).MutualInfoCalculatorMultiVariateKraskov1()
    elif estMethod == 'kraskov2':
        implementingClass = 'infodynamics.measures.continuous.kraskov'
        miCalc = jp.JPackage(implementingClass).MutualInfoCalculatorMultiVariateKraskov2()
    else:
        raise ValueError(f"Unknown mutual information estimation method '{estMethod}'")

    # Add neighest neighbor option for KSG estimator
    if estMethod in ['kraskov1', 'kraskov2']:
        if extraParam != None:
            miCalc.setProperty('k', extraParam) # 4th input specifies number of nearest neighbors for KSG estimator
        else:
            miCalc.setProperty('k', '3') # use 3 nearest neighbors for KSG estimator as default
        
    # Make deterministic if kraskov1 or 2 (which adds a small amount of noise to the signal by default)
    if (estMethod in ['kraskov1', 'kraskov2']) and (addNoise == False):
        miCalc.setProperty('NOISE_LEVEL_TO_ADD','0')
    
    # Specify a univariate calculation
    miCalc.initialise(1,1)

    return miCalc

In [6]:
def IN_AutoMutualInfo(y, timeDelay=1, estMethod='gaussian', extraParam=None):
    """
    Time-series automutual information

    Parameters:
    -----------
    y : array_like
        Input time series (column vector)
    time_delay : int or list, optional
        Time lag for automutual information calculation (default is 1)
    est_method : str, optional
        The estimation method used to compute the mutual information:
        - 'gaussian'
        - 'kernel'
        - 'kraskov1'
        - 'kraskov2'
        (default is 'kernel')
    extra_param : any, optional
        Extra parameters for the estimation method (default is None)

    Returns:
    --------
    out : float or dict
        Automutual information value(s)
    """

    if isinstance(timeDelay, str) and timeDelay in ['ac', 'tau']:
        print("ADD CO_FIRSTCROSSING")
    
    y = np.asarray(y).flatten()
    N = len(y)
    minSamples = 5 # minimum 5 samples to compute mutual information (could make higher?)

    # Loop over time delays if a vector
    if isinstance(timeDelay, int):
        timeDelay = [timeDelay]
    
    numTimeDelays = len(timeDelay)
    amis = np.full(numTimeDelays, np.nan)

    if numTimeDelays > 1:
        timeDelay = np.sort(timeDelay)
    
    # initialise the MI calculator object if using non-Gaussian estimator
    if estMethod != 'gaussian':
        # assumes the JVM has already been started up
        miCalc = IN_Initialize_MI(estMethod, extraParam=extraParam, addNoise=False) # NO ADDED NOISE!
    
    for k, delay in enumerate(timeDelay):
        # check enough samples to compute automutual info
        if delay > N - minSamples:
            # time sereis too short - keep the remaining values as NaNs
            break
        # form the time-delay vectors y1 and y2
        y1 = y[:-delay]
        y2 = y[delay:]

        if estMethod == 'gaussian':
            r, _ = stats.pearsonr(y1, y2)
            amis[k] = -0.5*np.log(1 - r**2)
        else:
            # Reinitialize for Kraskov:
            miCalc.initialise(1, 1)
            # Set observations to time-delayed versions of the time series:
            y1_jp = jp.JArray(jp.JDouble)(y1) # convert observations to java double
            y2_jp = jp.JArray(jp.JDouble)(y2)
            miCalc.setObservations(y1_jp, y2_jp)
            # compute
            amis[k] = miCalc.computeAverageLocalOfObservations()
        
    if np.isnan(amis).any():
        print(f"Warning: Time series (N={N}) is too short for automutual information calculations up to lags of {max(timeDelay)}")
    if numTimeDelays == 1:
        # return a scalar if only one time delay
        return amis[0]
    else:
        # return a dict for multiple time delays
        return {f"ami{delay}": ami for delay, ami in zip(timeDelay, amis)}

In [167]:
IN_AutoMutualInfo(ts1, [1, 2, 3, 10], 'kernel')

{'ami1': 1.8745546128068256,
 'ami2': 1.5913516748120065,
 'ami3': 1.446634477674225,
 'ami10': 1.2429953968321528}