# Cross-matching of large catalogs: DES to Gaia

In this tutorial we cross-match nain tables of Dark Energy Survey (DES) DR2 and Gaia DR3 catalogs. The outline of the tutorial is as follows:
1. Get original data files
2. Convert the data to HiPSCat format using [`hipscat-import`](https://github.com/astronomy-commons/hipscat-import/)
3. Cross-match the catalogs using LSDB and save to a new HiPSCat catalog

### Install required packages and import modules

In [None]:
import lsdb

# Uncomment to install hipscat-import
# ! pip install --quiet hipscat-import
! pip install --quiet git+https://github.com/astronomy-commons/hipscat-import.git@issue/221/gaia

# Uncomment to install lsdb
# ! pip install --quiet lsdb

In [None]:
# For files and directories manipulation
from pathlib import Path

# For Gaia columns data types inference
import pyarrow
from astropy.io import ascii

# Client for Dask distributed computing
from dask.distributed import Client

# Explore the HiPSCat catalogs and plot sky maps
import hipscat
from hipscat.catalog import Catalog
from hipscat.inspection import plot_pixels

# For reading the HiPSCat catalogs and performing the cross-match
import lsdb

# For converting the data to HiPSCat format
from hipscat_import.catalog.file_readers import CsvReader
from hipscat_import.pipeline import ImportArguments, pipeline_with_client

## Get original data files

??? Do we need to get scripts / Python code to download the data? Maybe one-liner with recursive `wget`?

To make this notebook faster to run and using less storage, we will use a subset of the data. However, the same pipeline, with no code changes, may be used to process the full catalogs.

In [None]:
# Change to the directories where the data will be stored
DES_DIR = Path("data/DES_DR2")
GAIA_DIR = Path("data/Gaia_DR3")

HIPSCAT_DIR = Path("hipscat")
DES_HIPSCAT_DIR = HIPSCAT_DIR / "des_dr2"
GAIA_HIPSCAT_DIR = HIPSCAT_DIR / "gaia_dr3"

### DES DR2

The Dark Eenrgy Survey DR2 catalog is hosted by NCSA, see the [official website](https://des.ncsa.illinois.edu/releases/dr2) for more information. Data files, in [FITS](https://fits.gsfc.nasa.gov) format, are located at <https://desdr-server.ncsa.illinois.edu/despublic/dr2_tiles/>.

We use `*dr2_main.fits` files for the main catalog table, see [the schema here](https://des.ncsa.illinois.edu/releases/dr2/dr2-products/dr2-schema).

Here we download a few first files to demonstrate the pipeline. The full catalog is much larger, feel free to download it all if you have enough storage.

In [None]:
# Comment / skip this cell if you already have the data

from shutil import copyfileobj

import requests  # For downloading files

# Download few first files
des_urls = [
    "https://desdr-server.ncsa.illinois.edu/despublic/dr2_tiles/DES0000+0209/DES0000+0209_dr2_main.fits",
    "https://desdr-server.ncsa.illinois.edu/despublic/dr2_tiles/DES0000+0252/DES0000+0252_dr2_main.fits",
    "https://desdr-server.ncsa.illinois.edu/despublic/dr2_tiles/DES0000+0335/DES0000+0335_dr2_main.fits",
]

for des_url in des_urls:
    des_file = DES_DIR / Path(des_url).name
    if not des_file.exists():
        des_file.parent.mkdir(parents=True, exist_ok=True)
        with requests.get(des_url, stream=True) as r, open(des_file, "wb") as f:
            copyfileobj(r.raw, f)

### Gaia DR3

The Gaia DR3 catalog is hosted by ESA, see the [official website](https://www.cosmos.esa.int/web/gaia/dr3) for more information. Data files, in [ECSV](https://docs.astropy.org/en/stable/io/ascii/ecsv.html) format are located at <http://cdn.gea.esac.esa.int/Gaia/gdr3/gaia_source/>.

We use `gaia_source` table, see its [schema here](https://gea.esac.esa.int/archive/documentation/GDR3/Gaia_archive/chap_datamodel/sec_dm_main_source_catalogue/ssec_dm_gaia_source.html).

Here we donwload a few files which barely correspond to the same area of the sky as the DES DR2 files above. The full catalog is much larger, feel free to download it all if you have enough storage.

In [None]:
# Comment / skip this cell if you already have the data

from shutil import copyfileobj

import requests  # For downloading files

# Download few first files
gaia_urls = [
    "http://cdn.gea.esac.esa.int/Gaia/gdr3/gaia_source/GaiaSource_310878-313367.csv.gz",
]

for gaia_url in gaia_urls:
    gaia_file = GAIA_DIR / Path(gaia_url).name
    if not gaia_file.exists():
        gaia_file.parent.mkdir(parents=True, exist_ok=True)
        with requests.get(gaia_url, stream=True) as r, open(gaia_file, "wb") as f:
            copyfileobj(r.raw, f)

## Convert the data to HiPSCat format

We use the [`hipscat-import`](https://github.com/astronomy-commons/hipscat-import/) tool to create HiPSCat catalogs from the original data files.

### Convert DES DR2 to HiPSCat

- Plan the pipeline specifying all parameters of the conversion
- Run the pipeline with [Dask](https://dask.org)

In [None]:
des_args = ImportArguments(
    # sort columns are optional and works only if few objects are very close to each other
    sort_columns="COADD_OBJECT_ID",
    ra_column="RA",
    dec_column="DEC",
    input_path=DES_DIR,
    input_format="fits",
    output_artifact_name="des_dr2",
    output_path=HIPSCAT_DIR,
    # Uncomment to overwrite existing catalog
    # overwrite=True,
)

with Client() as client:
    pipeline_with_client(des_args, client)

#### Plot the DES DR2 HiPSCat catalog pixels

In [None]:
# Read the HiPSCat catalog metadata, it does not load any data, just healpix pixels and other metadata
des_hipscat_catalog = Catalog.read_from_hipscat(DES_HIPSCAT_DIR)
plot_pixels(des_hipscat_catalog)

# TODO: add DES footprint to this plot
# https://des.ncsa.illinois.edu/static/images/dr2/dr2_footprint.png

### Convert Gaia DR3 to HiPSCat

For Gaia we need to specify schema of the input data, because currently `hipscat-import` cannot infer column data types properly from this dataset.

See this GitHub issue for more details: <>

In [None]:
empty_astropy_table = ascii.read(gaia_file, format="ecsv", data_end=1)

# table_dtypes = {name: empty_astropy_table[name].dtype for name in empty_astropy_table.colnames}
# parquet_schema = pyarrow.schema(pyarrow.field(name, pyarrow.from_numpy_dtype(dtype), nullable=True) for name, dtype in table_dtypes.items())
# pyarrow.parquet.ParquetWriter(GAIA_DIR / 'schema.parquet', parquet_schema).close()

empty_astropy_table.to_pandas().to_parquet(GAIA_DIR / "schema.parquet")

In [None]:
gaia_args = ImportArguments(
    # sort columns are optional and works only if few objects are very close to each other
    sort_columns="source_id",
    ra_column="ra",
    dec_column="dec",
    input_path=GAIA_DIR,
    input_format="csv.gz",
    file_reader=CsvReader(
        comment="#",
        schema_file=GAIA_DIR / "schema.parquet",
        parquet_kwargs={"dtype_backend": "numpy_nullable"},
    ),
    use_schema_file=GAIA_DIR / "schema.parquet",
    output_artifact_name="gaia_dr3",
    output_path=HIPSCAT_DIR,
    # Uncomment to overwrite existing catalog
    overwrite=True,
)

with Client() as client:
    pipeline_with_client(gaia_args, client)