diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index f001f0fb..62fcffa9 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -759,7 +759,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: f"detector {self.visit.detector} of {exposure_ids}.") def export_outputs(self, exposure_ids: set[int]) -> None: - """Copy raws and pipeline outputs from processing a set of images back + """Copy pipeline outputs from processing a set of images back to the central Butler. Parameters @@ -767,11 +767,6 @@ def export_outputs(self, exposure_ids: set[int]) -> None: exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. """ - # TODO: this method will not be responsible for raws after DM-36051. - self._export_subset(exposure_ids, "raw", - in_collections=self.instrument.makeDefaultRawIngestRunName(), - ) - self._export_subset(exposure_ids, # TODO: find a way to merge datasets like *_config # or *_schema that are duplicated across multiple diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 2ae67277..d7f2c025 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -629,9 +629,6 @@ def _create_copied_repo(self): central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") def setUp(self): - # TODO: test two parallel repos once DM-36051 fixed; can't do it - # earlier because the test data has only one raw. - self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, @@ -642,9 +639,11 @@ def setUp(self): self.input_data = os.path.join(data_dir, "input_data") local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) + second_local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -687,7 +686,7 @@ def setUp(self): self.interface.instrument, self.second_visit) self.second_interface = MiddlewareInterface(central_butler, self.input_data, self.second_visit, - skymap_name, local_repo.name, + skymap_name, second_local_repo.name, prefix="file://") with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -710,9 +709,12 @@ def _simulate_run(self): for k, v in self.second_data_id.items()} # Dataset types defined for local Butler on pipeline run, but no # guarantee this happens in central Butler. - butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, + butler_tests.addDatasetType(self.interface.butler, "calexp", + {"instrument", "visit", "detector"}, + "ExposureF") + butler_tests.addDatasetType(self.second_interface.butler, "calexp", + {"instrument", "visit", "detector"}, "ExposureF") - self.second_interface.butler.registry.refresh() self.interface.butler.put(exp, "calexp", self.processed_data_id) self.second_interface.butler.put(exp, "calexp", self.second_processed_data_id) @@ -741,19 +743,19 @@ def test_extra_collection(self): def test_export_outputs(self): self.interface.export_outputs({self.raw_data_id["exposure"]}) + self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - raw_collection = f"{instname}/raw/all" date = datetime.datetime.now(datetime.timezone.utc) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" - self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 1) + self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), + self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), 1) - self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 1) self.assertEqual( - self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), + self._count_datasets_with_id(central_butler, "calexp", export_collection, + self.second_processed_data_id), 1) # Did not export calibs or other inputs. self.assertEqual( @@ -773,17 +775,9 @@ def test_export_outputs_retry(self): self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - raw_collection = f"{instname}/raw/all" date = datetime.datetime.now(datetime.timezone.utc) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" - self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 2) - self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), - 1) - self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.second_data_id), - 1) self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) self.assertEqual( self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id),