From c95da79eba4d1f8dcc3d2545d7a53210050e4e50 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Tue, 16 May 2023 14:48:35 -0700 Subject: [PATCH 1/2] Do not export raws to the central repo --- python/activator/middleware_interface.py | 7 +------ tests/test_middleware_interface.py | 13 ------------- 2 files changed, 1 insertion(+), 19 deletions(-) diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index f001f0fb..62fcffa9 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -759,7 +759,7 @@ def run_pipeline(self, exposure_ids: set[int]) -> None: f"detector {self.visit.detector} of {exposure_ids}.") def export_outputs(self, exposure_ids: set[int]) -> None: - """Copy raws and pipeline outputs from processing a set of images back + """Copy pipeline outputs from processing a set of images back to the central Butler. Parameters @@ -767,11 +767,6 @@ def export_outputs(self, exposure_ids: set[int]) -> None: exposure_ids : `set` [`int`] Identifiers of the exposures that were processed. """ - # TODO: this method will not be responsible for raws after DM-36051. - self._export_subset(exposure_ids, "raw", - in_collections=self.instrument.makeDefaultRawIngestRunName(), - ) - self._export_subset(exposure_ids, # TODO: find a way to merge datasets like *_config # or *_schema that are duplicated across multiple diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 2ae67277..0b2f43db 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -743,14 +743,9 @@ def test_export_outputs(self): self.interface.export_outputs({self.raw_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - raw_collection = f"{instname}/raw/all" date = datetime.datetime.now(datetime.timezone.utc) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" - self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 1) - self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), - 1) self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 1) self.assertEqual( self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), @@ -773,17 +768,9 @@ def test_export_outputs_retry(self): self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) - raw_collection = f"{instname}/raw/all" date = datetime.datetime.now(datetime.timezone.utc) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" - self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 2) - self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id), - 1) - self.assertEqual( - self._count_datasets_with_id(central_butler, "raw", raw_collection, self.second_data_id), - 1) self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) self.assertEqual( self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), From 68f30807e8a4e8005fb23e61cb42e38976ab3469 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Wed, 17 May 2023 16:47:51 -0700 Subject: [PATCH 2/2] Test with two local repos The two visits were tested using one shared local repos. Change it so it is more similiar to the actual system. This can already work before this ticket. --- tests/test_middleware_interface.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index 0b2f43db..d7f2c025 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -629,9 +629,6 @@ def _create_copied_repo(self): central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto") def setUp(self): - # TODO: test two parallel repos once DM-36051 fixed; can't do it - # earlier because the test data has only one raw. - self._create_copied_repo() central_butler = Butler(self.central_repo.name, instrument=instname, @@ -642,9 +639,11 @@ def setUp(self): self.input_data = os.path.join(data_dir, "input_data") local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) + second_local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname) # TemporaryDirectory warns on leaks; addCleanup also keeps the TD from # getting garbage-collected. self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo) + self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo) # coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371 ra = 155.4702849608958 @@ -687,7 +686,7 @@ def setUp(self): self.interface.instrument, self.second_visit) self.second_interface = MiddlewareInterface(central_butler, self.input_data, self.second_visit, - skymap_name, local_repo.name, + skymap_name, second_local_repo.name, prefix="file://") with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock: @@ -710,9 +709,12 @@ def _simulate_run(self): for k, v in self.second_data_id.items()} # Dataset types defined for local Butler on pipeline run, but no # guarantee this happens in central Butler. - butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"}, + butler_tests.addDatasetType(self.interface.butler, "calexp", + {"instrument", "visit", "detector"}, + "ExposureF") + butler_tests.addDatasetType(self.second_interface.butler, "calexp", + {"instrument", "visit", "detector"}, "ExposureF") - self.second_interface.butler.registry.refresh() self.interface.butler.put(exp, "calexp", self.processed_data_id) self.second_interface.butler.put(exp, "calexp", self.second_processed_data_id) @@ -741,15 +743,20 @@ def test_extra_collection(self): def test_export_outputs(self): self.interface.export_outputs({self.raw_data_id["exposure"]}) + self.second_interface.export_outputs({self.second_data_id["exposure"]}) central_butler = Butler(self.central_repo.name, writeable=False) date = datetime.datetime.now(datetime.timezone.utc) export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \ "/ApPipe/prompt-proto-service-042" - self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 1) + self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2) self.assertEqual( self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id), 1) + self.assertEqual( + self._count_datasets_with_id(central_butler, "calexp", export_collection, + self.second_processed_data_id), + 1) # Did not export calibs or other inputs. self.assertEqual( self._count_datasets(central_butler, ["cpBias", "gaia", "skyMap", "*Coadd"], export_collection),