In [None]:
import numpy as np
from scipy import signal
import control as ct
import matplotlib.pyplot as plt
import multiprocessing as mp

from speckit import compute_spectrum
from speckit.plotting import default_rc

plt.rcParams.update(default_rc)

pool = mp.Pool()

In [None]:
# Step 1: Define the system transfer function
tau = 100  # Time constant
fs = 2  # Sampling frequency (Hz)
dt = 1 / fs  # Time step
n = int(2e6)  # Number of samples

# Transfer function H(s) = 1 / (tau*s + 1)
num = [1]
den = [tau, 1]
system = signal.TransferFunction(num, den)

# Step 2: Generate the input signal (white noise)
np.random.seed(42)  # For reproducibility
input_signal = np.random.normal(0, 1, n)

# Step 3: Generate the output signal by filtering the input through the system
t = np.linspace(0, n * dt, n)
_, output_signal, _ = signal.lsim(system, input_signal, t)

# Introduce frequency-dependent noise to the output
freq = np.fft.fftfreq(n, d=dt)
noise = np.fft.ifft(np.fft.fft(np.random.normal(0, 1, n)) * (np.abs(freq) ** 0.5)).real
output_signal_noisy = output_signal + 0.5 * noise  # Scale noise appropriately

In [None]:
fig, ax = plt.subplots()
ax.plot(input_signal, label="input")
ax.plot(output_signal_noisy, label="output")
ax.set_xlabel("Time (s)")
ax.set_ylabel("Signals")
ax.legend()
ax.set_xlim(0, 1000)
plt.show()

In [None]:
# Step 4: Compute the cross-spectral density (Welch)
f, Pxy = signal.csd(input_signal, output_signal_noisy, fs=fs, nperseg=10000)

In [None]:
# Step 5: Compute the cross-spectral density (spectools)
ltf_obj = compute_spectrum(
    [input_signal, output_signal_noisy], fs=fs, pool=pool, win="Kaiser"
)

In [None]:
fig, (ax1, ax2) = plt.subplots(2, 1)
ax1.semilogx(f, ct.mag2db(np.abs(Pxy)), ls="--", label="Welch")
ax2.semilogx(f, -np.angle(Pxy, deg=True), ls="--")
ax2.semilogx(ltf_obj.f, np.angle(ltf_obj.Gxy, deg=True))
ax1.semilogx(ltf_obj.f, ct.mag2db(np.abs(ltf_obj.Gxy)), label="spectools")
ax2.set_xlabel("Frequency (Hz)")
ax1.set_ylabel("Magnitude (dB)")
ax2.set_ylabel("Phase (deg)")
ax1.set_title("Cross spectral density estimate")
ax1.legend(framealpha=1, edgecolor="k")
ax1.grid(ls="--")
ax2.grid(ls="--")
ax1.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax2.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
plt.show()

In [None]:
fig, (ax2, ax1) = ltf_obj.plot("bode", errors=True, sigma=3)
ax1.set_ylim(-120, 60)
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog(ltf_obj.f, ltf_obj.coh, label="Coherence")
ax.loglog(
    ltf_obj.f, ltf_obj.coh_error, label="Normalized random error", lw=6, c="tomato"
)
ax.loglog(ltf_obj.f, ltf_obj.coh_dev, label="Standard deviation", c="lime")
ax.loglog(ltf_obj.f, ltf_obj.coh_error * ltf_obj.coh, ls=":", c="k")
ax.set_xlabel("Fourier frequency (Hz)")
ax.set_ylabel("Coherence")
ax.legend(framealpha=1, edgecolor="k")
ax.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax.set_ylim(1e-7, 2)
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog(
    ltf_obj.f, ltf_obj.coh_dev, lw=6, label=r"$\sigma[\gamma_{xy}^2]$", c="tomato"
)
ax.loglog(
    ltf_obj.f,
    ltf_obj.coh_error * ltf_obj.coh,
    label=r"$ \gamma_{xy}^2 \cdot \epsilon_r [\gamma_{xy}^2]$",
    c="lime",
)
ax.set_xlabel("Fourier frequency (Hz)")
ax.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax.legend(edgecolor="k", framealpha=1)
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog(ltf_obj.f, ltf_obj.Gxy_dev, lw=6, label=r"$\sigma[ |G_{xy}| ]$", c="tomato")
ax.loglog(
    ltf_obj.f,
    ltf_obj.Gxy_error * np.abs(ltf_obj.Gxy),
    label=r"$|G_{xy}| \cdot \epsilon_r[|G_{xy}|]$",
    c="lime",
)
ax.set_xlabel("Fourier frequency (Hz)")
ax.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax.legend(edgecolor="k", framealpha=1)
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog(ltf_obj.f, ltf_obj.Hxy_dev, lw=6, label=r"$\sigma[ |H_{xy}| ]$", c="tomato")
ax.loglog(
    ltf_obj.f,
    ltf_obj.Hxy_mag_error * np.abs(ltf_obj.Hxy),
    label=r"$|H_{xy}| \cdot \epsilon_r[|H_{xy}|]$",
    c="lime",
)
ax.set_xlabel("Fourier frequency (Hz)")
ax.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax.legend(edgecolor="k", framealpha=1)
plt.show()

In [None]:
fig, ax = plt.subplots()
ax.loglog(
    ltf_obj.f,
    ltf_obj.Hxy_rad_error,
    lw=6,
    label=r"$\epsilon_r[ arg[ H_{xy}] ]$",
    c="tomato",
)
ax.loglog(
    ltf_obj.f,
    ltf_obj.Hxy_dev / ltf_obj.cf,
    label=r"$\sigma[|H_{xy}|] / |H_{xy}|$",
    c="lime",
)
ax.set_xlabel("Fourier frequency (Hz)")
ax.set_ylabel(r"$\sigma(\gamma_{xy}^2)$")
ax.set_xlim(ltf_obj.f[0], ltf_obj.f[-1])
ax.legend(edgecolor="k", framealpha=1)
plt.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm

from speckit.analysis import compute_single_bin


def run_monte_carlo_validation(
    num_realizations: int = 1000,
    fs: float = 100.0,
    n_samples: int = 10000,
    target_freq: float = 10.0,
    segment_length: int = 256,
):
    """
    Validates the analytical error formulas in speckit using a Monte Carlo simulation.
    """
    print("=" * 60)
    print("Starting Monte Carlo Validation for Error Formulas")
    print(f"Number of Realizations: {num_realizations}")
    print(f"Signal Length: {n_samples} samples")
    print("=" * 60)

    # --- Storage for the distribution of estimates ---
    Gxx_estimates = []
    coh_estimates = []

    # --- Store the first analytical prediction ---
    # (Theoretically, it should be similar for all realizations)
    first_analytical_Gxx_dev = None
    first_analytical_coh_dev = None

    # --- Define the true signal properties ---
    # Create two signals: one pure noise, one with a coherent sine wave + noise
    # This gives us a known, non-trivial coherence value.
    true_coh_amplitude = 0.8  # Target coherence around 0.8^2 = 0.64
    noise_std = 5.0

    # Create a reproducible random number generator
    rng = np.random.default_rng(seed=42)

    for i in tqdm(range(num_realizations), desc="Running Realizations"):
        # --- 1. Generate a new, independent realization of the signals ---
        t = np.arange(n_samples) / fs

        # Common coherent part
        common_signal = true_coh_amplitude * np.sin(2 * np.pi * target_freq * t)

        # Independent noise parts
        noise1 = rng.normal(loc=0, scale=noise_std, size=n_samples)
        noise2 = rng.normal(loc=0, scale=noise_std, size=n_samples)

        x = common_signal + noise1
        y = common_signal + noise2

        # --- 2. Compute the spectral estimates for this realization ---
        result = compute_single_bin(
            data=[x, y], fs=fs, freq=target_freq, L=segment_length, win="hann", olap=0.5
        )

        # --- 3. Store the estimates ---
        Gxx_estimates.append(result.Gxx[0])
        coh_estimates.append(result.coh[0])

        # --- 4. Store the first analytical prediction for comparison ---
        if i == 0:
            first_analytical_Gxx_dev = result.Gxx_dev[0]
            first_analytical_coh_dev = result.coh_dev[0]
            print(
                f"\nSingle-run analytical prediction for Gxx_dev: {first_analytical_Gxx_dev:.4e}"
            )
            print(
                f"Single-run analytical prediction for coh_dev: {first_analytical_coh_dev:.4f}\n"
            )

    # --- 5. Analyze the distribution of estimates ---
    Gxx_estimates = np.array(Gxx_estimates)
    coh_estimates = np.array(coh_estimates)

    # Measured standard deviation from the Monte Carlo simulation
    measured_Gxx_dev = np.std(Gxx_estimates)
    measured_coh_dev = np.std(coh_estimates)

    print("\n--- Validation Results ---")
    print(f"Mean Gxx estimate: {np.mean(Gxx_estimates):.4e}")
    print(f"Measured Gxx_dev (from simulation): {measured_Gxx_dev:.4e}")
    print(f"Predicted Gxx_dev (from speckit):  {first_analytical_Gxx_dev:.4e}")

    gxx_rel_error = (
        np.abs(measured_Gxx_dev - first_analytical_Gxx_dev) / measured_Gxx_dev
    )
    print(f"--> Relative Difference: {gxx_rel_error:.2%}\n")

    print(f"Mean coherence estimate: {np.mean(coh_estimates):.4f}")
    print(f"Measured coh_dev (from simulation): {measured_coh_dev:.4f}")
    print(f"Predicted coh_dev (from speckit):  {first_analytical_coh_dev:.4f}")

    coh_rel_error = (
        np.abs(measured_coh_dev - first_analytical_coh_dev) / measured_coh_dev
    )
    print(f"--> Relative Difference: {coh_rel_error:.2%}\n")

    # A successful test should have a relative difference of < 10-15%
    # (The agreement improves with more realizations and more averages per estimate)
    if gxx_rel_error < 0.15 and coh_rel_error < 0.15:
        print("SUCCESS: Analytical predictions match simulation results.")
        # This would raise our confidence to ~99%
    else:
        print("WARNING: Discrepancy detected between analytical and simulated errors.")

    # --- 6. Plot the distributions for visual inspection ---
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

    ax1.hist(Gxx_estimates, bins=30, density=True, edgecolor="k", alpha=0.7)
    ax1.set_title(f"Distribution of Gxx Estimates (N={num_realizations})")
    ax1.set_xlabel("Gxx Value")
    ax1.axvline(
        np.mean(Gxx_estimates),
        color="r",
        linestyle="--",
        label=f"Mean = {np.mean(Gxx_estimates):.2e}",
    )
    ax1.legend()

    ax2.hist(coh_estimates, bins=30, density=True, edgecolor="k", alpha=0.7)
    ax2.set_title(f"Distribution of Coherence Estimates (N={num_realizations})")
    ax2.set_xlabel("Coherence Value")
    ax2.axvline(
        np.mean(coh_estimates),
        color="r",
        linestyle="--",
        label=f"Mean = {np.mean(coh_estimates):.2f}",
    )
    ax2.legend()

    fig.tight_layout()
    plt.show()


if __name__ == "__main__":
    run_monte_carlo_validation()