In [1]:
import numpy as np
import torch
from scipy.signal import welch
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model


# Load models
generator = load_model("saved_models/qpo_cgan_phy_generator.keras")
posterior = torch.load("trained_sbi_posterior.pt")


# --- Lorentzian Fit Model ---
def lorentzian(f, A, f0, gamma):
    return A / (1 + ((f - f0) / gamma)**2)

# --- Safe Q Estimation ---


def compute_lorentzian_q(series, fs=1.0, f_window=(0.001, 0.5)):
    """
    Compute the Q-factor from Lorentzian fit of the PSD.

    Parameters:
    - series (array): 1D light curve (flux values)
    - fs (float): Sampling frequency (default 1.0 Hz)
    - f_window (tuple): Frequency range to search for peak

    Returns:
    - Q (float): Quality factor of Lorentzian peak (or 0.0 if fit fails)
    """
    try:
        # Compute PSD
        f, Pxx = welch(series.squeeze(), fs=fs, nperseg=256)

        # Frequency window to focus on possible QPOs
        mask = (f > f_window[0]) & (f < f_window[1])
        f_peak = f[mask]
        Pxx_peak = Pxx[mask]

        # Fit Lorentzian: A / (1 + ((f - f0) / gamma)^2)
        p0 = [np.max(Pxx_peak), f_peak[np.argmax(Pxx_peak)],
              0.01]  # [A, f0, gamma]
        popt, _ = curve_fit(lorentzian, f_peak, Pxx_peak, p0=p0, maxfev=5000)
        A, f0, gamma = popt

        if gamma <= 0 or f0 <= 0:
            print("⚠️ Lorentzian fit returned non-physical gamma or f0. Setting Q = 0.")
            return 0.0

        Q = f0 / gamma
        return Q

    except Exception as e:
        print(f"⚠️ Lorentzian fit failed: {e}. Setting Q = 0.")
        return 0.0


# --- QPO Detection with SBI Posterior ---
def detect_qpo_sbi(curve, posterior, fs=1.0, show_plot=False):
    f, Pxx = welch(curve.squeeze(), fs=fs, nperseg=256)
    x_obs = torch.tensor(Pxx, dtype=torch.float32)

    # Posterior sampling
    samples = posterior.sample((500,), x=x_obs, show_progress_bars=False)
    fc_samples = samples[:, 0].numpy()
    amp_samples = samples[:, 1].numpy()

    fc_mean = fc_samples.mean()
    fc_std = fc_samples.std()
    amp_mean = amp_samples.mean()

    Q = compute_lorentzian_q(curve, fs=fs)

    print(
        f"📊 Posterior fc range: {fc_samples.min():.3f} – {fc_samples.max():.3f}")
    print(
        f"📊 Posterior amp range: {amp_samples.min():.3f} – {amp_samples.max():.3f}")
    print(f"📈 Q = {Q:.2f} | fc_std = {fc_std:.3f} | amp_mean = {amp_mean:.3f}")

    if show_plot:
        plt.figure(figsize=(12, 4))
        plt.semilogy(f, Pxx)
        plt.title("PSD of Input Light Curve")
        plt.xlabel("Frequency (Hz)")
        plt.ylabel("Power")
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    # 🔍 Score-based decision
    score = Q / 3 + amp_mean - fc_std
    has_qpo = score > 0.5

    return {
        "fc_mean": fc_mean,
        "fc_std": fc_std,
        "amp_mean": amp_mean,
        "Q": Q,
        "qpo": has_qpo,
        "score": score,
        "samples": samples
    }

  posterior = torch.load("trained_sbi_posterior.pt")


In [2]:
# Load data
data = np.loadtxt("ltcrv4bands_rej_dt100.dat")
bands = [data[:, i] for i in range(4)]

print("🔍 QPO Detection on XMM-Newton Bands (REJ1034+396)\n")

for i, band in enumerate(bands):
    print(f"🎧 Band {i+1}:")
    result = detect_qpo_sbi(band, posterior, fs=1.0, show_plot=False)
    print(f"→ fc_mean: {result['fc_mean']:.3f}, fc_std: {result['fc_std']:.3f}")
    print(f"→ amp_mean: {result['amp_mean']:.3f}, Q: {result['Q']:.2f}")
    print(f"→ QPO Detected? {'✅ YES' if result['qpo'] else '❌ NO'} | Score: {result['score']:.2f}\n")


🔍 QPO Detection on XMM-Newton Bands (REJ1034+396)

🎧 Band 1:
📊 Posterior fc range: 0.010 – 0.998
📊 Posterior amp range: 0.104 – 0.992
📈 Q = 2.71 | fc_std = 0.281 | amp_mean = 0.524
→ fc_mean: 0.495, fc_std: 0.281
→ amp_mean: 0.524, Q: 2.71
→ QPO Detected? ✅ YES | Score: 1.15

🎧 Band 2:
📊 Posterior fc range: 0.010 – 0.996
📊 Posterior amp range: 0.101 – 0.989
📈 Q = 5.52 | fc_std = 0.264 | amp_mean = 0.550
→ fc_mean: 0.464, fc_std: 0.264
→ amp_mean: 0.550, Q: 5.52
→ QPO Detected? ✅ YES | Score: 2.13

