Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions python/activator/activator.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@
from .exception import GracefulShutdownInterrupt, IgnorableVisit, InvalidVisitError, \
NonRetriableError, RetriableError
from .middleware_interface import get_central_butler, \
make_local_repo, make_local_cache, MiddlewareInterface
make_local_repo, make_local_cache, MiddlewareInterface, ButlerWriter, DirectButlerWriter
from .kafka_butler_writer import KafkaButlerWriter
from .repo_tracker import LocalRepoTracker

# Platform that prompt processing will run on
Expand Down Expand Up @@ -96,6 +97,23 @@
# The number of seconds to delay retrying connections to the Redis stream.
redis_retry = float(os.environ.get("REDIS_RETRY_DELAY", 30))

# If '1', sends outputs to a service for transfer into the central Butler
# repository instead of writing to the database directly.
use_kafka_butler_writer = os.environ.get("USE_KAFKA_BUTLER_WRITER", "0") == "1"
if use_kafka_butler_writer:
# Hostname of the Kafka cluster used by the Butler writer.
butler_writer_kafka_cluster = os.environ["BUTLER_WRITER_KAFKA_CLUSTER"]
# Username for authentication to BUTLER_WRITER_KAFKA_CLUSTER.
butler_writer_kafka_username = os.environ["BUTLER_WRITER_KAFKA_USERNAME"]
# Password for authentication to BUTLER_WRITER_KAFKA_CLUSTER.
butler_writer_kafka_password = os.environ["BUTLER_WRITER_KAFKA_PASSWORD"]
# Topic used to transfer output datasets to the central repository.
butler_writer_kafka_topic = os.environ["BUTLER_WRITER_KAFKA_TOPIC"]
# URI to the path where output datasets will be written when using the Kafka
# writer to transfer outputs to the central Butler repository.
# This will generally be in the same S3 bucket used by the central Butler.
butler_writer_file_output_path = os.environ["BUTLER_WRITER_FILE_OUTPUT_PATH"]

# Conditionally load keda environment variables
if platform == "keda":
# Time to wait for fanned out messages before spawning new pod.
Expand Down Expand Up @@ -163,6 +181,18 @@ def _get_consumer():
})


@functools.cache
def _get_producer():
"""Lazy initialization of Kafka Producer for Butler writer."""
return kafka.Producer({
"bootstrap.servers": butler_writer_kafka_cluster,
"security.protocol": "sasl_plaintext",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": butler_writer_kafka_username,
"sasl.password": butler_writer_kafka_password
})


@functools.cache
def _get_storage_client():
"""Lazy initialization of cloud storage reader."""
Expand All @@ -189,6 +219,19 @@ def _get_read_butler():
return _get_write_butler()


@functools.cache
def _get_butler_writer() -> ButlerWriter:
"""Lazy initialization of Butler writer."""
if use_kafka_butler_writer:
return KafkaButlerWriter(
_get_producer(),
output_topic=butler_writer_kafka_topic,
file_output_path=butler_writer_file_output_path
)
else:
return DirectButlerWriter(_get_write_butler())


@functools.cache
def _get_local_repo():
"""Lazy initialization of local repo.
Expand Down Expand Up @@ -461,7 +504,7 @@ def create_app():
_get_consumer()
_get_storage_client()
_get_read_butler()
_get_write_butler()
_get_butler_writer()
_get_local_repo()

app = flask.Flask(__name__)
Expand Down Expand Up @@ -510,7 +553,7 @@ def keda_start():
_get_consumer()
_get_storage_client()
_get_read_butler()
_get_write_butler()
_get_butler_writer()
_get_local_repo()

redis_session = RedisStreamSession(
Expand Down Expand Up @@ -1002,7 +1045,7 @@ def process_visit(expected_visit: FannedOutVisit):
# Create a fresh MiddlewareInterface object to avoid accidental
# "cross-talk" between different visits.
mwi = MiddlewareInterface(_get_read_butler(),
_get_write_butler(),
_get_butler_writer(),
image_bucket,
expected_visit,
pre_pipelines,
Expand Down
93 changes: 93 additions & 0 deletions python/activator/kafka_butler_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# This file is part of prompt_processing.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations

__all__ = ("KafkaButlerWriter",)

from datetime import date
from typing import Literal
from uuid import uuid4

from confluent_kafka import Producer
import pydantic

from lsst.daf.butler import (
Butler,
DatasetRef,
SerializedDimensionRecord,
SerializedFileDataset,
)
from lsst.resources import ResourcePath

from .middleware_interface import ButlerWriter, GroupedDimensionRecords


class KafkaButlerWriter(ButlerWriter):
def __init__(self, producer: Producer, *, output_topic: str, file_output_path: str) -> None:
self._producer = producer
self._output_topic = output_topic
self._file_output_path = ResourcePath(file_output_path, forceDirectory=True)

def transfer_outputs(
self, local_butler: Butler, dimension_records: GroupedDimensionRecords, datasets: list[DatasetRef]
) -> list[DatasetRef]:
# Create a subdirectory in the output root distinct to this processing
# run.
date_string = date.today().strftime("%Y-%m-%d")
subdirectory = f"{date_string}/{uuid4()}/"
output_directory = self._file_output_path.join(subdirectory, forceDirectory=True)
# There is no such thing as a directory in S3, but the Butler complains
# if there is not an object at the prefix of the export path.
output_directory.mkdir()

# Copy files to the output directory, and retrieve metadata required to
# ingest them into the central Butler.
file_datasets = local_butler._datastore.export(datasets, directory=output_directory, transfer="copy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of ._datastore makes me a little nervous. While we're somewhat insulated by using fixed builds, is it possible to have a stable API for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- in the upcoming PR where the outputs get written in-place instead of in a separate tree, this will be changed to use a different public function. (_datastore is effectively "Rubin-internal-public" and is used throughout a lot of packages, but I got the team to agree to a less implicit naming convention for Rubin internal code in the future.)


# Serialize Butler data as a JSON string.
event = PromptProcessingOutputEvent(
type="pp-output",
dimension_records=_serialize_dimension_records(dimension_records),
datasets=[dataset.to_simple() for dataset in file_datasets],
root_directory=subdirectory,
)
message = event.model_dump_json()

self._producer.produce(self._output_topic, message)
self._producer.flush()

return datasets


class PromptProcessingOutputEvent(pydantic.BaseModel):
type: Literal["pp-output"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with pydantic, but I find this construct particularly baroque. You have to declare a field as having only one value, and then initialize it to that value anyway?

More generally, the responsibility for the serialized form (e.g., representing all collections as lists) seems to be split between this class, transfer_outputs, and _serialize_dimension_records. Is there a way to clean things up so that transfer_outputs doesn't have to do part of the serialization itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's an annoying quirk of Pydantic. You can say type: Literal["pp-output"] = "pp-output" at the point of definition, but that can lead to surprising behavior when the model is later used in a discriminated union. (It's not currently, but if we need another event type in the future we would distinguish them based on this type field.)

You're not really supposed to have methods with behavior on Pydantic models -- it's more of a schema definition than an actual class. I could add a separate helper function to do the serialization, but the main point of transfer_outputs is to do the serialization and it needs every local variable in scope there, so I don't see a problem with doing it in place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that having it all be in the same module means it's not too big a problem, I was more worried about the ease of modifying the code when it has these redundant-but-matching lines.

(I'll point out that your argument assumes you have to use Pydantic for conversion to JSON. Personally, this kind of awkwardness is exactly why I don't like it.)

root_directory: str
dimension_records: list[SerializedDimensionRecord]
datasets: list[SerializedFileDataset]


def _serialize_dimension_records(grouped_records: GroupedDimensionRecords) -> list[SerializedDimensionRecord]:
output = []
for records in grouped_records.values():
for item in records:
output.append(item.to_simple())
return output
Loading