# Importing macauff results into HiPSCat / LSDB

The macauff pipeline OUTSIDE of LSDB will create a series of yml and csv files
that represent the counterpart assignments and likelihoods between objects in 
two catalogs.

To convert into an LSDB-friendly association table, you will need:
- match file schema file (either yaml or parquet schema)
- match output CSVs
- pre-hipscatted left catalog
- pre-hipscatted right catalog

## Schema

If you have a yaml schema, you will first need to convert it into a parquet
schema to be understood by the import pipeline.

Use the `from_yaml` utility method to read the yaml files; parse column names,
types, and other key-values; convert into parquet types; write to an empty
parquet file.

In [None]:
from hipscat.io import file_io
from lsdb_macauff.import_pipeline.convert_metadata import from_yaml
from pathlib import Path

## Location of the data on UW internal servers
epyc_input_path = Path("/data3/epyc/data3/hipscat/raw/macauff_results/")

yaml_input_file = epyc_input_path / "metadata" / "macauff_metadata.yml"
from_yaml(yaml_input_file, epyc_input_path / "metadata" )

We can inspect the parquet/pyarrow schema that's generated:

In [None]:
matches_schema_file = epyc_input_path / "metadata" / "macauff_GaiaDR3xCatWISE2020_matches.parquet"
single_metadata = file_io.read_parquet_metadata(matches_schema_file)
schema = single_metadata.schema.to_arrow_schema()
schema

## Association table

To create the association between two catalogs, use the purpose-built Macauff
association table import map-reduce pipeline.

See also the general documentation for pipeline argument configuration 
for the [hipscat-import pipeline](https://hipscat-import.readthedocs.io/en/latest/catalogs/arguments.html#pipeline-setup). 
The macauff pipeline uses that pipeline construction directly.

In [None]:
import lsdb_macauff.import_pipeline.run_import as runner
from hipscat_import.catalog.file_readers import CsvReader
from lsdb_macauff.import_pipeline.arguments import MacauffArguments
from dask.distributed import Client
import glob

matches_schema_file = epyc_input_path / "metadata" / "macauff_GaiaDR3xCatWISE2020_matches.parquet"
## Find all of the CSV files under the macauff output directory.
macauff_data_dir = epyc_input_path / "rds/project/iris_vol3/rds-iris-ip005/tjw/dr3_catwise_allskytest/output_csvs/"
files = glob.glob(f"{macauff_data_dir}/**/*.csv")
files.sort()

args = MacauffArguments(
    ## This will create an association catalog at the path:
    ##    /data3/epyc/data3/hipscat/catalogs/macauff_association/
    output_path="/data3/epyc/data3/hipscat/catalogs/",
    output_artifact_name="macauff_association",

    ## Make sure you use a directory with enough space!
    tmp_dir="/data3/epyc/data3/hipscat/tmp/macauff/",

    ## Read the CSV files and use the generated schema file for types and 
    ## other key-value metadata
    input_file_list=files,
    input_format="csv",
    file_reader=CsvReader(schema_file=matches_schema_file, header=None),
    metadata_file_path=matches_schema_file,

    ## For left catalog, specify the pre-hipscatted location, and ra/dec columns
    left_catalog_dir="/data3/epyc/data3/hipscat/catalogs/gaia_dr3/gaia",
    left_ra_column="gaia_ra",
    left_dec_column="gaia_dec",
    left_id_column="gaia_source_id",

    ## For right catalog, specify the pre-hipscatted location, and ra/dec columns
    right_catalog_dir="/epyc/projects3/sam_hipscat/catwise2020/catwise2020/",
    right_ra_column="catwise_ra",
    right_dec_column="catwise_dec",
    right_id_column="catwise_name",

)

with Client(
    local_directory="/data3/epyc/data3/hipscat/tmp/macauff/",
    n_workers=5,
    threads_per_worker=1,
) as client:
    runner.run(args, client)

This could take a while. Once it's done, check that the association data can be parsed as a valid association catalog.

In [None]:
from hipscat.catalog.association_catalog.association_catalog import AssociationCatalog

catalog = AssociationCatalog.read_from_hipscat(args.catalog_path)