In [None]:
import numpy as np
import matplotlib.pyplot as plt 
import math

In [None]:
x = np.linspace(0, 100, 100)
y = [(50**i * np.exp(-50))/(math.factorial(i)) for i in range(x.shape[0])]

In [None]:
plt.scatter(x, y)
plt.show()

In [None]:
import numpy as np
from numpy import matlib as mb

np.random.seed(12345678)
x = np.random.normal(0, 1, size=(5, 5))
centx = x - np.mean(x, axis=0)
centx_ = x - mb.repmat(np.mean(x, axis=0), x.shape[0], 1)
print(centx)
print(centx_)

In [None]:
import numpy as np
from mgcpy.independence_tests.hhg import HHG
from mgcpy.benchmarks.simulations import linear_sim

np.random.seed(12345678)
x, y = linear_sim(100, 1, noise=0)
x = x.reshape(-1, 1)
y = y.reshape(-1, 1)

hhg = HHG()
print(hhg.test_statistic(x, y)[0])
print(hhg.p_value(x, y)[0])

In [11]:
import numpy as np
def _fast_hhg(distx, disty):
    n = distx.shape[0]
    S = np.zeros((n, n))

    for i in range(n):
        for j in range(n):
            if i != j:
                a = (distx[i, :] <= distx[i, j]) * 1
                b = (disty[i, :] <= disty[i, j]) * 1

                t11 = np.sum(a.dot(b)) - 2
                t12 = np.sum(a.dot(1 - b))
                t21 = np.sum((1 - a).dot(b))
                t22 = np.sum((1 - a).dot(1 - b))

                denom = (t11+t12) * (t21+t22) * (t11+t21) * (t12+t22)
                if denom > 0:
                    S[i, j] = ((n-2) * (t12*t21 - t11*t22) ** 2) / denom

    stat = np.sum(S)

    return stat

distx = np.arange(25).reshape(5, 5)
disty = distx
_fast_hhg(distx, disty)

36.0

In [3]:
import warnings

from scipy.spatial.distance import cdist


# from scipy
def contains_nan(a):
    try:
        # Calling np.sum to avoid creating a huge array into memory
        # e.g. np.isnan(a).any()
        with np.errstate(invalid='ignore'):
            contains_nan = np.isnan(np.sum(a))
    except TypeError:
        # This can happen when attempting to sum things which are not
        # numbers (e.g. as in the function `mode`). Try an alternative method:
        try:
            contains_nan = np.nan in set(a.ravel())
        except TypeError:
            # Don't know what to do. Fall back to omitting nan values and
            # issue a warning.
            contains_nan = False
            msg = ("The input array could not be properly checked for nan "
                   "values. nan values will be ignored.")
            warnings.warn(msg, RuntimeWarning)

    if contains_nan:
        raise ValueError("Input contains NaNs. Please omit and try again")

    return contains_nan


def check_ndarray_xy(x, y):
    if not isinstance(x, np.ndarray) or not isinstance(y, np.ndarray):
        raise ValueError("x and y must be ndarrays")


def convert_xy_float64(x, y):
        # convert x and y to floats
        x = np.asarray(x).astype(np.float64)
        y = np.asarray(y).astype(np.float64)

        return x, y


def check_reps(reps):
    if not isinstance(reps, int) or reps < 0:
        raise ValueError(
            "Number of reps must be an integer greater than 0.")
    elif reps < 1000:
        msg = ("The number of replications is low (under 1000), and p-value "
                "calculations may be unreliable. Use the p-value result, with "
                "caution!")
        warnings.warn(msg, RuntimeWarning)


def check_compute_distance(compute_distance):
    # check if compute_distance if a callable()
    if (not callable(compute_distance) and compute_distance is not None):
        raise ValueError("Compute_distance must be a function.")


def euclidean(x):
    return cdist(x, x, metric='euclidean')

from abc import ABC, abstractmethod

import numpy as np
from scipy.spatial.distance import cdist
from scipy._lib._util import MapWrapper


