In [None]:
import DeepFMKit.core as dfm
from DeepFMKit.helpers import snr_to_asd, calculate_crlb_for_m

import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import norm
from scipy import constants as sc
from tqdm import tqdm
import multiprocessing
import os

# The worker now lives in the workers module.
from DeepFMKit.workers import run_efficiency_trial

def validate_fitter_efficiency(
    m_true=15.5,
    f_mod=1e3,
    f_samp=200e3,
    n=20,
    ndata=20,
    snr_db=70.0,
    n_trials=500,
    n_cores=None
):
    """
    Performs a Monte Carlo simulation to test the NLS fitter's performance
    against the theoretical Cramér-Rao Lower Bound (CRLB).
    """
    if n_cores is None:
        n_cores = os.cpu_count()
        
    print("="*60)
    print("Fitter Efficiency Validation: Comparing Measured vs. Theoretical Precision")
    print(f"Parameters: m = {m_true}, ndata = {ndata}, SNR = {snr_db} dB")
    print(f"Number of Monte Carlo trials: {n_trials}")
    print("="*60)
    
    # --- 1. Define the Laser Source ---
    laser_config = dfm.LaserConfig(label="main_laser")
    laser_config.f_mod = f_mod # Modulation frequency (Hz)

    # --- 2. Define the Main Interferometer ---
    main_ifo_config = dfm.InterferometerConfig(label="dynamic_ifo")
    main_ifo_config.ref_arml = 0.1 # Reference arm length (m)
    main_ifo_config.meas_arml = 0.3 # Measurement arm length (m)

    # --- 3. Set Modulation Depth by Adjusting Laser's `df` ---
    opd = main_ifo_config.meas_arml - main_ifo_config.ref_arml # Optical pathlength difference (m)
    df_required = (m_true * sc.c) / (2 * np.pi * opd) # Required laser modulation amplitude (Hz)
    laser_config.df = df_required
    
    # Set the amplitude noise ASD to achieve the target SNR
    laser_config.amp_n = snr_to_asd(snr_db, f_samp)
    
    # Calculate buffer size and duration for this configuration
    buffer_size = int(n * (f_samp / f_mod))
    buffer_seconds = n / f_mod
    
    # --- 2. Calculate Theoretical CRLB ---
    print("\nCalculating theoretical precision (CRLB)...")
    delta_m_crlb = calculate_crlb_for_m(m_true, ndata, snr_db, buffer_size)
    print(f"Theoretical Precision (CRLB): δm = {delta_m_crlb:.4e}")

    # --- 3. Run Monte Carlo Simulation in Parallel ---
    jobs = []
    for i in range(n_trials):
        # --- FIX 2: Removed the obsolete 'snr_db' key ---
        jobs.append({
            'laser_config': laser_config,
            'ifo_config': main_ifo_config,
            'n_seconds': buffer_seconds,
            'ndata': ndata,
            'm_true': m_true,
            'trial_num': i,
        })
        
    m_estimates = []
    print(f"\nRunning {n_trials} Monte Carlo simulations in parallel...")
    if __name__ == "__main__":
        with multiprocessing.Pool(processes=n_cores) as pool:
            results_iterator = pool.imap(run_efficiency_trial, jobs)
            for result in tqdm(results_iterator, total=len(jobs), desc="Running Trials"):
                m_estimates.append(result)

    # --- 4. Analyze and Print Results ---
    # ... (rest of the plotting and analysis code remains the same) ...
    m_estimates = np.array([m for m in m_estimates if not np.isnan(m)])
    if len(m_estimates) < n_trials:
        print(f"Warning: {n_trials - len(m_estimates)} trials failed and were excluded.")

    delta_m_measured = np.std(m_estimates)
    mean_m_measured = np.mean(m_estimates)
    bias = mean_m_measured - m_true
    
    efficiency = (delta_m_crlb**2 / delta_m_measured**2) * 100 if delta_m_measured > 0 else 0

    print("\n--- Results ---")
    print(f"Measured Mean of estimates:  <m> = {mean_m_measured:.6f}")
    print(f"Estimator Bias (<m> - m_true):   = {bias:.4e}")
    print(f"Measured Precision (Std Dev): δm = {delta_m_measured:.4e}")
    print(f"Estimator Efficiency (CRLB² / Measured²): {efficiency:.1f}%")

    # --- 5. Plot the Distribution of Estimates ---
    fig, ax = plt.subplots(figsize=(12, 7))
    
    ax.hist(m_estimates, bins=50, density=True, label=r'Histogram of $\hat{m}$ estimates' f'\n($N_{{trials}}={len(m_estimates)}$)', alpha=0.6)
    mu, std = norm.fit(m_estimates)
    bins = np.linspace(mu - 4*std, mu + 4*std, 100)
    p = norm.pdf(bins, mu, std)
    ax.plot(bins, p, 'k--', linewidth=2, label='Fitted Normal Distribution')

    ax.axvline(m_true, color='red', linestyle='-', linewidth=2, label=f'True m = {m_true:.4f}')
    ax.axvline(mean_m_measured, color='black', linestyle='--', linewidth=2, label=f'Measured Mean = {mean_m_measured:.4f}')

    ax.set_title(f"Estimator Performance vs. CRLB (m={m_true}, ndata={ndata}, SNR={snr_db}dB)", fontsize=16)
    ax.set_xlabel(r'Estimated Modulation Depth ($\hat{m}$)', fontsize=14)
    ax.set_ylabel('Probability Density', fontsize=14)
    
    results_str = (f"Bias = " f"{bias:.2e}\n"
                   f"Measured $\\sigma_m$ = {delta_m_measured:.3e}\n"
                   f"CRLB $\\sigma_m$ = {delta_m_crlb:.3e}\n"
                   f"Efficiency = {efficiency:.1f}%")
    props = dict(boxstyle='round', facecolor='wheat', alpha=0.5)
    ax.text(0.05, 0.95, results_str, transform=ax.transAxes, fontsize=12, verticalalignment='top', bbox=props)

    ax.legend()
    ax.grid(True, linestyle=':')
    fig.tight_layout()
    plt.show()

In [None]:
# --- Run the script ---
if __name__ == "__main__":
    validate_fitter_efficiency(
        m_true=5.3,
        f_mod=100,
        f_samp=10e3,
        n=20,
        ndata=20,
        snr_db=60.0,
        n_trials=1000,
        n_cores=None
    )