diff --git a/python/activator/activator.py b/python/activator/activator.py index 0d4139d4..b827554a 100644 --- a/python/activator/activator.py +++ b/python/activator/activator.py @@ -58,7 +58,8 @@ from .exception import GracefulShutdownInterrupt, IgnorableVisit, InvalidVisitError, \ NonRetriableError, RetriableError from .middleware_interface import get_central_butler, \ - make_local_repo, make_local_cache, MiddlewareInterface + make_local_repo, make_local_cache, MiddlewareInterface, ButlerWriter, DirectButlerWriter +from .kafka_butler_writer import KafkaButlerWriter from .repo_tracker import LocalRepoTracker # Platform that prompt processing will run on @@ -96,6 +97,23 @@ # The number of seconds to delay retrying connections to the Redis stream. redis_retry = float(os.environ.get("REDIS_RETRY_DELAY", 30)) +# If '1', sends outputs to a service for transfer into the central Butler +# repository instead of writing to the database directly. +use_kafka_butler_writer = os.environ.get("USE_KAFKA_BUTLER_WRITER", "0") == "1" +if use_kafka_butler_writer: + # Hostname of the Kafka cluster used by the Butler writer. + butler_writer_kafka_cluster = os.environ["BUTLER_WRITER_KAFKA_CLUSTER"] + # Username for authentication to BUTLER_WRITER_KAFKA_CLUSTER. + butler_writer_kafka_username = os.environ["BUTLER_WRITER_KAFKA_USERNAME"] + # Password for authentication to BUTLER_WRITER_KAFKA_CLUSTER. + butler_writer_kafka_password = os.environ["BUTLER_WRITER_KAFKA_PASSWORD"] + # Topic used to transfer output datasets to the central repository. + butler_writer_kafka_topic = os.environ["BUTLER_WRITER_KAFKA_TOPIC"] + # URI to the path where output datasets will be written when using the Kafka + # writer to transfer outputs to the central Butler repository. + # This will generally be in the same S3 bucket used by the central Butler. + butler_writer_file_output_path = os.environ["BUTLER_WRITER_FILE_OUTPUT_PATH"] + # Conditionally load keda environment variables if platform == "keda": # Time to wait for fanned out messages before spawning new pod. @@ -163,6 +181,18 @@ def _get_consumer(): }) +@functools.cache +def _get_producer(): + """Lazy initialization of Kafka Producer for Butler writer.""" + return kafka.Producer({ + "bootstrap.servers": butler_writer_kafka_cluster, + "security.protocol": "sasl_plaintext", + "sasl.mechanism": "SCRAM-SHA-512", + "sasl.username": butler_writer_kafka_username, + "sasl.password": butler_writer_kafka_password + }) + + @functools.cache def _get_storage_client(): """Lazy initialization of cloud storage reader.""" @@ -189,6 +219,19 @@ def _get_read_butler(): return _get_write_butler() +@functools.cache +def _get_butler_writer() -> ButlerWriter: + """Lazy initialization of Butler writer.""" + if use_kafka_butler_writer: + return KafkaButlerWriter( + _get_producer(), + output_topic=butler_writer_kafka_topic, + file_output_path=butler_writer_file_output_path + ) + else: + return DirectButlerWriter(_get_write_butler()) + + @functools.cache def _get_local_repo(): """Lazy initialization of local repo. @@ -461,7 +504,7 @@ def create_app(): _get_consumer() _get_storage_client() _get_read_butler() - _get_write_butler() + _get_butler_writer() _get_local_repo() app = flask.Flask(__name__) @@ -510,7 +553,7 @@ def keda_start(): _get_consumer() _get_storage_client() _get_read_butler() - _get_write_butler() + _get_butler_writer() _get_local_repo() redis_session = RedisStreamSession( @@ -1002,7 +1045,7 @@ def process_visit(expected_visit: FannedOutVisit): # Create a fresh MiddlewareInterface object to avoid accidental # "cross-talk" between different visits. mwi = MiddlewareInterface(_get_read_butler(), - _get_write_butler(), + _get_butler_writer(), image_bucket, expected_visit, pre_pipelines, diff --git a/python/activator/kafka_butler_writer.py b/python/activator/kafka_butler_writer.py new file mode 100644 index 00000000..2a40bffb --- /dev/null +++ b/python/activator/kafka_butler_writer.py @@ -0,0 +1,93 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = ("KafkaButlerWriter",) + +from datetime import date +from typing import Literal +from uuid import uuid4 + +from confluent_kafka import Producer +import pydantic + +from lsst.daf.butler import ( + Butler, + DatasetRef, + SerializedDimensionRecord, + SerializedFileDataset, +) +from lsst.resources import ResourcePath + +from .middleware_interface import ButlerWriter, GroupedDimensionRecords + + +class KafkaButlerWriter(ButlerWriter): + def __init__(self, producer: Producer, *, output_topic: str, file_output_path: str) -> None: + self._producer = producer + self._output_topic = output_topic + self._file_output_path = ResourcePath(file_output_path, forceDirectory=True) + + def transfer_outputs( + self, local_butler: Butler, dimension_records: GroupedDimensionRecords, datasets: list[DatasetRef] + ) -> list[DatasetRef]: + # Create a subdirectory in the output root distinct to this processing + # run. + date_string = date.today().strftime("%Y-%m-%d") + subdirectory = f"{date_string}/{uuid4()}/" + output_directory = self._file_output_path.join(subdirectory, forceDirectory=True) + # There is no such thing as a directory in S3, but the Butler complains + # if there is not an object at the prefix of the export path. + output_directory.mkdir() + + # Copy files to the output directory, and retrieve metadata required to + # ingest them into the central Butler. + file_datasets = local_butler._datastore.export(datasets, directory=output_directory, transfer="copy") + + # Serialize Butler data as a JSON string. + event = PromptProcessingOutputEvent( + type="pp-output", + dimension_records=_serialize_dimension_records(dimension_records), + datasets=[dataset.to_simple() for dataset in file_datasets], + root_directory=subdirectory, + ) + message = event.model_dump_json() + + self._producer.produce(self._output_topic, message) + self._producer.flush() + + return datasets + + +class PromptProcessingOutputEvent(pydantic.BaseModel): + type: Literal["pp-output"] + root_directory: str + dimension_records: list[SerializedDimensionRecord] + datasets: list[SerializedFileDataset] + + +def _serialize_dimension_records(grouped_records: GroupedDimensionRecords) -> list[SerializedDimensionRecord]: + output = [] + for records in grouped_records.values(): + for item in records: + output.append(item.to_simple()) + return output diff --git a/python/activator/middleware_interface.py b/python/activator/middleware_interface.py index 6ee56a7d..9cca829a 100644 --- a/python/activator/middleware_interface.py +++ b/python/activator/middleware_interface.py @@ -44,8 +44,8 @@ from lsst.pipe.base.mp_graph_executor import MPGraphExecutor from lsst.pipe.base.separable_pipeline_executor import SeparablePipelineExecutor from lsst.pipe.base.single_quantum_executor import SingleQuantumExecutor -from lsst.daf.butler import Butler, CollectionType, DatasetType, Timespan, \ - DataIdValueError, MissingDatasetTypeError, MissingCollectionError +from lsst.daf.butler import Butler, CollectionType, DatasetType, DatasetRef, Timespan, \ + DataIdValueError, DimensionRecord, MissingDatasetTypeError, MissingCollectionError import lsst.dax.apdb import lsst.geom import lsst.obs.base @@ -187,6 +187,64 @@ def _get_sasquatch_dispatcher(): return SasquatchDispatcher(url=url, token=token, namespace=namespace) +GroupedDimensionRecords: typing.TypeAlias = dict[str, list[DimensionRecord]] +"""Dictionary from dimension name to list of dimension records for that +dimension. +""" + + +class ButlerWriter(typing.Protocol): + """Interface defining functions for writing output datasets back to the central + Butler repository. + """ + + def transfer_outputs( + self, local_butler: Butler, dimension_records: GroupedDimensionRecords, datasets: list[DatasetRef] + ) -> list[DatasetRef]: + """Transfer outputs back to the central repository. + + Parameters + ---------- + local_butler : `lsst.daf.butler.Butler` + Local Butler repository from which output datasets will be + transferred. + dimension_records : `dict` [`str` , `list` [`lsst.daf.butler.DimensionRecord`]] + Dimension records to write to the central Butler repository. + datasets : `list` [`lsst.daf.butler.DatasetRef`] + Datasets to transfer to the central Butler repository. + + Returns + ------- + transferred : `list` [`lsst.daf.butler.DatasetRef`] + List of datasets actually transferred. + """ + + +class DirectButlerWriter(ButlerWriter): + def __init__(self, central_butler: Butler) -> None: + """Writes Butler outputs back to the central repository by connecting + directly to the Butler database. + + Parameters + ---------- + central_butler : `lsst.daf.butler.Butler` + Butler repo to which pipeline outputs should be written. + """ + self._central_butler = central_butler + + def transfer_outputs( + self, local_butler: Butler, dimension_records: GroupedDimensionRecords, datasets: list[DatasetRef] + ) -> list[DatasetRef]: + dimensions = local_butler.dimensions.sorted(dimension_records.keys()) + for dimension in dimensions: + records = dimension_records[dimension.name] + # If records don't match, this is not an error, and central takes precedence. + self._central_butler.registry.insertDimensionData(dimension, *records, skip_existing=True) + + return self._central_butler.transfer_from( + local_butler, datasets, transfer="copy", transfer_dimensions=False) + + class MiddlewareInterface: """Interface layer between the Butler middleware and the prompt processing data handling system, to handle processing individual images. @@ -211,10 +269,9 @@ class MiddlewareInterface: Butler repo containing the calibration and other data needed for processing images as they are received. This butler must be created with the default instrument and skymap assigned. - write_butler : `lsst.daf.butler.Butler` - Butler repo to which pipeline outputs should be written. This butler - must be created with the default instrument assigned. - May be the same object as ``read_butler``. + butler_writer : `activator.middleware_interface.ButlerWriter` + Object that will be used to write the pipeline outputs back to the + central Butler repository. image_bucket : `str` Storage bucket where images will be written to as they arrive. See also ``prefix``. @@ -266,7 +323,8 @@ def _collection_template(self): # corresponding to self.camera and self.skymap. Do not assume that # self.butler is the only Butler pointing to the local repo. - def __init__(self, read_butler: Butler, write_butler: Butler, image_bucket: str, visit: FannedOutVisit, + def __init__(self, read_butler: Butler, butler_writer: ButlerWriter, image_bucket: str, + visit: FannedOutVisit, pre_pipelines: PipelinesConfig, main_pipelines: PipelinesConfig, # TODO: encapsulate relationship between local_repo and local_cache skymap: str, local_repo: str, local_cache: DatasetCache, @@ -277,7 +335,7 @@ def __init__(self, read_butler: Butler, write_butler: Butler, image_bucket: str, # Deployment/version ID -- potentially expensive to generate. self._deployment = runs.get_deployment(self._apdb_config) self.read_central_butler = read_butler - self.write_central_butler = write_butler + self._butler_writer = butler_writer self.image_host = prefix + image_bucket # TODO: _download_store turns MWI into a tagged class; clean this up later if not self.image_host.startswith("file"): @@ -1596,7 +1654,8 @@ def _get_safe_dataset_types(butler): @connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry) def _export_subset(self, exposure_ids: set[int], - dataset_types: typing.Any, in_collections: typing.Any) -> None: + dataset_types: typing.Any, in_collections: typing.Any + ) -> collections.abc.Collection[DatasetRef]: """Copy datasets associated with a processing run back to the central Butler. @@ -1637,45 +1696,47 @@ def _export_subset(self, exposure_ids: set[int], except lsst.daf.butler.registry.DataIdError as e: raise ValueError("Invalid visit or exposures.") from e - with lsst.utils.timer.time_this(_log, msg="export_outputs (transfer)", level=logging.DEBUG): # Transfer dimensions created by ingest in case it was never done in # central repo (which is normal for dev). # Transferring governor dimensions in parallel can cause deadlocks in # central registry. We need to transfer our exposure/visit dimensions, # so handle those manually. - self._export_exposure_dimensions( + dimension_records = self._get_dimension_records_to_export( self.butler, - self.write_central_butler, where="exposure in (exposure_ids)", bind={"exposure_ids": exposure_ids}, instrument=self.instrument.getName(), detector=self.visit.detector, ) - transferred = self.write_central_butler.transfer_from( - self.butler, datasets, transfer="copy", transfer_dimensions=False) + + with lsst.utils.timer.time_this(_log, msg="export_outputs (transfer)", level=logging.DEBUG): + transferred = self._butler_writer.transfer_outputs(self.butler, dimension_records, list(datasets)) _check_transfer_completion(datasets, transferred, "Uploaded") return transferred @staticmethod - def _export_exposure_dimensions(src_butler, dest_butler, **kwargs): - """Transfer dimensions generated from an exposure to the central repo. + def _get_dimension_records_to_export(butler: Butler, **kwargs) -> GroupedDimensionRecords: + """Retrieve dimension records generated from an exposure that need to + be transferred to the central repo. - In many cases the exposure records will already exist in the central - repo, but this is not guaranteed (especially in dev environments). - Visit records never exist in the central repo and are the sole - responsibility of Prompt Processing. + In many cases the exposure records retrieved here will already exist in + the central repo, but this is not guaranteed (especially in dev + environments). Parameters ---------- - src_butler : `lsst.daf.butler.Butler` - The butler from which to transfer dimension records. - dest_butler : `lsst.daf.butler.Butler` - The butler to which to transfer records. + butler : `lsst.daf.butler.Butler` + The butler from which to retrieve dimension records. **kwargs Any data ID parameters to select specific records. They have the same meanings as the parameters of `lsst.daf.butler.Butler.query_dimension_records`. + + Returns + ------- + dimension_records : `dict` [`str` , `list` [`lsst.daf.butler.DimensionRecord`]] + Dictionary from dimension name to list of dimension records for that dimension. """ core_dimensions = ["group", "day_obs", @@ -1683,18 +1744,18 @@ def _export_exposure_dimensions(src_butler, dest_butler, **kwargs): "visit", "visit_system", ] - universe = src_butler.dimensions + universe = butler.dimensions full_dimensions = [universe[d] for d in core_dimensions if d in universe] extra_dimensions = [] for d in full_dimensions: extra_dimensions.extend(universe.get_elements_populated_by(universe[d])) - sorted_dimensions = universe.sorted(full_dimensions + extra_dimensions) + dimensions = full_dimensions + extra_dimensions - for dimension in sorted_dimensions: - records = src_butler.query_dimension_records(dimension, explain=False, **kwargs) - # If records don't match, this is not an error, and central takes precedence. - dest_butler.registry.insertDimensionData(dimension, *records, skip_existing=True) + records = {} + for dimension in dimensions: + records[dimension.name] = butler.query_dimension_records(dimension, explain=False, **kwargs) + return records def _query_datasets_by_storage_class(self, butler, exposure_ids, collections, storage_class): """Identify all datasets with a particular storage class, regardless of diff --git a/tests/test_kafka_butler_writer.py b/tests/test_kafka_butler_writer.py new file mode 100644 index 00000000..0376ac07 --- /dev/null +++ b/tests/test_kafka_butler_writer.py @@ -0,0 +1,80 @@ +# This file is part of prompt_processing. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os +from pathlib import Path +import tempfile +import unittest +import unittest.mock + +from confluent_kafka import Producer + +from lsst.daf.butler import Butler + +from activator.kafka_butler_writer import KafkaButlerWriter, PromptProcessingOutputEvent + + +class KafkaButlerWriterTest(unittest.TestCase): + def setUp(self): + data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") + repository_dir = os.path.join(data_dir, "central_repo") + self.butler = Butler(repository_dir, writeable=False) + + def test_transfer_outputs(self): + butler = self.butler + + # Pull up a list of datasets to send + collection = "LSSTComCamSim/defaults" + datasets = [] + datasets.extend(butler.query_datasets("bias", collection)) + datasets.extend(butler.query_datasets("dark", collection)) + num_datasets = len(datasets) + + # Pull up some dimension records to send + dimension_records = {} + dimension_records["instrument"] = butler.query_dimension_records("instrument") + dimension_records["detector"] = butler.query_dimension_records("detector", instrument="LSSTComCamSim") + num_dimension_records = len(dimension_records["instrument"]) + len(dimension_records["detector"]) + + kafka_producer_mock = unittest.mock.Mock(Producer) + with tempfile.TemporaryDirectory() as output_directory: + topic = "topic-name" + # Simulate a transfer, writing the datasets into a temporary + # directory. + writer = KafkaButlerWriter( + producer=kafka_producer_mock, + output_topic=topic, + file_output_path=output_directory + ) + datasets_transferred = writer.transfer_outputs(butler, dimension_records, datasets) + + self.assertEqual(datasets, datasets_transferred) + self.assertEqual(kafka_producer_mock.produce.call_args.args[0], topic) + + # Check that the serialized metadata sent to Kafka looks correct. + event_json = kafka_producer_mock.produce.call_args.args[1] + model = PromptProcessingOutputEvent.model_validate_json(event_json) + self.assertEqual(len(model.datasets), num_datasets) + self.assertEqual(len(model.dimension_records), num_dimension_records) + + # Check that datasets were written to the output directory. + output_files = [path for path in Path(output_directory).rglob("*") if path.is_file()] + self.assertEqual(len(output_files), num_datasets) diff --git a/tests/test_middleware_interface.py b/tests/test_middleware_interface.py index df4187f4..8a1604c4 100644 --- a/tests/test_middleware_interface.py +++ b/tests/test_middleware_interface.py @@ -53,7 +53,7 @@ from activator.caching import DatasetCache from activator.exception import NonRetriableError, NoGoodPipelinesError, PipelineExecutionError from activator.middleware_interface import get_central_butler, make_local_repo, \ - _get_sasquatch_dispatcher, MiddlewareInterface, \ + _get_sasquatch_dispatcher, MiddlewareInterface, DirectButlerWriter, \ _filter_datasets, _generic_query, _MissingDatasetError from shared.config import PipelinesConfig from shared.run_utils import get_output_run @@ -281,7 +281,8 @@ def setUp(self): private_sndStamp=1718661900.7165175, ) self.logger_name = "lsst.activator.middleware_interface" - self.interface = MiddlewareInterface(self.read_butler, self.write_butler, + butler_writer = DirectButlerWriter(self.write_butler) + self.interface = MiddlewareInterface(self.read_butler, butler_writer, self.input_data, self.next_visit, # Use empty preprocessing to avoid slowing down tests # with real pipelines (adds 20s) @@ -533,7 +534,8 @@ def test_prep_butler_twice(self): # Second visit with everything same except group. second_visit = dataclasses.replace(self.next_visit, groupId=str(int(self.next_visit.groupId) + 1)) - second_interface = MiddlewareInterface(self.read_butler, self.write_butler, + butler_writer = DirectButlerWriter(self.write_butler) + second_interface = MiddlewareInterface(self.read_butler, butler_writer, self.input_data, second_visit, pre_pipelines_empty, pipelines, skymap_name, self.local_repo.name, self.local_cache, @@ -557,7 +559,7 @@ def test_prep_butler_twice(self): position=[self.next_visit.position[0] - 0.2, self.next_visit.position[1] - 0.1], ) - third_interface = MiddlewareInterface(self.read_butler, self.write_butler, + third_interface = MiddlewareInterface(self.read_butler, butler_writer, self.input_data, third_visit, pre_pipelines_empty, pipelines, skymap_name, self.local_repo.name, self.local_cache, @@ -1278,10 +1280,10 @@ def setUp(self): instrument=instname, skymap=skymap_name, writeable=False) - write_butler = Butler(self.central_repo.name, - instrument=instname, - skymap=skymap_name, - writeable=True) + self.write_butler = Butler(self.central_repo.name, + instrument=instname, + skymap=skymap_name, + writeable=True) data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data") self.input_data = os.path.join(data_dir, "input_data") @@ -1340,7 +1342,8 @@ def setUp(self): self.logger_name = "lsst.activator.middleware_interface" # Populate repository. - self.interface = MiddlewareInterface(read_butler, write_butler, self.input_data, self.next_visit, + butler_writer = DirectButlerWriter(self.write_butler) + self.interface = MiddlewareInterface(read_butler, butler_writer, self.input_data, self.next_visit, pre_pipelines_full, pipelines, skymap_name, local_repo.name, self.local_cache, prefix="file://") @@ -1363,7 +1366,7 @@ def setUp(self): self.second_group_data_id = {(k if k != "exposure" else "group"): (v if k != "exposure" else str(v)) for k, v in self.second_data_id.required.items()} self.second_interface = MiddlewareInterface( - read_butler, write_butler, self.input_data, self.second_visit, pre_pipelines_full, pipelines, + read_butler, butler_writer, self.input_data, self.second_visit, pre_pipelines_full, pipelines, skymap_name, second_local_repo.name, self.second_local_cache, prefix="file://") with unittest.mock.patch("activator.middleware_interface.MiddlewareInterface._run_preprocessing"): self.second_interface.prep_butler() @@ -1395,13 +1398,13 @@ def _simulate_run(self): for k, v in self.second_data_id.required.items()} # Dataset types defined for local Butler on pipeline run, but code # assumes output types already exist in central repo. - butler_tests.addDatasetType(self.interface.write_central_butler, "promptPreload_metrics", + butler_tests.addDatasetType(self.write_butler, "promptPreload_metrics", {"instrument", "group", "detector"}, "MetricMeasurementBundle") - butler_tests.addDatasetType(self.interface.write_central_butler, "regionTimeInfo", + butler_tests.addDatasetType(self.write_butler, "regionTimeInfo", {"instrument", "group", "detector"}, "RegionTimeInfo") - butler_tests.addDatasetType(self.interface.write_central_butler, "history_diaSource", + butler_tests.addDatasetType(self.write_butler, "history_diaSource", {"instrument", "group", "detector"}, "ArrowAstropy") butler_tests.addDatasetType(self.interface.butler, "history_diaSource", @@ -1410,7 +1413,7 @@ def _simulate_run(self): butler_tests.addDatasetType(self.second_interface.butler, "history_diaSource", {"instrument", "group", "detector"}, "ArrowAstropy") - butler_tests.addDatasetType(self.interface.write_central_butler, "calexp", + butler_tests.addDatasetType(self.write_butler, "calexp", {"instrument", "visit", "detector"}, "ExposureF") butler_tests.addDatasetType(self.interface.butler, "calexp", @@ -1457,7 +1460,8 @@ def test_extra_collection(self): # Avoid collisions with other calls to prep_butler with make_local_repo(tempfile.gettempdir(), read_butler, instname) as local_repo: - interface = MiddlewareInterface(read_butler, write_butler, self.input_data, + butler_writer = DirectButlerWriter(write_butler) + interface = MiddlewareInterface(read_butler, butler_writer, self.input_data, dataclasses.replace(self.next_visit, groupId="42"), pre_pipelines_empty, pipelines, skymap_name, local_repo, DatasetCache(3, {"uw_stars_20240524": 10, "template_coadd": 30}),