Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remote input subscribe on barrier mutation #17612

Merged
merged 4 commits into from
Jul 10, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Jul 8, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently, RemoteInput fetches the mutation of each barrier from LocalBarrierWorker by per-barrier sending a request to it and wait on a response oneshot channel.

This mechanism has some disadvantages:

  1. when the number of remote input increases, the LocalBarrierWorker may get a great number of these request, which may block the handling of other request.
  2. The remote input will always be blocked when receiving a barrier.

In this PR, we change to using a long existing channel to subscribe on barrier mutation. On the first barrier, the RemoteInput will send a SubscribeBarrierMutation request to the LocalBarrierWorker, including the actor_id and start epoch. When receiving this request, the LocalBarrierWorker will respond with the mutation of barriers that are already injected and has epoch later than start epoch. When later receiving barrier, it will push the mutation to the sender so that the RemoteInput can receive the mutation.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@fuyufjh
Copy link
Member

fuyufjh commented Jul 10, 2024

2. The remote input will always be blocked when receiving a barrier.

Didn't get this point. I thought it's necessary for the RemoteInput to block and wait for the Mutation barrier? How does this PR address this problem?

@wenym1
Copy link
Contributor Author

wenym1 commented Jul 10, 2024

Didn't get this point. I thought it's necessary for the RemoteInput to block and wait for the Mutation barrier? How does this PR address this problem?

For the usual case that LocalBarrierWorker receivers the barrier earlier than the RemoteInput. In current main branch, the RemoteInput sends a request and waits for the response, which must be blocking (poll return Pending). With this PR, the LocalBarrierWorker will send the mutation to the channel when receiving the barrier, and when RemoteInput later receives the barrier, it is very likely that it won't be blocked (poll return Ready) because the mutation is already in the channel.

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

src/stream/src/executor/exchange/input.rs Outdated Show resolved Hide resolved
Comment on lines -72 to -78
/// Received barrier from actors in other compute nodes in remote input, however no `send_barrier`
/// request from the meta service is issued.
Stashed {
/// Senders registered by the remote input.
mutation_senders: Vec<oneshot::Sender<Option<Arc<Mutation>>>>,
},

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite clever that the new impl eliminates the need for the Stashed state.

Comment on lines +279 to +280
// The barrier no more collect from such actor. End subscribe on mutation.
return;
Copy link
Member

@BugenZhao BugenZhao Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually unreachable?


EDIT: only when prev_epoch == start_prev_epoch

Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest LGTM

Copy link
Contributor

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

src/stream/src/executor/exchange/input.rs Outdated Show resolved Hide resolved
@wenym1 wenym1 added this pull request to the merge queue Jul 10, 2024
Merged via the queue into main with commit 4d34bd3 Jul 10, 2024
29 of 30 checks passed
@wenym1 wenym1 deleted the yiming/remote-input-subscribe-mutation branch July 10, 2024 08:04
kwannoel added a commit that referenced this pull request Aug 22, 2024
Co-authored-by: William Wen <44139337+wenym1@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants