# Inference of causal graphs from data

Author: Marcell Stippinger

Date: 2025-11-07

## Contents

* Generate logistic map data
* Implement time delay embedding
* Do convergent cross-mapping

## Imports and plotting functions

In [None]:
import numpy as np
import pandas as pd
from scipy import stats, signal
from sklearn.utils import check_random_state
from scipy.spatial import cKDTree, distance
from typing import NamedTuple, Optional, Tuple, Any
# Granger causality test
from statsmodels.tsa.stattools import grangercausalitytests
# Visualization
import matplotlib.pyplot as plt

In [None]:
def plot_ts(X, E=None, fs=1.0):
    """Plot time series data.

    Parameters
    ----------
    X : array-like, shape (n_samples, n_features)
        Time series data to plot.
    E : array-like, shape (n_samples, n_features), optional
        Noise components to overlay on the time series.
    fs : float, optional
        Sampling frequency of the time series.
    """
    n_samples, n_features = X.shape
    t = np.arange(n_samples) / fs
    fig, axes = plt.subplots(n_features, 1, figsize=(6, 4), sharex=True)
    if n_features == 1:
        axes = [axes]
    for i in range(n_features):
        axes[i].axhline(0, color='gray', linestyle='--', linewidth=0.5)
        axes[i].plot(t, X[:, i])
        if E is not None:
            axes[i].plot(t, E[:, i], linestyle='None', marker='o', markersize=3, alpha=0.7, label='Noise')
        axes[i].set_title(f'Time Series {i+1}')
        axes[i].set_ylabel('Value')
    axes[-1].set_xlabel('Time')
    plt.tight_layout()
    plt.show()

## Autocorrelograms

In [None]:
def plot_autocorrelograms(x: np.ndarray, lags: int = 40):
    """Plot ACF and PACF of a time series.

    Parameters
    ----------
    x : array-like, shape (n_samples,)
        Time series data.
    lags : int
        Number of lags to include in the plots.
    """
    acf_vals = acf(x, nlags=lags)
    pacf_vals = pacf(x, nlags=lags)

    fig, axes = plt.subplots(2, 1, figsize=(6, 4))

    axes[0].stem(range(lags + 1), acf_vals)
    axes[0].set_title('Autocorrelation Function (ACF)')
    axes[0].set_xlabel('Lags')
    axes[0].set_ylabel('ACF')

    axes[1].stem(range(lags + 1), pacf_vals)
    axes[1].set_title('Partial Autocorrelation Function (PACF)')
    axes[1].set_xlabel('Lags')
    axes[1].set_ylabel('PACF')

    plt.tight_layout()
    plt.show()

## Granger causality test

We can follow, for example

Ding, M., Chen, Y., & Bressler, S. L. (2006). Granger Causality: Basic Theory and Application to Neuroscience. February. https://doi.org/10.1002/9783527609970.ch17

In [None]:
# Y -> X
#gr_yx = grangercausalitytests(coupled_ts, maxlag=4)

In [None]:
# X -> Y
#coupled_ts_rev = np.stack((coupled_ts[:, 1], coupled_ts[:, 0]), axis=1)
#gr_xy = grangercausalitytests(coupled_ts[:, ::-1], maxlag=4)
#gr_xy = grangercausalitytests(coupled_ts_rev, maxlag=4)

Explain the results

- which tests are significant
- for what lag

# Coupled logistic maps

A single logistic map is generated by
$$ x(t+1) = r x(t) (1- x(t)) $$

The interaction between logmaps may be additive
$$ x(t+1) = r x(t) (1- x(t)) - \beta y(t) $$
or multiplicative
$$ x(t+1) = r x(t) (1- x(t) - \beta y(t)). $$

We have to make sure the new value is in $[0, 1]$ therefore we take it modulo $1$.

In [None]:
def _logmap_mul_rhs_modulo(state, r, beta, noise):
    """
    Mapping function of logistic map with circular boundary conditions and
    multiplicative coupling

    :param state: [X1, X2, ..., Xn], shape (dim, )
    :param r: float or array of floats, shape (n, )
    :param beta: c_{j-->i} == beta_{ij}, array of floats, shape (n, n)
    :param noise: callable, dynamical noise generator
    :return: f
    """
    return np.remainder(noise + r * state * (1.0 - state - beta @ state), 1.0)


