New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
raftstore-v2: introduce apply trace #13939
Conversation
raftstore v2 disables WAL for all tablets and store all states to raft engine. To be able to recover from restart, we need to build some relations between raft engine and tablets flush. In the previous PR, flush indexes are stored in raft engine by `PersistenceListener`. In this PR, ApplyTrace is introduced to anaylze apply index after restart. And it will trigger persistence for more apply progress like split. Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
48dc36c
to
332d4d5
Compare
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
6e9fcdc
to
48e22ce
Compare
for cf in self.cf_names() { | ||
handles.push(util::get_cf_handle(self.as_inner(), cf)?); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: maybe it's cleaner to use a temp cfs to cover both cases.
something like:
let cfs = cfs.is_empty()? &self.cf_names() : cfs;
for cf in cfs .....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf_names
will allocate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf_names
will allocate.
if !cfs.is_empty(), self.cf_names() won't be called? it's a minor issue anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf_names
will return a Vec
, so you can't put it inside if branch. Instead, it needs to allocate first, and then choose what to borrow.
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
data_cfs: Box<[Progress; DATA_CFS_LEN]>, | ||
admin: Progress, | ||
// Index that is issued to be written. It may not be truely persisted. | ||
persisted_applied: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for persisting admin.flushed every 100 indexes or every 5 minutes, which is not implemented yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One general question: When we redo the split, what to do with the existing tablets? They should be deleted. But I did not see the code.
And the more complicated part is for merge, because once merge is committed, the source regions data may be gone. And there's no way to redo the merge.
To get rid of all these problems, I think we should force the flush in both split and merge. Then we don't need to redo any of them.
for cf in self.cf_names() { | ||
handles.push(util::get_cf_handle(self.as_inner(), cf)?); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cf_names
will allocate.
if !cfs.is_empty(), self.cf_names() won't be called? it's a minor issue anyway.
continue; | ||
} | ||
// Note flushed largest_seqno equals to earliest_seqno of next memtable. | ||
if pr.earliest_seqno < largest_seqno { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if pr.earliest_seqno >= largest_seqno, it will panic later. Should we add an assert here to be more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will break instead of panicking. If any pr is found, it will not panic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will break instead of panicking. If any pr is found, it will not panic.
Got it.
let off = cf_offset(cf); | ||
if self.should_skip(off, index) { | ||
return Ok(()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not add pre_apply to wrap this code, otherwise it's duplicated in all apply_ methods.
let off = cf_offset(cf);
if self.should_skip(off, index) {
return Ok(());
}
Also add post_apply for
self.modifications_mut()[off] = index;
These pre_/post_apply can be called outside
Then, these apply_put, apply_delete won't need change at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's more clear to be called in place. The semantics of modifications
is it changes data not whether the command succeed or not.
state_changes: changes, | ||
}); | ||
} | ||
|
||
/// Called a memtable finished flushing. | ||
pub fn on_flush_completed(&self, cf: &str, id: u64) { | ||
pub fn on_flush_completed(&self, cf: &str, largest_seqno: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered this API was called in separate thread in v1 other than raft threads, otherwise it will have performance regression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In v2 this API is called by rocksdb flush thread.
|
||
#[derive(Clone, Copy, Default)] | ||
struct Progress { | ||
flushed: u64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flushed: u64, | |
flushed_index: u64, |
applied_term: u64, | ||
modifications: DataTrace, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comments for these fields.
} | ||
|
||
/// An alias of frequent use type that each data cf has a u64. | ||
pub type DataTrace = [u64; DATA_CFS_LEN]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub type DataTrace = [u64; DATA_CFS_LEN]; | |
pub type CfsFlushIndexes = [u64; DATA_CFS_LEN]; |
DataTrace is too general and hard to understand what's the exactly meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually DataTrace
is a better name to reflect the idea. As we don't trace all indexes of all CFs, it only trace the data related CFs.
} | ||
} | ||
|
||
fn record_modify(&mut self, cf: &str, index: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn record_modify(&mut self, cf: &str, index: u64) { | |
fn set_last_modified_index(&mut self, cf: &str, index: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as other record_xxx. Let's use set_ for consistency. Also please add the suffix _index to be more specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's on purpose to avoid the _index
suffix, otherwise almost every methods fields in this files will have the suffixes. As long as we all know the file is about indexes, the suffix is not needed.
self.data_cfs.iter().map(|p| p.flushed).min().unwrap() | ||
} | ||
|
||
fn record_flush(&mut self, cf: &str, index: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn record_flush(&mut self, cf: &str, index: u64) { | |
fn set_cf_flushed_index(&mut self, cf: &str, index: u64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
record_flush
here means record a flush event.
} | ||
|
||
#[inline] | ||
fn data_index(&self) -> u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fn data_index(&self) -> u64 { | |
fn min_flushed_index(&self) -> u64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not the same. data_index
is the index of data cfs, not arbitrary flushed index.
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
lb.put_region_state(region_id, last_index, self.region_state()) | ||
.unwrap(); | ||
for cf in ALL_CFS { | ||
lb.put_flushed_index(region_id, cf, last_index, last_index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't it enough to just record CF_RAFT?
.get_region_state(region_id, trace.admin.flushed)? | ||
.unwrap(); | ||
let data_index = trace.data_index(); | ||
// If index is not larger than applied_index, it means some CF doesn't have any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know what does "index" and "applied_index" refers to here. It should be "data_index" and "admin_flushed"?
apply_trace.maybe_advance_admin_flushed(apply_index); | ||
} | ||
|
||
pub fn on_manual_flush(&mut self, cfs: Vec<&'static str>, ch: CmdResChannel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed anymore after post write callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, as far as I know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it now?
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
@@ -78,6 +82,13 @@ pub struct Peer<EK: KvEngine, ER: RaftEngine> { | |||
|
|||
// Trace which peers have not finished split. | |||
split_trace: Vec<(u64, HashSet<u64>)>, | |||
|
|||
/// Apply ralated State changes that needs to be persisted to raft engine. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Apply ralated State changes that needs to be persisted to raft engine. | |
/// Apply related state changes that needs to be persisted to raft engine. |
apply_trace.maybe_advance_admin_flushed(apply_index); | ||
} | ||
|
||
pub fn on_manual_flush(&mut self, cfs: Vec<&'static str>, ch: CmdResChannel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it now?
/// Logs may be replayed from the some apply index, but those data may have | ||
/// been flushed in the past, so we need the flushed indexes to decide what | ||
/// logs can be skipped for certain CFs. If all CFs are flushed before the | ||
/// apply index, `None` is returned. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apply index -> admin flushed / persisted apply index ?
|
||
// All events before `mem_index` must be consumed before calling this function. | ||
fn maybe_advance_admin_flushed(&mut self, mem_index: u64) { | ||
if self.admin.flushed < self.admin.last_modified { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it can be confusing sometimes for admin to reuse the progress struct. For normal progress it should have invariant flushed <= last_modified
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the relation between flushed
and last_modified
can be arbitrary.
} | ||
// At best effort, we can only advance the index to `mem_index`. | ||
let mut candidate = mem_index; | ||
for pr in self.data_cfs.iter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe filter().map().min() is more readable.
Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
/merge |
@BusyJay: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 261c162
|
@BusyJay: Your PR was out of date, I have automatically updated it for you. If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
/merge |
@BusyJay: It seems you want to merge this PR, I will help you trigger all the tests: /run-all-tests You only need to trigger If you have any questions about the PR merge process, please refer to pr process. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
This pull request has been accepted and is ready to merge. Commit hash: 42fa3c9
|
What is changed and how it works?
Issue Number: Ref #12842
What's Changed:
Check List
Tests
Release note