-
Notifications
You must be signed in to change notification settings - Fork 0
DM-35053: Implement syncing of output products for Prompt Processing #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e88060c
to
00e0365
Compare
be4e369
to
a90cea0
Compare
18d711d
to
52a890a
Compare
03adc70
to
839bdff
Compare
collections=[self.output_collection], | ||
# TODO DM-34202: hack around a middleware bug. | ||
run=None) | ||
# The run **must** be unique for each unit of processing. It must be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why this is necessary. Processing images with identical obs_id+detector
values in different groups/visits should not occur. If we need to improve the simulated input so that this doesn't happen, that should be a separate task. (It's possible that exposures with identical obs_id+detector
values in the same group/visit might be processed due to a failure or restart; we should strive to be idempotent in this case.)
Having a collection in the central Butler repo for each obs_id+detector
is not scalable. I would aim for one collection per payload per night at most. In fact, I think it should be possible to have one collection for each version/configuration of the payload, possibly with a few extra collections for "rerun" reprocessings of selected visits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that ideally the upload.py
outputs would all have unique exposure IDs (I think that's equivalent to what you said about obs_id
?). However, this is very difficult to pull off without hard-coding HSC (or DECam, or some other test dataset) into upload.py
. Generating a unique exposure ID that is still valid for that instrument is essentially the inverse of the metadata translator problem.
The other half of the problem is what happened on DM-36070. Reusing the same (local repo) run for multiple detectors causes dataset collisions regardless of what we do with exposure/obsIDs. We could avoid them if we had a separate init step like BPS does, but this means setting up the pipeline in doing some processing that's not tied to one of the identical workers.prep_butler
like it was originally
I'll admit I was probably too paranoid about not being able to guarantee that each call to run_pipeline
would match the previous call to prep_butler
, and not merely a prior one. The dangers of too-large tickets, I suppose. 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deferred to DM-36586. Both fixing upload.py
and the dataset collision problem are easily mid-sized tickets themselves. I've updated this comment to be a TODO pointing to that issue.
# TODO: RawIngestTask doesn't currently support remote files. | ||
file = self._download(file) | ||
result = self.rawIngestTask.run([file]) | ||
result = self.rawIngestTask.run([file], run=self._get_raw_run_name(visit)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't change; see above.
where=where, | ||
butler=Butler(butler=self.butler, | ||
collections=self.butler.collections, | ||
run=output_run)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the collections and output_run
can be made the same from visit to visit, they should be able to be incorporated into self.butler
, meaning that a new Butler
instance wouldn't be needed here.
But I believe this is low-cost, so maybe it's not worth worrying about.
Identifiers of the exposures that were processed. | ||
""" | ||
# TODO: this method will not be responsible for raws after DM-36051. | ||
self._export_subset(visit, exposure_ids, "raw", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, this should go away.
out_collection=self.instrument.makeDefaultRawIngestRunName()) | ||
umbrella = self.instrument.makeCollectionName("prompt-results") | ||
self._export_subset(visit, exposure_ids, | ||
# TODO: find a way to merge datasets like *_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't necessarily worry about these. I think the Execution Butler deals with them by writing them in a pipetaskInit
job that's executed once per workflow. In this case, that would be once per payload version/configuration == output collection.
`Visit.group` is documented as being `str`, but the test cases make it an `int`.
Using this function instead of homebrew systems makes the code much more concise.
The workaround involved keeping track of run-specific information in MiddlewareInterface state, and hacking around SimplePipelineExecutor's internal Butler management.
The merging of self.prefix and self.image_bucket into self.image_host cuts down on the number of variables the programmer needs to track, without changing any behavior.
Making _init_ingester and _init_visit_definer parallel methods makes it easier to see the control flow in __init__. The documentation also clarifies what state is assumed by methods called from __init__.
This change allows `MiddlewareInterface.butler` to have the same default collections for the lifetime of the MWI object, preventing a possible source of error in class code. Unfortunately, it's not possible to have `.butler` be the same *object* for MWI lifetime because of how implicit dimensions are handled.
…lewareInterface. This documention is primarily as an aid to future developers, to highlight how dependent MWI is on a linear sequence of events driven by `activator.py`. It may help with debugging future concurrency problems or problems from unexpected sequences of jobs.
Keeping the run in run_pipeline instead of object state allows `MiddlewareInterface.butler` to have the same default run for the lifetime of the MWI object, preventing a possible source of error in class code. It also makes `MiddlewareInterface` less sensitive to how it's called from the activator, possibly heading off future concurrency bugs.
The previous code would exclude old runs from the output chained collection, causing them to dangle. The chain now behaves the way command-line users of `pipetask run` would expect.
Keeping the pipeline in run_pipeline instead of object state prevents `MiddlewareInterface.pipeline` from being sometimes defined and sometimes not, preventing a possible source of error in class code. It also makes `MiddlewareInterface` less sensitive to how it's called from the activator, possibly heading off future concurrency bugs.
Putting the raws in uniquely named collections prevents dataset collisions in our test environment (where the same exposure may be ingested as part of different groups) and makes it easier to clean up the central repository if we want to start anew. <instrument>/raw/all still exists as a chained collection, so any code that assumes its existence should have the expected behavior.
The new class allows for unit tests to safely write to the central repo, without slowing down non-writing tests with unnecessary safeguards.
This implementation is fairly conservative, assuming that anything other than a successful transfer (such as being run when there are no outputs) is an error. Butler.transfer_from has serious concurrency problems that will need to be resolved later. Export/import is safer because it lets us avoid performing any potentially-deadlocked queries on governor dimensions.
At present, export_outputs is called only if the pipeline run succeeded. More generally, it should be called if the processing will *not* be retried, i.e., on success or on permanent failure.
839bdff
to
15cc140
Compare
A previous commit had tried to replace these with group-specific collections, but this caused a rare bug when the same worker was used to process two detectors from the same group. The explanatory comment should prevent future developers from making the same mistake.
The `ref_dataset_name` config field is no longer needed nor used.
Replacing the Butler was previously necessary because default dimensions are initialized only on Butler construction. The removal of default dimensions makes this no longer necessary, and having a single Butler object associated with MiddlewareInterface is easier to think about and makes repository behavior more predictable.
Now that a single Butler object is being used throughout, these refreshes should no longer be necessary.
This change lets us respond faster to upstream bugfixes in Science Pipelines.
1caa221
to
507b924
Compare
This PR adds a "merge to central repo" operation to the activator. Because of technical limitations, this operation currently transfers visit-detector level datasets, but not single-ID datasets like task configs or catalog schemas. This limitation will need to be addressed later.
In addition, this PR carries out a large amount of refactoring that was needed to implement and/or debug the primary feature:
Visit
objects, simplifying code and fixing an untestable type error.MiddlewareInterface
so that all of it is either a) "persistent" state that is established at class construction, or b) visit-specific state that is local to the methods that work on that visit. This makes the behavior of its methods more predictable (less state-dependent), keeps old runs from "leaking" into new ones, and makes the class as a whole easier to reason about.