def _logmap_add_rhs_modulo(state, r, beta, noise):
    """
    Mapping function of logistic map with circular boundary conditions and
    additive coupling

    :param state: [X1, X2, ..., Xn], shape (dim, )
    :param r: float or array of floats, shape (n, )
    :param beta: c_{j-->i} == beta_{ij}, array of floats, shape (n, n)
    :param noise: callable, dynamical noise generator
    :return: f
    """
    return np.remainder(noise + r * state * (1.0 - state) - beta @ state, 1.0)

def generate_coupled_logmaps(
        n_samples: int,
        r: Any,
        beta: np.ndarray,
        noise_std: float = 0.0,
        coupling_type: str = 'additive',
        random_state: Optional[Any] = None
    ) -> np.ndarray:
    """
    Generate time series from coupled logistic maps with circular boundary conditions.

    :param n_samples: Number of time steps to simulate.
    :param r: Growth rate parameter(s) for the logistic maps.
    :param beta: Coupling matrix, diagonal empty, shape (n, n).
    :param noise_std: Standard deviation of the Gaussian noise.
    :param coupling_type: Type of coupling ('additive' or 'multiplicative').
    :param random_state: Random state for reproducibility.
    :return: Time series data, shape (n_samples, n).
    """
    rng = check_random_state(random_state)
    n = beta.shape[0]
    state = rng.rand(n)
    ts = np.zeros((n_samples, n))

    if coupling_type == 'additive':
        rhs = _logmap_add_rhs_modulo
    elif coupling_type == 'multiplicative':
        rhs = _logmap_mul_rhs_modulo
    else:
        raise ValueError("coupling_type must be 'additive' or 'multiplicative'")

    for t in range(n_samples):
        noise = rng.normal(0, noise_std, size=n)
        ts[t] = state
        state = rhs(state, r, beta, noise)

    return ts

In [None]:
# x0 -> x1 avagy x -> y
# x0 -> x2 avagy x -> z

coupling = np.array([[0.0, 0.2, 0.15],
                     [0.0, 0.0, 0.00],
                     [0.0, 0.0, 0.00]])

ts = generate_coupled_logmaps(
    n_samples=1000,
    r=3.8,
    beta=coupling.T,
    coupling_type='additive',
    random_state=20251107,
    noise_std=0.01
)

plot_ts(ts)


# Time delay embedding is based on Takens' theorem.

We may use numpy's sliding_window_view to create the time delay embedding. (This is good for us, but not efficient for convolution.)

In [None]:
help(np.lib.stride_tricks.sliding_window_view)

In [None]:
def time_delay_embedding(X: np.ndarray, m: int, tau: int) -> np.ndarray:
    """
    Perform time-delay embedding of a time series.

    :param X: Time series data, shape (n_samples, n_features).
    :param m: Embedding dimension.
    :param tau: Time delay.
    :return: Embedded time series, shape (n_samples - (m - 1) * tau, n_features * m).
    """
    n_samples, n_features = X.shape
    n_embedded = n_samples - (m - 1) * tau
    embedded = np.zeros((n_embedded, n_features * m))

    for i in range(n_embedded):
        for j in range(m):
            embedded[i, j*n_features:(j+1)*n_features] = X[i + j * tau]

    return embedded

In [None]:
def time_delay_embedding_np(X: np.ndarray, m: int, tau: int) -> np.ndarray:
    """
    Perform time-delay embedding of a time series.

    :param X: Time series data, shape (n_samples, n_features).
    :param m: Embedding dimension.
    :param tau: Time delay.
    :return: Embedded time series, shape (n_samples - (m - 1) * tau, n_features * m).
    """
    n_samples, n_features = X.shape
    n_embedded = n_samples - (m - 1) * tau
    embedded = np.zeros((n_embedded, n_features * m))

    embedded = np.lib.stride_tricks.sliding_window_view(X, window_shape=(m, n_features))[:, 0, ::tau, :].reshape(n_embedded, n_features * m)

    return embedded

In [None]:
from pyparsing import line


def show_embedding_3d(X_embedded: np.ndarray, title: str = ''):
    """
    Visualize 3D time-delay embedding.

    :param X_embedded: Embedded time series, shape (n_samples, 3).
    :param title: Title of the plot.
    """
    from mpl_toolkits.mplot3d import Axes3D

    fig = plt.figure(figsize=(8, 6))
    ax = fig.add_subplot(111, projection='3d')
    ax.plot(X_embedded[:, 0], X_embedded[:, 1], X_embedded[:, 2], marker='o', linestyle='None', markersize=2, alpha=0.7)
    ax.set_title(title)
    ax.set_xlabel('X(t)')
    ax.set_ylabel('X(t + τ)')
    ax.set_zlabel('X(t + 2τ)')
    plt.show()

