In [3]:
## import numpy as np
import pandas as pd
import astropy.units as u
from astropy.coordinates import SkyCoord
from astroquery.gaia import Gaia
from astroquery.simbad import Simbad
from astroquery.vizier import Vizier
from typing import Callable, Dict
import os

from auxiliary.filter_functions import filter_gaia, filter_archival, filter_chandra, filter_erosita, filter_ned, filter_simbad, filter_vizier, filter_Xray_binaries


In [6]:
def safe_read_filtered(filename: str, catalogs: Dict[str, Callable], detections: pd.DataFrame = None) -> pd.DataFrame:
    import os
    if os.path.exists(filename):
        return pd.read_csv(filename, sep=',', header=0, dtype=str)
    else:
        if detections is not None:
            # use detections columns as a template
            df = pd.DataFrame(columns=detections.columns)
        else:
            # fallback: minimal structure
            df = pd.DataFrame(columns=['Number','ObsId','RA','DEC','THETA','POS_ERR','SIGNIFICANCE'])
        return update_catalogs(df, catalogs)

def update_catalogs(dataframe: pd.DataFrame, catalogs: Dict[str, Callable], verbose: int = 0) -> pd.DataFrame:
    """
    ## Return a new dataframe with the catalog match columns added.

    ### Args:
        dataframe `pd.DataFrame`: Dataframe to add the catalog match columns to.
        catalogs `Dict[str, Callable]`: Catalogs to add to the dataframe.
        verbose `int` (optional): Defaults to `0`. Level of verbosity.

    ### Returns:
        `pd.DataFrame`: Dataframe with the catalog match columns added.
    """
    dataframe = dataframe.copy()

    for catalog, _ in catalogs.items():
        if f'{catalog}_match' not in dataframe.columns:
            dataframe.insert(len(dataframe.columns),
                             f'{catalog}_match', 'unknown')

            if verbose > 2:
                print(f'Added {catalog}_match column.')

    return dataframe


def update_detections(detections: pd.DataFrame, filtered: pd.DataFrame, catalogs: Dict[str, Callable], verbose: int = 0) -> pd.DataFrame:
    """
    ## Adds new detections to the filtered dataframe.

    ### Args:
        detections `pd.DataFrame`: Dataframe of detections to add to the filtered dataframe.
        filtered `pd.DataFrame`: Dataframe of filtered detections.
        catalogs `Dict[str, Callable]`: Dictionary of catalogs to add to the dataframe.

    ### Returns:
        `pd.DataFrame`: The updated filtered dataframe with new detections added.
    """
    detections = detections.copy()
    filtered = filtered.copy()

    detections = update_catalogs(detections, catalogs)
    filtered = update_catalogs(filtered, catalogs)

    # add new detections to filtered dataframe
    for i, detection in detections.iterrows():
        if (
            detection.at['ObsId'] in filtered['ObsId'].values and
            detection.at['RA'] in filtered['RA'].values and
            detection.at['DEC'] in filtered['DEC'].values and
            detection.at['THETA'] in filtered['THETA'].values and
            detection.at['POS_ERR'] in filtered['POS_ERR'].values and
            detection.at['SIGNIFICANCE'] in filtered['SIGNIFICANCE'].values
        ):
            if verbose > 2:
                print(
                    f'{i}: {detection.at["ObsId"]} - Detection already in filtered.')
            continue

        filtered.loc[len(filtered)] = detection

        if verbose > 2:
            print(
                f'{i}: {detection.at["ObsId"]} - Added detection to filtered.')

    return filtered


def filter_detections(detections: pd.DataFrame, filtered: pd.DataFrame, catalogs: Dict[str, Callable], verbose: int = 0) -> pd.DataFrame:
    """
    ## Filter the detections in the detections dataframe.

    ### Args:
        detections `pd.DataFrame`: Dataframe of detections to filter.
        filtered `pd.DataFrame`: Dataframe of filtered detections.
        catalogs `Dict[str, Callable]`: Catalogs to filter the detections with.
        verbose `int` (optional): Defaults to `0`. Level of verbosity.

    ### Returns:
        `pd.DataFrame`: Filtered dataframe of detections.
    """
    # add new detections to filtered dataframe
    filtered = update_detections(detections, filtered, catalogs, verbose)

    # to check if detection has already been queried
    # TODO: make this a more permanent solution
    ref_file = f'output/filtered_{WINDOW}_forward.csv'
    filtered_ref = safe_read_filtered(ref_file, catalogs, detections)

    for i, detection in filtered.iterrows():
        # check if detection is in the filtered_ref file
        if detection['RA'] in filtered_ref['RA'].values and detection['DEC'] in filtered_ref['DEC'].values:
            filtered_detection = filtered_ref[(filtered_ref['RA'] == detection['RA']) &
                                              (filtered_ref['DEC'] == detection['DEC'])].iloc[0]
            # populate the new filtered file with the previously filtered detection
            for catalog in catalogs.keys():
                if filtered_detection[f'{catalog}_match'] != 'unknown':
                    filtered.at[i, f'{catalog}_match'] = filtered_detection[f'{catalog}_match']
            if verbose > 1:
                print(f'{i}: {detection.at["ObsId"]} - populated from {ref_file}.')
            continue

        for catalog, filter_func in catalogs.items():
            if detection.at[f'{catalog}_match'] == 'unknown':
                try:
                    result = 'yes' if filter_func(detection) else 'no'
                    filtered.at[i, f'{catalog}_match'] = result

                    if verbose > 1:
                        print(f'{i}: {detection.at["ObsId"]} - {catalog} match: {filtered.at[i, f"{catalog}_match"]}')
                except Exception as e:
                    print(f'{i}: {detection.at["ObsId"]} - {e}')
                    continue
            else:
                if verbose > 1:
                    print(f'{i}: {detection.at["ObsId"]} - {catalog} match: already known.')

    return filtered



def filter_detection_file(detections_filename: str, filtered_filename: str, catalogs: Dict[str, Callable], verbose: int = 0) -> None:
    """
    ## Filter the detections in the detections file.

    ### Args:
        detections_filename `str`: Filename of the detections file.
        filtered_filename `str`: Filename of the filtered file.
        catalogs `Dict[str, Callable]`: Catalogs to filter the detections with.
        verbose `int` (optional): Defaults to `0`. Level of verbosity.
    """
    detections = pd.read_csv(
        detections_filename,
        sep=r"\s+",   # split on one or more spaces
        engine="python",
        names=["ObsId", "RA", "DEC", "THETA", "POS_ERR", "SIGNIFICANCE"],
        header=0,     # skip the first line ("ObsId")
        dtype=str
    )
 

    filtered   = safe_read_filtered(filtered_filename, catalogs, detections)

    if verbose > 0:
        print(
            f'Analysing {len(detections)} detections from {detections_filename}'
        )

    filtered = filter_detections(detections, filtered, catalogs, verbose)

    filtered.to_csv(filtered_filename, index=False)


def clear_filter_matches(filtered_filename: str, catalog: str) -> None:
    """
    ## Clear the matches for a specific catalog in the filtered file.

    ### Args:
        filtered_filename `str`: Filename of the filtered file.
        catalog `str`: Catalog to clear the matches for.
    """
    filtered = pd.read_csv(filtered_filename, sep=',', header=0, dtype=str)

    for i, detection in filtered.iterrows():
        if f'{catalog}_match' in filtered.columns:
            filtered.at[i, f'{catalog}_match'] = 'unknown'
    
    filtered.to_csv(filtered_filename, index=False)

def find_new_fxrt(filtered_filename: str, catalogs: Dict[str, Callable], window: str) -> pd.DataFrame:
    """
    ## Find new FXRT candidates (detections with no catalog matches).

    ### Args:
        filtered_filename `str`: Path to the filtered detections CSV.
        catalogs `Dict[str, Callable]`: Dictionary of catalogs used.
        window `str`: Window size label (e.g. "w0.2") for naming the output file.

    ### Returns:
        `pd.DataFrame`: DataFrame with rows that are potential new FXRTs.
    """
    filtered = pd.read_csv(filtered_filename, sep=",", header=0, dtype=str, index_col=0)

    # build list of catalog match columns
    catalog_cols = [f"{cat}_match" for cat in catalogs.keys()]

    # condition: all catalogs = "no"
    new_fxrt = filtered.loc[(filtered[catalog_cols] == "no").all(axis=1)]

    if len(new_fxrt) == 0:
        print("No new FXRT candidates found.")
    else:
        print(f"Found {len(new_fxrt)} new FXRT candidate(s).")

        # make sure directory exists
        os.makedirs("output/candidates", exist_ok=True)

        # save candidates to dedicated folder
        out_file = f"output/candidates/new_fxrt_{window}.csv"
        new_fxrt.to_csv(out_file, index=False)
        print(f"Saved candidates to {out_file}")

        # also print table to console
        print(new_fxrt)

    return new_fxrt


WINDOW = "w0.2"

DETECTIONS_FILENAME = f'output/detections_{WINDOW}_forward.txt'
FILTERED_FILENAME   = f'output/filtered_{WINDOW}_forward.csv'

CATALOGS = {
    'gaia': filter_gaia,
    'archival': filter_archival,
    'chandra': filter_chandra,
    'erosita': filter_erosita,
    'ned': filter_ned,
    'simbad': filter_simbad,
    'vizier': filter_vizier,
    'xray-binaries': filter_Xray_binaries
}
VERBOSE = 2



if __name__ == '__main__':
    filter_detection_file(
        DETECTIONS_FILENAME,
        FILTERED_FILENAME,
        CATALOGS,
        VERBOSE
    )

    #find and save new candidates 
    candidates = find_new_fxrt(FILTERED_FILENAME, CATALOGS, WINDOW)

Analysing 2 detections from output/detections_w0.2_forward.txt
INFO: Query finished. [astroquery.utils.tap.core]
0: 4418 - gaia match: yes
0: 4418 - archival match: no
0: 4418 - chandra match: no
0: 4418 - erosita match: no
0: 4418 - ned match: yes
0: 4418 - 'OTYPES'
0: 4418 - vizier match: yes
0: 4418 - xray-binaries match: no
No new FXRT candidates found.
