# Injection Testing

Injection testing is a process of testing a signal detection algorithm by simulating what the signal would look like in context and then running the algorithm on it to determine if it is able to recover the signal and how accurately it can characterize it. ALIAS is a great candidate for this style of testing since it is trying to detect signals that are fairly easy to impliment.

In [1]:
import alias

import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import scipy.signal

import numpy as np
from astropy.io import fits

import random as rand

import tqdm.autonotebook as tqdm

  import tqdm.autonotebook as tqdm


In [2]:
plt.rcParams.update({'font.size': 10})

In [3]:
with open('../data/sample_star_urls', 'r') as f:
    urls = f.read().splitlines()

files = [ '../data/spectra/%s' %url.split('/')[-1] for url in urls ]

ds = alias.loadDataset(files)

AttributeError: module 'alias' has no attribute 'loadDataset'

In [None]:
ds = alias.continuum_normalize(ds)

In [None]:
# Plot sample laser signature

lsf = alias.default_lsf

test_laser_signature = np.interp(np.array(range(len(ds.wave)))-4500, lsf.x, lsf.y)

plt.figure(figsize=(12,4))

plt.plot(ds.wave, test_laser_signature)

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.tight_layout()

In [None]:
# Zoom in on signature

plt.rcParams.update({'font.size': 10})

plt.figure(figsize=(6,4))

plt.plot(ds.wave, test_laser_signature)

plt.xlim(16060, 16080)

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.tight_layout()

In [None]:
# Inject the signature


flux_injected = ds.flux[101] + 0.3 * np.nanmedian(ds.flux[101]) * test_laser_signature
ivar_injected = ds.ivar[101]

fig = plt.figure(figsize=(6,5))

gs = gridspec.GridSpec(2, 1, hspace=0.2)

ax1 = fig.add_subplot(gs[0])

ax1.plot(ds.wave, flux_injected)

ax1.plot([ds.wave[4500],]*2, (0,2), ls='dotted')

ax1.set_ylim(0.4, 1.4)

ax1.set_ylabel(r'Relative Flux')



ax2 = fig.add_subplot(gs[1])

ax2.plot(ds.wave, flux_injected, '.-')

ax2.plot([ds.wave[4500],]*2, (0,2), ls='dotted')

ax2.set_ylim(0.4, 1.4)
ax2.set_xlim(ds.wave[4500]-1.5, ds.wave[4500]+1.5)

ax2.set_xlabel(r'$\lambda$ ($\AA$)')
ax2.set_ylabel(r'Relative Flux')


In [None]:
# Now start the detection process

# Continuum normalize the spectrum


plt.figure(figsize=(12,4))

plt.plot(ds.wave, flux_injected)

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.tight_layout()

In [None]:
plt.plot(ds.wave, flux_injected)

plt.xlim(16060, 16080)

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.tight_layout()

In [None]:
peaks = scipy.signal.find_peaks(flux_injected, height = 1.1)[0]

recovered_wavelength = ds.wave[peaks[0]]
recovered_flux = flux_injected[peaks[0]] - 1

print('Recovered Wavelength: %s' %recovered_wavelength)
print('Recovered Amplitude: %s' %recovered_flux)

In [None]:
print('Delta lambda: %s' %(recovered_wavelength - ds.wave[4500]))
print('Delta flux: %s' %(recovered_flux - 0.3))

In [None]:
# Implement Injection Testing Framework

def create_laser_signature(wave, lsf, idx):
    line = np.interp(np.array(range(len(wave)))-idx, lsf.x, lsf.y)
    return line

def injection_test(ds, lsf, detector, count, min_amp, max_amp):
    results = []

    for i in tqdm.trange(count):
        
        spec = rand.randrange(len(ds.flux))
        valid_idx = np.nonzero(~np.isnan(ds.flux[spec]))[0]
        idx_int = np.random.choice(valid_idx)
        idx = idx_int + np.random.uniform(-0.5, 0.5)
        wave = np.interp(idx, range(len(ds.wave)), ds.wave)
        amp = np.random.uniform(min_amp, max_amp)

        nflux = np.copy(ds.flux[spec])
        nflux += create_laser_signature(ds.wave, lsf, idx)*amp
        
        detections = detector(ds.wave, nflux, ds.ivar[spec])

        if len(detections) == 0:
            results.append((spec, idx, amp, 0, 0, 0, 0))
            continue

        detection_wavelengths = detections[:,0]
        detection_amplitudes = detections[:,1]

        detec_id = np.argmin(np.abs(detection_wavelengths - wave))
        delta_wave = detection_wavelengths[detec_id] - wave

        if np.abs(delta_wave) > 1:
            results.append((spec, idx, amp, 0, 0, 0, len(detections)))
            continue
        
        delta_flux = detection_amplitudes[detec_id] - amp
        results.append((spec, idx, amp, 1, delta_wave, delta_flux, len(detections) - 1))

    return np.array(results, dtype=float)

# Result structure: Spectrum id, injected wavelength, injected flux, detected?, delta wavelength, delta flux, num other detections
        

In [None]:
# Run detector

norm_flux_masked = np.ma.filled(np.ma.MaskedArray(ds.flux, mask=ds.flux<0.01), np.nan)
median_flux = np.nanmedian(norm_flux_masked, axis=0)

print(median_flux)

In [None]:
plt.figure(figsize=(6,3))

plt.plot(ds.wave, median_flux, label='Median Continuum-Normalized Spectrum')

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.legend()

plt.tight_layout()

In [None]:
n = 5

f_residual = alias.continuum_normalize(alias.Dataset(ds.wave, (ds.flux[n],), (ds.ivar[n],))).flux[0] - median_flux

plt.figure(figsize=(6,3))

plt.plot(ds.wave, f_residual, label='Median Continuum-Normalized Spectrum')
plt.ylim(-0.4, 0.4)

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

plt.tight_layout()

In [None]:
def test_detector(wave, flux, ivar):
        
    flux_norm_res = flux - median_flux
    peaks = scipy.signal.find_peaks(flux_norm_res, height = 0.05)[0]
    wavelengths = wave[peaks]
    amplitudes = flux_norm_res[peaks]
    return np.array((wavelengths, amplitudes)).T

In [None]:
features = test_detector(ds.wave, ds.flux[0], ds.ivar[0])

plt.figure(figsize=(6,3))

plt.plot(ds.wave, ds.flux[0])
plt.plot(features[:,0], ds.flux[0][np.array(np.interp(features[:,0], ds.wave, range(len(ds.wave))), dtype=int)], '.', color='red')

plt.xlabel(r'$\lambda$ ($\AA$)')
plt.ylabel(r'Relative Flux')

_ = plt.tight_layout()

In [None]:
results = injection_test(ds, alias.default_lsf, test_detector, 5000, 0.01, 0.5)

In [None]:
# Now interpret results

print('Efficiency: %s%%' %(100*sum(results[:,3])/len(results)))
print('Other Detections: %s/spectrum' %np.mean(results[:,6]))

In [None]:
def show_result(ds, lsf, detector, result):    
    plt.figure(figsize=(12,4))
    spec = int(result[0])
    flux_inj = ds.flux[spec] + create_laser_signature(ds.wave, lsf, result[1]) * result[2]
    wave = np.interp(result[1], range(len(ds.wave)), ds.wave)

    detections = detector(ds.wave, flux_inj, ds.ivar[spec])
    
    plt.plot(ds.wave, flux_inj, label='Injected Spectrum')
    plt.plot(ds.wave, ds.flux[spec], label='Original Spectrum')
    plt.scatter(detections[:,0], detections[:,1] + 1)

    plt.xlim(wave - 15, wave + 15)
        
    plt.xlabel(r'$\lambda$ ($\AA$)')
    plt.ylabel(r'Relative Flux')
    
    plt.tight_layout()

In [None]:
show_result(ds, alias.default_lsf, test_detector, results[47])

In [None]:
_ = plt.hist(results[results[:,3] > 0.5,4], bins=30)

plt.xlabel('$\Delta\lambda$ ($\AA$)')
plt.ylabel('Number of Detections')

In [None]:
import matplotlib.gridspec as gridspec

fig = plt.figure()

gs = gridspec.GridSpec(1, 2, wspace=0)

ax1 = fig.add_subplot(gs[0])
ax2 = fig.add_subplot(gs[1])

ax1.hist(results[results[:,3] > 0.5,4], bins=60)

ax1.set_ylim(0, 1300)
ax1.set_xlim(-1.2, 1.2)

ax1.set_xlabel('$\Delta\lambda$ ($\AA$)')
ax1.set_ylabel('Number of Detections')

ax2.hist(results[results[:,3] > 0.5,5], bins=60)
ax2.set_ylim(0, 1300)
ax2.set_yticks([])

ax2.set_xlabel('$\Delta Flux$')

fig.set_figwidth(6)
fig.set_figheight(3)



In [None]:
_ = plt.hist(results[results[:,3] > 0.5,5], bins=30)

plt.xlabel('$\Delta$Flux')
plt.ylabel('Number of Detections')

In [None]:
bins = np.linspace(0, 0.5, 30)
all = np.histogram(results[:,2], bins=bins)[0]
detected = np.histogram(results[results[:,3] > 0.5,2], bins=bins)[0]

plt.figure(figsize=(6,4))

plt.plot(bins[:-1], 100*detected/all)

plt.xlabel('LASER Flux (Fraction of Star Flux)')
_ = plt.ylabel(r'Efficiency (%)')

In [None]:
plt.figure(figsize=(12,4))
bins = np.linspace(np.min(ds.wave), np.max(ds.wave), 300)
all = np.histogram(ds.wave[np.array(results[:,1], dtype=int)], bins=bins)[0]
detected = np.histogram(ds.wave[np.array(results[results[:,3] > 0.5,1], dtype=int)], bins=bins)[0]

plt.plot(bins[:-1], detected/all)

plt.xlabel('$\lambda$ ($\AA$)')
plt.ylabel(r'Efficiency (%)')

In [None]:
plt.figure(figsize=(12,4))
plt.hist2d(ds.wave[np.array(results[results[:,3] > 0.5,1],dtype=int)], results[results[:,3] > 0.5,4], bins=(80, 40))
plt.xlabel('$\lambda$ ($\AA$)')
plt.ylabel('$\Delta\lambda$ ($\AA$)')

In [None]:
plt.figure(figsize=(12,4))
plt.scatter(ds.wave[np.array(results[results[:,3] > 0.5,1],dtype=int)], results[results[:,3] > 0.5,4], s=3)
plt.xlabel('$\lambda$ ($\AA$)')
plt.ylabel('$\Delta\lambda$ ($\AA$)')