In [69]:
from astropy.io import fits
import numpy as np
import scipy.stats
from scipy.stats import norm
from astropy.coordinates import Angle
import astropy.units as u
import matplotlib.pyplot as plt

In [70]:
def fits_data_index(fits_file: str):
    '''Given a FITS file, return the index of the file where the data array is'''

    file_index = 0

    #open FITS file
    try:
        file = fits.open(fits_file)
    except:
        print(f'Unable to open {fits_file}')

    info = file[file_index]
    data = info.data
    while data is None:
        #going through the indices of file to find the array
        try:
            file_index += 1
            info = file[file_index]
            data = info.data
        except:
            print(f'Error in locating data index of {fits_file}')

    return file_index

In [71]:
def region_stats(fits_file: str, center: list = [], radius: list = [], invert: bool = False):
    '''Given a FITS file, list of center coordinates in units of pixels,
    list of radii in units of arcsec (include measurements within this radius of the center),
    and Boolean of whether to invert (if True, becomes exclude instead of include),
    return a dictionary with floats of the maximum flux (in Jy), coordinates of field center (in pixels),
    coordinates of maximum flux (in pixels), rms (in Jy), beam size (in arcsec^2),
    x axis length (in arcsec), and y axis length (in arcsec) in the specified region.
    If no center given, will eventually default to center of ((length of x-axis)/2, (length of y-axis)/2), rounded up.
    '''

    if center != [] and len(center) != len(radius):
        raise IndexError ('Center list and radius list lengths do not match')

    i = fits_data_index(fits_file)

    #open FITS file
    try:
        file = fits.open(fits_file)
    except:
        print(f'Unable to open {fits_file}')

    #extract data array
    info = file[i]
    data = info.data

    #getting dimensions for array
    try:
        dims = data.shape
        x_dim = dims[1]
        y_dim = dims[2]
    except:
        print('Data dimension error')

    x_dist_array = np.tile(np.arange(x_dim),(y_dim, 1)) #array of each pixel's horizontal distance (in pixels) from y-axis
    y_dist_array = x_dist_array.T #array of each pixel's vertical distance (in pixels) from x-axis

    #keep center pixel coordinates if specified, set to default if unspecified
    center_pix = center
    field_center = (round(x_dim/2), round(y_dim/2))
    if center == []:
        center_pix = [field_center]
        if len(radius) > 1:
            center_pix = center_pix * len(radius)

    #find units of axes
    x_unit = info.header['CUNIT1']
    y_unit = info.header['CUNIT2']

    #find cell size (units of arcsec)
    x_cell_size = (Angle(info.header['CDELT1'], x_unit)).to(u.arcsec)
    y_cell_size = (Angle(info.header['CDELT2'], y_unit)).to(u.arcsec)
    y_cell_size.to(u.arcsec)

    #find major axis (units of arcsec), minor axis (units of arcsec), beam size (units of arcsec^2)
    beam_size = ((np.pi/4) * info.header['BMAJ'] * info.header['BMIN'] * Angle(1, x_unit) * Angle(1, y_unit) / np.log(2)).to(u.arcsec**2)

    #find axis sizes
    x_axis_size = info.header['NAXIS1'] * x_cell_size
    y_axis_size = info.header['NAXIS2'] * y_cell_size

    #distance from center array
    dist_from_center =((((x_dist_array - center_pix[0][0])*x_cell_size)**2 + ((y_dist_array - center_pix[0][1])*y_cell_size)**2)**0.5)

    #boolean mask and apply
    mask = (dist_from_center <= radius[0] * u.arcsec)
    if len(center) > 1:
        for j in range(1, len(center)):
            dist_from_center = ((((x_dist_array - center_pix[j][0])*x_cell_size)**2 + ((y_dist_array - center_pix[j][1])*y_cell_size)**2)**0.5)
            mask = np.logical_or(mask, (dist_from_center <= radius[j] * u.arcsec))

    if invert:
        mask = np.logical_not(mask)

    masked_data = data[0][mask]

    #get peak, rms, beam_size values
    try:
        peak = float(max(masked_data))
    except ValueError:
        print('No values after mask applied. Check inclusion and exclusion radii.')

    #find coordinates of peak
    peak_pix = np.where(data[0] == peak)
    x = peak_pix[1][0]
    y = peak_pix[0][0]
    peak_coord = (int(x_dist_array[0][x]), int(y_dist_array[y][0]))

    rms = float((np.var(masked_data))**0.5)

    stats = {'peak': peak, 'field_center': field_center, 'peak_coord': peak_coord, 'rms': rms, 'beam_size': float(beam_size / (u.arcsec**2)),\
              'x_axis': float(x_axis_size / u.arcsec), 'y_axis': float(y_axis_size / u.arcsec)}

    return stats