class HypothTest(ABC):
    """
    Base class for all tests in mgc.

    Parameters
    ----------
    compute_distance : callable, optional
        Function indicating distance metric (or alternatively the kernel) to
        use. Calculates the pairwise distance for each input, by default
        euclidean.

    Attributes
    ----------
    stat : float
        The computed independence test statistic.
    pvalue : float
        The computed independence test p-value.
    compute_distance : callable, optional
        Function indicating distance metric (or alternatively the kernel) to
        use. Calculates the pairwise distance for each input, by default
        euclidean.
    """

    def __init__(self, compute_distance=None):
        # set statistic and p-value
        self.stat = None
        self.pvalue = None

        # set compute_distance kernel
        if not compute_distance:
            compute_distance = euclidean
        self.compute_distance = compute_distance

        super().__init__()

    @abstractmethod
    def statistic(self, x, y):
        """
        Calulates the independence test statistic.

        Parameters
        ----------
        x, y : ndarray
            Input data matrices that have shapes depending on the particular
            independence tests (check desired test class for specifics).
        """

    def _perm_stat(self, index):
        """
        Helper function that is used to calculate parallel permuted test
        statistics.

        Returns
        -------
        perm_stat : float
            Test statistic for each value in the null distribution.
        """
        permx = np.random.permutation(self.x)
        permy = np.random.permutation(self.y)

        # calculate permuted statics, store in null distribution
        perm_stat = self.statistic(permx, permy)

        return perm_stat

    @abstractmethod
    def test(self, x, y, reps=1000, workers=-1):
        """
        Calulates the independece test p-value.

        Parameters
        ----------
        x, y : ndarray
            Input data matrices that have shapes depending on the particular
            independence tests (check desired test class for specifics).
        reps : int, optional
            The number of replications used in permutation, by default 1000.
        workers : int, optional
            Evaluates method using `multiprocessing.Pool <multiprocessing>`).
            Supply `-1` to use all cores available to the Process.

        Returns
        -------
        pvalue : float
            The pvalue obtained via permutation.
        null_dist : list
            The null distribution of the permuted test statistics.
        """
        self.x = x
        self.y = y

        # calculate observed test statistic
        obs_stat = self.statistic(x, y)

        # use all cores to create function that parallelizes over number of reps
        mapwrapper = MapWrapper(workers)
        null_dist = np.array(list(mapwrapper(self._perm_stat, range(reps))))
        self.null_dist = null_dist

        # calculate p-value and significant permutation map through list
        pvalue = (null_dist >= obs_stat).sum() / reps

        # correct for a p-value of 0. This is because, with bootstrapping
        # permutations, a p-value of 0 is incorrect
        if pvalue == 0:
            pvalue = 1 / reps
        self.pvalue = pvalue

        return obs_stat, pvalue


class _CheckInputs:
    def __init__(self, x, y, dim, reps=None, compute_distance=None):
        self.x = x
        self.y = y
        self.dim = dim
        self.compute_distance = compute_distance
        self.reps = reps

    def __call__(self, test_name):
        check_ndarray_xy(self.x, self.y)
        contains_nan(self.x)
        contains_nan(self.y)
        self.x, self.y = self.check_dim_xy(test_name)
        self.x, self.y = convert_xy_float64(self.x, self.y)
        check_compute_distance(self.compute_distance)

        if self.reps:
            check_reps(self.reps)

        return self.x, self.y

    def check_dim_xy(self, test_name):
        # check if x and y are ndarrays
        if self.dim == 1:
            # check if x or y is shape (n,)
            if self.x.ndim > 1 or self.y.ndim > 1:
                msg = ("x and y must be of shape (n,). Will reshape")
                warnings.warn(msg, RuntimeWarning)
                self.x.shape = (-1)
                self.y.shape = (-1)
        if self.dim > 1:
            # convert arrays of type (n,) to (n, 1)
            if self.x.ndim == 1:
                self.x.shape = (-1, 1)
            if self.y.ndim == 1:
                self.y.shape = (-1, 1)

            self._check_nd_indeptest(test_name)

        return self.x, self.y

    def _check_nd_indeptest(self, test_name):
        nx, px = self.x.shape
        ny, py = self.y.shape

        test_nsame = ['MGC', 'Dcorr', 'HHG']
        if test_name in test_nsame:
            if nx != ny:
                raise ValueError("Shape mismatch, x and y must have shape "
                                 "[n, p] and [n, q].")
        else:
            if nx != ny or px != py:
                raise ValueError("Shape mismatch, x and y must have the same "
                                 "shape [n, p].")
from numba import njit


@njit
def _center_distmat(distx):
    n = distx.shape[0]

    exp_distx = ((distx.sum(axis=0) / (n-2)).reshape(n, -1)
                + (distx.sum(axis=1) / (n-2)).reshape(n, -1)
                - distx.sum() / ((n-1) * (n-2)))
    cent_distx = distx - exp_distx

    return cent_distx


@njit
def _global_cov(distx, disty):
    return np.sum(distx @ disty)


@njit
def _dcorr(distx, disty, is_paired=False):
    cent_distx = _center_distmat(distx)
    cent_disty = _center_distmat(disty)

    covar = _global_cov(cent_distx, cent_disty.T)
    varx = _global_cov(cent_distx, cent_distx.T)
    vary = _global_cov(cent_disty, cent_disty.T)

    if varx <= 0 or vary <= 0:
        stat = 0
    else:
        if is_paired:
            n = cent_distx.shape[0]
            stat = (varx * (n-1)/n + vary * (n-1)/n
                    - 2/n * np.trace(cent_distx @ cent_disty.T))
        else:
            stat = covar / np.real(np.sqrt(varx * vary))

    return stat


class Dcorr(HypothTest):
    """
    Compute the Dcorr test statistic and p-value.

    Attributes
    ----------
    stat : float
        The computed independence test statistic.
    pvalue : float
        The computed independence test p-value.
    """

    def __init__(self, compute_distance=None, is_paired=False):
        HypothTest.__init__(self, compute_distance=compute_distance)
        self.is_paired = is_paired

    def statistic(self, x, y):
        """
        Calulates the Dcorr test statistic.

        Parameters
        ----------
        x, y : ndarray
            Input data matrices that have shapes depending on the particular
            independence tests (check desired test class for specifics).

        Returns
        -------
        stat : float
            The computed independence test statistic.
        """
        check_input = _CheckInputs(x, y, dim=np.max([x.shape[0], y.shape[0]]),
                                   compute_distance=self.compute_distance)
        x, y = check_input(Dcorr.__name__)

        distx = self.compute_distance(x)
        disty = self.compute_distance(y)

        stat = _dcorr(distx, disty, self.is_paired)
        self.stat = stat

        return stat

    def test(self, x, y, reps=1000, workers=-1):
        """
        Calulates the HHG test p-value.

        Parameters
        ----------
        x, y : ndarray
            Input data matrices that have shapes depending on the particular
            independence tests (check desired test class for specifics).
        reps : int, optional
            The number of replications used in permutation, by default 1000.

        Returns
        -------
        pvalue : float
            The computed independence test p-value.
        """
        check_input = _CheckInputs(x,
                                   y,
                                   dim=np.max([x.shape[0], y.shape[0]]),
                                   compute_distance=self.compute_distance)
        x, y = check_input(Dcorr.__name__)

        return super(Dcorr, self).test(x, y, reps, workers)


In [9]:
x = np.arange(1000)
y = x
stat, pvalue = Dcorr().test(x, y)
print(pvalue)

Process ForkPoolWorker-28:
Traceback (most recent call last):
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-3-7b9d615668a9>", line 133, in _perm_stat
    perm_stat = self.statistic(permx, permy)
KeyboardInterrupt
Process ForkPoolWorker-30:
Traceback (most recent call last):
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/sampan501/miniconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._targe

KeyboardInterrupt: 

In [1]:
import mgc

In [1]:
from mgc.independence import Dcorr
import numpy as np

In [3]:
x = np.arange(100)
y = x
dcorr = Dcorr()
stat, pvalue = dcorr.test(x, y)
stat, pvalue = Dcorr(x, y)

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

In [6]:
print(stat, pvalue)

1.0 0.001
