In [2]:
import numpy as np
import sbi 

import lal as _lal
from pycbc.waveform import get_fd_waveform
from pycbc.psd import aLIGOZeroDetHighPower
from pycbc.filter import highpass

import matplotlib
import matplotlib.pyplot as plt

import torch

from sbi import analysis as analysis
from sbi.analysis import pairplot
from sbi.inference import NPE, simulate_for_sbi
from sbi.utils import BoxUniform 
from sbi.utils.user_input_checks import (
    check_sbi_inputs,
    process_prior,
    process_simulator,
)

#### (chatGPT) The noise:

detectore noise is given by:

$$n(f) - \mathbb{C} N(0,0.5S_n(f))$$

where $S_n(f)$ is the one-sided power spectral density at frequency f.

In [3]:
"""
theta[0]: mass1 (solar masses)
theta[1]: mass2 (solar masses)
theta[2]: distance (Mpc)
"""

def simulator(theta):
    delta_f = 1/16
    f_lower = 20
    f_final = 512
    num_sim = theta.shape[0]
    num_freq = int(((f_final - f_lower) / delta_f) + 1)
    h_freq = np.zeros(shape = (num_sim, 4, num_freq))

    # Calculate the gravitational wave strains in frequency space num_sim times
    for i in range(num_sim):
        mass1 = theta[i,0]
        mass2 = theta[i,1]
        distance = 10 ** theta[i,2]

        
        hp, hc = get_fd_waveform(approximant="TaylorF2",
                                 mass1=mass1,
                                 mass2=mass2,
                                 delta_f=delta_f,
                                 f_lower=f_lower,
                                 f_final=f_final,
                                 distance=distance,
                                 inclination=0.0,
                                 coa_phase=0.0)
        
        hp_freq = hp.to_frequencyseries()
        frequencies = hp_freq.sample_frequencies.numpy()
        hp_amplitudes = hp_freq.numpy()[frequencies >= f_lower]
        hp_amplitudes_real = np.real(hp_amplitudes)
        hp_amplitudes_imaginary = np.imag(hp_amplitudes)
        
        hc_freq = hc.to_frequencyseries()
        hc_amplitudes = hc_freq.numpy()[frequencies >= f_lower]
        hc_amplitudes_real = np.real(hc_amplitudes)
        hc_amplitudes_imaginary = np.imag(hc_amplitudes)
        
        h_freq[i,0,:] = hp_amplitudes_real
        h_freq[i,1,:] = hp_amplitudes_imaginary
        h_freq[i,2,:] = hc_amplitudes_real
        h_freq[i,3,:] = hc_amplitudes_imaginary
        
        hp_psd = aLIGOZeroDetHighPower(length = len(frequencies), delta_f = delta_f, low_freq_cutoff = f_lower).numpy()[frequencies >= f_lower]
        hp_scale = np.sqrt(0.5 * hp_psd * delta_f)
        hp_noise_real = np.random.normal(0, hp_scale)
        hp_noise_imaginary = np.random.normal(0, hp_scale)
        
        hc_psd = aLIGOZeroDetHighPower(length = len(frequencies), delta_f = delta_f, low_freq_cutoff = f_lower).numpy()[frequencies >= f_lower]
        hc_scale = np.sqrt(0.5 * hc_psd * delta_f)
        hc_noise_real = np.random.normal(0, hc_scale)
        hc_noise_imaginary = np.random.normal(0, hc_scale)
        
        h_freq[i,0,:] += hp_noise_real
        h_freq[i,1,:] += hp_noise_imaginary
        h_freq[i,2,:] += hc_noise_real
        h_freq[i,3,:] += hc_noise_imaginary

    h_freq = h_freq.reshape(h_freq.shape[0],-1)

    h_freq = torch.tensor(h_freq, dtype=torch.float32)
    return h_freq
    
    

In [32]:
mass1 = 1
mass2 = 2
distance = 500
delta_f = 1/16
f_lower = 20.0
f_final = 512.0

hp, hc = get_fd_waveform(approximant="TaylorF2",
                             mass1=mass1,
                             mass2=mass2,
                             delta_f=delta_f,
                             f_lower=f_lower,
                             f_final=f_final,
                             distance=distance,
                             inclination=0.0,
                             coa_phase=0.0)

ht = hp.to_timeseries()
times = ht.sample_times.numpy()
amplitude = ht.numpy()

plt.plot(times, amplitude)
plt.xlabel("Time (s)")
plt.ylabel("Strain h(t)")
plt.savefig("time series")
plt.close()

hp_freq = hp.to_frequencyseries()
frequencies = hp_freq.sample_frequencies.numpy()
hp_amplitudes = hp_freq.numpy()
hp_amplitudes_real = np.real(hp_amplitudes)
hp_amplitudes_imaginary = np.imag(hp_amplitudes)

hc_freq = hc.to_frequencyseries()
hc_amplitudes = hc_freq.numpy()
hc_amplitudes_real = np.real(hc_amplitudes)
hc_amplitudes_imaginary = np.imag(hc_amplitudes)

h_freq = np.zeros(shape = (4, len(frequencies)))
h_freq[0,:] = hp_amplitudes_real
h_freq[1,:] = hp_amplitudes_imaginary
h_freq[2,:] = hc_amplitudes_real
h_freq[3,:] = hc_amplitudes_imaginary

hp_psd = aLIGOZeroDetHighPower(length = len(frequencies), delta_f = delta_f, low_freq_cutoff = f_lower)
hp_scale = np.sqrt(0.5 * hp_psd.numpy() * delta_f)
hp_noise_real = np.random.normal(0, hp_scale)
hp_noise_imaginary = np.random.normal(0, hp_scale)

hc_psd = aLIGOZeroDetHighPower(length = len(frequencies), delta_f = delta_f, low_freq_cutoff = f_lower)
hc_scale = np.sqrt(0.5 * hc_psd.numpy() * delta_f)
hc_noise_real = np.random.normal(0, hc_scale)
hc_noise_imaginary = np.random.normal(0, hc_scale)

h_freq[0,:] += hp_noise_real
h_freq[1,:] += hp_noise_imaginary
h_freq[2,:] += hc_noise_real
h_freq[3,:] += hc_noise_imaginary

plt.plot(frequencies, h_freq[0,:])
plt.plot(frequencies, h_freq[1,:])
plt.xlabel('frequency (Hz)')
plt.ylabel('Strain amplitude h(f)')
plt.savefig("frequency series hp + noise")
plt.close()

plt.plot(frequencies, h_freq[2,:])
plt.plot(frequencies, h_freq[3,:])
plt.xlabel('frequency (Hz)')
plt.ylabel('Strain amplitude h(f)')
plt.savefig("frequency series hc + noise")
plt.close()

In [4]:
theta = np.array([[30,30,np.log10(100)],
                 [40,15,np.log10(200)],
                 [50,25,np.log10(300)],
                 [20,40,np.log10(400)],
                 [35,20,np.log10(500)],])
h_freq = simulator(theta)

In [5]:
h_freq.shape

torch.Size([5, 31492])

In [6]:
delta_f = 1/16
f_lower = 20
f_final = 512
num_sim = 20000

# sim_wrapper = lambda theta: simulator(theta, delta_f = delta_f, f_lower = f_lower, f_final = f_final)
# def sim_wrapper(theta):
#     return simulator(theta = theta, delta_f = delta_f, f_lower = f_lower, f_final = f_final)

m_lowerbound = 5
m_upperbound = 10
log_dinstance_lowerbound = np.log10(100)
log_distance_upperbound = np.log10(300)

prior = BoxUniform(low=torch.tensor([m_lowerbound, m_lowerbound, log_dinstance_lowerbound]), 
                   high=torch.tensor([m_upperbound, m_upperbound, log_distance_upperbound]))

# Check prior, simulator, consistency
prior, num_parameters, prior_returns_numpy = process_prior(prior)
simulator = process_simulator(simulator, prior, prior_returns_numpy)
check_sbi_inputs(simulator, prior)

# Create inference object. Here, NPE is used.
inference = NPE(prior=prior, density_estimator="zuko_maf")

#generate simulations and pass to the inference object
theta, h_freq = simulate_for_sbi(simulator, proposal=prior, num_simulations=num_sim)
inference = inference.append_simulations(theta, h_freq)

# train the density estimator and build the posterior
density_estimator = inference.train()
posterior = inference.build_posterior(density_estimator)

  0%|          | 0/20000 [00:00<?, ?it/s]

 Neural network successfully converged after 35 epochs.

In [14]:
theta_true = prior.sample((1,))
# generate our observation
h_obs = simulator(theta_true)

samples = loaded_posterior.sample((10000,), x=h_obs)

_ = analysis.pairplot(
    samples, 
    limits=[[m_lowerbound, m_upperbound], 
            [m_lowerbound, m_upperbound], 
            [log_dinstance_lowerbound, log_distance_upperbound]],
    figsize=(8, 8),
    labels=[r"$m_1$", r"$m_2$", r"$d$"],
    points=theta_true # add ground truth thetas,
)

plt.savefig("Gravitational Wave Posterior")
plt.close()

AssertionError: The trailing dimensions of `theta_or_x` do not match the `event_shape`.

In [12]:
torch.save(posterior, 'posterior.pt')

In [13]:
loaded_posterior = torch.load('posterior.pt')

  loaded_posterior = torch.load('posterior.pt')
