Skip to content
Merged
8 changes: 8 additions & 0 deletions pipelines/LATISS/ApPipe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
description: Alert Production pipeline specialized for LATISS

imports:
- location: $DRP_PIPE_DIR/pipelines/LATISS/DRP.yaml
include:
- isr
- characterizeImage
- calibrate
3 changes: 3 additions & 0 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

# The short name for the instrument.
instrument_name = os.environ["RUBIN_INSTRUMENT"]
# The skymap to use in the central repo
skymap = os.environ["SKYMAP"]
# URI to the main repository containing calibs and templates
calib_repo = os.environ["CALIB_REPO"]
# S3 Endpoint for Buckets; needed for direct Boto access but not Butler
Expand Down Expand Up @@ -86,6 +88,7 @@
mwi = MiddlewareInterface(get_central_butler(calib_repo, instrument_name),
image_bucket,
instrument_name,
skymap,
local_repos)


Expand Down
91 changes: 59 additions & 32 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class MiddlewareInterface:
name or the short name. Examples: "LsstCam", "lsst.obs.lsst.LsstCam".
TODO: this arg can probably be removed and replaced with internal
use of the butler.
skymap: `str`
Name of the skymap in the central repo for querying templates.
local_storage : `str`
An absolute path to a space where this object can create a local
Butler repo. The repo is guaranteed to be unique to this object.
Expand All @@ -120,9 +122,6 @@ class MiddlewareInterface:
appropriate for use in the USDF environment; typically only
change this when running local tests.
"""
_COLLECTION_TEMPLATE = "templates"
"""The collection used for templates.
"""
_COLLECTION_SKYMAP = "skymaps"
"""The collection used for skymaps.
"""
Expand All @@ -141,7 +140,7 @@ class MiddlewareInterface:
# corresponding to self.camera and self.skymap.

def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
local_storage: str,
skymap: str, local_storage: str,
prefix: str = "s3://"):
# Deployment/version ID -- potentially expensive to generate.
self._deployment = self._get_deployment()
Expand Down Expand Up @@ -170,11 +169,9 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str,
"camera", instrument=self.instrument.getName(),
collections=self.instrument.makeUnboundedCalibrationRunName()
)
# TODO: is central_butler guaranteed to have only one skymap dimension?
skymaps = list(self.central_butler.registry.queryDataIds("skymap"))
assert len(skymaps) == 1, "Ambiguous or missing skymap in central repo."
self.skymap_name = skymaps[0]["skymap"]
self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name)
self.skymap_name = skymap
self.skymap = self.central_butler.get("skyMap", skymap=self.skymap_name,
collections=self._COLLECTION_SKYMAP)

# How much to pad the refcat region we will copy over.
self.padding = 30*lsst.geom.arcseconds
Expand Down Expand Up @@ -371,16 +368,23 @@ def prep_butler(self, visit: Visit) -> None:
# TODO: Summit filter names may not match Butler names, especially for composite filters.
self._export_skymap_and_templates(export, center, detector, wcs, visit.filters)
self._export_calibs(export, visit.detector, visit.filters)

# CHAINED collections
export.saveCollection(self.instrument.makeRefCatCollectionName())
export.saveCollection(self._COLLECTION_TEMPLATE)
export.saveCollection(self.instrument.makeUmbrellaCollectionName())
self._export_collections(export, self.instrument.makeUmbrellaCollectionName())

self.butler.import_(filename=export_file.name,
directory=self.central_butler.datastore.root,
transfer="copy")

# Temporary workarounds until we have a prompt-processing default top-level collection
# in shared repos and then we can organize collections without worrying DRP use cases.
_prepend_collection(self.butler,
self.instrument.makeUmbrellaCollectionName(),
[self._get_template_collection()])

def _get_template_collection(self):
"""Get the collection name for templates
"""
return self.instrument.makeCollectionName("templates")

def _export_refcats(self, export, center, radius):
"""Export the refcats for this visit from the central butler.

Expand All @@ -401,7 +405,8 @@ def _export_refcats(self, export, center, radius):
# currently, and we can't queryDatasetTypes in just the refcats
# collection, so we have to specify a list here. Replace this
# with another solution ASAP.
possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110"]
possible_refcats = ["gaia", "panstarrs", "gaia_dr2_20200414", "ps1_pv3_3pi_20170110",
"atlas_refcat2_20220201"]
refcats = set(_filter_datasets(
self.central_butler, self.butler,
possible_refcats,
Expand Down Expand Up @@ -453,16 +458,21 @@ def _export_skymap_and_templates(self, export, center, detector, wcs, filter):
# TODO: alternately, we need to extract it from the pipeline? (best?)
# TODO: alternately, can we just assume that there is exactly
# one coadd type in the central butler?
templates = set(_filter_datasets(
self.central_butler, self.butler,
"*Coadd",
collections=self._COLLECTION_TEMPLATE,
instrument=self.instrument.getName(),
skymap=self.skymap_name,
where=template_where,
findFirst=True))
_log.debug("Found %d new template datasets.", len(templates))
export.saveDatasets(templates)
try:
templates = set(_filter_datasets(
self.central_butler, self.butler,
"*Coadd",
collections=self._get_template_collection(),
instrument=self.instrument.getName(),
skymap=self.skymap_name,
where=template_where,
findFirst=True))
except _MissingDatasetError as err:
_log.error(err)
else:
_log.debug("Found %d new template datasets.", len(templates))
export.saveDatasets(templates)
self._export_collections(export, self._get_template_collection())

def _export_calibs(self, export, detector_id, filter):
"""Export the calibs for this visit from the central butler.
Expand Down Expand Up @@ -495,11 +505,24 @@ def _export_calibs(self, export, detector_id, filter):
export.saveDatasets(
calibs,
elements=[]) # elements=[] means do not export dimension records
target_types = {CollectionType.CALIBRATION}
for collection in self.central_butler.registry.queryCollections(...,
collectionTypes=target_types):
export.saveCollection(collection)
export.saveCollection(self.instrument.makeCalibrationCollectionName())

def _export_collections(self, export, collection):
"""Export the collection and all its children.

This preserves the collection structure even if some child collections
do not have data. Exporting a collection does not export its datasets.

Parameters
----------
export : `Iterator[RepoExportContext]`
Export context manager.
collection : `str`
The collection to be exported. It is usually a CHAINED collection
and can have many children.
"""
for child in self.central_butler.registry.queryCollections(
collection, flattenChains=True, includeChains=True):
export.saveCollection(child)

@staticmethod
def _count_by_type(refs):
Expand Down Expand Up @@ -637,8 +660,12 @@ def _prep_pipeline(self, visit: Visit) -> lsst.pipe.base.Pipeline:
pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file)
except FileNotFoundError:
raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}")
pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri)
pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace)

try:
pipeline.addConfigOverride("diaPipe", "apdb.db_url", self._apdb_uri)
pipeline.addConfigOverride("diaPipe", "apdb.namespace", self._apdb_namespace)
except LookupError:
_log.debug("diaPipe is not in this pipeline.")
return pipeline

def _download(self, remote):
Expand Down
Binary file modified tests/data/central_repo/gen3.sqlite3
Binary file not shown.
26 changes: 5 additions & 21 deletions tests/data/export.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# Created from ap_verify_ci_hits2015 on 2022-03-16 using make_preloaded_export.py
# and then modified to add the necessary collection CHAINs.
# Created from ap_verify_ci_hits2015 on 2023-02-23 using make_preloaded_export.py
description: Butler Data Repository Export
version: 1.0.1
version: 1.0.2
universe_version: 3
universe_namespace: daf_butler
data:
- type: dimension
element: instrument
records:
- name: DECam
visit_max: 33554432
visit_system: 0
exposure_max: 33554432
detector_max: 100
class_name: lsst.obs.decam.DarkEnergyCamera
Expand Down Expand Up @@ -541,24 +543,6 @@ data:
host: null
timespan_begin: null
timespan_end: null
- type: collection
collection_type: CHAINED
name: refcats
children:
- refcats/gen2
- type: collection
collection_type: CHAINED
name: templates
children:
- templates/deep
- type: collection
collection_type: CHAINED
name: DECam/defaults
children:
- DECam/calib
- refcats
- templates
- skymaps
- type: dataset_type
name: camera
dimensions:
Expand Down
13 changes: 9 additions & 4 deletions tests/data/make_central_repo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
# tests, making all the files be empty and adjusting the sqlite registry to
# match.

# This export file was created with this command, and then modified to have the
# CHAINED collections that we expect (path is the repo on JKP's desktop):
# make_preloaded_export.py --src-rep /data/ap_verify_ci_hits2015
# This export file was created with this command, and then added a comment on the first line:
# make_preloaded_export.py --src-rep $AP_VERIFY_CI_HITS2015_DIR
butler create central_repo
butler import --export-file export.yaml --transfer copy central_repo/ $AP_VERIFY_CI_HITS2015_DIR/preloaded/

butler collection-chain central_repo refcats refcats/gen2
butler collection-chain central_repo DECam/templates templates/deep
butler collection-chain central_repo DECam/defaults DECam/calib,refcats,DECam/templates,skymaps

# Empty out files and make them size 0 in the registry.
# We do not empty the camera description in ``DECAM/calib/unbounded`
# because we need it to load the camera geometry.
Expand All @@ -19,4 +22,6 @@ find central_repo/DECam/calib/20150313T000000Z -name "*.fits" -execdir sh -c '>
find central_repo/DECam/calib/curated -name "*.fits" -execdir sh -c '> "$1"' -- {} \;
# find central_repo/skymaps -name "*.fits" -execdir sh -c '> "$1"' -- {} \;

sqlite3 central_repo/gen3.sqlite3 "update file_datastore_records set file_size=0;"
# The camera and skymap files are not emptied out. The records need to be consistent so that
# the datastore does not complain about size mismatch when reading them.
sqlite3 central_repo/gen3.sqlite3 'update file_datastore_records set file_size=0 where path != "DECam/calib/unbounded/camera/camera_DECam_DECam_calib_unbounded.fits" and path != "skymaps/skyMap/skyMap_deepCoadd_skyMap_skymaps.pickle";'
32 changes: 26 additions & 6 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
instname = "DECam"
# Full name of the physical filter for the test file.
filter = "g DECam SDSS c0001 4720.0 1520.0"
# The skymap name used in the test repo.
skymap_name = "deepCoadd_skyMap"


def fake_file_data(filename, dimensions, instrument, visit):
Expand Down Expand Up @@ -133,7 +135,7 @@ def setUp(self):
# Have to preserve the tempdir, so that it doesn't get cleaned up.
self.workspace = tempfile.TemporaryDirectory()
self.interface = MiddlewareInterface(central_butler, self.input_data, instrument,
self.workspace.name,
skymap_name, self.workspace.name,
prefix="file://")

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
Expand Down Expand Up @@ -171,7 +173,7 @@ def test_get_butler(self):
]:
# TODO: better way to test repo location?
self.assertTrue(
butler.getURI("skyMap", skymap="deepCoadd_skyMap", run="foo", predict=True).ospath
butler.getURI("skyMap", skymap=skymap_name, run="foo", predict=True).ospath
.startswith(self.central_repo))
self.assertEqual(list(butler.collections), [f"{instname}/defaults"])
self.assertTrue(butler.isWriteable())
Expand Down Expand Up @@ -203,7 +205,7 @@ def _check_imports(self, butler, detector, expected_shards):
# Check that the right skymap is in the chained output collection.
self.assertTrue(
butler.datasetExists("skyMap",
skymap="deepCoadd_skyMap",
skymap=skymap_name,
collections=self.interface.output_collection)
)

Expand Down Expand Up @@ -241,7 +243,7 @@ def _check_imports(self, butler, detector, expected_shards):
butler.registry.refresh()
for patch in (464, 465, 509, 510):
butler.datasetExists('deepCoadd', tract=0, patch=patch, band="g",
skymap="deepCoadd_skyMap",
skymap=skymap_name,
collections=self.interface.output_collection)

def test_prep_butler(self):
Expand Down Expand Up @@ -592,7 +594,7 @@ def setUp(self):
self._create_copied_repo()
central_butler = Butler(self.central_repo.name,
instrument=instname,
skymap="deepCoadd_skyMap",
skymap=skymap_name,
collections=[f"{instname}/defaults"],
writeable=True)
instrument = "lsst.obs.decam.DarkEnergyCamera"
Expand Down Expand Up @@ -628,7 +630,8 @@ def setUp(self):
self.logger_name = "lsst.activator.middleware_interface"

# Populate repository.
self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, workspace.name,
self.interface = MiddlewareInterface(central_butler, self.input_data, instrument,
skymap_name, workspace.name,
prefix="file://")
# TODO: should MiddlewareInterface have a cleanup method?
self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.interface._repo)
Expand Down Expand Up @@ -675,6 +678,23 @@ def _count_datasets(self, butler, types, collections):
def _count_datasets_with_id(self, butler, types, collections, data_id):
return len(set(butler.registry.queryDatasets(types, collections=collections, dataId=data_id)))

def test_extra_collection(self):
"""Test that extra collections in the chain will not lead to MissingCollectionError
even if they do not carry useful data.
"""
central_butler = Butler(self.central_repo.name, writeable=True)
central_butler.registry.registerCollection("emptyrun", CollectionType.RUN)
_prepend_collection(central_butler, "refcats", ["emptyrun"])

self.interface.prep_butler(self.next_visit)

self.assertEqual(
self._count_datasets(self.interface.butler, "gaia", f"{instname}/defaults"),
3)
self.assertIn(
"emptyrun",
self.interface.butler.registry.queryCollections("refcats", flattenChains=True))

def test_export_outputs(self):
self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]})

Expand Down
2 changes: 2 additions & 0 deletions ups/prompt_prototype.table
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ setupRequired(utils) # Used by scripts in bin.src

# Used by middleware_interface module due to hard-coded pipeline references.
setupRequired(ap_pipe)
# Borrow LATISS configs from drp_pipe
setupRequired(drp_pipe)
setupRequired(daf_butler)
setupRequired(ctrl_mpexec)
setupRequired(geom)
Expand Down