🎧 Band 3:
📊 Posterior fc range: 0.011 – 0.993
📊 Posterior amp range: 0.106 – 0.994
📈 Q = 5.39 | fc_std = 0.256 | amp_mean = 0.559
→ fc_mean: 0.484, fc_std: 0.256
→ amp_mean: 0.559, Q: 5.39
→ QPO Detected? ✅ YES | Score: 2.10

🎧 Band 4:
⚠️ Lorentzian fit returned non-physical gamma or f0. Setting Q = 0.
📊 Posterior fc range: 0.014 – 1.000
📊 Posterior amp range: 0.103 – 0.995
📈 Q = 0.00 | fc_std = 0.257 | amp_mean = 0.559
→ fc_mean: 0.471, fc_std: 0.257
→ amp_mean: 0.559, Q: 0.00
→ QPO Detected

In [3]:
import tensorflow as tf

# Load your trained generator and posterior (if not already loaded)
generator = tf.keras.models.load_model(
    "saved_models/qpo_cgan_phy_generator.keras")
# posterior = torch.load("trained_sbi_posterior.pt")  # already loaded

latent_dim = 100
num_samples = 100
results = []

# Range of test values for frequency and amplitude
fc_range = (0.5, 0.8)
amp_range = (0.1, 1.0)

for i in range(num_samples):
    # Random test params
    fc = np.random.uniform(*fc_range)
    amp = np.random.uniform(*amp_range)

    # Generate QPO and non-QPO light curves
    z = tf.random.normal((1, latent_dim))

    label_qpo = tf.convert_to_tensor([[fc, amp, 1.0]], dtype=tf.float32)
    label_noqpo = tf.convert_to_tensor([[fc, amp, 0.0]], dtype=tf.float32)

    signal_qpo = generator([z, label_qpo], training=False).numpy().squeeze()
    signal_noqpo = generator(
        [z, label_noqpo], training=False).numpy().squeeze()

    # Detect QPO
    result_qpo = detect_qpo_sbi(signal_qpo, posterior, fs=1.0, show_plot=False)
    result_noqpo = detect_qpo_sbi(
        signal_noqpo, posterior, fs=1.0, show_plot=False)

    results.append({
        "true_qpo": 1,
        "fc": fc,
        "amp": amp,
        "detected": int(result_qpo["qpo"]),
        "score": result_qpo["score"],
        "Q": result_qpo["Q"],
        "amp_mean": result_qpo["amp_mean"],
        "fc_std": result_qpo["fc_std"]
    })

    results.append({
        "true_qpo": 0,
        "fc": fc,
        "amp": amp,
        "detected": int(result_noqpo["qpo"]),
        "score": result_noqpo["score"],
        "Q": result_noqpo["Q"],
        "amp_mean": result_noqpo["amp_mean"],
        "fc_std": result_noqpo["fc_std"]
    })

📊 Posterior fc range: 0.012 – 0.995
📊 Posterior amp range: 0.104 – 0.999
📈 Q = 81.37 | fc_std = 0.250 | amp_mean = 0.535
📊 Posterior fc range: 0.013 – 0.998
📊 Posterior amp range: 0.115 – 1.000
📈 Q = 2.31 | fc_std = 0.258 | amp_mean = 0.574
📊 Posterior fc range: 0.013 – 0.998
📊 Posterior amp range: 0.106 – 0.999
📈 Q = 102.97 | fc_std = 0.259 | amp_mean = 0.549
📊 Posterior fc range: 0.015 – 0.990
📊 Posterior amp range: 0.104 – 0.998
📈 Q = 38.69 | fc_std = 0.258 | amp_mean = 0.559
📊 Posterior fc range: 0.019 – 0.991
📊 Posterior amp range: 0.100 – 1.000
📈 Q = 0.30 | fc_std = 0.259 | amp_mean = 0.515
⚠️ Lorentzian fit returned non-physical gamma or f0. Setting Q = 0.
📊 Posterior fc range: 0.014 – 0.997
📊 Posterior amp range: 0.106 – 0.999
📈 Q = 0.00 | fc_std = 0.267 | amp_mean = 0.545
📊 Posterior fc range: 0.010 – 0.994
📊 Posterior amp range: 0.101 – 0.997
📈 Q = 83.12 | fc_std = 0.255 | amp_mean = 0.579
📊 Posterior fc range: 0.011 – 0.993
📊 Posterior amp range: 0.117 – 0.998
📈 Q = 12.84 | 

In [4]:
import pandas as pd
df = pd.DataFrame(results)
accuracy = (df['true_qpo'] == df['detected']).mean()
print(f"✅ Detector Accuracy on GAN samples: {accuracy*100:.2f}%")


✅ Detector Accuracy on GAN samples: 51.50%


In [5]:
import pandas as pd

# Convert the results list to a DataFrame
df = pd.DataFrame(results)

# Save it to file
csv_path = "gan_qpo_detection_results.csv"
df.to_csv(csv_path, index=False)

print(f"✅ Results saved to: {csv_path}")


✅ Results saved to: gan_qpo_detection_results.csv
