Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Dagger Integration #33

Open
jpsamaroo opened this issue Feb 27, 2022 · 3 comments
Open

[RFC] Dagger Integration #33

jpsamaroo opened this issue Feb 27, 2022 · 3 comments

Comments

@jpsamaroo
Copy link

I've gotten some recent interest from the community in having Dagger-powered time series operations, such that a streaming DAG can be configured across multiple nodes. I am interested in making this happen, and have started https://github.com/jpsamaroo/DaggerTimeDag.jl to show off the basic concept of what this integration could look like.

The benefits of integrating with Dagger are numerous:

  • Multithreading and Distributed computing support "for free"
  • Lazy (background) execution of DAGs
  • Benefit from future storage awareness to persist Blocks to disk

I would greatly appreciate any comments on this approach!

@tpgillam
Copy link
Owner

Thanks for reaching out!

Definitely I agree that being able to use Dagger as an evaluation scheduler for TimeDag would be great. My thought a little while ago was that, for full integration, we'd have multiple implementations of TimeDag.evaluate_until! for different scheduling approaches [which should then add an extensibility point for other packages to implement their own schedulers].

Some thoughts from then are in a comment here:

https://github.com/invenia/TimeDag.jl/blob/614e79bca9cee7508bbcd4e421f8686eb36d5855/src/evaluation.jl#L82-L91

I suspect the main difficulty is the way that TimeDag.run_node! currently works by mutating the node state [1]. I assume (?) that this is an awkward thing to support with Dagger, so maybe we'd need a little wrapper around run_node! that additionally returned the new (mutated) state [2].

[1] Not all nodes are stateful, but many are — e.g. for nodes with more than one parent, state is needed to implement alignment. Tracking this state isn't technically required iff using evaluate with batch_interval=nothing, but it would prevent batched evaluation, and also pulling incremental updates through the graph.

[2] Or possibly we should just change run_node! to always do this anyway.

@jpsamaroo
Copy link
Author

I suspect the main difficulty is the way that TimeDag.run_node! currently works by mutating the node state [1]. I assume (?) that this is an awkward thing to support with Dagger, so maybe we'd need a little wrapper around run_node! that additionally returned the new (mutated) state [2].

Dagger now supports mutable data via Dagger.@mutable, which marks a piece of data as ineligible for automatic data movement (to prevent copies). Still, mutation isn't really an issue if we create the state for each node independently on the worker that will be processing the node (assuming that we don't want to move nodes around), which is what DaggerTimeDag.jl currently does.

If it's not valid to do this, then yeah, we can just use Dagger.@mutable to mark such data and ensure that any tasks that need that data are forced to execute on the worker that contains the data.

 #   Also, initial experiments suggest that Dagger has about 100x overhead for simple 
#   graphs (about 30s for 10^5 nodes, compared to 0.3s for full evaluation of a simple 
#   TimeDag graph in single-threaded mode). 

I'd need to run some tests, but my guess is that this is probably true for a naive implementation that spawns one task per node evaluation; a smarter approach is to launch one task per node lifetime, and use a RemoteChannel to communicate between tasks. There will still be overhead from Distributed communication internals (even when communicating between nodes on the same worker), but it's probably an acceptable overhead for the scalability benefits.

Note that DaggerTimeDag.jl doesn't currently use this approach, but it's on my Todo list.

@tpgillam
Copy link
Owner

tpgillam commented Mar 1, 2022

Thanks for this - wasn't aware of Dagger.@mutable! Also my simple tests were certainly only done with the most naive implementation.

One (hopefully useful) observation of your current implementation is that evaluate_internal is duplicating work. This is because TimeDag.start_at([node], ...) initialises state for the entire subgraph needed to evaluate node — i.e. it isn't only running the operation inside node, but rather also determining & evaluating all dependencies. I think you want to use lower-level API — see e.g. the use inside evaluate_until! here:

https://github.com/invenia/TimeDag.jl/blob/main/src/evaluation.jl#L106-L109

(currently the state that you create is a full EvaluationState, which packages together all individual node states in the subgraph that is being evaluated. For the distributed execution scheme you mention, with different nodes on different workers, then I suspect this struct is no longer appropriate.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants