In [None]:
# imports the PYCBC package
import sys
!{sys.executable} -m pip install pycbc ligo-common --no-cache-dir

In [None]:
# importing the necessary libraries & packages
import numpy as np
import math
import pylab
%matplotlib inline
import matplotlib.pyplot as plt
import random
import pycbc
from pycbc import distributions
from pycbc.waveform import get_td_waveform
from pycbc.detector import Detector
import pycbc.coordinates as co
from pycbc.psd import welch, interpolate
from pycbc.psd import interpolate, inverse_spectrum_truncation
from pycbc.noise.gaussian import noise_from_psd
from pycbc.noise.gaussian import frequency_noise_from_psd
from pycbc.filter import matched_filter

In [None]:
snr_REQ = []

In [None]:
# establishing some parameters to generate the GWS

det_l1 = Detector('L1') # name of detector from which the wave will be extracted: 
# L1: Livingston, H1: Hanford, V1: Virgo

apx = 'IMRPhenomD' # mathematical approximation used to approx3imate the wave, 
# these are not real waves but only their simulation

N=2048*16  #N is number of samples, N=length/delta_t == sampling freq * number of seconds
fs=2048 #fs is sampling frequnecy
length= N//fs #duration of segment == number of seconds
delta_f= fs/N # a parameter to be defined so that it can be used for later functions
f_samples = N//2 +1 # a parameter to be defined so that it can be used for later functions
f_lower=50 # the frequency from which the gravitational wave will start, usually is kept to be 30Hz
delta_t=1.0/2048 # (1/ sampling frequency)

print(length, delta_f, f_samples, delta_t) # print to see if any discrepancies

# incase of changes, only change N, fs, f_lower and delta_t...other parameters will get changed automatically
# !!!! CAUTION - 64bit float values will be different for same operations so correct them

# GENERATION OF WHITEN GAUSSIAN NOISE
from pycbc.psd.analytical import AdVDesignSensitivityP1200087 # mathematical approx for noise

# get power spectrum density for noise
def get_psd(f_samples, delta_f, low_freq_cutoff):
    psd=AdVDesignSensitivityP1200087(f_samples, delta_f, low_freq_cutoff)
    return psd
psd=get_psd(f_samples, delta_f, f_lower) # function called here

# some basic signal processing operation applied on pure simulated wave to shift them between proper time interval
def pure_signal_processing(signal):
  signal.start_time = 0 # bring the signal to timestep 0
  signal.append_zeros(4*2048) # appends zero at the sampling frequency rate at the end of signal
  signal = signal.cyclic_time_shift(2) # cyclic shifts the signal as much by the parameter defined
  signal.start_time = 0 
  return signal

# functions returns the time step at which the signal has highest amplitude
def get_peak_time(signal):
  peak = signal.numpy().argmax()
  time = signal.sample_times[peak]
  return time

# extracts/crops 0.25 seconds of signal around the peak
def get_025s(signal):
    time = get_peak_time(signal) #get_peak time function called here
    cropped = signal.time_slice(time-0.1875,time+0.0625) # crops the signal around the parameters specified
                                                         # in this case the peak time
    return cropped

# GENERATION OF NOISE
from pycbc.noise.gaussian import frequency_noise_from_psd
# function uses inbuilt algo to generate noise using psd in freq domain
def get_noise(psd, seed=None):
    noise=frequency_noise_from_psd(psd, seed=seed)
    noise_time = noise.to_timeseries() # converted to time domain so that we can add it to signal in time domain
    return noise_time

# adds the signal to noise
def add_noise_signal(noise, signal):
    noise.start_time = 0 # bring the initial points of both signal to 0 for coherency
    signal.start_time = 0
    length_signal = len(signal)
    signal_plus_noise=noise
    signal_plus_noise[0:length_signal]=np.add(noise[0:length_signal], signal) # adding signal to noise
    return signal_plus_noise

# finding the matched filtering SNR
def SNR_MF(signal_fin, noise_fin):
  noise_fin.start_time = 0
  signal_fin.start_time = 0
  hps=signal_fin
  conditioned=noise_fin
  hps.resize(len(conditioned))
  template = hps.cyclic_time_shift(hps.start_time)
  psd_whiten=interpolate(welch(conditioned), 1.0 / conditioned.duration)
  snr = matched_filter(template, conditioned, psd=psd_whiten, low_frequency_cutoff=40, sigmasq = 1)
  # pylab.figure(figsize=[10, 4])
  # pylab.plot(snr.sample_times, abs(snr))
  # pylab.ylabel('Signal-to-noise')
  # pylab.xlabel('Time (s)')
  # pylab.show()
  peak = abs(snr).numpy().argmax()
  snrp = snr[peak]
  time = snr.sample_times[peak]
  # print(time)
  # print("We found a signal at {}s with SNR {}".format(time, 
  #                                                     round(abs(snrp), 2)))
  return round(abs(snrp),2), time

# Scales the matched filtering SNR to our required SNR
#in our project the req snr was in range (2,3,4,5....,17)
def scale_SNR(signal, noise, snr_req):
  noise.start_time = 0
  signal.start_time = 0
  snr,_ = SNR_MF(signal, noise) # matched filtering SNR function called here
  signal = (signal/snr) * snr_req # basic scaling operation
  # final = add_noise_signal(noise, signal)
  return signal

# WHITENING THE SIGNAL
from pycbc.psd import welch, interpolate
def get_whiten(signal_plus_noise):
    signal_freq_series=signal_plus_noise.to_frequencyseries() # conversion to frequency domain
    numerator = signal_freq_series
    psd_to_whiten = interpolate(welch(signal_plus_noise), 1.0 / signal_plus_noise.duration) # mathematical operation of interpolation
    denominator=np.sqrt(psd_to_whiten)
    whiten_freq = (numerator / denominator)
    whiten=whiten_freq.to_timeseries().highpass_fir(30., 512).lowpass_fir(300.0, 512) # applying filters and conerting to time domain
    return whiten

#GETS THE REQUIRED SIGNAL
# this signal in whitened (noise+pure wave) signal
def get_data(signal, corrupted): # signal = pure wave, corrupted = noise + pure wave
  corrupted.start_time = 0
  whiten = get_whiten (corrupted) # called whitening function here
  _, time = SNR_MF(signal, whiten) # finds the peak of pure wave in the corrupted signal
  # print(time)
  data = whiten.time_slice(time-0.3, time+0.2) # slices 0.5s around the peak
  data.start_time = 0
  data = data/np.std(data) # normalizing the data 
  return data

# Function for making uniform distrbution of various parameters for generation of waves
def DISTRIBUTIONS(low, high, samples): # specify samples == number of waves to generate
    var_dist = distributions.Uniform(var = (low, high)) # uniform distrubtion betwween lower & higher limits
    return var_dist.rvs(size = samples)

# same function as above but some extra parameters required
def SPIN_DISTRIBUTIONS(samples):
    theta_low = 0.
    theta_high = 1.
    phi_low = 0.
    phi_high = 2.
    uniform_solid_angle_distribution = distributions.UniformSolidAngle(polar_bounds=(theta_low,theta_high),
                                              azimuthal_bounds=(phi_low,phi_high))
    solid_angle_samples = uniform_solid_angle_distribution.rvs(size=samples)
    spin_mag = np.ndarray(shape=(samples), dtype=float)
    for i in range(0,samples):
        spin_mag[i] = 1.
    spinx, spiny, spinz = co.spherical_to_cartesian(spin_mag,solid_angle_samples['phi'],solid_angle_samples['theta'])
    return spinz

# FUNCTION to generate parameters for wave generation
# samples= number of simulated wave to be generated
def get_params(samples):
    mass1_samples = DISTRIBUTIONS(10, 81, samples)
    mass2_samples = DISTRIBUTIONS(10, 81, samples)
    right_ascension_samples  = DISTRIBUTIONS(0 , 2*math.pi, samples)
    polarization_samples = DISTRIBUTIONS(0 , 2*math.pi, samples)
    declination_samples = DISTRIBUTIONS((-math.pi/2)+0.0001, (math.pi/2)-0.0001, samples)
    spinz1 = SPIN_DISTRIBUTIONS(samples)
    spinz2 = SPIN_DISTRIBUTIONS(samples)
    snr_req = snr_REQ
    print(snr_req)
    # snr_req = np.int64(snr_req)
    DIST = DISTRIBUTIONS(2500, 3000, samples)
    return mass1_samples, mass2_samples, right_ascension_samples, polarization_samples, declination_samples, spinz1, spinz2, snr_req , DIST



In [None]:
samples = 1700 #number of samples
pure_training_DATA = np.zeros((samples, 512)) # sample and length of sample in timesteps (freq * seconds)
# pure = only pure gravitational wave
# noisy = pure+noise
noisy_training_DATA =  np.zeros((samples, 512))

#initialises the global variables everytime because samples are generated in batches, so that there is no RAM overload
def initialise():
  global pure_training_DATA
  global noisy_training_DATA 
  pure_training_DATA = np.zeros((samples, 512))
  noisy_training_DATA  =  np.zeros((samples, 512))

# batches are generated and saves
# .npy is the extension used
def save():
  noisy_dir = '/content/noisy_test_DATA_mix_1' # write the file name in which you need to put the data
  np.save(noisy_dir , noisy_training_DATA )
  pure_dir = '/content/pure_test_DATA_mix_1' # write the file name in which you need to put the data
  np.save(pure_dir , pure_training_DATA)

In [None]:
# FUNCTION TO GENERATE THE SAMPLES
# TO UNDERSTAND THE INBUILT FUNCTION USED CHECK DOCUMENTATION
from astropy.coordinates.distances import Distance
def DATA_GENERATION(samples):

  initialise()
  mass1_samples, mass2_samples, right_ascension_samples, polarization_samples, declination_samples, spinz1, spinz2, snr_req, DIST = get_params(samples)

  for i in range(0,samples):
        seed =  random.randint(1, 256)
        # NOTE: Inclination runs from 0 to pi, with poles at 0 and pi
        #       coa_phase runs from 0 to 2 pi.
        hp, hc = get_td_waveform(approximant=apx,mass1=mass1_samples[i][0],mass2=mass2_samples[i][0],
                                  spin1z=spinz1[i], spin2z=spinz2[i],
                                  delta_t=delta_t, f_lower=30,  distance = DIST[i][0])

        signal_l1 = det_l1.project_wave(hp, hc,  right_ascension_samples[i][0], declination_samples[i][0], polarization_samples[i][0])
        signal_l1 = pure_signal_processing(signal_l1)
      
        

        # pure_training_DATA[i] = signal_final
        
        noise=get_noise(psd)
        whiten = get_whiten(noise)
        whiten = whiten/np.std(whiten)


        snr,time = SNR_MF(signal_l1, noise)
        # print(time)
        signal_l1 = (signal_l1/snr) * snr_req[i]
        

        signal_l1.start_time = 0
        whiten.start_time = 0
        signal_l1 = get_025s(signal_l1)
        whiten_noise = whiten.time_slice(3,3+0.25)

        signal_l1.start_time = 0
        whiten_noise.start_time = 0
        signal_final = signal_l1* (10**22)
        data_final = add_noise_signal(whiten_noise, signal_final) 

        # pylab.figure()
        # pylab.plot(data_final.sample_times, data_final)
        # pylab.plot(signal_final.sample_times, signal_final)
        # pylab.legend(('Noisy Signal', 'Pure Signal'))


        # print(len(signal_final))
        # print(len(data_final))
        pure_training_DATA[i] = signal_final
        noisy_training_DATA[i] = data_final

  save()
  print("done")




In [None]:
DATA_GENERATION(samples) #calling the above function here

[0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,

In [None]:
# import the drive to store the samples
from google.colab import drive
drive.mount('/content/gdrive',force_remount=True)

Mounted at /content/gdrive


In [None]:

!cp noisy_test_DATA_mix_1.npy '/content/gdrive/My Drive/Anagh_AE_Data_Share/'
!cp pure_test_DATA_mix_1.npy '/content/gdrive/My Drive/Anagh_AE_Data_Share/'
!ls -lt '/content/gdrive/My Drive/Anagh_AE_Data_Share/' 

total 85883
-rw------- 1 root root 6963328 Apr  5 09:18 noisy_test_DATA_mix_1.npy
-rw------- 1 root root 6963328 Apr  5 09:18 pure_test_DATA_mix_1.npy
-rw------- 1 root root  139392 Apr  2 07:39 noisy_test_DATA_mix.npy
-rw------- 1 root root  139392 Apr  2 07:39 pure_test_DATA_mix.npy
-rw------- 1 root root  819328 Apr  1 07:01 noisy_test_DATA_5.npy
-rw------- 1 root root  819328 Apr  1 07:01 pure_test_DATA_5.npy
-rw------- 1 root root  819328 Mar 31 10:18 noisy_test_DATA_4.npy
-rw------- 1 root root  819328 Mar 31 10:18 pure_test_DATA_4.npy
-rw------- 1 root root  819328 Mar 31 10:17 noisy_test_DATA_3.npy
-rw------- 1 root root  819328 Mar 31 10:17 pure_test_DATA_3.npy
-rw------- 1 root root  819328 Mar 31 10:16 noisy_test_DATA_2.npy
-rw------- 1 root root  819328 Mar 31 10:16 pure_test_DATA_2.npy
-rw------- 1 root root  819328 Mar 31 10:15 noisy_test_DATA_1.npy
-rw------- 1 root root  819328 Mar 31 10:15 pure_test_DATA_1.npy
-rw------- 1 root root 8192128 Feb 24 14:13 pure_training_D

In [None]:
# FUNCTION HERE IS USED TO GENERATE ONLY PURE NOISE WAVE WHICH HAVE NO SIGNAL IN THEM
# AS PER THE PROJECT OF BINARY CLASSIFICATION
# 0 - PURE NOISE WAVES
# 1 - NOISE WAVES WHICH HAVE SIGNALS IN THEM

negative_samples = 2560
negative_training_DATA = np.zeros((negative_samples, 512))

def negative_save():
  negative_dir = '/content/negative_training_DATA' # write the file name in which you need to put the data
  np.save(negative_dir , negative_training_DATA )


In [None]:
def NOISE_GENERATOR(samples):
  for i in range(samples):
    seed =  random.randint(1, 1000)
    noise=get_noise(psd, seed)
    # pylab.plot(noise.sample_times, noise)
    whiten = get_whiten (noise)
    noise_signal = whiten.time_slice(3,3+0.25)
    noise_signal.start_time = 0
    noise_signal = noise_signal/ np.std(noise_signal)
    # pylab.figure()
    # pylab.plot(noise_signal.sample_times, noise_signal)
    # pylab.show()
    negative_training_DATA[i] = noise_signal
  negative_save()
    

In [None]:
NOISE_GENERATOR(negative_samples)

In [None]:
# BY UNDERSTANDING THE CODE YOU CAN GENERATE ANY TYPE OF WAVE OF ANY LENGTH AND FREQUENCY
# SHOULD WORK FOR BOTH CLASSIFICATION AND DENOISING WAVE SAMPLES GENERATION