diff --git a/Dockerfile.main b/Dockerfile.main index 31f7a8d3..f38c017c 100644 --- a/Dockerfile.main +++ b/Dockerfile.main @@ -1,4 +1,4 @@ -FROM lsstsqre/centos:w_latest +FROM lsstsqre/centos:d_latest ENV PYTHONUNBUFFERED True RUN source /opt/lsst/software/stack/loadLSST.bash \ && mamba install -y \ diff --git a/pipelines/calibrate.py b/pipelines/calibrate.py index 4a580681..1e3d7b97 100644 --- a/pipelines/calibrate.py +++ b/pipelines/calibrate.py @@ -6,10 +6,8 @@ # band with the most depth). config.connections.astromRefCat = "gaia" -config.astromRefObjLoader.ref_dataset_name = config.connections.astromRefCat config.astromRefObjLoader.anyFilterMapsToThis = "phot_g_mean" config.astromRefObjLoader.filterMap = {} # Use panstarrs for photometry (grizy filters). config.connections.photoRefCat = "panstarrs" -config.photoRefObjLoader.ref_dataset_name = config.connections.photoRefCat diff --git a/python/activator/activator.py b/python/activator/activator.py index 81528e65..9480d499 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -84,7 +84,7 @@ # the central repo is located, either, so perhaps we need a new module. central_butler = Butler(calib_repo, collections=[active_instrument.makeCollectionName("defaults")], - writeable=False, + writeable=True, inferDefaults=False) repo = f"/tmp/butler-{os.getpid()}" butler = Butler(Butler.makeRepo(repo), writeable=True) @@ -176,7 +176,7 @@ def next_visit_handler() -> Tuple[str, int]: ) if oid: m = re.match(RAW_REGEXP, oid) - mwi.ingest_image(oid) + mwi.ingest_image(expected_visit, oid) expid_set.add(m.group('expid')) _log.debug(f"Waiting for snaps from {expected_visit}.") @@ -227,6 +227,9 @@ def next_visit_handler() -> Tuple[str, int]: _log.warning(f"Processing {len(expid_set)} snaps, expected {expected_visit.snaps}.") _log.info(f"Running pipeline on {expected_visit}.") mwi.run_pipeline(expected_visit, expid_set) + # TODO: broadcast alerts here + # TODO: call export_outputs on success or permanent failure in DM-34141 + mwi.export_outputs(expected_visit, expid_set) return "Pipeline executed", 200 else: _log.error(f"Timed out waiting for images for {expected_visit}.") diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 6157493e..921c365f 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -27,6 +27,7 @@ import os import os.path import tempfile +import typing from lsst.utils import getPackageDir from lsst.resources import ResourcePath @@ -54,6 +55,16 @@ class MiddlewareInterface: ingest the data when it is available, and run the difference imaging pipeline, all in that local butler. + Each instance may be used for processing more than one group-detector + combination, designated by the `Visit` parameter to certain methods. There + is no guarantee that a processing run may, or may not, share a group, + detector, or both with a previous run handled by the same object. + + ``MiddlewareInterface`` objects are not thread- or process-safe, and must + not share any state with other instances (see ``butler`` in the parameter + list). They may, however, share a ``central_butler``, and concurrent + operations on this butler are guaranteed to be appropriately synchronized. + Parameters ---------- central_butler : `lsst.daf.butler.Butler` @@ -70,9 +81,10 @@ class MiddlewareInterface: use of the butler. butler : `lsst.daf.butler.Butler` Local butler to process data in and hold calibrations, etc.; must be - writeable. + writeable. Must not be shared with any other ``MiddlewareInterface`` + object. prefix : `str`, optional - URI identification prefix; prepended to ``image_bucket`` when + URI scheme followed by ``://``; prepended to ``image_bucket`` when constructing URIs to retrieve incoming files. The default is appropriate for use in the Google Cloud environment; typically only change this when running local tests. @@ -84,15 +96,24 @@ class MiddlewareInterface: """The collection used for skymaps. """ + # Class invariants: + # self.image_host is a valid URI with non-empty path and no query or fragment. + # self._download_store is None if and only if self.image_host is a local URI. + # self.instrument, self.camera, and self.skymap do not change after __init__. + # self.butler defaults to a chained collection named + # self.output_collection, which contains zero or more output runs, + # pre-made inputs, and raws, in that order. However, self.butler is not + # guaranteed to contain concrete data, or even the dimensions + # corresponding to self.camera and self.skymap. + def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, butler: Butler, prefix: str = "gs://"): self.ip_apdb = os.environ["IP_APDB"] - self.prefix = prefix self.central_butler = central_butler - self.image_bucket = image_bucket + self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later - if not prefix.startswith("file"): + if not self.image_host.startswith("file"): self._download_store = tempfile.TemporaryDirectory(prefix="holding-") else: self._download_store = None @@ -102,9 +123,7 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, self._init_local_butler(butler) self._init_ingester() - - define_visits_config = lsst.obs.base.DefineVisitsConfig() - self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) + self._init_visit_definer() # HACK: explicit collection gets around the fact that we don't have any # timestamp/exposure information in a form we can pass to the Butler. @@ -125,6 +144,9 @@ def __init__(self, central_butler: Butler, image_bucket: str, instrument: str, def _init_local_butler(self, butler: Butler): """Prepare the local butler to ingest into and process from. + ``self.instrument`` must already exist. ``self.butler`` is correctly + initialized after this method returns. + Parameters ---------- butler : `lsst.daf.butler.Butler` @@ -133,13 +155,31 @@ def _init_local_butler(self, butler: Butler): """ self.instrument.register(butler.registry) - # Refresh butler after configuring it, to ensure all required - # dimensions and collections are available. - butler.registry.refresh() - self.butler = butler + # Will be populated in prep_butler. + butler.registry.registerCollection(self.instrument.makeUmbrellaCollectionName(), + CollectionType.CHAINED) + # Will be populated on ingest. + butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), + CollectionType.CHAINED) + # Will be populated on pipeline execution. + butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) + collections = [self.instrument.makeUmbrellaCollectionName(), + self.instrument.makeDefaultRawIngestRunName(), + ] + butler.registry.setCollectionChain(self.output_collection, collections) + + # Internal Butler keeps a reference to the newly prepared collection. + # This reference makes visible any inputs for query purposes. Output + # runs are execution-specific and must be provided explicitly to the + # appropriate calls. + self.butler = Butler(butler=butler, + collections=[self.output_collection], + ) def _init_ingester(self): """Prepare the raw file ingester to receive images into this butler. + + ``self._init_local_butler`` must have already been run. """ config = lsst.obs.base.RawIngestConfig() config.transfer = "copy" # Copy files into the local butler. @@ -149,6 +189,14 @@ def _init_ingester(self): self.rawIngestTask = lsst.obs.base.RawIngestTask(config=config, butler=self.butler) + def _init_visit_definer(self): + """Prepare the visit definer to define visits for this butler. + + ``self._init_local_butler`` must have already been run. + """ + define_visits_config = lsst.obs.base.DefineVisitsConfig() + self.define_visits = lsst.obs.base.DefineVisitsTask(config=define_visits_config, butler=self.butler) + def _predict_wcs(self, detector: lsst.afw.cameraGeom.Detector, visit: Visit) -> lsst.afw.geom.SkyWcs: """Calculate the expected detector WCS for an incoming observation. @@ -202,6 +250,12 @@ def _detector_bounding_circle(self, detector: lsst.afw.cameraGeom.Detector, def prep_butler(self, visit: Visit) -> None: """Prepare a temporary butler repo for processing the incoming data. + After this method returns, the internal butler is guaranteed to contain + all data and all dimensions needed to run the appropriate pipeline on ``visit``, + except for ``raw`` and the ``exposure`` and ``visit`` dimensions, + respectively. It may contain other data that would not be loaded when + processing ``visit``. + Parameters ---------- visit : `Visit` @@ -213,9 +267,6 @@ def prep_butler(self, visit: Visit) -> None: wcs = self._predict_wcs(detector, visit) center, radius = self._detector_bounding_circle(detector, wcs) - # Need up-to-date census of what's already present. - self.butler.registry.refresh() - with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: with self.central_butler.export(filename=export_file.name, format="yaml") as export: self._export_refcats(export, center, radius) @@ -231,8 +282,13 @@ def prep_butler(self, visit: Visit) -> None: directory=self.central_butler.datastore.root, transfer="copy") - self._prep_collections() - self._prep_pipeline(visit) + # Create unique run to store this visit's raws. This makes it easier to + # clean up the central repo later. + # TODO: not needed after DM-36051, or if we find a way to give all test + # raws unique exposure IDs. + input_raws = self._get_raw_run_name(visit) + self.butler.registry.registerCollection(input_raws, CollectionType.RUN) + _prepend_collection(self.butler, self.instrument.makeDefaultRawIngestRunName(), [input_raws]) def _export_refcats(self, export, center, radius): """Export the refcats for this visit from the central butler. @@ -371,8 +427,29 @@ def get_key(ref): for k, g in itertools.groupby(ordered, key=get_key): yield k, len(list(g)) + def _get_raw_run_name(self, visit): + """Define a run collection specific to a particular visit. + + Parameters + ---------- + visit : Visit + The visit whose raws will be stored in this run. + + Returns + ------- + run : `str` + The name of a run collection to use for raws. + """ + return self.instrument.makeCollectionName("raw", visit.group) + def _prep_collections(self): """Pre-register output collections in advance of running the pipeline. + + Returns + ------- + run : `str` + The name of a new run collection to use for outputs. The run name + is prefixed by ``self.output_collection``. """ # NOTE: Because we receive a butler on init, we can't use this # prep_butler() because it takes a repo path. @@ -383,23 +460,16 @@ def _prep_collections(self): # 'refcats'], # output=self.output_collection) # The below is taken from SimplePipelineExecutor.prep_butler. - # TODO DM-34202: save run collection in self for now, but we won't need - # it when we no longer need to work around DM-34202. - self.output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" - self.butler.registry.registerCollection(self.instrument.makeDefaultRawIngestRunName(), - CollectionType.RUN) - self.butler.registry.registerCollection(self.output_run, CollectionType.RUN) - self.butler.registry.registerCollection(self.output_collection, CollectionType.CHAINED) - collections = [self.instrument.makeUmbrellaCollectionName(), - self.instrument.makeDefaultRawIngestRunName(), - self.output_run] - self.butler.registry.setCollectionChain(self.output_collection, collections) - - # Need to create a new butler with all the output collections. - self.butler = Butler(butler=self.butler, - collections=[self.output_collection], - # TODO DM-34202: hack around a middleware bug. - run=None) + # TODO: currently the run **must** be unique for each unit of + # processing. It must be unique per group because in the prototype the + # same exposures may be rerun under different group IDs, and they must + # be unique per detector because the same worker may be tasked with + # different detectors from the same group. Replace with a single + # universal run on DM-36586. + output_run = f"{self.output_collection}/{self.instrument.makeCollectionTimestamp()}" + self.butler.registry.registerCollection(output_run, CollectionType.RUN) + _prepend_collection(self.butler, self.output_collection, [output_run]) + return output_run def _prep_pipeline(self, visit: Visit) -> None: """Setup the pipeline to be run, based on the configured instrument and @@ -410,6 +480,11 @@ def _prep_pipeline(self, visit: Visit) -> None: visit : Visit Group of snaps from one detector to prepare the pipeline for. + Returns + ------- + pipeline : `lsst.pipe.base.Pipeline` + The pipeline to run for ``visit``. + Raises ------ RuntimeError @@ -422,13 +497,14 @@ def _prep_pipeline(self, visit: Visit) -> None: ap_pipeline_file = os.path.join(getPackageDir("prompt_prototype"), "pipelines", self.instrument.getName(), "ApPipe.yaml") try: - self.pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) + pipeline = lsst.pipe.base.Pipeline.fromFile(ap_pipeline_file) except FileNotFoundError: raise RuntimeError(f"No ApPipe.yaml defined for camera {self.instrument.getName()}") # TODO: Can we write to a configurable apdb schema, rather than # "postgres"? - self.pipeline.addConfigOverride("diaPipe", "apdb.db_url", - f"postgresql://postgres@{self.ip_apdb}/postgres") + pipeline.addConfigOverride("diaPipe", "apdb.db_url", + f"postgresql://postgres@{self.ip_apdb}/postgres") + return pipeline def _download(self, remote): """Download an image located on a remote store. @@ -451,35 +527,48 @@ def _download(self, remote): local.transfer_from(remote, "copy") return local - def ingest_image(self, oid: str) -> None: + def ingest_image(self, visit: Visit, oid: str) -> None: """Ingest an image into the temporary butler. + The temporary butler must not already contain a ``raw`` dataset + corresponding to ``oid``. After this method returns, the temporary + butler contains one ``raw`` dataset corresponding to ``oid``, and the + appropriate ``exposure`` dimension. + Parameters ---------- + visit : Visit + The visit for which the image was taken. oid : `str` Google storage identifier for incoming image, relative to the image bucket. """ + # TODO: consider allowing pre-existing raws, as may happen when a + # pipeline is rerun (see DM-34141). _log.info(f"Ingesting image id '{oid}'") - file = ResourcePath(f"{self.prefix}{self.image_bucket}/{oid}") + file = ResourcePath(f"{self.image_host}/{oid}") if not file.isLocal: # TODO: RawIngestTask doesn't currently support remote files. file = self._download(file) - result = self.rawIngestTask.run([file]) + result = self.rawIngestTask.run([file], run=self._get_raw_run_name(visit)) # We only ingest one image at a time. # TODO: replace this assert with a custom exception, once we've decided # how we plan to handle exceptions in this code. assert len(result) == 1, "Should have ingested exactly one image." _log.info("Ingested one %s with dataId=%s", result[0].datasetType.name, result[0].dataId) - def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: + def run_pipeline(self, visit: Visit, exposure_ids: set[int]) -> None: """Process the received image(s). + The internal butler must contain all data and all dimensions needed to + run the appropriate pipeline on ``visit`` and ``exposure_ids``, except + for the ``visit`` dimension itself. + Parameters ---------- visit : Visit Group of snaps from one detector to be processed. - exposure_ids : `set` + exposure_ids : `set` [`int`] Identifiers of the exposures that were received. """ # TODO: we want to define visits earlier, but we have to ingest a @@ -492,24 +581,131 @@ def run_pipeline(self, visit: Visit, exposure_ids: set) -> None: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") from e - # TODO: can we move this from_pipeline call to prep_butler? + output_run = self._prep_collections() + where = f"instrument='{visit.instrument}' and detector={visit.detector} " \ f"and exposure in ({','.join(str(x) for x in exposure_ids)})" - executor = SimplePipelineExecutor.from_pipeline(self.pipeline, where=where, butler=self.butler) + pipeline = self._prep_pipeline(visit) + executor = SimplePipelineExecutor.from_pipeline(pipeline, + where=where, + butler=Butler(butler=self.butler, + collections=self.butler.collections, + run=output_run)) if len(executor.quantum_graph) == 0: # TODO: a good place for a custom exception? raise RuntimeError("No data to process.") - # TODO DM-34202: hack around a middleware bug. - executor.butler = Butler(butler=self.butler, - collections=[self.output_collection], - run=self.output_run) - _log.info(f"Running '{self.pipeline._pipelineIR.description}' on {where}") + _log.info(f"Running '{pipeline._pipelineIR.description}' on {where}") # If this is a fresh (local) repo, then types like calexp, # *Diff_diaSrcTable, etc. have not been registered. result = executor.run(register_dataset_types=True) _log.info(f"Pipeline successfully run on {len(result)} quanta for " f"detector {visit.detector} of {exposure_ids}.") + def export_outputs(self, visit: Visit, exposure_ids: set[int]) -> None: + """Copy raws and pipeline outputs from processing a set of images back + to the central Butler. + + The copied raws can be found in the collection ``/raw/all``, + while the outputs can be found in ``"/prompt-results"``. + + Parameters + ---------- + visit : Visit + The visit whose outputs need to be exported. + exposure_ids : `set` [`int`] + Identifiers of the exposures that were processed. + """ + # TODO: this method will not be responsible for raws after DM-36051. + self._export_subset(visit, exposure_ids, "raw", + in_collections=self._get_raw_run_name(visit), + out_collection=self.instrument.makeDefaultRawIngestRunName()) + umbrella = self.instrument.makeCollectionName("prompt-results") + self._export_subset(visit, exposure_ids, + # TODO: find a way to merge datasets like *_config + # or *_schema that are duplicated across multiple + # workers. + self._get_safe_dataset_types(self.butler), + self.output_collection + "/*", # exclude inputs + umbrella) + _log.info(f"Pipeline products saved to collection '{umbrella}' for " + f"detector {visit.detector} of {exposure_ids}.") + + @staticmethod + def _get_safe_dataset_types(butler): + """Return the set of dataset types that can be safely merged from a worker. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The butler in which to search for dataset types. + + Returns + ------- + types : iterable [`str`] + The dataset types to return. + """ + return [dstype for dstype in butler.registry.queryDatasetTypes(...) + if "detector" in dstype.dimensions] + + def _export_subset(self, visit: Visit, exposure_ids: set[int], + dataset_types: typing.Any, in_collections: typing.Any, out_collection: str) -> None: + """Copy datasets associated with a processing run back to the + central Butler. + + Parameters + ---------- + visit : Visit + The visit whose outputs need to be exported. + exposure_ids : `set` [`int`] + Identifiers of the exposures that were processed. + dataset_types + The dataset type(s) to transfer; can be any expression described in + :ref:`daf_butler_dataset_type_expressions`. + in_collections + The collections to transfer from; can be any expression described + in :ref:`daf_butler_collection_expressions`. + out_collection : `str` + The chained collection in which to include the datasets. Need not + exist before the call. + """ + try: + # Need to iterate over datasets at least twice, so list. + datasets = list(self.butler.registry.queryDatasets( + dataset_types, + collections=in_collections, + # in_collections may include other runs, so need to filter. + # Since AP processing is strictly visit-detector, these three + # dimensions should suffice. + # DO NOT assume that visit == exposure! + where=f"exposure in ({', '.join(str(x) for x in exposure_ids)})", + instrument=self.instrument.getName(), + detector=visit.detector, + )) + except lsst.daf.butler.registry.DataIdError as e: + raise ValueError("Invalid visit or exposures.") from e + if not datasets: + raise ValueError(f"No datasets match visit={visit} and exposures={exposure_ids}.") + + with tempfile.NamedTemporaryFile(mode="w+b", suffix=".yaml") as export_file: + # MUST NOT export governor dimensions, as this causes deadlocks in + # central registry. Can omit most other dimensions (all dimensions, + # after DM-36051) to avoid locks or redundant work. + # TODO: saveDatasets(elements={"exposure", "visit"}) doesn't work. + # Use import(skip_dimensions) until DM-36062 is fixed. + with self.butler.export(filename=export_file.name) as export: + export.saveDatasets(datasets) + self.central_butler.import_(filename=export_file.name, + directory=self.butler.datastore.root, + skip_dimensions={"instrument", "detector", + "skymap", "tract", "patch"}, + transfer="copy") + # No-op if collection already exists. + self.central_butler.registry.registerCollection(out_collection, CollectionType.CHAINED) + runs = {ref.run for ref in datasets} + # Don't unlink any previous runs. + # TODO: need to secure this against concurrent modification + _prepend_collection(self.central_butler, out_collection, runs) + def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, *args, **kwargs) -> collections.abc.Iterable[lsst.daf.butler.DatasetRef]: @@ -545,3 +741,23 @@ def _query_missing_datasets(src_repo: Butler, dest_repo: Butler, # this operation. return itertools.filterfalse(lambda ref: ref in known_datasets, src_repo.registry.queryDatasets(*args, **kwargs)) + + +def _prepend_collection(butler: Butler, chain: str, new_collections: collections.abc.Iterable[str]) -> None: + """Add a specific collection to the front of an existing chain. + + Parameters + ---------- + butler : `lsst.daf.butler.Butler` + The butler in which the collections exist. + chain : `str` + The chained collection to prepend to. + new_collections : iterable [`str`] + The collections to prepend to ``chain``. + + Notes + ----- + This function is not safe against concurrent modifications to ``chain``. + """ + old_chain = butler.registry.getCollectionChain(chain) # May be empty + butler.registry.setCollectionChain(chain, list(new_collections) + list(old_chain), flatten=False) diff --git a/python/tester/upload.py b/python/tester/upload.py index 58d8e360..5c414195 100644 --- a/python/tester/upload.py +++ b/python/tester/upload.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +import dataclasses from google.cloud import pubsub_v1, storage from google.oauth2 import service_account import itertools @@ -12,7 +12,7 @@ from activator.visit import Visit -@dataclass +@dataclasses.dataclass class Instrument: n_snaps: int n_detectors: int @@ -213,7 +213,7 @@ def make_random_visits(instrument, group): ---------- instrument : `str` The short name of the instrument carrying out the observation. - group : `int` + group : `str` The group number being observed. Returns @@ -344,7 +344,7 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n "unobserved raws are available.") for i, true_group in enumerate(itertools.islice(raw_pool, n_groups)): - group = group_base + i + group = str(group_base + i) _log.debug(f"Processing group {group} from unobserved {true_group}...") # snap_dict maps snap_id to {visit: blob} snap_dict = {} @@ -352,7 +352,7 @@ def upload_from_raws(publisher, instrument, raw_pool, src_bucket, dest_bucket, n # replacing the (immutable) Visit objects to point to group # instead of true_group. for snap_id, old_visits in raw_pool[true_group].items(): - snap_dict[snap_id] = {splice_group(true_visit, group): blob + snap_dict[snap_id] = {dataclasses.replace(true_visit, group=group): blob for true_visit, blob in old_visits.items()} # Gather all the Visit objects found in snap_dict, merging # duplicates for different snaps of the same detector. @@ -390,7 +390,7 @@ def upload_from_random(publisher, instrument, dest_bucket, n_groups, group_base) The base number from which to offset new group numbers. """ for i in range(n_groups): - group = group_base + i + group = str(group_base + i) visit_infos = make_random_visits(instrument, group) # TODO: may be cleaner to use a functor object than to depend on @@ -405,32 +405,5 @@ def upload_dummy(visit, snap_id): time.sleep(SLEW_INTERVAL) -def splice_group(visit, group): - """Replace the group ID in a Visit object. - - Parameters - ---------- - visit : `activator.Visit` - The object to update. - group : `str` - The new group ID to use. - - Returns - ------- - new_visit : `activator.Visit` - A visit with group ``group``, but otherwise identical to ``visit``. - """ - return Visit(instrument=visit.instrument, - detector=visit.detector, - group=group, - snaps=visit.snaps, - filter=visit.filter, - ra=visit.ra, - dec=visit.dec, - rot=visit.rot, - kind=visit.kind, - ) - - if __name__ == "__main__": main() diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 6eb1a6ff..114b1364 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import dataclasses import itertools import tempfile import os.path @@ -29,13 +30,15 @@ import astropy.units as u import astro_metadata_translator -from lsst.daf.butler import Butler, DataCoordinate +import lsst.afw.image +from lsst.daf.butler import Butler, CollectionType, DataCoordinate +import lsst.daf.butler.tests as butler_tests from lsst.obs.base.formatters.fitsExposure import FitsImageFormatter from lsst.obs.base.ingest import RawFileDatasetInfo, RawFileData import lsst.resources from activator.visit import Visit -from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets +from activator.middleware_interface import MiddlewareInterface, _query_missing_datasets, _prepend_collection # The short name of the instrument used in the test repo. instname = "DECam" @@ -129,7 +132,7 @@ def setUp(self): rot = 90. self.next_visit = Visit(instrument_name, detector=56, - group=1, + group="1", snaps=1, filter=filter, ra=ra, @@ -154,6 +157,7 @@ def test_init(self): # Check that the butler instance is properly configured. instruments = list(self.butler.registry.queryDimensionRecords("instrument")) self.assertEqual(instname, instruments[0].name) + self.assertEqual(set(self.interface.butler.collections), {self.interface.output_collection}) # Check that the ingester is properly configured. self.assertEqual(self.interface.rawIngestTask.config.failFast, True) @@ -222,10 +226,6 @@ def test_prep_butler(self): expected_shards = {157394, 157401, 157405} self._check_imports(self.butler, detector=56, expected_shards=expected_shards) - # Check that we configured the right pipeline. - self.assertEqual(self.interface.pipeline._pipelineIR.description, - "End to end Alert Production pipeline specialized for HiTS-2015") - def test_prep_butler_twice(self): """prep_butler should have the correct calibs (and not raise an exception!) on a second run with the same, or a different detector. @@ -236,31 +236,20 @@ def test_prep_butler_twice(self): self.interface.prep_butler(self.next_visit) # Second visit with everything same except group. - self.next_visit = Visit(instrument=self.next_visit.instrument, - detector=self.next_visit.detector, - group=self.next_visit.group + 1, - snaps=self.next_visit.snaps, - filter=self.next_visit.filter, - ra=self.next_visit.ra, - dec=self.next_visit.dec, - rot=self.next_visit.rot, - kind=self.next_visit.kind) + self.next_visit = dataclasses.replace(self.next_visit, group=str(int(self.next_visit.group) + 1)) self.interface.prep_butler(self.next_visit) expected_shards = {157394, 157401, 157405} self._check_imports(self.butler, detector=56, expected_shards=expected_shards) # Third visit with different detector and coordinates. # Only 5, 10, 56, 60 have valid calibs. - self.next_visit = Visit(instrument=self.next_visit.instrument, - detector=5, - group=self.next_visit.group + 1, - snaps=self.next_visit.snaps, - filter=self.next_visit.filter, - # Offset by a bit over 1 patch. - ra=self.next_visit.ra + 0.4, - dec=self.next_visit.dec - 0.4, - rot=self.next_visit.rot, - kind=self.next_visit.kind) + self.next_visit = dataclasses.replace(self.next_visit, + detector=5, + group=str(int(self.next_visit.group) + 1), + # Offset by a bit over 1 patch. + ra=self.next_visit.ra + 0.4, + dec=self.next_visit.dec - 0.4, + ) self.interface.prep_butler(self.next_visit) expected_shards.update({157218, 157229}) self._check_imports(self.butler, detector=5, expected_shards=expected_shards) @@ -269,6 +258,7 @@ def test_prep_butler_twice(self): # This may be impossible to unit test, since it seems to depend on Google-side parallelism. def test_ingest_image(self): + self.interface.prep_butler(self.next_visit) # Ensure raw collections exist. filename = "fakeRawImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, @@ -277,7 +267,7 @@ def test_ingest_image(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) datasets = list(self.butler.registry.queryDatasets('raw', collections=[f'{instname}/raw/all'])) @@ -296,6 +286,7 @@ def test_ingest_image_fails_missing_file(self): through ingest_image(), we'll want to have a test of "missing file ingestion", and this can serve as a starting point. """ + self.interface.prep_butler(self.next_visit) # Ensure raw collections exist. filename = "nonexistentImage.fits" filepath = os.path.join(self.input_data, filename) data_id, file_data = fake_file_data(filepath, @@ -305,7 +296,7 @@ def test_ingest_image_fails_missing_file(self): with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock, \ self.assertRaisesRegex(FileNotFoundError, "Resource at .* does not exist"): mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) # There should not be any raw files in the registry. datasets = list(self.butler.registry.queryDatasets('raw', collections=[f'{instname}/raw/all'])) @@ -326,11 +317,15 @@ def test_run_pipeline(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) with unittest.mock.patch("activator.middleware_interface.SimplePipelineExecutor.run") as mock_run: - self.interface.run_pipeline(self.next_visit, {1}) + with self.assertLogs(self.logger_name, level="INFO") as logs: + self.interface.run_pipeline(self.next_visit, {1}) mock_run.assert_called_once_with(register_dataset_types=True) + # Check that we configured the right pipeline. + self.assertIn("End to end Alert Production pipeline specialized for HiTS-2015", + "\n".join(logs.output)) def test_run_pipeline_empty_quantum_graph(self): """Test that running a pipeline that results in an empty quantum graph @@ -346,7 +341,7 @@ def test_run_pipeline_empty_quantum_graph(self): self.next_visit) with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: mock.return_value = file_data - self.interface.ingest_image(filename) + self.interface.ingest_image(self.next_visit, filename) with self.assertRaisesRegex(RuntimeError, "No data to process"): self.interface.run_pipeline(self.next_visit, {2}) @@ -394,3 +389,161 @@ def test_query_missing_datasets_nodim(self): result = set(_query_missing_datasets(src_butler, existing_butler, "skyMap", ..., skymap="mymap")) src_butler.registry.queryDatasets.assert_called_once_with("skyMap", ..., skymap="mymap") self.assertEqual(result, {data1}) + + def test_prepend_collection(self): + self.butler.registry.registerCollection("_prepend1", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend2", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend3", CollectionType.TAGGED) + self.butler.registry.registerCollection("_prepend_base", CollectionType.CHAINED) + + # Empty chain. + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), []) + _prepend_collection(self.butler, "_prepend_base", ["_prepend1"]) + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), ["_prepend1"]) + + # Non-empty chain. + self.butler.registry.setCollectionChain("_prepend_base", ["_prepend1", "_prepend2"]) + _prepend_collection(self.butler, "_prepend_base", ["_prepend3"]) + self.assertEqual(list(self.butler.registry.getCollectionChain("_prepend_base")), + ["_prepend3", "_prepend1", "_prepend2"]) + + +class MiddlewareInterfaceWriteableTest(unittest.TestCase): + """Test the MiddlewareInterface class with faked data. + + This class creates a fresh test repository for writing to. This means test + setup takes longer than for MiddlewareInterfaceTest, so it should be + used sparingly. + """ + @classmethod + def setUpClass(cls): + super().setUpClass() + + cls.env_patcher = unittest.mock.patch.dict(os.environ, + {"IP_APDB": "localhost"}) + cls.env_patcher.start() + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + + cls.env_patcher.stop() + + def _create_copied_repo(self): + """Create a fresh repository that's a copy of the test data. + + This method sets self.central_repo and arranges cleanup; cleanup would + be awkward if this method returned a Butler instead. + """ + # Copy test data to fresh Butler to allow write tests. + data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + data_repo = os.path.join(data_dir, "central_repo") + data_butler = Butler(data_repo, writeable=False) + self.central_repo = tempfile.TemporaryDirectory() + # TemporaryDirectory warns on leaks + self.addCleanup(tempfile.TemporaryDirectory.cleanup, self.central_repo) + + # Butler.transfer_from can't easily copy collections, so use + # export/import instead. + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as export_file: + with data_butler.export(filename=export_file.name) as export: + export.saveDatasets(data_butler.registry.queryDatasets(..., collections=...)) + for collection in data_butler.registry.queryCollections(): + export.saveCollection(collection) + central_butler = Butler(Butler.makeRepo(self.central_repo.name), writeable=True) + central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") + + def setUp(self): + # TODO: test two parallel repos once DM-36051 fixed; can't do it + # earlier because the test data has only one raw. + + self._create_copied_repo() + central_butler = Butler(self.central_repo.name, + instrument=instname, + skymap="deepCoadd_skyMap", + collections=[f"{instname}/defaults"], + writeable=True) + instrument = "lsst.obs.decam.DarkEnergyCamera" + data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + self.input_data = os.path.join(data_dir, "input_data") + repo = tempfile.TemporaryDirectory() + # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from + # getting garbage-collected. + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repo) + self.butler = Butler(Butler.makeRepo(repo.name), writeable=True) + + # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 + ra = 155.4702849608958 + dec = -4.950050405424033 + # DECam has no rotator; instrument angle is 90 degrees in our system. + rot = 90. + self.next_visit = Visit(instrument, + detector=56, + group="1", + snaps=1, + filter=filter, + ra=ra, + dec=dec, + rot=rot, + kind="SURVEY") + self.logger_name = "lsst.activator.middleware_interface" + + # Populate repository. + self.interface = MiddlewareInterface(central_butler, self.input_data, instrument, self.butler, + prefix="file://") + self.interface.prep_butler(self.next_visit) + filename = "fakeRawImage.fits" + filepath = os.path.join(self.input_data, filename) + self.raw_data_id, file_data = fake_file_data(filepath, + self.butler.dimensions, + self.interface.instrument, + self.next_visit) + with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: + mock.return_value = file_data + self.interface.ingest_image(self.next_visit, filename) + self.interface.define_visits.run([self.raw_data_id]) + + # Simulate pipeline execution. + exp = lsst.afw.image.ExposureF(20, 20) + run = self.interface._prep_collections() + self.processed_data_id = {(k if k != "exposure" else "visit"): v for k, v in self.raw_data_id.items()} + # Dataset types defined for local Butler on pipeline run, but no + # guarantee this happens in central Butler. + butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, + "ExposureF") + self.interface.butler.put(exp, "calexp", self.processed_data_id, run=run) + + def _check_datasets(self, butler, types, collections, count, data_id): + datasets = list(butler.registry.queryDatasets(types, collections=collections)) + self.assertEqual(len(datasets), count) + for dataset in datasets: + self.assertEqual(dataset.dataId, data_id) + + def test_export_outputs(self): + self.interface.export_outputs(self.next_visit, {self.raw_data_id["exposure"]}) + + central_butler = Butler(self.central_repo.name, writeable=False) + raw_collection = f"{instname}/raw/all" + export_collection = f"{instname}/prompt-results" + self._check_datasets(central_butler, + "raw", raw_collection, 1, self.raw_data_id) + # Did not export raws directly to raw/all. + self.assertNotEqual(central_butler.registry.getCollectionType(raw_collection), CollectionType.RUN) + self._check_datasets(central_butler, + "calexp", export_collection, 1, self.processed_data_id) + # Did not export calibs or other inputs. + self._check_datasets(central_butler, + ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection, + 0, {"error": "dnc"}) + # Nothing placed in "input" collections. + self._check_datasets(central_butler, + ["raw", "calexp"], f"{instname}/defaults", 0, {"error": "dnc"}) + + def test_export_outputs_bad_visit(self): + bad_visit = dataclasses.replace(self.next_visit, detector=88) + with self.assertRaises(ValueError): + self.interface.export_outputs(bad_visit, {self.raw_data_id["exposure"]}) + + def test_export_outputs_bad_exposure(self): + with self.assertRaises(ValueError): + self.interface.export_outputs(self.next_visit, {88})