In [72]:
def incl_excl_data(fits_file: str, center: list = []):
    '''Given a FITS file and (optional) list of tuples of center coordinates in units of arcsec,
    return a dictionary with the field center coordinates as tuple, peak flux value of the inclusion area, coordinates of this peak as tuple,
    peak flux value of the exclusion area, coordinates of this peak as tuple, rms value of the exclusion area,
    number of measurements in the inclusion area, and number of measurements in the exclusion area of the specified circle.
    '''

    i = fits_data_index(fits_file)

    #open FITS file
    try:
        file = fits.open(fits_file)
    except:
        print(f'Unable to open {fits_file}')

    #extract data array
    info = file[i]

    #get radius, inclusion, exclusion lists for interior and exterior
    radius = [float((info.header['BMAJ'] * (Angle(1, info.header['CUNIT1'])).to(u.arcsec) / u.arcsec) + 5)] #major axis + 5 arcsec
    if len(center) > 1:
        radius = radius + ([radius[0] - 5.0] * (len(center) - 1))

    #get info on inclusion and exclusion regions
    int_info = region_stats(fits_file = fits_file, radius = radius, center = center)
    ext_info = region_stats(fits_file = fits_file, radius = radius, center = center, invert=True)

    #getting values for peak, rms, axis lengths, beam size
    info_dict = {}
    info_dict['int_peak_val'] = int_info['peak']
    info_dict['field_center'] = int_info['field_center']
    info_dict['int_peak_coord'] = int_info['peak_coord']
    info_dict['ext_peak_coord'] = ext_info['peak_coord']
    info_dict['ext_peak_val'] = ext_info['peak']
    info_dict['rms_val'] = ext_info['rms']
    x_axis = int_info['x_axis']
    y_axis = int_info['y_axis']
    beam_size = int_info['beam_size']

    #calculating number of measurements in inclusion and exclusion regions
    incl_area = np.pi * (radius[0]**2)
    excl_area = x_axis * y_axis - incl_area
    info_dict['n_incl_meas'] = incl_area / beam_size
    info_dict['n_excl_meas'] = excl_area / beam_size

    return info_dict

In [73]:
def meas_rms_prob(fits_file: str, center: list = []):
    '''Given a FITS file, (optional) list of tuples of center coordinates in units of arcsec,
    and (optional) maximum number of repetitions, return a list of dictionaries.
    The first dictionary contains the probability of the peak to noise ratio of the interior of the specified region
    and the probability of peak to noise ratio of the exterior of the specified region,
    with noise being the measured rms in the exclusion area.
    If the external probability is less than 0.001, there will be subsequent dictionaries
    that contain the probability of the peak to noise ratio of the interior of the specified region
    and the probability of peak to noise ratio of the exterior of the specified region,
    with noise being the measured rms in the exclusion area,
    but the included region is expanded to include the highest excluded peak of the previous excluded region
    and the excluded region now excludes that highest peak.
    '''
    info = incl_excl_data(fits_file, center)

    int_peak = info['int_peak_val']
    ext_peak = info['ext_peak_val']
    rms = info['rms_val']
    n_incl = info['n_incl_meas']
    n_excl = info['n_excl_meas']

    #calculate error for rms
    rms_err = rms * (n_excl)**(-1/2)

    #create normal distributions from rms and error for rms
    uncert = np.linspace(-5 * rms_err, 5 * rms_err, 100)
    uncert_pdf = norm.pdf(uncert, loc = 0, scale = rms_err)

    #sum and normalize to find probabilities
    prob_dict = info
    prob_dict['int_prob'] = float(sum((norm.cdf((-1 * int_peak)/(rms + uncert)) * n_incl) * uncert_pdf) / sum(uncert_pdf))
    prob_dict['ext_prob'] = float(sum((norm.cdf((-1 * ext_peak)/(rms + uncert)) * n_excl) * uncert_pdf) / sum(uncert_pdf))
    prob_dict['int_snr'] = float(int_peak / rms)
    prob_dict['ext_snr'] = float(ext_peak / rms)

    prob_list = [prob_dict]

    if prob_dict['ext_prob'] < 0.001:
        if center == []:
            new_center = [info['field_center'], info['ext_peak_coord']]
        else:
            center.append(info['ext_peak_coord'])
            new_center = center
        new_list = meas_rms_prob(fits_file, new_center)
        prob_list.extend(new_list)

    return prob_list

In [74]:
def calc_rms_prob(fits_file: str, center: list = []):
    '''Given a FITS file, (optional) center coordinates in units of arcsec, and (optional) maximum number of repetitions
    return a dictionary that contains the probability of the peak to noise ratio of the interior of the specified region
    and the probability of peak to noise ratio of the exterior of the specified region,
    with noise being the calculated rms in the exclusion area based on the expected probability of the peak value in the exclusion area.
    '''

    info = incl_excl_data(fits_file, center)
    prob_dict = info

    int_peak_val = info['int_peak_val']
    ext_peak_val = info['ext_peak_val']
    n_incl_meas = info['n_incl_meas']
    n_excl_meas = info['n_excl_meas']

    excl_sigma = -1 * norm.ppf(1/n_excl_meas)
    rms_val = ext_peak_val / excl_sigma

    prob_dict['rms_val'] = float(rms_val)
    prob_dict['int_prob'] = float(norm.cdf((-1 * int_peak_val)/(rms_val))) * n_incl_meas
    prob_dict['ext_prob'] = float(norm.cdf((-1 * ext_peak_val)/(rms_val))) * n_excl_meas
    prob_dict['int_snr'] = float(int_peak_val / rms_val)
    prob_dict['ext_snr'] = float(excl_sigma)

    return prob_dict

In [75]:
def significance(fits_file: str, version: str, threshold: float):
    '''Given a FITS file, version ('meas_rms_prob' or 'calc_rms_prob'), and threshold probability,
    return a list of Booleans (if version = 'meas_rms_prob') or Boolean (if version = 'calc_rms_prob')
    of whether source emission is considered significant for the given threshold.
    Each entry of the list corresponds to the external probabilities using different regions.
    There will be more than one entry in the list if the first external probability is less than 0.001.
    '''
    if not (threshold >= 0 and threshold <= 1):
        raise ValueError("threshold must be between 0 and 1, inclusive")

    if not (version == 'meas_rms_prob' or version == 'calc_rms_prob'):
        raise ValueError("version must be either 'meas_rms_prob' or 'calc_rms_prob'")

    bool_list = []

    if version == 'meas_rms_prob':
        for i in range(len(meas_rms_prob(fits_file))):
            dict = meas_rms_prob(fits_file)[i]
            prob = dict['int_prob']
            bool_list.append(prob <= threshold)
        return bool_list

    else:
        dict = calc_rms_prob(fits_file)
        prob = dict['int_prob']
        bool = prob <= threshold
        return bool

In [76]:
fits_list = ['ngc5044.fits', '1407+2827.fits', '3c245.fits', 'sdssj152527.48+050029.9.fits',\
             'sdssj155636.40+415250.5.fits', '3c270.1.fits']
for f in range(len(fits_list)):
    print(fits_list[f])
    print('meas')
    print(meas_rms_prob(fits_file = fits_list[f]))
    print(significance(fits_file = fits_list[f], version = 'meas_rms_prob', threshold = 0.05))
    print('calc')
    print(calc_rms_prob(fits_file = fits_list[f]))
    print(significance(fits_file = fits_list[f], version = 'calc_rms_prob', threshold = 0.05))
    print()

ngc5044.fits
meas
[{'int_peak_val': 0.04268083721399307, 'field_center': (66, 66), 'int_peak_coord': (65, 67), 'ext_peak_coord': (58, 29), 'ext_peak_val': 0.0032760933972895145, 'rms_val': 0.0007852913695387542, 'n_incl_meas': 20.981819973808246, 'n_excl_meas': 526.4910584454557, 'int_prob': 0.0, 'ext_prob': 0.010185324770829738, 'int_snr': 54.35031998258421, 'ext_snr': 4.1718189252655975}]
[True]
calc
{'int_peak_val': 0.04268083721399307, 'field_center': (66, 66), 'int_peak_coord': (65, 67), 'ext_peak_coord': (58, 29), 'ext_peak_val': 0.0032760933972895145, 'rms_val': 0.0011318697055996384, 'n_incl_meas': 20.981819973808246, 'n_excl_meas': 526.4910584454557, 'int_prob': 0.0, 'ext_prob': 0.9999999999999972, 'int_snr': 37.70826006106573, 'ext_snr': 2.894408588799464}
True

1407+2827.fits
meas
[{'int_peak_val': 0.004832044243812561, 'field_center': (86, 86), 'int_peak_coord': (85, 84), 'ext_peak_coord': (7, 108), 'ext_peak_val': 0.0037240907549858093, 'rms_val': 0.0010278000263497233, 'n

In [77]:
fits_list = ['0510+180.fits', 'hd283323.fits', 'l1551-51.fits', 'bd+21 584.fits', 'hd28354.fits', \
'uranus.fits', '3c111.fits', 'bllac.fits', 'hd283572.fits', 'v652per.fits', '3c279.fits', '0854+201.fits']
for f in range(len(fits_list)):
    print(fits_list[f])
    print('meas')
    print(meas_rms_prob(fits_file = fits_list[f]))
    print(significance(fits_file = fits_list[f], version = 'meas_rms_prob', threshold = 0.05))
    print('calc')
    print(calc_rms_prob(fits_file = fits_list[f]))
    print(significance(fits_file = fits_list[f], version = 'calc_rms_prob', threshold = 0.05))
    print()

0510+180.fits
meas
[{'int_peak_val': 1.2661728858947754, 'field_center': (74, 74), 'int_peak_coord': (73, 73), 'ext_peak_coord': (88, 144), 'ext_peak_val': 0.004458718933165073, 'rms_val': 0.0011129066115245223, 'n_incl_meas': 18.99328721322503, 'n_excl_meas': 538.7928169473441, 'int_prob': 0.0, 'ext_prob': 0.020403648882963095, 'int_snr': 1137.7171029295084, 'ext_snr': 4.006372940005512}]
[True]
calc
{'int_peak_val': 1.2661728858947754, 'field_center': (74, 74), 'int_peak_coord': (73, 73), 'ext_peak_coord': (88, 144), 'ext_peak_val': 0.004458718933165073, 'rms_val': 0.0015366137458902153, 'n_incl_meas': 18.99328721322503, 'n_excl_meas': 538.7928169473441, 'int_prob': 0.0, 'ext_prob': 1.0000000000000009, 'int_snr': 824.0020560021973, 'ext_snr': 2.9016523801705145}
True

hd283323.fits
meas
[{'int_peak_val': 0.001534036360681057, 'field_center': (74, 74), 'int_peak_coord': (85, 73), 'ext_peak_coord': (39, 61), 'ext_peak_val': 0.002280647400766611, 'rms_val': 0.0006150116678327322, 'n_inc

In [78]:
def no_rec_meas_rms_prob(fits_file: str, center: list = []):
    '''Given a FITS file, (optional) list of tuples of center coordinates in units of arcsec,
    and (optional) maximum number of repetitions, return a list of dictionaries.
    The first dictionary contains the probability of the peak to noise ratio of the interior of the specified region
    and the probability of peak to noise ratio of the exterior of the specified region,
    with noise being the measured rms in the exclusion area.
    If the external probability is less than 0.001, there will be subsequent dictionaries
    that contain the probability of the peak to noise ratio of the interior of the specified region
    and the probability of peak to noise ratio of the exterior of the specified region,
    with noise being the measured rms in the exclusion area,
    but the included region is expanded to include the highest excluded peak of the previous excluded region
    and the excluded region now excludes that highest peak.
    '''
    info = incl_excl_data(fits_file, center)

    int_peak = info['int_peak_val']
    ext_peak = info['ext_peak_val']
    rms = info['rms_val']
    n_incl = info['n_incl_meas']
    n_excl = info['n_excl_meas']

    #calculate error for rms
    rms_err = rms * (n_excl)**(-1/2)

    #create normal distributions from rms and error for rms
    uncert = np.linspace(-5 * rms_err, 5 * rms_err, 100)
    uncert_pdf = norm.pdf(uncert, loc = 0, scale = rms_err)

    #sum and normalize to find probabilities
    prob_dict = info
    prob_dict['int_prob'] = float(sum((norm.cdf((-1 * int_peak)/(rms + uncert)) * n_incl) * uncert_pdf) / sum(uncert_pdf))
    prob_dict['ext_prob'] = float(sum((norm.cdf((-1 * ext_peak)/(rms + uncert)) * n_excl) * uncert_pdf) / sum(uncert_pdf))
    prob_dict['int_snr'] = float(int_peak / rms)
    prob_dict['ext_snr'] = float(ext_peak / rms)

    return prob_dict

In [79]:
print(no_rec_meas_rms_prob('l1551-51.fits', [(74,74), (98,58)]))

{'int_peak_val': 0.0032458838541060686, 'field_center': (74, 74), 'int_peak_coord': (98, 58), 'ext_peak_coord': (43, 62), 'ext_peak_val': 0.002294371370226145, 'rms_val': 0.0006354488432407379, 'n_incl_meas': 19.456539301042547, 'n_excl_meas': 559.7508671800183, 'int_prob': 5.31809087712777e-06, 'ext_prob': 0.0973471385738915, 'int_snr': 5.108017566846644, 'ext_snr': 3.610631122601523}


In [80]:
print(no_rec_meas_rms_prob('3c279.fits'))

{'int_peak_val': 12.789649963378906, 'field_center': (68, 68), 'int_peak_coord': (67, 67), 'ext_peak_coord': (69, 31), 'ext_peak_val': 0.11062327027320862, 'rms_val': 0.013562027364969254, 'n_incl_meas': 17.852911140329653, 'n_excl_meas': 401.4156689071529, 'int_prob': 0.0, 'ext_prob': 3.1675440696901414e-12, 'int_snr': 943.0485294856874, 'ext_snr': 8.156838745138412}


In [81]:
print(no_rec_meas_rms_prob('3c279.fits', [(68,68), (69,31)]))

{'int_peak_val': 12.789649963378906, 'field_center': (68, 68), 'int_peak_coord': (67, 67), 'ext_peak_coord': (65, 103), 'ext_peak_val': 0.10841279476881027, 'rms_val': 0.012968845665454865, 'n_incl_meas': 17.852911140329653, 'n_excl_meas': 401.4156689071529, 'int_prob': 0.0, 'ext_prob': 8.156440237350095e-13, 'int_snr': 986.182602006493, 'ext_snr': 8.35947913680472}


In [82]:
print(no_rec_meas_rms_prob('3c279.fits', [(68,68), (69,31), (65,103)]))

{'int_peak_val': 12.789649963378906, 'field_center': (68, 68), 'int_peak_coord': (67, 67), 'ext_peak_coord': (96, 124), 'ext_peak_val': 0.047967977821826935, 'rms_val': 0.012375887483358383, 'n_incl_meas': 17.852911140329653, 'n_excl_meas': 401.4156689071529, 'int_prob': 0.0, 'ext_prob': 0.027005286574073598, 'int_snr': 1033.4329542488892, 'ext_snr': 3.8759222630561685}
