# Creating Datasets for Faro Unit Testing

This notebook steps through the process of extracting a small set of data to use for `faro` unit testing.

In [None]:
# Which version of the Stack am I using?
!eups list -s | grep lsst_distrib

In [None]:
import glob
import os
import gzip
import shutil
import numpy as np

import lsst.daf.butler as dafButler

import matplotlib.pyplot as plt
%matplotlib widget

In [None]:
import shrink_cat

## Preliminaries

Use the ci_hsc_gen3

(1) Set up [testdata_ci_hsc](https://github.com/lsst/testdata_ci_hsc) following README instructions.

(2) Set up [ci_hsc_gen3](https://github.com/lsst/ci_hsc_gen3) following README instructions.

(3) Run `faro`. First, set up the package.

```
cd repos/metric-pipeline-tasks
setup -k -r .
```

We will consider a subset of detectors and single patch to keep file sizes small. Note that the full tract will only be partially populated for metrics that use a full tract. Note that one cannot choose a subset of detectors much smaller than this because the repeatability metrics require a minimum number of matched observations.

Run the single-band matched catalogs and metrics:

```
pipetask run -j 12 -b "$CI_HSC_GEN3_DIR"/DATA/butler.yaml --register-dataset-types -p pipelines/metrics_pipeline_matched.yaml -d "tract=0 AND patch=70 AND detector IN (4,16,17,23,24) AND skymap='discrete/ci_hsc' AND instrument='HSC'" --output kbechtol/matched_small -i HSC/runs/ci_hsc
```

Run the multi-band matched catalogs and metrics:

```
pipetask run -j 12 -b "$CI_HSC_GEN3_DIR"/DATA/butler.yaml --register-dataset-types -p pipelines/metrics_pipeline_matched_multi.yaml -d "tract=0 AND patch=70 AND detector IN (4,16,17,23,24) AND skymap='discrete/ci_hsc' AND instrument='HSC'" --output kbechtol/matched_multi_small -i HSC/runs/ci_hsc
```

## Explore repo contents and prepare to extract datasets

In [None]:
# This is a local version of ci_hsc_gen3
repo = '/home/kbechtol/DATA/ci_hsc_gen3/'

In [None]:
# Create a directory for staging the test datasets
staging_dir = '/home/kbechtol/repos/metric-pipeline-tasks/tests/data_staging/'
if not os.path.exists(staging_dir):
    print('Creating %s ...'%(staging_dir))
    os.makedirs(staging_dir)

In [None]:
config = os.path.join(repo, 'DATA', 'butler.yaml')
try: butler = dafButler.Butler(config=config)
except ValueError as e: print(e)

In [None]:
registry = butler.registry

In [None]:
for x in registry.queryCollections():
    print(x)

In [None]:
for x in registry.queryDatasetTypes(): 
    print(x)

In [None]:
#collections = 'kbechtol/matched_one_patch'
collections = 'kbechtol/matched_small'
skymap_ref = list(registry.queryDatasets('skyMap', collections=collections, findFirst=True))[0]#.dataId['skymap']
print(skymap_ref)

Example of how one could explore the file tree in the data repo. Provided only for informational purposes.

In [None]:
path = '/home/kbechtol/DATA/ci_hsc_gen3/DATA/kbechtol/matched/20210127T041304Z'
yaml_files = glob.glob(path + "/**/*.yaml", recursive = True)
for file in yaml_files:
    if 'metadata' in file:
        continue
    new_name = file.replace('_discrete_ci_hsc_kbechtol_matched_20210127T041304Z', '')
    print(os.path.basename(new_name))

## Extract Measurements

Access the `.yaml` containing metric results.

In [None]:
def getMeasurementFilenames(butler, collections, tract):
    registry = butler.registry
    skymap_name = list(registry.queryDatasets('skyMap', collections=collections, findFirst=True))[0].dataId['skymap']
    paths = []
    outfiles = []
    for x in registry.queryDatasetTypes(): 
        if x.storageClass.name == 'MetricValue':
            dataid = {'tract': tract, 'skymap': skymap_name}
            refs = list(registry.queryDatasets(x.name, dataId=dataid, collections=collections))
            if len(refs) == 0:
                continue
            for ii in range(0, len(refs)):
                measurement = butler.get(refs[ii], collections=collections)
                uri = butler.getURI(x.name, refs[ii].dataId, collections=collections)
                outfile = '%s%s%s%s'%(measurement.metric_name, 
                                      '_expected',
                                      os.path.basename(uri.path).split('_discrete')[0].split('HSC')[1],
                                      os.path.splitext(uri.path)[1])
                paths.append(uri.path)
                outfiles.append(outfile)
            
    return list(zip(paths, outfiles))

In [None]:
collections = 'kbechtol/matched_small'
metric_results_single_band = getMeasurementFilenames(butler, collections, 0)

In [None]:
metric_results_single_band[0:2]

In [None]:
for infile, outfile_base in np.unique(metric_results_single_band, axis=0):
    outfile = os.path.join(staging_dir, outfile_base)
    shutil.copyfile(infile, outfile)

In [None]:
collections = 'kbechtol/matched_multi_small'
metric_results_multi_band = getMeasurementFilenames(butler, collections, 0)

In [None]:
metric_results_multi_band[0:2]

In [None]:
for infile, outfile_base in np.unique(metric_results_multi_band, axis=0):
    outfile = os.path.join(staging_dir, outfile_base)
    shutil.copyfile(infile, outfile)

## Extract Matched Catalogs

In [None]:
def compress(filename):
    with open(filename, 'rb') as f_in:
        with gzip.open(filename + '.gz', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)

In [None]:
def getMatchedCatalogFilenames(butler, collections, datasettype, tract):
    registry = butler.registry
    skymap_name = list(registry.queryDatasets('skyMap', collections=collections, findFirst=True))[0].dataId['skymap']
    dataid = {'tract': tract, 'skymap': skymap_name}
    refs = list(registry.queryDatasets(datasettype, 
                                       dataId=dataid, 
                                       collections=collections))
    paths = []
    outfiles = []
    for ii in range(0, len(refs)):
        uri = butler.getURI(datasettype, refs[ii].dataId, collections=collections)
        outfile = datasettype + os.path.basename(uri.path).split('_discrete')[0].split('HSC')[1] + os.path.splitext(uri.path)[1]
        paths.append(uri.path)
        outfiles.append(outfile)
        
    #return (uri.path, outfile)
    return list(zip(paths, outfiles))

In [None]:
collections = 'kbechtol/matched_small'
matched_catalog_single_band = getMatchedCatalogFilenames(butler, collections, 'matchedCatalogTract', 0)

In [None]:
print(matched_catalog_single_band)

In [None]:
for infile, outfile_base in np.unique(matched_catalog_single_band, axis=0):
    outfile = os.path.join(staging_dir, outfile_base)
    shrink_cat.shrinkCat(infile, outfile)
    compress(outfile)

In [None]:
collections = 'kbechtol/matched_multi_small'
matched_catalog_multi_band = getMatchedCatalogFilenames(butler, collections, 'matchedCatalogMulti', 0)

In [None]:
print(matched_catalog_multi_band)

In [None]:
for infile, outfile_base in np.unique(matched_catalog_multi_band, axis=0):
    outfile = os.path.join(staging_dir, outfile_base)
    shrink_cat.shrinkCat(infile, outfile)
    compress(outfile)

In [None]:
# Verify that we can load the compressed file
print(outfile + '.gz')
from lsst.afw.table import SimpleCatalog
catalog = SimpleCatalog.readFits(outfile + '.gz')
#catalog['detect_isPrimary']

## Visualize Matched Catalog

In [None]:
collections = 'kbechtol/matched'
#collections = 'kbechtol/matched_small'
#collections = 'kbechtol/matched_multi_small'
skymap_ref = list(registry.queryDatasets('skyMap', collections=collections, findFirst=True))[0]
skymap = butler.get(skymap_ref, collections=collections)

In [None]:
dataid = {'tract': 0, 'skymap': skymap_ref.dataId['skymap'], 'band': 'r'}
refs = list(registry.queryDatasets('matchedCatalogTract', dataId=dataid, collections=collections))
#refs = list(registry.queryDatasets('matchedCatalogMulti', dataId=dataid, collections=collections))
print(len(refs))
matched_catalog = butler.get(refs[0], collections=collections)

In [None]:
len(matched_catalog)

In [None]:
matched_catalog.schema.getNames()

In [None]:
tract_dict = {}
tract_id_array = []
patch_id_array = []
for record in matched_catalog:
    coord = record.getCoord()
    tract_id = skymap.findTract(coord).getId()
    if str(tract_id) not in tract_dict.keys():
        tract_dict[str(tract_id)] = skymap.generateTract(tract_id)
    patch_id = tract_dict[str(tract_id)].getSequentialPatchIndex(tract_dict[str(tract_id)].findPatch(coord))
    tract_id_array.append(tract_id)
    patch_id_array.append(patch_id)
    
tract_id_array = np.array(tract_id_array)
patch_id_array = np.array(patch_id_array)

In [None]:
plt.figure()
for patch_index in np.unique(patch_id_array):
    selection = (patch_id_array ==  patch_index)
    if patch_index == 70:
        marker = 's'
    else:
        marker = 'o'
    plt.scatter(np.degrees(matched_catalog['coord_ra'][selection]), 
                np.degrees(matched_catalog['coord_dec'][selection]),
                #c=patch_id_array,
                edgecolor='none', s=5, marker=marker)
    print(patch_index, np.sum(selection))
plt.xlabel('coord_ra (deg)')
plt.ylabel('coord_dec (deg)')

In [None]:
np.all(np.isfinite(matched_catalog['coord_dec']))

## Find subset of visit and detector combinations

In order to decrease the size of files persisted in the repo, and retain consistency with the end-to-end running of `faro` pipeline, we can select a subset of visits and/or detectors to pass as input to `pipetask` when running `faro`.

In [None]:
refs = list(registry.queryDatasets('calexp', collections=collections))
#refs = list(registry.queryDatasets('calexp', where="detector in (4,16,17,23,24) and instrument = 'HSC'", collections=collections))
ra = []
dec = []
band = []
visit = []
detector = []
for ref in refs:
    # There must be a faster way to get the approximate centers of each detector
    calexp = butler.get(ref, collections=collections)
    ra.append(calexp.getWcs().getSkyOrigin().getRa().asDegrees())
    dec.append(calexp.getWcs().getSkyOrigin().getDec().asDegrees())
    band.append(ref.dataId['band'])
    visit.append(ref.dataId['visit'])
    detector.append(ref.dataId['detector'])
    
ra = np.array(ra)
dec = np.array(dec)
band = np.array(band)
visit = np.array(visit)
detector = np.array(detector)

In [None]:
# Visualize the individual visits and detectors
plt.figure()
plt.scatter(ra[band=='r'], dec[band=='r'], marker='x')
plt.scatter(ra[band=='i'], dec[band=='i'], marker='+')
for ii in range(0, len(visit)):
    plt.text(ra[ii], dec[ii], '(%s,%s)'%(visit[ii], detector[ii]))
plt.xlabel('coord_ra (deg)')
plt.ylabel('coord_dec (deg)')

In [None]:
# What are the dimensions of an individual detector?
calexp = butler.get(ref, collections=collections)
print(calexp.getBBox().getWidth() * calexp.getWcs().getPixelScale().asDegrees())
print(calexp.getBBox().getHeight() * calexp.getWcs().getPixelScale().asDegrees())

In [None]:
#np.unique(matched_catalog['detector'][patch_id_array == 70])

In [None]:
#selection = (patch_id_array == 70)
#np.unique(list(zip(matched_catalog['visit'][selection], 
#                   matched_catalog['detector'][selection])), axis=0)

## Copying over the files needed from staging area

```
cp tests/data_staging/matchedCatalog*.fits.gz tests/data/.
cp tests/data_staging/TE*.yaml tests/data/.
cp tests/data_staging/A*1*.yaml tests/data/.
cp tests/data_staging/P*1*.yaml tests/data/.
cp tests/data_staging/PA2*.yaml tests/data/.
```

## Extract Single-Visit Catalog

In [None]:
collections = 'HSC/runs/ci_hsc'

In [None]:
src_refs = list(registry.queryDatasets('src', collections=collections, detector=100, instrument='HSC',
                                       where='visit=903986'))
assert len(src_refs) == 1

In [None]:
uri = butler.getURI(src_refs[0], collections=collections)

In [None]:
infile = uri.path
outifle = os.path.join(staging_dir, uri.basename())
print(infile)
print(outfile)
shutil.copyfile(infile, outfile)
compress(outfile)