# Simulated Dyadic IBI Data

We generate three synthetic dyad recordings to demonstrate coupling, leader–follower dynamics,
and non-coupling in the windowed cross-correlation (WXC) tool.
All simulated signals are derived from a real IBI recording so they retain physiological realism.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.interpolate import CubicSpline
from scipy.signal import welch

plt.rcParams.update({
    'figure.dpi': 120,
    'axes.spines.top': False,
    'axes.spines.right': False,
    'font.size': 11,
})

## Source data

We load person A's IBI series from a real recording. This is our **source signal** —
all simulated person-B series will be derived from it, ensuring the base physiology looks real.

In [None]:
def load_ibi(path, clip_ms=(300, 990)):
    """Load IBI series from a Mindware xlsx export (sheet 'IBI Series').
    Row 0 is a segment marker and is skipped. Values outside clip_ms are removed.
    Upper bound kept below 1000 ms to stay within the WXC tool's valid range."""
    df = pd.read_excel(path, sheet_name='IBI Series', header=None)
    ibi = df.iloc[1:, 0].astype(float).values
    lo, hi = clip_ms
    mask = (ibi >= lo) & (ibi <= hi)
    n_removed = (~mask).sum()
    if n_removed:
        print(f'  {path}: removed {n_removed} outlier(s) outside [{lo}, {hi}] ms')
    return ibi[mask]

ibi_a = load_ibi('dyad_recording/HRV_a.xlsx')
ibi_b_real = load_ibi('dyad_recording/HRV_b.xlsx')   # kept for reference / condition 3

MEAN_B = ibi_b_real.mean()   # target mean HR for simulated person B (~644 ms, ~93 bpm)

for name, ibi in [('A (source)', ibi_a), ('B (real, reference)', ibi_b_real)]:
    dur = ibi.sum() / 1000
    print(f'Person {name}: n={len(ibi):4d}  mean={ibi.mean():.1f} ms '
          f'({60000/ibi.mean():.1f} bpm)  SD={ibi.std():.1f} ms  '
          f'duration={dur:.0f} s ({dur/60:.1f} min)')

---
## Condition 1 — Drifting Coupling

In this condition the two people start uncoupled, gradually synchronise in the middle of
the recording, and then drift apart again. In the WXC heatmap this should appear as a
bright band centred on lag 0 that emerges and fades over time.

**Pipeline:**
1. Resample person A’s IBI series to a uniform 5 Hz grid (matching the WXC tool).
2. Generate an independent noise signal with the exact same power spectrum via phase randomisation.
3. Mix A’s signal and the noise according to a smooth coupling envelope α(t).
4. Add a different mean HR so person B looks like a distinct individual.
5. Convert the continuous signal back to an event-based IBI series.

In [None]:
# ── Simulation parameters ──────────────────────────────────────────────
FS         = 5      # Hz  — must match the WXC tool’s resampling rate
ALPHA_MAX  = 0.90   # peak coupling strength  (expected WXC r ≈ 0.90 at peak)
SEED       = 42     # fixed seed for reproducibility

### Step 1 — Resample to a uniform grid

IBI series are event-based: each value is the time between two beats, so the samples
are irregularly spaced in time. We use cubic spline interpolation to get a continuous
signal at a fixed 5 Hz rate — the same step the WXC tool performs internally.

In [None]:
def ibi_to_uniform(ibi_ms, fs=5):
    """Resample event-based IBI series to a uniform time grid via cubic spline.
    Returns (time_seconds, ibi_values_ms)."""
    ibi = np.asarray(ibi_ms, dtype=float)
    t_beat_ms = np.cumsum(np.insert(ibi, 0, 0)[:-1])   # beat onset times (ms)
    cs = CubicSpline(t_beat_ms, ibi)
    t_ms = np.arange(t_beat_ms[0], t_beat_ms[-1], 1000 / fs)
    return t_ms / 1000, cs(t_ms)   # seconds, ms

t_a, x_a = ibi_to_uniform(ibi_a, fs=FS)
N = len(x_a)

print(f'Resampled:  {N} samples  |  {t_a[-1]:.1f} s  |  '
      f'mean = {x_a.mean():.1f} ms  |  SD = {x_a.std():.1f} ms')

In [None]:
beat_t_a = np.cumsum(np.insert(ibi_a, 0, 0)[:-1]) / 1000   # beat-onset times in seconds

fig, axes = plt.subplots(2, 1, figsize=(13, 5), sharex=False)

axes[0].plot(beat_t_a, ibi_a, lw=0.7, color='steelblue')
axes[0].set_ylabel('IBI (ms)')
axes[0].set_title('Raw IBI series — person A (event-based, irregular timing)')

axes[1].plot(t_a, x_a, lw=0.7, color='darkorange')
axes[1].set_ylabel('IBI (ms)')
axes[1].set_xlabel('Time (s)')
axes[1].set_title(f'Resampled IBI — person A ({FS} Hz uniform grid)')

for ax in axes:
    ax.set_xlim(0, max(beat_t_a[-1], t_a[-1]))

plt.tight_layout()
plt.show()

### Step 2 — Phase-randomised surrogate (independent noise)

We need a noise signal that looks like a real HRV series — with the same 1/f spectral
shape and RSA peak — but is statistically **independent** of person A.

Phase randomisation achieves this: take the FFT of A’s signal, keep all the amplitudes
(which encode the spectrum) but replace every phase with a random angle. The result has
an identical power spectrum but zero cross-correlation with A.
This method is well-established in HRV research (Schreiber & Schmitz, 1996).

In [None]:
def phase_randomize(signal, seed=None):
    """Generate a phase-randomised surrogate of a zero-mean signal.
    Preserves amplitude spectrum; destroys all temporal structure."""
    rng = np.random.default_rng(seed)
    N = len(signal)
    fft_vals = np.fft.rfft(signal)
    amplitudes = np.abs(fft_vals)
    phases = rng.uniform(0, 2 * np.pi, len(fft_vals))
    phases[0] = 0                       # keep DC phase (zero mean)
    if N % 2 == 0:
        phases[-1] = 0                  # Nyquist must be real
    return np.fft.irfft(amplitudes * np.exp(1j * phases), n=N)

mean_a    = x_a.mean()
x_a_fluct = x_a - mean_a               # zero-mean fluctuation

epsilon = phase_randomize(x_a_fluct, seed=SEED)

r_check = np.corrcoef(x_a_fluct, epsilon)[0, 1]
print(f'Cross-correlation A vs surrogate: r = {r_check:.4f}  (should be ≈ 0)')

In [None]:
f_a,   p_a   = welch(x_a_fluct, fs=FS, nperseg=256)
f_eps, p_eps = welch(epsilon,    fs=FS, nperseg=256)

fig, ax = plt.subplots(figsize=(11, 4))
ax.semilogy(f_a,   p_a,   lw=2,   color='steelblue', label='Person A fluctuation')
ax.semilogy(f_eps, p_eps, lw=1.5, color='tomato', linestyle='--', label='Surrogate ε')
ax.axvspan(0.04, 0.15, alpha=0.10, color='green',  label='LF (0.04–0.15 Hz)')
ax.axvspan(0.15, 0.40, alpha=0.10, color='purple', label='HF / RSA (0.15–0.40 Hz)')
ax.set_xlabel('Frequency (Hz)')
ax.set_ylabel('Power (ms² / Hz)')
ax.set_xlim(0, 0.5)
ax.set_title('Surrogate quality check — power spectra must match')
ax.legend()
plt.tight_layout()
plt.show()

#### Why phase randomisation produces independence

Two plots confirm the surrogate has the right properties:

- **Same shape, different trajectory** (time domain): the surrogate and person A have nearly
  identical amplitude and texture — they could both pass for real HRV — but their moment-to-moment
  values are unrelated.
- **Zero cross-correlation at all lags**: even when we slide one signal past the other, the
  correlation stays within the 95% confidence band for zero. This is the key property —
  the mixing model will only introduce correlation through the coupling weight α(t).

In [None]:
SHOW_S = 60
show_n = int(SHOW_S * FS)

fig, axes = plt.subplots(2, 1, figsize=(12, 5), sharex=True, sharey=True)

axes[0].plot(t_a[:show_n], x_a_fluct[:show_n], lw=1.0, color='steelblue')
axes[0].axhline(0, color='gray', lw=0.6, linestyle=':')
axes[0].set_ylabel('IBI fluctuation (ms)')
axes[0].set_title('Person A fluctuation — first 60 s')

axes[1].plot(t_a[:show_n], epsilon[:show_n], lw=1.0, color='tomato')
axes[1].axhline(0, color='gray', lw=0.6, linestyle=':')
axes[1].set_ylabel('IBI fluctuation (ms)')
axes[1].set_xlabel('Time (s)')
axes[1].set_title('Surrogate ε — same amplitude and spectral texture, different trajectory')

plt.tight_layout()
plt.show()

In [None]:
from scipy.signal import correlate

MAX_LAG_S = 60
max_lag_n = int(MAX_LAG_S * FS)

# z-score both signals so the cross-correlation is normalised to [-1, 1]
xa_z  = (x_a_fluct - x_a_fluct.mean()) / x_a_fluct.std()
eps_z = (epsilon    - epsilon.mean())   / epsilon.std()

cc        = correlate(xa_z, eps_z, mode='full') / N
lags_full = np.arange(-(N - 1), N) / FS   # seconds
mid       = N - 1                          # index of lag = 0

cc_trim   = cc[mid - max_lag_n : mid + max_lag_n + 1]
lags_trim = lags_full[mid - max_lag_n : mid + max_lag_n + 1]

ci = 1.96 / np.sqrt(N)   # 95% CI for zero correlation

fig, ax = plt.subplots(figsize=(12, 4))
ax.fill_between(lags_trim, -ci, ci, alpha=0.25, color='gray', label='95% CI  (±1.96/√N)')
ax.axhline(0, color='gray', lw=0.8, linestyle=':')
ax.axvline(0, color='gray', lw=0.8, linestyle=':')
ax.plot(lags_trim, cc_trim, lw=1.5, color='tomato')
ax.set_xlabel('Lag (s)')
ax.set_ylabel('Cross-correlation r')
ax.set_xlim(-MAX_LAG_S, MAX_LAG_S)
ax.set_title('Cross-correlation A vs surrogate ε — r ≈ 0 at every lag')
ax.legend()
plt.tight_layout()
plt.show()

print(f'Max |r| across all lags: {np.max(np.abs(cc_trim)):.4f}  (95% CI threshold: {ci:.4f})')

### Step 3 — Coupling envelope and mixing model

We control coupling strength with a time-varying weight α(t) ∈ [0, 1].
A Hanning window centred on the middle of the recording gives a smooth
onset and offset — no abrupt switches.

The mixed signal is:

> **B(t) = α(t) · A(t) + √(1 − α²(t)) · ε(t)**

The √(1 − α²) term keeps the variance of B constant regardless of α —
so the signal looks equally ‘active’ whether it is coupled or not.
When α = 0.9 at the peak, the expected Pearson r between A and B is 0.9.

In [None]:
t_on  = N // 4
t_off = 3 * N // 4
win_len = t_off - t_on

alpha = np.zeros(N)
alpha[t_on:t_off] = np.hanning(win_len)
alpha *= ALPHA_MAX

print(f'Coupling active:  {t_a[t_on]:.0f} s → {t_a[t_off-1]:.0f} s  '
      f'(peak at {t_a[(t_on + t_off)//2]:.0f} s)')

fig, ax = plt.subplots(figsize=(11, 3))
ax.fill_between(t_a, alpha, alpha=0.35, color='steelblue')
ax.plot(t_a, alpha, lw=1.5, color='steelblue')
ax.set_xlabel('Time (s)')
ax.set_ylabel('α(t)')
ax.set_ylim(-0.05, 1.0)
ax.set_xlim(0, t_a[-1])
ax.set_title('Coupling envelope α(t) — condition 1')
plt.tight_layout()
plt.show()

In [None]:
x_b_fluct = alpha * x_a_fluct + np.sqrt(1 - alpha**2) * epsilon

x_b = x_b_fluct + MEAN_B
x_b = np.clip(x_b, 300, 990)   # stay within WXC tool's valid range [100, 1000)

print(f'Person A:  mean = {x_a.mean():.1f} ms  ({60000/x_a.mean():.1f} bpm)  SD = {x_a.std():.1f} ms')
print(f'Person B:  mean = {x_b.mean():.1f} ms  ({60000/x_b.mean():.1f} bpm)  SD = {x_b.std():.1f} ms')

In [None]:
def smooth(x, sec=5, fs=FS):
    k = int(sec * fs)
    return np.convolve(x, np.ones(k) / k, mode='same')

t_on_s  = t_a[t_on]
t_off_s = t_a[t_off - 1]

fig, axes = plt.subplots(2, 1, figsize=(13, 5), sharex=True)

for ax in axes:
    ax.axvspan(t_on_s, t_off_s, alpha=0.12, color='steelblue', zorder=0)
    ax.axhline(0, color='gray', lw=0.6, linestyle=':')

axes[0].plot(t_a, x_a_fluct, lw=0.5, color='steelblue', alpha=0.5)
axes[0].plot(t_a, smooth(x_a_fluct), lw=1.8, color='steelblue', label='Person A')
axes[0].set_ylabel('IBI fluctuation (ms)')
axes[0].set_title('IBI fluctuations — shaded region is where coupling is active')
axes[0].legend(loc='upper right')

axes[1].plot(t_a, x_b_fluct, lw=0.5, color='darkorange', alpha=0.5)
axes[1].plot(t_a, smooth(x_b_fluct), lw=1.8, color='darkorange', label='Person B (simulated)')
axes[1].set_ylabel('IBI fluctuation (ms)')
axes[1].set_xlabel('Time (s)')
axes[1].legend(loc='upper right')

for ax in axes:
    ax.set_xlim(0, t_a[-1])

plt.tight_layout()
plt.show()

### Step 4 — Rolling correlation preview

This is a simplified version of what the WXC tool will show at lag = 0.
We compute Pearson r in a sliding 60-second window and plot it against
the coupling envelope. The two should track each other closely.

In [None]:
WIN_S  = 60
STEP_S = 5
win_n  = int(WIN_S  * FS)
step_n = int(STEP_S * FS)

centers, rs = [], []
for start in range(0, N - win_n, step_n):
    end = start + win_n
    r = np.corrcoef(x_a_fluct[start:end], x_b_fluct[start:end])[0, 1]
    centers.append(t_a[start + win_n // 2])
    rs.append(r)

centers = np.array(centers)
rs      = np.array(rs)

fig, ax = plt.subplots(figsize=(13, 4))
ax.fill_between(t_a, alpha, alpha=0.15, color='steelblue')
ax.plot(t_a, alpha, lw=1.2, color='steelblue', linestyle='--', label='α(t) — coupling envelope')
ax.plot(centers, rs, lw=2.2, color='darkorange', label=f'Rolling r  ({WIN_S} s window)')
ax.axhline(0, color='gray', lw=0.8, linestyle=':')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Pearson r')
ax.set_ylim(-0.35, 1.05)
ax.set_xlim(0, t_a[-1])
ax.set_title('Windowed correlation at lag 0 — preview of WXC output')
ax.legend()
plt.tight_layout()
plt.show()

print(f'Peak rolling r = {rs.max():.2f}  |  '
      f'mean r in uncoupled regions = {np.mean(rs[centers < t_on_s]):.2f}')

### Step 5 — Convert back to event-based IBIs

The WXC tool expects an event-based IBI series as input.
We recover beat times from the continuous signal by stepping forward iteratively:
the IBI at the current beat gives the time to the next beat.
This is a simple and fully invertible transformation.

In [None]:
def uniform_to_ibi(t_sec, x_ms, clip_ms=(300, 990)):
    """Convert a uniformly-sampled IBI signal back to an event-based IBI series.
    Iteratively steps forward: next_beat = current_beat + IBI(current_beat) / 1000."""
    cs = CubicSpline(t_sec, x_ms)
    t_max = t_sec[-1]
    lo, hi = clip_ms

    beat_times = [float(t_sec[0])]
    while True:
        t_curr = beat_times[-1]
        ibi_ms = float(np.clip(cs(t_curr), lo, hi))
        t_next = t_curr + ibi_ms / 1000
        if t_next >= t_max:
            break
        beat_times.append(t_next)

    return (np.diff(beat_times) * 1000).astype(float)   # ms

ibi_b_c1 = uniform_to_ibi(t_a, x_b)

dur_b = ibi_b_c1.sum() / 1000
print(f'Person B (event-based):  n = {len(ibi_b_c1)} beats  '
      f'| mean = {ibi_b_c1.mean():.1f} ms  '
      f'| SD = {ibi_b_c1.std():.1f} ms  '
      f'| duration = {dur_b:.0f} s ({dur_b/60:.1f} min)')
print(f'Person A (event-based):  n = {len(ibi_a)} beats  '
      f'| mean = {ibi_a.mean():.1f} ms  '
      f'| SD = {ibi_a.std():.1f} ms')

### Step 6 — Save

Each condition is saved as a simple two-column xlsx.
Column A holds person A’s IBIs, column B holds person B’s IBIs.

In [None]:
import os
os.makedirs('dyads_simulated', exist_ok=True)

def save_condition(ibi_a_out, ibi_b_out, filename):
    """Save a dyad IBI pair to xlsx. Pads the shorter series with NaN."""
    n = max(len(ibi_a_out), len(ibi_b_out))
    df = pd.DataFrame({
        'IBI_A_ms': np.pad(ibi_a_out.astype(float), (0, n - len(ibi_a_out)),
                           constant_values=np.nan),
        'IBI_B_ms': np.pad(ibi_b_out.astype(float), (0, n - len(ibi_b_out)),
                           constant_values=np.nan),
    })
    path = f'dyads_simulated/{filename}'
    df.to_excel(path, index=False)
    print(f'Saved → {path}  ({len(ibi_a_out)} A beats, {len(ibi_b_out)} B beats)')

save_condition(ibi_a, ibi_b_c1, 'cond1_drifting_coupling.xlsx')