In [5]:
from pathlib import Path

import numpy as np
import pandas as pd
from astropy.table import QTable
from pyvo.io.vosi.vodataservice import VODataServiceTable
from exotools.utils.qtable_utils import _get_qtable_paths, _read_qtable_header, QTableHeader, RootQTableHeader
from exotools import CandidateExoplanetsDataset, TessDataset, LightcurveDataset, KnownExoplanetsDataset
from exotools.downloaders.tap_service import ExoService
from exotools.utils.qtable_utils import get_header_from_table
from tests.utils import compare_qtables
from astropy.time import Time
from exotools.io.fs_storage import EcsvStorage

# %load_ext autoreload
# %autoreload 2

p = Path("/Users/christian/git/exotools/tests/tmp")

In [3]:
def download_data():
    known = KnownExoplanetsDataset(p).download_known_exoplanets(limit=15)
    candidates = CandidateExoplanetsDataset(p).download_candidate_exoplanets(limit=15)
    all_ids = np.concatenate([known.unique_ids, candidates.unique_ids])
    tess_meta = TessDataset(p).download_observation_metadata(targets_tic_id=all_ids, store=True)
    LightcurveDataset(p).download_lightcurves_from_tess_db(tess_meta)


# download_data()
exo_qtable = KnownExoplanetsDataset(p).download_known_exoplanets(limit=15, with_gaia_star_data=False).view

Preparing to download known exoplanets dataset...
Querying https://exoplanetarchive.ipac.caltech.edu/TAP/ (synchronous)...
DONE! Collected 15 unique planets, for a total of 15 records.


In [10]:
exo_header = _read_qtable_header(p/"known_exoplanets_header.json")
ecsv = EcsvStorage(root_path=p/"ecsv")
ecsv.write_qtable(exo_qtable, exo_header, "known", True)
ecsv_read = ecsv.read_qtable("known")
compare_qtables(exo_qtable, ecsv_read)

True

In [None]:
from astropy.table import MaskedColumn
from typing import Optional



def save_qtable(
    table: QTable,
    header: Optional[QTableHeader],
    file_path: Path,
    file_name: Optional[str] = None,
) -> Path:
    data_path, header_path = _get_qtable_paths(file_path, file_name)
    data_path.parent.mkdir(parents=True, exist_ok=True)

    if header is not None:
        root_model = RootQTableHeader(root=header)

        # Store table unit info in json format
        with open(header_path, "w") as f:
            f.write(root_model.model_dump_json(indent=4))

    # Store table data in feather format
    df = table.to_pandas().reset_index()
    if "index" in df:
        df = df.drop(columns="index")
    df.to_feather(data_path)

    return data_path

def read_qtable(file_path: Path, file_name: Optional[str] = None) -> QTable:
    data_path, header_path = _get_qtable_paths(file_path=file_path, file_name=file_name)

    # Read header information with column units
    header = _read_qtable_header(header_path)
    units = {key: info.unit for key, info in header.items()} if header else None

    # Read data and assign units
    if not data_path.exists():
        raise ValueError(f"read_qtable(): given path does not exist: {data_path}")

    df = pd.read_feather(data_path)
    qtable = QTable.from_pandas(df, units=units)
    for c in qtable.columns:
        qtable[c].description = header[c].description if c in header else None

    return qtable





p_coniglio = p / "coniglio"

exo_qtable.write(p_coniglio/"coniglio.ecsv", overwrite=True, serialize_method={MaskedColumn: 'data_mask'})
ecsv_read = QTable.read(p_coniglio/"coniglio.ecsv")
compare_qtables(exo_qtable, ecsv_read)

save_qtable(exo_qtable, h, p_coniglio, "coniglio")
coniglio_table = read_qtable(file_path=p_coniglio, file_name="coniglio")
# compare_qtables(exo_qtable, coniglio_table)
# ExoDB.preprocess_dataset(a)
# ExoDB.compute_bounds(a)

# It's useful to disable parsing Time columns if we need them as Quantities,
# for example to copy units to another qtable.
# if convert_time_columns:
#     ExoDB.convert_time_columns(a)


In [None]:
hdf5_wrapper = Hdf5Wrapper(Path("/Users/christian/git/exotools/tests/tmp/dataset.hdf5"))

hdf5_wrapper.write_qtable(tess_meta.view, get_header_from_table(tess_meta.view), "tess_meta")
hdf5_wrapper.write_qtable(candidate_db.view, get_header_from_table(candidate_db.view), "candidate_db")
hdf5_wrapper.write_qtable(exo_db.view, get_header_from_table(exo_db.view), "exo_db")
hdf5_wrapper.write_qtable(lc_db.view, get_header_from_table(lc_db.view), "lc_db")
hdf5_wrapper.write_json(data={"dio": "stra_porco"}, name="porco")
hdf5_wrapper.write_json(data={"dio": "megamerda"}, name="merda")


In [None]:
exo_qtable.write("coniglio.hdf5", path="coniglio", overwrite=True, append=True, compression=True, serialize_meta=True)
coniglio = QTable.read("coniglio.hdf5", path="coniglio")
compare_qtables(exo_qtable, coniglio)

In [None]:

for c in exo_qtable.itercols():
    if isinstance(c, Time):
        continue
    if c.dtype == "O":
        print(c.name)

In [None]:
exoservice = ExoService()
ps_table : VODataServiceTable = exoservice._service.tables["ps"]
ps_table

In [None]:
# def _get_fields_info(table) -> QTableHeader:
#     return {
#         # NOTE: column.name was previously column.feature_name, after this method started to fail for GAIA data downloader
#         column.name: TableColumnInfo(unit=column.unit, description=column.description)
#         for column in table.columns
#     }
aaa = []
for a in ps_table.columns:
    aaa.append([a.name, a.unit, a.datatype.content, a.description])
gesu = pd.DataFrame(aaa, columns=["name", "unit", "datatype", "description"])

print(gesu.unit.unique())
print(gesu.datatype.unique())