In [1]:
%matplotlib inline
from pathlib import Path

import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from scipy.io import savemat, loadmat

import matplotlib.pyplot as plt
import seaborn as sns
from mgcpy.benchmarks.ts_benchmarks import NonlinearDependence
from mgcpy.independence_tests.dcorrx import DCorrX
from mgcpy.independence_tests.mgcx import MGCX
from mgcpy.independence_tests.xcorr import BoxPierceX, LjungBoxX

In [2]:
def _compute_power(test, X_full, Y_full, num_sims, alpha, n, replication_factor=100):
    """
    Helper method estimate power of a test on a given simulation.

    :param test: Test to profile, either DCorrX or MGCX.
    :type test: TimeSeriesIndependenceTest

    :param X_full: An ``[n*num_sims]`` data matrix where ``n`` is the highest sample size.
    :type X_full: 2D ``numpy.array``

    :param Y_full: An ``[n*num_sims]`` data matrix where ``n`` is the highest sample size.
    :type Y_full: 2D ``numpy.array``

    :param num_sims: number of simulation at each sample size.
    :type num_sims: integer

    :param alpha: significance level.
    :type alpha: float

    :param n: sample size.
    :type n: integer

    :return: returns the estimated power.
    :rtype: float
    """
    num_rejects = 0.0

    def worker(s):
        X = X_full[range(n), s]
        Y = Y_full[range(n), s]
        if test['name'] in ['DCorr-X', 'MGC-X']:
            p_value, _ = test['object'].p_value(
                X, 
                Y, 
                replication_factor=replication_factor, 
                is_fast = test['is_fast']
            )
        else:
            p_value = test['object'].p_value(X, Y, replication_factor=replication_factor)

        if p_value <= alpha:
            return 1.0
        return 0.0

    rejects = Parallel(n_jobs=-2, verbose=1)(delayed(worker)(s) for s in range(num_sims))
    power = np.mean(rejects)
    std = np.std(rejects)

    return power, std

def _load_power_curves(tests, process):
    result = {}
    for test in tests:
        # Read in .pkl file.
        filename = f"./powers/{test['filename']}_powers_{process.filename}.pkl"
        pickle_in = open(filename,"rb")
        curve = pickle.load(pickle_in)
        pickle_in.close()
        
        N = curve.shape[0]
        if not result:
            result['sample_sizes'] = curve[:,0].reshape((N, 1))
        result[test['filename'] + '_powers'] = curve[:,1].reshape((N, 1))
        result[test['filename'] + '_stds'] = curve[:,1].reshape((N, 1))
    return result

In [3]:
n = 1200
alpha = 0.05
num_sims = 300

tests = [
    {
        'name' : 'DCorr-X',
        'filename' : 'dcorrx',
        'is_fast' : False,
        'subsample_size' : -1,
        'object' : DCorrX(max_lag=1)
    },
    {
        'name' : 'LjungX',
        'filename' : 'ljungx',
        'object' : LjungBoxX(max_lag=1),
        'color' : 'k'
    },
    {
        'name' : 'BoxPierceX',
        'filename' : 'boxpiercex',
        'object' : LjungBoxX(max_lag=1),
        'color' : 'k'
    },
    {
        'name' : 'MGC-X',
        'filename' : 'mgcx',
        'is_fast' : False,
        'object' : MGCX(max_lag=1),
    }
]

In [4]:
p = Path('./extinction_data/')
processes = sorted(p.glob("*mat"))

In [None]:
rates = [float(process.name.split("_")[-2]) for process in processes]
df = pd.DataFrame(rates, columns=['extinction_rate'])

for test in tests:
    print(f"Running test: {test['name']}")
    powers = np.zeros(len(processes))
    stds = np.zeros(len(processes))
    
    for i, process in enumerate(processes):
        print(f"Extinction Rate: {process.name.split('_')[-2]}")
        data = loadmat(process)
        X_full = data['X_full']
        Y_full = data['Y_full']
        
        powers[i], stds[i] = _compute_power(test, X_full, Y_full, num_sims, alpha, n)
        
    test_name = test['name']
    tmp_df = pd.DataFrame(np.array([powers, stds]).T, columns=[f"{test_name}_powers", f"{test_name}_stds"])
    
    df = pd.concat([df, tmp_df], axis=1)

In [None]:
df.to_csv("results.csv", index=False)