# Initialization and Function Definitions

In [None]:
import os
import sys

import glob

from prfi_gen.rfi_generator import Pulse
from prfi_gen.utils import get_noised_and_normalized_array

import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

from tqdm.notebook import trange, tqdm 

from skimage.transform import resize

In [54]:
def downsample_with_skimage(image, kernel):
    """
    Downsamples the given image using the specified kernel and applies anti-aliasing.

    Parameters:
    image (ndarray): Input image to be downsampled.
    kernel (tuple): Desired output dimensions (height, width).

    Returns:
    ndarray: Downsampled image with anti-aliasing applied.
    """
    return resize(image, kernel, anti_aliasing=True)


def dedispersion(array, delays_list):
    """
    Applies dedispersion to the given array based on the specified delays_list.

    Parameters:
    array (list of ndarrays): A list where each element is a one-dimensional array, representing a line of data.
    delays_list (list of ints): A list where each element is an integer, representing the number of points by which to roll/shift the corresponding line in the array.

    Returns:
    list of ndarrays: A list of dedispersed arrays.
    """
    return [np.roll(line, points) for line, points in zip(array, delays_list)]


def get_SNR(pulse):
    """
    Calculates the Signal-to-Noise Ratio (SNR) for the given pulse.

    Parameters:
    pulse (ndarray): One-dimensional array representing the pulse for which SNR is to be calculated.

    Returns:
    float: The calculated SNR, rounded to 1 decimal place.
    """
    shift = np.argmax(pulse) - 20 if np.argmax(pulse) - 20 < 0 else 20 - np.argmax(pulse)  
    pulse = np.roll(pulse, shift)
    
    length = 125
    ampl_of_max = []
    bins_wights = range(1, 50)

    for wight in bins_wights:

        r = np.int32(np.round(wight / 2))
        epro = np.zeros(length * 2)
        epro[0:wight] = np.ones(wight) + epro[0:wight]
        p = np.convolve(1 / np.ones(wight), pulse, mode='full')[r:r + length * 2]
        n = p[125:wight + length * 2].std()

        ampl_of_max.append(np.max(p / n))
    
    return np.round(np.max(ampl_of_max), 1)

def generate_mean_pulse(array):
    """
    Generates the mean pulse from the given array by summing over the axis and subtracting the median of the resulting graph.

    Parameters:
    array (ndarray): Input multidimensional array from which the mean pulse is to be generated.

    Returns:
    ndarray: One-dimensional array representing the generated mean pulse.
    """
    graph = np.sum(array, axis=0)
    graph -= np.median(graph)
    
    return graph

# Setting Paths

In [28]:
PATH_MODELS = '../models/'

In [None]:
models = glob.glob(f'{PATH_MODELS}*.keras')
models

# Examples Spectrograms with Varied DM

Spectrograms with varied Dispersion Measure (DM) values are generated and visualized to examine how different DM values manifest in the spectrograms.

In [None]:
f_lo, f_hi = 1.21, 1.53
n_channels, tresol = 256, 0.0001024

plt.figure(figsize=(10,10))

dm_list = np.linspace(1, 150, 25)

