Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: handle state store sync in local barrier manager #14377

Merged
merged 16 commits into from
Jan 16, 2024

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Jan 5, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Previously, we cannot call async method in local barrier manager, and therefore the sync of state store should be called in the rpc handler. After we change the local barrier manager to a event loop worker, we can now call and poll the future returned from async method. In this PR, we change to call sync of state store inside the local barrier manager worker loop to simplify the logic outside the local barrier manager.

The managed barrier state has two more enum variant, AllCollected and Completed. When all actors have collected a barrier, the barrier state will transform to AllCollected, and a future that call sync will be created. The future will be polled when calling the next_completed_epoch method.

Some other refactors are done accordingly.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@BugenZhao
Copy link
Member

Not reviewed detailedly...

we change to call sync of state store inside the local barrier manager worker loop to simplify the logic outside the local barrier manager.

Does this decrease the overall complexity? Previously local barrier manager is only responsible for collecting and sending barriers, while RPC handlers call sync for their own epochs. Now it seems mixed up and there's complicated concurrent control flows. 😕

Some other refactors are done accordingly.

Is it possible to split the changes above and other minor refactors?

@wenym1
Copy link
Contributor Author

wenym1 commented Jan 5, 2024

Does this decrease the overall complexity? Previously local barrier manager is only responsible for collecting and sending barriers, while RPC handlers call sync for their own epochs. Now it seems mixed up and there's complicated concurrent control flows. 😕

I plan to replace the current two rpc inject_barrier and collect_barrier with a long streaming bi-direction rpc between CN and meta. Therefore this PR is more like a refactor PR to move the work of inject_barrier and collect_barrier to the local barrier manager. Previously we had to call sync in the RPC handles because the local barrier manager was not in a async context.

Previously, the rpc handler gets notified from barrier manager on barrier collected, and then call sync. In this PR, the process of notifying barrier collected and calling sync is replaced with creating a future that calls sync and notifying on future completion. Hopes this explanation can help review this PR.

Is it possible to split the changes above and other minor refactors?

The greatest refactor in this PR is to change the key of barrier_state_map from epoch.curr to epoch.prev, and this is necessary in this PR because we should find find a specify barrier by its epoch.prev.

@tabVersion tabVersion self-requested a review January 5, 2024 10:00
@wenym1
Copy link
Contributor Author

wenym1 commented Jan 10, 2024

Is it possible to split the changes above and other minor refactors?

Some refactor logic is split to #14436 and gets merged. Now we can focus the logic of handling state store sync of this PR.

@kwannoel
Copy link
Contributor

Why do we introduce this PR, is it a prerequisite of some feature?

@wenym1
Copy link
Contributor Author

wenym1 commented Jan 11, 2024

Why do we introduce this PR, is it a prerequisite of some feature?

We are going to deprecate the send_barrier and complete_barrier rpc and replace them with a bidirectional grpc stream. This will be useful to the partial checkpoint implementation. It also helps the current code to better handle stale request during recovery.

@wenym1
Copy link
Contributor Author

wenym1 commented Jan 12, 2024

Any comments? @BugenZhao @yezizp2012 @kwannoel @tabVersion @hzxa21

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a rough look, basically LGTM. Wait for @yezizp2012 review

src/compute/src/rpc/service/stream_service.rs Show resolved Hide resolved
Comment on lines 121 to 122
pin!(self.state.next_completed_epoch()),
pin!(event_rx.recv()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a preferred branch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either branch is fine. The handler for each case is not async and blocking, so each case handler is expected to finish a very short time.

@@ -46,12 +48,11 @@ pub const ENABLE_BARRIER_AGGREGATION: bool = false;

/// Collect result of some barrier on current compute node. Will be reported to the meta service.
#[derive(Debug)]
pub struct CollectResult {
pub struct BarrierCompleteResult {
pub sync_result: Option<SyncResult>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a docstring for this field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

warn!(err=?e.as_ref().map(|_|()), "fail to send collect epoch result");
});
loop {
let item = drop_either_future(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more on the use of drop_either_future here?

Why we just drop lhs / rhs future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we use tokio::select! instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed to tokio::select!. Original use of drop_either_future is because the future will hold a mutable reference to self, and if not dropped we cannot modify the states in self.

}
}

/// Notify if we have collected barriers from all actor ids. The state must be `Issued`.
fn may_notify(&mut self, prev_epoch: u64) {
fn may_have_collected_all(&mut self, prev_epoch: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the docstring and naming of this function? may_have_collected_all seems kind of vague, I didn't really get the use of it.

From the code it seems to call sync_epoch and update the mview progress mainly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added docstring on the method. The naming may_xxx is to be consistent with the original method to help better compare with the original logic for code review.

Copy link
Contributor

@yezizp2012 yezizp2012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, btw do we have any detailed docs about the implementation of partial checkpoint?

.get_mut(&prev_epoch)
.expect("should exist");
// sanity check on barrier state
match &state.inner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nits: can use assert_matches instead.

@wenym1
Copy link
Contributor Author

wenym1 commented Jan 16, 2024

LGTM, btw do we have any detailed docs about the implementation of partial checkpoint?

Detailed design about barrier collection in partial checkpoint can be found in partial-checkpoint.md, which is in risingwavelabs/rfcs#84.

@wenym1 wenym1 enabled auto-merge January 16, 2024 10:19
@wenym1 wenym1 added this pull request to the merge queue Jan 16, 2024
Merged via the queue into main with commit 50fd512 Jan 16, 2024
26 of 27 checks passed
@wenym1 wenym1 deleted the yiming/local-barrier-manager-handle-sync branch January 16, 2024 11:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants