-
Notifications
You must be signed in to change notification settings - Fork 526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(storage): concurrent checkpoint #2184
Conversation
Codecov Report
@@ Coverage Diff @@
## main #2184 +/- ##
==========================================
+ Coverage 73.60% 73.86% +0.26%
==========================================
Files 765 765
Lines 104939 105248 +309
==========================================
+ Hits 77236 77744 +508
+ Misses 27703 27504 -199
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
src/meta/src/barrier/mod.rs
Outdated
let heap = self.epoch_heap.clone(); | ||
let hummock_manager = self.hummock_manager.clone(); | ||
|
||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using multiple coroutines instead of multiple threads here? 🤔
We can collect the futures and await them without thread-overprovisioning, where the latency of RPC is also overlapped.
futures.push(async move{ ... })
...
let res = select_all(futures).await;
If coroutine works, there would be no data race on env
and epoch_heap
, so RwLock
could be mitigated.
env: Arc<MetaSrvEnv<S>>,
epoch_heap: Arc<BinaryHeap<u64>>,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okoko , I will do it after I finish the basic functions.
2ded88b
to
faf343c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add some docs for newly introduced structs and fields?
I've updated the branch so that we can fairly compare with 944b998 :) May pull this branch before committing new changes. |
Just tried to bench this on AWS. However, all create table and create materialized source statement will stuck. Guess there's deadlock, maybe you'll need to debug more 🤣 The bug might be produced easier in 3 compute node release mode. Use ./risedev configure to enable release build, and update the default section of |
Need to fix conflict. We have a new Grafana generator, and we should adapt to that. |
With #2184 (comment) and #2184 (comment), async fn barrier_complete_and_commit(....) {
let (failed_nodes, res) = match result {
Ok(resp) => {
// Try complete barriers
let succeeded_nodes = checkpoint_control.succeed(prev_epoch);
let mut failed_nodes = Vec::new();
let mut res: Result<()> = Ok(());
// Complete_barriers does the following things:
// - commit epoches in hummock_manager
// - trigger post completion work on barriers
// It returns:
// - Ok(()) if all succeed
// - Err((failed_nodes, err)) for the failed nodes and the error.
if let Err((failed_nodes, err)) = self.complete_barriers(succeeded_nodes) {
// Fail all pending barriers as well
failed_nodes.concat(checkpoint_control.fail());
return (failed_nodes, Err(err));
}
Ok(())
}
Err(err) => {
// Fail all barriers
(checkpoint_control.fail(), Err(err))
}
};
if !failed_nodes.is_empty() {
if self.enable_recovery {
// Post collection failure
for node in failed_nodes {
...
}
// Trigger recovery
...
} else {
panic!(...)
}
}
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, good work!
// there's barrier scheduled. | ||
_ = self.scheduled_barriers.wait_one() => {} | ||
_ = self.scheduled_barriers.wait_one(), if checkpoint_control.can_inject_barrier(self.in_flight_barrier_nums) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible performance regression, need to think twice. select! will evaluate all the conditions before actually doing selects. Is it possible that:
- can_inject_barrier = false at first
- no in-flight barrier (barrier_complete_rx will stuck)
At this time, only the shutdown branch and barrier_complete_rx will be activated. Even if can_inject_barrier
becomes true sometime later, the thread will still block on polling barrier_complete_rx
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that checkpoint_control
is a local state and will not be modified by other thread. So while the current thread is blocked, it's not likely to be changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can_inject_barrier will return false in two cases.
- the nums of in-flight > max. So there are in-flight barrier
- is_build_actor = true. So there is at least one in-flight barrier
So in-flight barrier returns ==>can_inject_barrier
becomes true. ==> continue next loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like there won't be any deadlock if all related logics are left untouched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyway, you can think of this code like:
loop {
if can_inject_barrier {
select! { shutdown, complete_rx }
} else {
select! { shutdown, complete_rx, scheduled_barrier }
}
}
If it is believed that this will always work without deadlock, then everything would be fine :)
... who's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM. Good work!!
/// remove all collect rx less than `prev_epoch` | ||
pub fn drain_collect_rx(&mut self, prev_epoch: u64) { | ||
self.collect_complete_receiver | ||
.drain_filter(|x, _| x < &prev_epoch); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be <=
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be used before send_barrier
. So there are not prev_epoch
in collect_complete_receiver
So there're no necessary changes on the shared buffer? That's really cool. 🥵 |
I commit code on another computer with root |
82d42d3
to
e3a7e34
Compare
The benchmark looks cool, that's so fast. By the way, would you please wait for all metrics to be fully loaded before taking a screenshot? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good work!
What's changed and what's your intention?
concurrent checkpoint
we do the initial version of concurrent barrier
commit_epoch
in order.managed_barrier_state
in map<epoch,state>barrier_nums
andbarrier_send_latency
).checkpoint_interval_ms
(deafult :10)newly introduced structs and fields
We use CheckpointControl to control the injection and save the state of barrier.
is_recovery
andis_build_actor
can pause inject barrier. The state of barrier is saved in queue. It makes the commit orderlyIn
managed_state
we use map to save collected actor for different epochChecklist
Refer to a related PR or issue link (optional)
#1156