-
Notifications
You must be signed in to change notification settings - Fork 1.3k
dvc: cache graph/stages/pipelines #2529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cb163d5 to
c04c20f
Compare
|
How is this related to #755? On patch itself, it looks so fragile that it doesn't look like it caching anything - it's too easy call This could be done the same as here, but:
Also, you are calculating graphs twice because how @cached_property
def graph(self):
self.graph, self.active_graph = self._graph()
return self.graphAnother thing is you resurrected concepts of active stages and active pipelines, which we dropped some time ago and altered |
I need to implement a stage locker that needs to know what DAG looks like in order to lock dependencies and stuff, and this patch helps avoid passing
Yeah,
That is precicesly what reload is meant to do, I've just used cached_property, but should've used good old read-only
Yep, I should really start adding tickboxes to the PR description to list the issues I'm aware of and that I will fix before WIP becomes a final patch.
It was an incorrect decision to drop it everywhere. It is not needed for pull/push/etc, but is def needed for repro, so I've fixed that. |
|
To remove attribute/cached prop, which you are not sure is there, you can: self.__dict__.pop('graph', None)Maybe I should consider adding silent deletion of @property
def graph(self):
if not getattr(self, '_graph', None):
self._graph, self._active_graph = self._build_graph()
return self._graphOn active things, As, you can see we are discussing two things here in parallel, which complicate each other. So I suggest moving repro refactor to a separate PR. |
|
BTW, I still don't understand why you are calling |
@Suor Cache should be reloaded after repo lock is taken, as someone might've deleted/added/modified stages, while we didn't have the lock. And it needs to reloaded whenever we change |
|
@efiop So that should be synchronized in code somehow, simply repeating Summarizing:
|
|
A side note: looks like releasing lock on runs, may cost us some performance - graph needs to be rebuilt if accessed. |
43f6c36 to
f208553
Compare
|
Btw, this also solves #2519 (comment) . And I've found a few |
|
It's very hard to review while two things, caching and reproduce refactor, are intermixed here. Would you mind separating these? |
|
@Suor it is not trivial to decouple, as they are closely tied together. I did try. |
|
@Suor Or are you specifically talking about |
|
I think we should remove all that "active" stuff. This has no sense, especially in |
|
So there will be disagreement, while cache stuff might be merged much faster. |
We shouldn't remove it, as it is a core feature for |
shcheklein
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
following the discussion: looks good to me. Just some minor comments to address/improve. I don't like the way active/non active are returned and used - via 0 and 1 indexes - this is cosmetic. I tend to agree with @Suor that's it's better to keep this logic inside the repro if it's the only consumer of the active stuff, but I don't "feel" the code strong enough to have a strong opinion. For example, if it'll require importing a lot of stuff into repro and repeating all this DAG logic that would be bad. Also, since it's caching this stuff now - it's probably a good idea to keep it in the same place.
5f99ff4 to
510f1bc
Compare
|
@Suor @shcheklein You are right about |
Signed-off-by: Ruslan Kuprieiev <ruslan@iterative.ai>
treeverse#2529 (comment) Signed-off-by: Ruslan Kuprieiev <ruslan@iterative.ai>
treeverse#2529 (comment) Signed-off-by: Ruslan Kuprieiev <ruslan@iterative.ai>
Suor
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the big things are done. Except one - check that graph is not collected while lock is not hold. That will protect us from forgetting @locked basically. This could be done separately though.
The other are minor and cosmetic things.
| stage = attrs[node] | ||
| if stage.path.startswith(target + os.sep): | ||
| ret.append(stage) | ||
| return ret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we base this on stages?
stages = get_stages(G)
return [stage for stage in stages if stage.path.startswith(target + os.sep)]Another thing is it looks like we were not collecting all the stages before, but now we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order matters. E.g. for repro --recursive --single-item we need to run those in the proper order. Might be better to do that not in collect, but rather in reproduce itself. Need to take a look separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this one can also be significantly simplified by using stages as nodes.
| return ret | ||
|
|
||
| stage = Stage.load(self, target) | ||
| if not with_deps: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So with_deps is ignored if it's recursive && isdir and not ignored if it's recursive && !isdir. Maybe add:
assert not recursive or not with_deps # recursive => not with_depsAt the starts of the function, to guarantee that there is no such situation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more a bug though. --with-deps --recursive is a perfectly valid combination. I'll adjust it with a test later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this goes to separate issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Suor Yes.
dvc/repo/reproduce.py
Outdated
|
|
||
| active = G.copy() | ||
| stages = nx.get_node_attributes(G, "stage") | ||
| for node in G.nodes(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I can see you don't need to call .nodes(), it's possible to iterate graph directly and there is no need to prepare attrs you may use stage = G.nodes[node]['stage']. Or iterate G.nodes.items()
BTW, why don't we add stages to graph directly? Then it would be simply:
for stage in G:
if not stage.locked:
continue
# ...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed the nodes() issue.
BTW, why don't we add stages to graph directly? Then it would be simply:
Mostly a legacy I think. Might take a look at some point.
| for node in G.nodes(): | ||
| if G.in_degree(node) == 0: | ||
| targets.append(os.path.join(self.root_dir, node)) | ||
| targets.append(attrs[node]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This all could be:
targets = [stage for G in pipelines for stage in G if G.in_degree(stage) == 0]There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is terrible to do such a long comprehensions. Much better to just leave them structured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can break list comprehension into several lines, this would be the same but more efficient and descriptive. Collecting results with list.append() always looked like a smell to me.
| from dvc.repo import locked | ||
|
|
||
|
|
||
| @locked |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite get why do we have locked here. As I understand, it is supposed to reset graph/stages/pipelines upon changing the state of dvc repo. Can list do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It it supposed to reset it because it no longer guarantees that it will stay intact. But locked here is about locking the dvc repository, so that no other dvc instance can add/remove dvcfiles from the project, which would affect self.collect used below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks for the clarification, I only looked at locked as a mechanism caching the graph. It makes sense.
Definitely separately. Also we do collect graph without the lock on API/import/get calls when dealing with external repo. These days we can bring that back for the symmetry, as we have universal locks now and are not afraid of NFS and friends. |
| with repo.lock: | ||
| return f(repo, *args, **kwargs) | ||
| ret = f(repo, *args, **kwargs) | ||
| # Our graph cache is no longer valid after we release the repo.lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's not valid? I would write that concurrent dvc process might change stage files and this graph cache might be stale.
| stage = attrs[node] | ||
| if stage.path.startswith(target + os.sep): | ||
| ret.append(stage) | ||
| return ret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this one can also be significantly simplified by using stages as nodes.
| return ret | ||
|
|
||
| stage = Stage.load(self, target) | ||
| if not with_deps: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this goes to separate issue?
| for node in G.nodes(): | ||
| if G.in_degree(node) == 0: | ||
| targets.append(os.path.join(self.root_dir, node)) | ||
| targets.append(attrs[node]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can break list comprehension into several lines, this would be the same but more efficient and descriptive. Collecting results with list.append() always looked like a smell to me.
Signed-off-by: Ruslan Kuprieiev ruslan@iterative.ai
Related to #755
Have you followed the guidelines in our
Contributing document?
Does your PR affect documented changes or does it add new functionality
that should be documented? If yes, have you created a PR for
dvc.org documenting it or at
least opened an issue for it? If so, please add a link to it.