Skip to content

Conversation

dhirving
Copy link
Contributor

@dhirving dhirving commented Jul 8, 2025

Added an alternate implementation for writing Butler outputs to the central repository, following the design in DMTN-310. When USE_KAFKA_BUTLER_WRITER=1, writes to the central Butler database will be done indirectly by sending a Kafka message to a service, instead of connecting directly from Prompt Processing.

@dhirving dhirving changed the title DM-49670: Add option for using a service for Butler database writes. DM-49670: Add option for using a service for Butler database writes Jul 8, 2025
@dhirving dhirving force-pushed the tickets/DM-49670 branch 2 times, most recently from a707deb to 85a2cd2 Compare July 21, 2025 21:07
@dhirving dhirving changed the title DM-49670: Add option for using a service for Butler database writes DM-49670: Add option to use a service for Butler database writes Jul 21, 2025
@dhirving dhirving force-pushed the tickets/DM-49670 branch 2 times, most recently from aa9810b to e8768ca Compare July 22, 2025 22:50
@dhirving dhirving marked this pull request as ready for review July 24, 2025 18:11
Copy link
Member

@kfindeisen kfindeisen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow reply -- I'm returning this ahead of the Phalanx review to not keep you waiting any longer.

While the basic approach looks good, I'm concerned about how the environment variable interface is organized (I might have more specific advice once I see Phalanx), and the introduction of certain Middleware-isms (specifically, split responsibilities and implementation-level dependencies instead of modularization, and the use of assert to tape over structural issues).

Please also clean up the commit history; the current history splits changes in an "overlapping" fashion that means no single commit is associated with any logical change. This makes the code hard to review, hard to understand for future development, and hard to reorganize/revert should the need arise.

Comment on lines 1679 to 1721
def _export_exposure_dimensions(src_butler, dest_butler, **kwargs):
"""Transfer dimensions generated from an exposure to the central repo.
def _export_exposure_dimensions(src_butler, **kwargs) -> dict[str, list[DimensionRecord]]:
"""Retrieve dimension records generated from an exposure that need to
be transferred to the central repo.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename this method and parameters to fit its new purpose (perhaps _get_dimensions_to_export?) and avoid confusion. Also rewrite the docs, as almost none of it is appropriate for a local repo query.

Returns
-------
dimension_records : `dict` [ `str` , `list` [ `lsst.daf.butler.DimensionRecord` ] ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excessive whitespace that impairs readability, see the DM style guide:

Suggested change
dimension_records : `dict` [ `str` , `list` [ `lsst.daf.butler.DimensionRecord` ] ]
dimension_records : `dict` [`str` , `list` [`lsst.daf.butler.DimensionRecord`]]

Do these actually need to be specced as a dict and list instead of, say, a mapping and a collection? Surely the records can't be ordered in any meaningful way.

def test_transfer_outputs(self):
data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
repository_dir = os.path.join(data_dir, "central_repo")
butler = Butler(repository_dir, writeable=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use setUp as appropriate, to leave room for more tests in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still recommend using setUp or setUpClass for anything that would be shared among tests, such as the test repo. Not sure about the datasets/dimension records, I can see those changing from case to case.

dimension_record_count = 0
for dimension in ["instrument", "skymap"]:
records = butler.query_dimension_records(dimension)
dimension_record_count += len(records)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of tracking dimension_record_count separately? dimension_records is self-contained.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dimension_records is a dict of lists though, so this:

  1. Avoids a second loop to get the count.
  2. Helps catch potential errors where the code we are calling modifies dimension_records when it shouldn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried it could introduce errors in the test code, but it's true that nonmodification is hard to enforce. Maybe add a comment that that's what you're testing?

@dhirving dhirving force-pushed the tickets/DM-49670 branch 2 times, most recently from 0f874a4 to 1939acd Compare August 13, 2025 00:45
Copy link
Member

@kfindeisen kfindeisen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, thanks!

Returns
-------
transferred : `list` [`DatasetRef`]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent with the previous style:

Suggested change
transferred : `list` [`DatasetRef`]
transferred : `list` [`lsst.daf.butler.DatasetRef`]

Returns
-------
transferred : `list` [`DatasetRef`]
List of datasets actually transferred.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, do these actually need to be specced as concrete classes like list instead of generic Collection? While you could define an order for (serial) transfers, logically the transferred datasets are a set -- what matters is whether a particular dataset got transferred or not.

I realize that nested collections like dimension_records run into type invariance and need to use concrete element types, but that doesn't apply here.

@connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
def _export_subset(self, exposure_ids: set[int],
dataset_types: typing.Any, in_collections: typing.Any) -> None:
dataset_types: typing.Any, in_collections: typing.Any) -> list[DatasetRef]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the docs spec this as a generic collection. As stated above, I think that's the right level of abstraction, but the method should at least be self-consistent.

The butler from which to transfer dimension records.
dest_butler : `lsst.daf.butler.Butler`
The butler to which to transfer records.
The butler from which to retrieve dimension records.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name src_butler still seems inappropriate in the new context. For a query operation, can this just be butler?

def test_transfer_outputs(self):
data_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "data")
repository_dir = os.path.join(data_dir, "central_repo")
butler = Butler(repository_dir, writeable=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still recommend using setUp or setUpClass for anything that would be shared among tests, such as the test repo. Not sure about the datasets/dimension records, I can see those changing from case to case.

return [dstype.name for dstype in butler.registry.queryDatasetTypes(...)
if "detector" in dstype.dimensions]

@connect.retry(2, DATASTORE_EXCEPTIONS, wait=repo_retry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, one last question: does this need to be updated to account for KafkaButlerWriter.transfer_outputs? I assume it raises a completely different set of exceptions on e.g. network problems.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question.

The exceptions thrown when there are S3 issues should be the same as they were previously -- so the exceptions in the existing list are still useful.

The Kafka client should be doing some amount of retrying/reconnecting internally, so we may not want to attempt to retry on a Kafka exception here. The confluent_kafka documentation isn't very clear on the exact sorts of errors we might expect. I should be able to simulate some of the more obvious issues like the broker being down, so I'll poke at this a bit when I'm testing the changes in DM-52180.

To prepare for transferring datasets to the central Butler using Kafka, add an interface for an object that does the Butler writes back to central, to allow for alternate implementations.  This re-orders the middleware export process so that all of the writes to central happen at a single point in the code.
Added an option to send a Kafka message to a microservice to write output datasets to the central Butler database, instead of connecting directly to the database.  This is intended to reduce database contention, as detailed in DMTN-310.
@dhirving dhirving merged commit 100000b into main Aug 21, 2025
11 checks passed
@dhirving dhirving deleted the tickets/DM-49670 branch August 21, 2025 22:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants