Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracking: online scaling in compute node #3750

Open
23 of 32 tasks
BugenZhao opened this issue Jul 8, 2022 · 5 comments
Open
23 of 32 tasks

Tracking: online scaling in compute node #3750

BugenZhao opened this issue Jul 8, 2022 · 5 comments
Assignees
Labels
component/meta Meta related issue. component/streaming Stream processing related issue. difficulty/hard Issues that need deep insight of the system and expected to cost lot of work type/feature type/tracking Tracking issue.

Comments

@BugenZhao
Copy link
Member

BugenZhao commented Jul 8, 2022

To support scaling in our system, we decide to generally follow the design in Re-Introduce Configuration Change based on Pause Barrier. After consistent hash has been utilized in most of the critical places in our system (#3543), it's high time we start doing this!

This task can be roughly divided into several steps below.

@BugenZhao BugenZhao added component/meta Meta related issue. component/streaming Stream processing related issue. type/feature labels Jul 8, 2022
@BugenZhao BugenZhao added the difficulty/hard Issues that need deep insight of the system and expected to cost lot of work label Jul 8, 2022
@jon-chuang
Copy link
Contributor

Support connecting or disconnecting an actor from the upstream and downstream of the graph.

Isn't this already covered by AddOutput and Stop?

@BugenZhao
Copy link
Member Author

Support connecting or disconnecting an actor from the upstream and downstream of the graph.

Isn't this already covered by AddOutput and Stop?

Exactly. However, they're only used for creating and dropping materialized views. We should reuse them to support creating and dropping parallel units.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 11, 2022

What should the interface be for this? I guess you are referring to having a high-level interface in meta like:

// in meta/..
fragment.add_parallel_units(par_unit_ids: &[usize]) // -> create_actors, delete_actors, add outputs to upstream, stop prev outputs from upstream
fragment.remove_parallel_units(par_unit_ids: &[usize])

Currently, we need to wait until new actors are created, before we can send AddOutput barrier messages? Or is there a form of synchronization for this in the LocalStreamManager?

I guess for the time being, whenever there are new compute nodes added, we can scale up to the new parallel units. We can have a more fine-grained per-fragment control of parallel units in the future once we decide on a scaling and placement policy.


Furthermore, what should be the behaviour for stateful operators? Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the latter case, we need an UpdateVnodes barrier message, which if it matches the actor ID of the executor, will trigger update of vnodes of the state table. The state table will need to flush its previous state, ensuring !state_table.is_dirty().

Some of the keys (for scale up) may no longer be relevant to the node. We could rely on LRU to evict these unused keys or explicitly evict them by iterating through all the keys in the application cache and evict the ones whose vnodes are in the removed_vnodes set.

After resume barrier, we can make use of the new vnodes in the state table.

I guess for the time being we can stop all actors and create new ones for the entire fragment. It's unproven whether its necessary to reuse existing actors. Actor startup should be fast and cache can be populated easily from block cache.

@yuhao-su yuhao-su self-assigned this Jul 11, 2022
@BugenZhao
Copy link
Member Author

I guess you are referring to having a high-level interface in meta like:

Exactly.

Currently, we need to wait until new actors are created, before we can send AddOutput barrier messages?

Yep. This is very similar to creating materialized views. Currently, we first build the actors on the worker nodes and then issue a command to the global barrier manager. Therefore, the consistency is kept.

whenever there are new compute nodes added, we can scale up to the new parallel units

Yes. This will be included in the step "Utilize parallel units of newly joined compute nodes."

Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the original design by @fuyufjh, we will drop all of the current actors for simplicity. However, after we've unified the state interface with StateTable, updating the partition info (vnodes) can also be simple so I think we can reuse the existing actors.

Actor startup should be fast and cache can be populated easily from block cache.

Not sure how much it will cost, while keeping the cache in the original actors can always be better.

@fuyufjh
Copy link
Contributor

fuyufjh commented Jul 12, 2022

Should it always replace all of the current actors? Or should we allow existing actors to continue?

In the original design by @fuyufjh, we will drop all of the current actors for simplicity. However, after we've unified the state interface with StateTable, updating the partition info (vnodes) can also be simple so I think we can reuse the existing actors.

Agree. Actually, my initial design is to reuse the existing actor and ignore the data that is not owned by it anymore, which will not affect anything and will be evicted out soon or later in theory. But please feel free to simplify the design.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/meta Meta related issue. component/streaming Stream processing related issue. difficulty/hard Issues that need deep insight of the system and expected to cost lot of work type/feature type/tracking Tracking issue.
Projects
None yet
Development

No branches or pull requests

8 participants