show_embedding_3d(time_delay_embedding(ts[:, 0:1], m=3, tau=1), title='Time-Delay Embedding of X')
show_embedding_3d(time_delay_embedding(ts[:, 1:2], m=3, tau=1), title='Time-Delay Embedding of Y')

# Convergent cross-mapping

Map neighborhood of manifold $Y$ (consequence) to indices, look for corresponding values in $x$.
Compare the average of these values with the actual $x$.

* naive neighbor lookup uses all pairwise distances, $O(n^2)$ complexty
* in lower dimensions KDTree algorithm is much more efficient

In [None]:
help(cKDTree)

In [None]:
def cross_map(source: np.array, target: np.array, k: int):
    """

    Args:
        source aka. consequece (np.array): n_samples x d_embed
        target aka. cause (np.array): n_samples
        k (int): number of neighbors to consider

    """
    tree = cKDTree(source)
    # we shall exclude self-matches of points
    distances, indices = tree.query(source, k+1)
    # search corresponding x values: (n_samples, k)
    estimates = target[indices[:, 1:]]
    # weights, summing to 1 in every row
    weights = np.exp(-distances[:, 1:] / distances[:, 1:2])
    weights /= weights.sum(axis=1, keepdims=True)
    # estimates
    weighted_estimates = (weights * estimates).sum(axis=1)
    return weighted_estimates

def cross_map_correlation(source: np.array, target: np.array, k: int):
    correlations = []
    lengths = np.logspace(1, np.log10(len(source)), num=10, dtype=int).astype(int)
    for n in lengths:
        estimates = cross_map(source[:n], target[:n], k)
        corr = np.corrcoef(estimates[:n].flat, target[:n].flat)[0, 1]
        correlations.append(corr)
    return lengths, correlations

def cross_map_effect_lag(source: np.array, target: np.array, k: int, max_lag = 10):
    correlations = []
    # observed time index of target (cause) relative to source (consequence)
    lags = np.arange(-max_lag, max_lag+1) # negative lag takes earlier target (cause) values
    T = len(source)
    for n in lags:
        estimates = cross_map(source[max_lag:T-max_lag], target[max_lag+n:T-max_lag+n], k)
        corr = np.corrcoef(estimates.flat, target[max_lag+n:T-max_lag+n].flat)[0, 1]
        correlations.append(corr)
    return lags, correlations


def cross_map_correlation_plot(source: np.array, target: np.array, k: int, title: str = ''):
    lengths, correlations = cross_map_correlation(source, target, k)
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(lengths, correlations, marker='o')
    ax.set_xscale('log')
    ax.set_ylim(-0.1, 1.1)
    ax.set_xlabel('Library Size')
    ax.set_ylabel('Cross Map Correlation')
    ax.set_title(title)
    ax.grid()
    plt.show()


def cross_map_effect_lag_plot(source: np.array, target: np.array, k: int, title: str = ''):
    lags, correlations = cross_map_effect_lag(source, target, k)
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(lags, correlations, marker='o')
    # integer ticks
    ax.get_xaxis().set_major_locator(plt.MaxNLocator(integer=True))
    ax.set_ylim(-0.1, 1.1)
    ax.set_xlabel(r'target=cause ahead $\leftarrow$ lag $\rightarrow$ source=consequence ahead')
    ax.set_ylabel('Cross Map Correlation')
    ax.set_title(title)
    ax.grid()
    plt.show()

TODO:

* discuss why target is indexed [:, 0], multiple reasons!

In [None]:
d_embed = 3

x = time_delay_embedding(ts[:, 0:1], m=d_embed, tau=1)
y = time_delay_embedding(ts[:, 1:2], m=d_embed, tau=1)

print(cross_map_effect_lag(x, y[:, 0], k=6))
print(cross_map_effect_lag(y, x[:, 0], k=6))

cross_map_effect_lag_plot(x[:], y[:, 0], k=6, title = 'target=Y cross-mapped from source=X (does Y->X?)')
cross_map_effect_lag_plot(y[:], x[:, 0], k=6, title = 'target=X cross-mapped from source=Y (does X->Y?)')

Interpretation:
* first plot:
  * X has an effect on Y that builds up during the time window of length d_embed and helps to somewhat better predict Y
* second plot:
  * X can be well reconstructed from Y at t=0
  * reconstruction of past values decays due to mixing: observed x(t) may come from two different x(t-1) values respectng the map
  * future values can be followed and accuracy decays due to chaoticity  

TODO:
* try different embedding dimensions
* how does it influence the height and location of the peak 
* try lower and higher noise values
* how do they influence mixing and chaos

# Cross-mapping coherence

Map neighborhood of manifold $Y$ (consequence) to indices, look for corresponding values in $x$.
Compare the average of these values with the actual $x$ in spectrum.


In [None]:
def cross_map_coherence(source: np.array, target: np.array, k: int):
    n, d = source.shape
    estimates = cross_map(source, target, k)
    freqs, coherence = signal.coherence(estimates, target, axis=0)
    return freqs, coherence

def cross_map_coherence_effect_lag(source: np.array, target: np.array, k: int, max_lag = 10):
    coherences = []
    lags = np.arange(-max_lag, max_lag+1) # same lag convention as in cross_map_effect_lag
    T = len(source)
    for n in lags:
        freqs, coherence = cross_map_coherence(source[max_lag+0:T-max_lag+0], target[max_lag+n:T-max_lag+n], k)
        coherences.append(coherence)
    return lags, freqs, np.array(coherences)


def cross_map_coherence_plot(source: np.array, target: np.array, k: int, title: str = ''):
    freqs, coherence = cross_map_coherence(source, target, k)
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(freqs, coherence, marker='o')
    ax.set_xscale('log')
    ax.set_ylim(-0.1, 1.1)
    ax.set_xlabel('Frequency')
    ax.set_ylabel('Cross Map Coherence')
    ax.set_title(title)
    ax.grid()
    plt.show()


def cross_map_coherence_effect_lag_plot(source: np.array, target: np.array, k: int, title: str = ''):
    lags, freqs, coherences = cross_map_coherence_effect_lag(source, target, k)
    fig, ax = plt.subplots(figsize=(8, 4))
    im = ax.matshow(coherences.T, aspect='auto', extent=[lags[0]-0.5, lags[-1]+0.5, freqs[0], freqs[-1]], vmin=0, vmax=1)
    cb = plt.colorbar(im, ax=ax)
    cb.set_label('Cross Map Coherence')
    # integer ticks
    ax.get_xaxis().set_major_locator(plt.MaxNLocator(integer=True))
    ax.set_xlabel(r'target=cause ahead $\leftarrow$ lag $\rightarrow$ source=consequence ahead')
    ax.set_ylabel('Frequency')
    ax.set_title(title)
    ax.grid()
    plt.show()

In [None]:
d_embed = 3

x = time_delay_embedding(ts[:, 0:1], m=d_embed, tau=1)
y = time_delay_embedding(ts[:, 1:2], m=d_embed, tau=1)

cross_map_coherence_plot(x, y[:, 0], k=6, title = 'Y cross-mapped from X (does Y->X?)')
cross_map_coherence_plot(y, x[:, 0], k=6, title = 'X cross-mapped from Y (does X->Y?)')

cross_map_coherence_effect_lag_plot(x[:], y[:, 0], k=6, title = 'Y cross-mapped from X (does Y->X?)')
cross_map_coherence_effect_lag_plot(y[:], x[:, 0], k=6, title = 'X cross-mapped from Y (does X->Y?)')

# Cross-sorting vectors

* more computationally heavy as it requires all pairwise distance

In [None]:
def cross_distance_vector(source: np.array, target: np.array):
    n, d = source.shape
    sdist = distance.cdist(source, source)
    ordering = np.argsort(sdist, axis=0)
    tdist = distance.cdist(target, target)
    reordered_targets = np.take_along_axis(tdist, ordering, axis=0)
    vector = reordered_targets.mean(axis=1)
    return vector


def cross_distance_vectors_plot(source: np.array, target: np.array, title: str = ''):
    vector = cross_distance_vector(source, target)
    x = np.arange(len(vector))
    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(x, vector, marker='o')
    ax.set_ylim(-0.1, 1.1)
    ax.set_xlabel('Neighbor index')
    ax.set_ylabel('Cross Distance Vector Value')
    ax.set_title(title)
    ax.grid()
    plt.show()



In [None]:
d_embed = 3

x = time_delay_embedding(ts[:, 0:1], m=d_embed, tau=1)
y = time_delay_embedding(ts[:, 1:2], m=d_embed, tau=1)

print(cross_distance_vector(x, y))
print(cross_distance_vector(y, x))

cross_distance_vectors_plot(x, y, title = 'Y cross-mapped from X (does Y->X?)')
cross_distance_vectors_plot(y, x, title = 'X cross-mapped from Y (does X->Y?)')

## Recurrence diagrams

Based on the paper:
* Hirata, Y., & Aihara, K. (2010). Identifying hidden common causes from
bivariate time series: A method using recurrence plots.
Phys Rev E - Statistical, Nonlinear, and Soft Matter Physics, 81(1), 1–7.
https://doi.org/10.1103/PhysRevE.81.016203



In [None]:
from typing import Tuple, Sequence

import numpy as np
from scipy.spatial.distance import pdist, squareform
from scipy.stats import norm
from sklearn.utils import check_consistent_length


def recurrence_diagrams(
        Dx: np.ndarray, Dy: np.ndarray, rate_x: float, rate_y: float
):
    m = Dx.shape[0]
    # number of recurrent elements given the rates
    count_x = round(m * rate_x)
    count_y = round(m * rate_y)
    if (count_x < 1) or (count_y < 1):
        raise ValueError("invalid threshold, try increasing it")
    # threshold
    threshold_x = np.sort(Dx)[count_x]
    threshold_y = np.sort(Dy)[count_y]
    # recurrence matrices and rejection of independence
    Rx = Dx < threshold_x
    Ry = Dy < threshold_y
    return Rx, Ry


def plot_recurrence_diagrams(
        X: np.ndarray, Y: np.ndarray, rate_x: float, rate_y: float, ord: float = 2, max_size: int = 100
):
    Dx = pdist(X, metric='minkowski', p=ord).astype(np.single)
    Dy = pdist(Y, metric='minkowski', p=ord).astype(np.single)
    Rx, Ry = recurrence_diagrams(Dx, Dy, rate_x, rate_y)
    Rx = squareform(Rx)
    Ry = squareform(Ry)
    fig, axes = plt.subplots(1, 1, figsize=(10, 5))
    axes.imshow(Rx[: max_size, :max_size], cmap='Reds', origin='lower', alpha=1.0)
    # axes.set_title('Recurrence Diagram X')
    axes.imshow(Ry[: max_size, :max_size], cmap='Blues', origin='lower', alpha=0.5)
    # axes.set_title('Recurrence Diagram Y')
    plt.show()


def independence_test(
        Dx: np.ndarray, Dy: np.ndarray, rate_x: float, rate_y: float
) -> Tuple[float, float]:
    """
    Perform test for H0={x and y are independent}, return the significance
    level of unindependence.
    The alternative is expressed as the two-sided option of recurrences being
    correlated or anticorrelated, although this latter may be impossible. TODO

    :param Dx: pairwise distance in X
    :param Dy: pairwise distance in Y, same length and same time indices as X
    :param rate_x: preferred recurrence rate of X for choosing threshold
    :param rate_y: preferred recurrence rate of Y
    :return:
      * stat: float, almost normally distributed value
      * p_stat: float, p-value
    """
    m = Dx.shape[0]
    Rx, Ry = recurrence_diagrams(Dx, Dy, rate_x, rate_y)
    # number of recurrent elements by chance
    mean_j = m * rate_x * rate_y
    std_j = np.sqrt(m * (rate_x * rate_y) * (1 - rate_x * rate_y))
    _dist = norm(mean_j, std_j)
    # rejection of independence
    count_j = np.count_nonzero(Rx * Ry)
    stat = (count_j - mean_j) / std_j
    candidate_p = _dist.cdf(count_j), _dist.sf(count_j)

    return stat, 2 * np.minimum(*candidate_p)


def directional_coupling_test(
        Dx: np.ndarray, Dy: np.ndarray, rate_x: float, rate_y: float
) -> Tuple[bool, int, float]:
    """
    Perform test for H0={x drives y}, return the significance
    level of no drive from x to y.
    The alternative is expressed as the one-sided option of having more
    coinciding recurrences in y than expected by chance.

    :param Dx: pairwise distance in X
    :param Dy: pairwise distance in Y, same length and same time indices as X
    :param rate_x: preferred recurrence rate of X for choosing threshold
    :param rate_y: preferred recurrence rate of Y
    :return:
      * cover: bool, whether covering is complete in one direction
        (equivalent to the method in the original paper, deprecated)
      * achieved_p: float, p-value for maximal achievable covering
    """
    m = Dx.shape[0]
    # number of recurrent elements given the rates
    Rx, Ry = recurrence_diagrams(Dx, Dy, rate_x, rate_y)
    count_y = np.count_nonzero(Ry)
    count_j = np.count_nonzero(Rx * Ry)
    has_covering = count_j == count_y
    # see how far the covering can be pushed with alternative rate_y
    mean_c = m * rate_x * rate_y
    std_c = np.maximum(
        np.sqrt(m * (rate_x * rate_y) * (1 - rate_x * rate_y)),
        np.finfo(float).eps)  # make valid dist for rate=0, results in p=0.5
    achieved_p = norm.sf(count_j, mean_c, std_c)
    return has_covering, achieved_p