for i, dm in enumerate(dm_list):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(dm, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, 2, 10) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')
    plt.title(np.round(dm, 1))
    plt.imshow(spectrogramm[:,:256], aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

# Examples with Random Location of Spectrogram

Explanation: Spectrograms at random locations are generated to provide insights into the variations and distribution of pulse signals across different segments of the spectrogram.

In [None]:
plt.figure(figsize=(10,10))
dm = 25

temp_loc_list = []

for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(dm, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, 2, 10) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')
    rand_loc = np.random.randint(0, spectrogramm.shape[1]-256)
    temp_loc_list.append(rand_loc)
    plt.title(np.round(rand_loc))
    plt.imshow(spectrogramm[:,rand_loc:rand_loc+256], aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

# Spectrogram Noise Addition and Normalization

Noise is added to spectrograms and visualized to simulate real-world conditions where received signals often contain noise, which is crucial for evaluating model robustness.

In [None]:
plt.figure(figsize=(10,10))
dm = 25

for i, loc in enumerate(temp_loc_list):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(dm, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, 2, 10) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')
    plt.title(loc)
    img = get_noised_and_normalized_array(spectrogramm[:,loc:loc+256], 0.2)
    plt.imshow(img, aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

# DM Variation and Model Evaluation

Models are evaluated over a specified range of DM, documenting how the models perform under varied DM conditions, and the results are saved for further analysis.

In [None]:
dm_list = range(5, 501, 5)

num_spects = 50
labels = np.full(num_spects, 1)
dm_list

In [None]:
for modelname in tqdm(models, desc='Models loop'):
    name_part = os.path.basename(modelname).split('-')[0]
    loaded_model = tf.keras.models.load_model(modelname)

    acuracy_list = []

    for dm in tqdm(dm_list, desc='DM loop'):
    
        downsampled_array = np.empty((num_spects, 128, 128))      
        for i in trange(num_spects, leave=False):
            pulse = Pulse(dm, f_hi, f_lo, n_channels, tresol)
            pulse.generate_spectrogram(1, 2, 10) # amplitude, sigma, location
            spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')
 
            rand_loc = np.random.randint(0, spectrogramm.shape[1]-256)
            noised_pulse = get_noised_and_normalized_array(spectrogramm[:,rand_loc:rand_loc+256], 0.2)
            downsampled_array[i] = downsample_with_skimage(noised_pulse, kernel=(128, 128))
 
        downsampled_array = downsampled_array[..., tf.newaxis]
 
        acuracy_list.append(loaded_model.evaluate(downsampled_array,  labels, verbose=0)[1])

    with open(f'../files/Pulse_DM_sensetivity_for_{name_part}.csv', 'w') as file:
        file.write('DM,ACR\n')
                         
        for dm, acr in zip(dm_list, acuracy_list):
            file.write(f'{dm},{acr}\n')
    

# Model Accuracy Visualization by DM

The model accuracy concerning different DMs is visualized, aiding in understanding how models’ performance varies with different Dispersion Measures.

In [None]:
directory = "../files/"

# Create a single plot
plt.figure(figsize=(10,6))

for file in sorted(glob.glob(f'{directory}Pulse_DM_sensetivity_for_*.csv')):
    df = pd.read_csv(file)

    # Extracting name_part from filename for the legend
    name_part = os.path.basename(file).split("Pulse_DM_sensetivity_for_")[1].split(".csv")[0]

    # Plotting on the single plot
    plt.scatter(df['DM'], df['ACR'], label=name_part, s=15)
    plt.plot(df['DM'], df['ACR'], ls=':')

# Setting labels, title and legend
plt.ylabel('Accuracy of the model')
plt.xlabel('DM')
plt.title('Accuracy by DM for Different Models')    
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.grid(True, ls='--')

# Adjusting layout and displaying the plot
plt.tight_layout()
plt.show()

# SNR Calculation and Visualization

The Signal to Noise Ratio (SNR) is visualized for various amplitudes and the generated pulse is plotted, enabling a clearer understanding and analysis of the SNR in the received signals.

In [None]:
DM = 56.7
f_lo, f_hi = 1.21, 1.53
n_channels, tresol = 256, 0.0001024

f_list = np.linspace(f_hi, f_lo, n_channels)
delay_list = Pulse.delays_DM_list(f_list, DM)

delays_list_point = [int(round(i, 0)) for i in map(lambda x:x / tresol / 1000, delay_list)]

pulse = Pulse(DM, f_hi, f_lo, n_channels, tresol)
pulse.generate_spectrogram(1, 2, 10) # amplitude, sigma, location
spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')

rand_loc = np.random.randint(160, 486)
plt.figure(figsize=(10,10))
first_amp = 1.5
step_amp = 0.06
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    
    img = get_noised_and_normalized_array(spectrogramm[:,rand_loc:rand_loc+256], first_amp)
    plt.imshow(img, cmap='gray')
    dd_array = dedispersion(img, delays_list_point)
    graph = generate_mean_pulse(dd_array)
    plt.xlabel(f'{get_SNR(graph)}')
    
    first_amp -= step_amp
plt.tight_layout()    
plt.show()

In [None]:
plt.figure(figsize=(10,10))
first_amp = 1.5
step_amp = 0.06
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    
    img = get_noised_and_normalized_array(spectrogramm[:,rand_loc:rand_loc+256], first_amp)
    dd_array = dedispersion(img, delays_list_point)
    graph = generate_mean_pulse(dd_array)
    plt.title(f'{get_SNR(graph)}')
    plt.plot(graph / graph.max(), color='black')
    #plt.xlim(0, 125)
    
    first_amp -= step_amp
plt.tight_layout()    
plt.show()

# Noise Amplitude Variation and Model Evaluation

The model is evaluated for accuracy with varied noise amplitudes in spectrograms, assessing the resilience and accuracy of the model under different noise conditions.

In [None]:
num_spects = 50
labels = np.full(num_spects, 1)
noise_amp_list = np.round(np.linspace(0.05, 1.5, 50), 2)
noise_amp_list

In [None]:
for modelname in tqdm(models, desc='Models loop'):
    name_part = os.path.basename(modelname).split('-')[0]
    loaded_model = tf.keras.models.load_model(modelname)
    snr_array = []
    acuracy_list = []

    for n_a in tqdm(noise_amp_list, desc='Noise amplitude loop'):
        temp = []
        downsampled_array = np.empty((num_spects, 128, 128))      
        
        for i in trange(num_spects, leave=False):
            rand_loc = np.random.randint(160, 486)
            noised_pulse = get_noised_and_normalized_array(spectrogramm[:,rand_loc:rand_loc+256], n_a)
            
            spctr = dedispersion(noised_pulse, delays_list_point)
            graph = generate_mean_pulse(spctr)
            temp.append(get_SNR(graph))
            
            downsampled_array[i] = downsample_with_skimage(noised_pulse, kernel=(128, 128))
 
        downsampled_array = downsampled_array[..., tf.newaxis]
        snr_array.append(np.round(np.mean(temp), 1))
        acuracy_list.append(loaded_model.evaluate(downsampled_array,  labels, verbose=0)[1])

    with open(f'../files/Pulse_SNR_sensetivity_for_{name_part}.csv', 'w') as file:
        file.write('SNR,ACR\n')

        for snr, acr in zip(snr_array, acuracy_list):
            file.write(f'{snr},{acr}\n')

# Model Accuracy Visualization by SNR

The accuracies of models with varied Signal to Noise Ratio (SNR) are visualized, presenting a clear picture of model reliability and performance under different signal strengths.

In [None]:
plt.clf()
fig, axs = plt.subplots(1, 2, figsize=(10,5))

# Direct to the correct directory
directory = "../files/"

for file in sorted(glob.glob(f'{directory}Pulse_SNR_sensetivity_for_*.csv')):
    df = pd.read_csv(file)

    # Extracting name_part from filename for the legend
    name_part = os.path.basename(file).split("Pulse_SNR_sensetivity_for_")[1].split(".csv")[0]

    # Plotting on the first subplot
    axs[0].scatter(df['SNR'], df['ACR'], label=name_part, s=15)
    axs[0].plot(df['SNR'], df['ACR'], ls=':')

    # Plotting on the second subplot (zoomed)
    axs[1].scatter(df['SNR'], df['ACR'], label=name_part, s=15)
    axs[1].plot(df['SNR'], df['ACR'], ls=':')
    axs[1].set_xlim(8, 18)  # Adjust as necessary for your desired zoom level

# Setting labels and titles
axs[0].set_ylabel('Accuracy of the model')
axs[0].set_xlabel('SNR')
axs[0].set_title('Full View')
axs[0].grid(True)

# axs[1].set_xscale('log') 
axs[1].set_xlabel('SNR')
axs[1].set_title('Zoomed View (8 < SNR < 18)')
axs[1].grid(True)

# Adjusting layout and displaying the legend
plt.tight_layout()
axs[1].legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.show()


# Pulse Width Variation and Visualization

Spectrograms with varied pulse widths are generated and visualized, allowing for an analysis of how pulse width impacts the visual representation of signals in the spectrogram.

In [None]:
DM = 56.7
f_lo, f_hi = 1.21, 1.53
n_channels, tresol = 256, 0.0001024

plt.figure(figsize=(10,10))

width_list = range(1, 100, 5)

for i, width in enumerate(width_list):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(DM, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, width, 2*width) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 150, 'both')
    plt.title(np.round(width, 1))
    plt.imshow(spectrogramm[:,256:512], aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

In [None]:
plt.figure(figsize=(10,10))

temp_loc_list = []

for i, width in enumerate(width_list):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(DM, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, width, 2*width) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 50, 'both')
    rand_loc = np.random.randint(0, spectrogramm.shape[1]-256)
    temp_loc_list.append(rand_loc)
    plt.title(np.round(rand_loc))
    plt.imshow(spectrogramm[:,rand_loc:rand_loc+256], aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

In [None]:
plt.figure(figsize=(10,10))

for i, loc, width in zip(range(25), temp_loc_list, width_list):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    pulse = Pulse(DM, f_hi, f_lo, n_channels, tresol)
    pulse.generate_spectrogram(1, width, 2*width) # amplitude, sigma, location
    spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 50, 'both')
    plt.title(loc)
    img = get_noised_and_normalized_array(spectrogramm[:,loc:loc+256], 0.1)
    plt.imshow(img, aspect='auto', cmap='gray')
    
plt.tight_layout()    
plt.show()

# Model Evaluation with Varied Pulse Width

The accuracy of models is evaluated with varied pulse widths, analyzing the adaptability of models to different pulse conditions.

In [None]:
num_spects = 50
labels = np.full(num_spects, 1)
width_list = range(1, 101)
width_list

In [None]:
for modelname in tqdm(models, desc='Models loop'):
    name_part = os.path.basename(modelname).split('-')[0]
    loaded_model = tf.keras.models.load_model(modelname)

    acuracy_list = []

    for width in tqdm(width_list, desc='Width loop'):
    
        downsampled_array = np.empty((num_spects, 128, 128))      
        
        for i in trange(num_spects, leave=False):
            pulse = Pulse(DM, f_hi, f_lo, n_channels, tresol)
            pulse.generate_spectrogram(1, width, 2*width) # amplitude, sigma, location
            spectrogramm = pulse.add_zeros(pulse.compresed_pulse, 50, 'both')
 
            rand_loc = np.random.randint(0, spectrogramm.shape[1]-256)
            noised_pulse = get_noised_and_normalized_array(spectrogramm[:,rand_loc:rand_loc+256], 0.2)
            downsampled_array[i] = downsample_with_skimage(noised_pulse, kernel=(128, 128))
 
        downsampled_array = downsampled_array[..., tf.newaxis]
 
        acuracy_list.append(loaded_model.evaluate(downsampled_array,  labels, verbose=0)[1])

    with open(f'../files/Pulse_width_sensetivity_for_{name_part}.csv', 'w') as file:
        file.write('Width,ACR\n')

        for width, acr in zip(width_list, acuracy_list):
            file.write(f'{width},{acr}\n')

# Model Accuracy Visualization by Pulse Width

The visualization represents the accuracies of different models with varied pulse widths, providing insights into the models' sensitivity and adaptability to pulse width variations.

In [None]:
directory = "../files/"

# Create a single plot
plt.figure(figsize=(10,6))

for file in sorted(glob.glob(f'{directory}Pulse_width_sensetivity_for_*.csv')):
    df = pd.read_csv(file)

    # Extracting name_part from filename for the legend
    name_part = os.path.basename(file).split("Pulse_width_sensetivity_for_")[1].split(".csv")[0]

    # Plotting on the single plot
    plt.scatter(df['Width'], df['ACR'], label=name_part, s=15)
    plt.plot(df['Width'], df['ACR'], ls=':')

# Setting labels, title and legend
plt.ylabel('Accuracy of the model')
plt.xlabel('Width')
plt.title('Accuracy by Width for Different Models')
plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
plt.grid(True, ls='--')

# Adjusting layout and displaying the plot
plt.tight_layout()
plt.show()