Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions python/activator/middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -759,19 +759,14 @@ def run_pipeline(self, exposure_ids: set[int]) -> None:
f"detector {self.visit.detector} of {exposure_ids}.")

def export_outputs(self, exposure_ids: set[int]) -> None:
"""Copy raws and pipeline outputs from processing a set of images back
"""Copy pipeline outputs from processing a set of images back
to the central Butler.

Parameters
----------
exposure_ids : `set` [`int`]
Identifiers of the exposures that were processed.
"""
# TODO: this method will not be responsible for raws after DM-36051.
self._export_subset(exposure_ids, "raw",
in_collections=self.instrument.makeDefaultRawIngestRunName(),
)

self._export_subset(exposure_ids,
# TODO: find a way to merge datasets like *_config
# or *_schema that are duplicated across multiple
Expand Down
32 changes: 13 additions & 19 deletions tests/test_middleware_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,9 +629,6 @@ def _create_copied_repo(self):
central_butler.import_(directory=data_repo, filename=export_file.name, transfer="auto")

def setUp(self):
# TODO: test two parallel repos once DM-36051 fixed; can't do it
# earlier because the test data has only one raw.

self._create_copied_repo()
central_butler = Butler(self.central_repo.name,
instrument=instname,
Expand All @@ -642,9 +639,11 @@ def setUp(self):
self.input_data = os.path.join(data_dir, "input_data")

local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname)
second_local_repo = make_local_repo(tempfile.gettempdir(), central_butler, instname)
# TemporaryDirectory warns on leaks; addCleanup also keeps the TD from
# getting garbage-collected.
self.addCleanup(tempfile.TemporaryDirectory.cleanup, local_repo)
self.addCleanup(tempfile.TemporaryDirectory.cleanup, second_local_repo)

# coordinates from DECam data in ap_verify_ci_hits2015 for visit 411371
ra = 155.4702849608958
Expand Down Expand Up @@ -687,7 +686,7 @@ def setUp(self):
self.interface.instrument,
self.second_visit)
self.second_interface = MiddlewareInterface(central_butler, self.input_data, self.second_visit,
skymap_name, local_repo.name,
skymap_name, second_local_repo.name,
prefix="file://")

with unittest.mock.patch.object(self.interface.rawIngestTask, "extractMetadata") as mock:
Expand All @@ -710,9 +709,12 @@ def _simulate_run(self):
for k, v in self.second_data_id.items()}
# Dataset types defined for local Butler on pipeline run, but no
# guarantee this happens in central Butler.
butler_tests.addDatasetType(self.interface.butler, "calexp", {"instrument", "visit", "detector"},
butler_tests.addDatasetType(self.interface.butler, "calexp",
{"instrument", "visit", "detector"},
"ExposureF")
butler_tests.addDatasetType(self.second_interface.butler, "calexp",
{"instrument", "visit", "detector"},
"ExposureF")
self.second_interface.butler.registry.refresh()
self.interface.butler.put(exp, "calexp", self.processed_data_id)
self.second_interface.butler.put(exp, "calexp", self.second_processed_data_id)

Expand Down Expand Up @@ -741,19 +743,19 @@ def test_extra_collection(self):

def test_export_outputs(self):
self.interface.export_outputs({self.raw_data_id["exposure"]})
self.second_interface.export_outputs({self.second_data_id["exposure"]})

central_butler = Butler(self.central_repo.name, writeable=False)
raw_collection = f"{instname}/raw/all"
date = datetime.datetime.now(datetime.timezone.utc)
export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \
"/ApPipe/prompt-proto-service-042"
self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 1)
self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2)
self.assertEqual(
self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id),
self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id),
1)
self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 1)
self.assertEqual(
self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id),
self._count_datasets_with_id(central_butler, "calexp", export_collection,
self.second_processed_data_id),
1)
# Did not export calibs or other inputs.
self.assertEqual(
Expand All @@ -773,17 +775,9 @@ def test_export_outputs_retry(self):
self.second_interface.export_outputs({self.second_data_id["exposure"]})

central_butler = Butler(self.central_repo.name, writeable=False)
raw_collection = f"{instname}/raw/all"
date = datetime.datetime.now(datetime.timezone.utc)
export_collection = f"{instname}/prompt/output-{date.year:04d}-{date.month:02d}-{date.day:02d}" \
"/ApPipe/prompt-proto-service-042"
self.assertEqual(self._count_datasets(central_butler, "raw", raw_collection), 2)
self.assertEqual(
self._count_datasets_with_id(central_butler, "raw", raw_collection, self.raw_data_id),
1)
self.assertEqual(
self._count_datasets_with_id(central_butler, "raw", raw_collection, self.second_data_id),
1)
self.assertEqual(self._count_datasets(central_butler, "calexp", export_collection), 2)
self.assertEqual(
self._count_datasets_with_id(central_butler, "calexp", export_collection, self.processed_data_id),
Expand Down