def _min_sign_rec_rate(m: int, rate: float, alpha: float = 0.01) -> float:
    """
    Minimum significant recurrence rate,
    see Eq. 7 in doi:10.1103/PhysRevE.81.016203
    """
    zr = norm.ppf(alpha)  # zr>0 in Eq. 7., but we use zr**2 only
    return zr ** 2 * m * rate / (
            m ** 2 * (1 - rate) ** 2 + zr ** 2 * m * rate ** 2)


def recurrence_analysis(
        X: np.ndarray, Y: np.ndarray, ord: float = 2,
        rate: float = 0.05, rate_uni: float = 0.5, alpha: float = 0.01
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Perform all tests used in the decision tree,
    see Fig. 2. in doi:10.1103/PhysRevE.81.016203

    :param X: embedded time series X
    :param Y: embedded time series Y, same length and same time indices as X
    :param ord: order of the norm to be used
    :param rate: recurrence rate for independence testing
    :param rate_uni: recurrence rate for direct cause testing
    :param alpha: preferred significance level for direct cause testing
    :return:
      two arrays, as follows:
      * five-case binary output
      * three p-values: rejecting independence, rejecting x->y, rejecting y->x
    """
    check_consistent_length(X, Y)
    n = len(X)
    m = n * (n - 1) // 2
    # Note: recurrence matrices require a lot of memory, use single precision.
    # Note: pdist calculates half the matrix w/o diagonal and returns a vector.
    Dx = pdist(X, metric='minkowski', p=ord).astype(np.single)
    Dy = pdist(Y, metric='minkowski', p=ord).astype(np.single)
    # If x_|_y (indep) then Rj = Rx * Ry conforms rate_j = rate_x * rate_y.
    # H0={independence}, by rejecting H0 we have indication of some causal
    # relation, we have to find out which one.
    stat, p_independence = independence_test(Dx, Dy, rate, rate)
    # If x->y (xtoy) then Rx >= Ry set elements.
    # H0={Rx >= Ry possible with p=alpha}, by rejecting H0 we have denied
    # direct cause. Note: _min_sign_rec_rate values are very small.
    b_xtoy, p_xtoy = directional_coupling_test(
        Dx, Dy, rate_uni, _min_sign_rec_rate(m, rate_uni, alpha=alpha))
    # Similarly to above.
    b_ytox, p_ytox = directional_coupling_test(
        Dy, Dx, rate_uni, _min_sign_rec_rate(m, rate_uni, alpha=alpha))
    result = np.array([
        (p_independence <= alpha) and b_xtoy and not b_ytox,
        (p_independence <= alpha) and b_xtoy and b_ytox,
        (p_independence <= alpha) and b_ytox and not b_xtoy,
        (p_independence <= alpha) and (not b_xtoy) and (not b_ytox),
        (p_independence > alpha)
    ])
    p_values = np.array([p_independence, p_xtoy, p_ytox])
    return result, p_values

In [None]:
bidir_coupling = np.array([[0.0, 0.2],
                           [0.2, 0.0]])

bidir_ts = generate_coupled_logmaps(
    n_samples=1000,
    r=3.8,
    beta=bidir_coupling.T,
    coupling_type='additive',
    random_state=20251107,
    noise_std=0.01
)


d_embed = 3

x = time_delay_embedding(ts[:, 0:1], m=d_embed, tau=1)
y = time_delay_embedding(ts[:, 1:2], m=d_embed, tau=1)
z = time_delay_embedding(ts[:, 2:3], m=d_embed, tau=1)
f = time_delay_embedding(bidir_ts[:, 0:1], m=d_embed, tau=1)
g = time_delay_embedding(bidir_ts[:, 1:2], m=d_embed, tau=1)


plot_recurrence_diagrams(x, y, rate_x=0.05, rate_y=0.05)


print('         x->y  x<->y   y->x  x_cc_y indep')
print(recurrence_analysis(x, y))
print(recurrence_analysis(y, x))
print(recurrence_analysis(y, z))
print(recurrence_analysis(x, y[np.random.permutation(len(y))]))
print(recurrence_analysis(f, g))
