New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-24734: Rework QuantumGraph generation to avoid O(N^2) scaling. #128
Conversation
6f8fa1c
to
6013858
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks fine, just a few code comments, but algorithm seems ok as best as I followed
@@ -440,7 +440,7 @@ def to_primitives(self): | |||
accumulate = {"description": self.description} | |||
if self.instrument is not None: | |||
accumulate['instrument'] = self.instrument | |||
accumulate['tasks'] = {l: t.to_primitives() for l, t in self.tasks.items()} | |||
accumulate['tasks'] = {m: t.to_primitives() for m, t in self.tasks.items()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious what didnt it like here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
l
is frowned upon as a variable because some fonts confuse it with 1
dataIds : `Iterable` [ `DataCoordinate` ] | ||
Data IDs to match. | ||
|
||
Yields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct. It returns a concrete thing. Yields is only if you did something like:
def extract(...) -> Generator[DatasetRef, None, None:
...
yield from (refs[datasetId] for dataId in dataIds)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, in the sense that this is what the Python community expects, but I'm curious as to whether you know if this actually matters. AFAICT, whether an iterator-returning function is implemented as a generator or not is completely transparent to the caller, so distinguishing between them is always just exposing an implementation detail. Is that wrong in some edge cases (or maybe coroutines)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts:
- pedantically the function does not yield anything it returns something that yields something. Its like the difference between iterable and iterator
- If someone really wanted to know the call stack (or something like that), this will never be re-entered, the thing you are returning will be
- It does make a difference when coroutines are involved (in so far as you loose the richer type information provided to any checkers)
I think in day to day practice you can use this as is, but your terminology does not really match what is happening, and if it was used like this in other places (co routines) it would be wrong. Code in a generator function is not executed until the first use (everything up to the first yield, which is why it must be "primed" with a next, but in this case there is code execution each time before the generator is returned. If someone really cared about that for some reason then the info you wrote would lead them astray. In practice it probably doesnt matter much.
dataId : `DataCoordinate` | ||
Data ID for this quantum. | ||
""" | ||
def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove your __init__
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think taking that approach (along with the changes in your next few comments) also requires adding field(init=False)
or defaults (and hence dropping __slots__
) for inputs
, outputs
, and prerequisites
, and that task
and dataId
shouldn't actually be InitVar
in that scenario since we do want them as attributes. Overall it seems simpler to just define my own __init__
than go to so much work to make the dataclass
-injected one usable for me.
Maybe the better question is whether I should drop @dataclass
- I'm already overriding __repr__
and __init__
, so all I'm getting from that is equality comparison, and I don't actually want that, I suppose. Unless I hear otherwise I'll just drop the decorator.
""" | ||
def __init__(self, task: _TaskScaffolding, dataId: DataCoordinate): | ||
self.task = task | ||
self.dataId = dataId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add def __posit_init__(self, task, dataId):
# because of back-references. | ||
return f"_QuantumScaffolding(taskDef={self.taskDef}, dataId={self.dataId}, ...)" | ||
|
||
task: _TaskScaffolding |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this task: typing.InitVar[_TaskScaffolding]
quantum = task.quanta.get(quantumDataId) | ||
if quantum is None: | ||
quantum = _QuantumScaffolding(task=task, dataId=quantumDataId) | ||
task.quanta[quantumDataId] = quantum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just do task.quanta.setdefault(quantumDataId, _QuantumScaffolding(task=task, dataId=quantumDataId)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reasoning as above.
# be associated. | ||
# Many of these associates will be duplicates (because another | ||
# query row that differed from this one only in irrelevant | ||
# dimensions already added them), and we use sets to skips. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skips -> skip
for datasetType, refs in itertools.chain(self.inputs.items(), self.intermediates.items(), | ||
self.outputs.items()): | ||
datasetDataId = commonDataId.subset(datasetType.dimensions) | ||
ref = refs.get(datasetDataId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do refs.setdefault(datasetId, DatasetRef(datasetType, datasetDataID)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured the cost of an extra hash lookup was probably lower than the cost of constructing the temporary if it's usually not needed (as I think it is the case here). No quantitative analysis of that, though. Mostly I really wish Python (and every language I've ever used) had a better interface for conditional mapping insertions that avoided that choice.
_LOG.debug("Resolving %d datasets for input dataset %s.", len(refs), datasetType.name) | ||
for dataId in refs: | ||
refs[dataId] = registry.findDataset(datasetType, dataId=dataId, collections=collections) | ||
assert not any(ref is None for ref in refs.values()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bare asserts can be hard for debug. Its up to you to decide if it is likely enough to happen to make a richer message worth it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this one is probably worth changing to a RuntimeError
raise, as it's guarding against both daf_butler logic (which is probably too "far away" for this assert), and that these datasets haven't been deleted by some other Registry client since the previous query.
elif not skipExisting: | ||
raise OutputExistsError(f"Output dataset {datasetType.name} already exists in " | ||
f"output RUN collection '{run}' with data ID {dataId}.") | ||
refs[dataId] = ref |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems weird to pull out the ref and then reassign it in the case it is not resolved. I think these branches can be re worked a bit.
Original, per-data ID messages are now at TRACE level, with coarser (per-Task) status meessages at DEBUG.
6013858
to
1da6094
Compare
I've addressed or responded to review comments and rebased, and am kicking off a new Jenkins run now since it's been a while. Will merge when that's green unless I see more comments first. |
1da6094
to
f21cc20
Compare
These now reflect how adjustQuantum is actually being called. I suspect the original types reflect a reasonable aspiration: PipelineTask subclasses ideally would be operating on a mapping that uses their internal collection names instead of dataset type names, but the Connections infrastructure doesn't provide a good way to do that translation (!), so changing that here is both out-of-scope _and_ a lot of work.
f21cc20
to
e829e2b
Compare
The previous implementation tested all DatasetRefs (of the right type) for compatibility with all quanta, which doesn't scale when the graph is large. This removes the fillQuanta step from _PipelineScaffolding, moving the association of datasets with quanta into fillDataIds (now connectDataIds) and prerequisite lookup into fillDatasetRefs (now resolveDatasetRefs). To do that, I added a new _QuantumScaffolding object to represent an under-construction Quantum, and removed _DatasetScaffolding as it didn't end up providing more than a simple dict would provide; _DatasetScaffoldingDict was accordingly renamed and adjusted to _DatasetDict.
e829e2b
to
1c80eb4
Compare
No description provided.