This notebook runs the correlation angular counts using corrgi's map-reduce pipeline.

In [None]:
# Install the branch with the implementation of the map-reduce pipeline
!pip install git+https://github.com/lincc-frameworks-mask-incubator/corrgi.git@sandro/implement-map-reduce

In [None]:
# This is where the catalog that was reimported with 50k points per partition lives
# We're bound by the Fortran routines, that's why we made the partitions so small.
hsc_forced_r = "/ocean/projects/phy210048p/shared/hipscat/test_catalogs/hsc-pdr3-forced-r-50k"
# This is the directory where my histograms will live
output_dir = "/ocean/projects/phy210048p/scampos/counts"

First we define an object with the parameters for the angular correlation (using gundam).

In [None]:
import gundam
from corrgi.correlation.angular_correlation import AngularCorrelation

def acf_params():
    params = gundam.packpars(kind="acf")
    params.dsept = 0.1
    params.nsept = 33
    params.septmin = 0.01
    return params

ang_correlation = AngularCorrelation(params=acf_params())

We instantiate a Dask client with 64 workers (on PSC each of them will have 256GiB/64~4GiB available).

In [None]:
from dask.distributed import Client
client = Client(n_workers=64)

To compute the DD counts on HSC we instantiate the corrgi arguments and call `run_counting`.

In [None]:
from corrgi.pipeline.arguments import CorrgiArguments
from corrgi.pipeline.run_counting import run_counting

args = CorrgiArguments(
    left_catalog_path=hsc_forced_r,
    right_catalog_path=hsc_forced_r,
    correlation=ang_correlation,
    output_path=output_dir,
    output_artifact_name="dd",
    resume=True
)

In [None]:
%%time
counts_dd = run_counting(args, client)

Finally, print the DD counts:

In [None]:
counts_dd

and shutdown the Dask client:

In [None]:
client.close()