Skip to content

Commit

Permalink
change merge flow path
Browse files Browse the repository at this point in the history
Signed-off-by: Liqi Geng <gengliqiii@gmail.com>
  • Loading branch information
gengliqi committed Feb 14, 2020
1 parent 6068bf5 commit 46585d5
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 411 deletions.
29 changes: 16 additions & 13 deletions components/test_raftstore/src/pd.rs
Expand Up @@ -860,25 +860,28 @@ impl TestPdClient {
pub fn must_merge(&self, from: u64, target: u64) {
self.merge_region(from, target);

for _ in 1..500 {
sleep_ms(10);

if self.get_region_by_id(from).wait().unwrap().is_none() {
return;
}
}

let region = self.get_region_by_id(from).wait().unwrap();
if region.is_none() {
return;
}
panic!("region {:?} is still not merged.", region.unwrap());
self.check_merged_timeout(from, Duration::from_secs(5));
}

pub fn check_merged(&self, from: u64) -> bool {
self.get_region_by_id(from).wait().unwrap().is_none()
}

pub fn check_merged_timeout(&self, from: u64, duration: Duration) {
let timer = Instant::now();
loop {
let region = self.get_region_by_id(from).wait().unwrap();
if let Some(r) = region {
if timer.elapsed() > duration {
panic!("region {:?} is still not merged.", r);
}
} else {
return;
}
sleep_ms(10);
}
}

pub fn region_leader_must_be(&self, region_id: u64, peer: metapb::Peer) {
for _ in 0..500 {
sleep_ms(10);
Expand Down
6 changes: 3 additions & 3 deletions src/pd/pd.rs
Expand Up @@ -29,7 +29,7 @@ use crate::raftstore::store::util::is_epoch_stale;
use crate::raftstore::store::util::KeysInfoFormatter;
use crate::raftstore::store::Callback;
use crate::raftstore::store::StoreInfo;
use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter};
use crate::raftstore::store::{CasualMessage, PeerMsg, RaftCommand, RaftRouter, SignificantMsg};
use crate::storage::FlowStatistics;
use tikv_util::collections::HashMap;
use tikv_util::metrics::ThreadInfoStatistics;
Expand Down Expand Up @@ -981,9 +981,9 @@ fn send_admin_request(
/// Sends merge fail message to gc merge source.
fn send_merge_fail(router: &RaftRouter, source_region_id: u64, target: metapb::Peer) {
let target_id = target.get_id();
if let Err(e) = router.send(
if let Err(e) = router.force_send(
source_region_id,
PeerMsg::CasualMessage(CasualMessage::MergeResult {
PeerMsg::SignificantMsg(SignificantMsg::MergeResult {
target,
stale: true,
}),
Expand Down
94 changes: 33 additions & 61 deletions src/raftstore/store/fsm/apply.rs
Expand Up @@ -34,7 +34,7 @@ use crate::import::SSTImporter;
use crate::raftstore::coprocessor::CoprocessorHost;
use crate::raftstore::store::fsm::{RaftPollerBuilder, RaftRouter};
use crate::raftstore::store::metrics::*;
use crate::raftstore::store::msg::{Callback, PeerMsg};
use crate::raftstore::store::msg::{Callback, PeerMsg, SignificantMsg};
use crate::raftstore::store::peer::Peer;
use crate::raftstore::store::peer_storage::{self, write_initial_apply_state, write_peer_state};
use crate::raftstore::store::util::check_region_epoch;
Expand Down Expand Up @@ -175,7 +175,6 @@ pub enum ExecResult {
region: Region,
state: MergeState,
},
CatchUpLogs(CatchUpLogs),
CommitMerge {
region: Region,
source: Region,
Expand Down Expand Up @@ -954,8 +953,7 @@ impl ApplyDelegate {
| ExecResult::VerifyHash { .. }
| ExecResult::CompactLog { .. }
| ExecResult::DeleteRange { .. }
| ExecResult::IngestSST { .. }
| ExecResult::CatchUpLogs { .. } => {}
| ExecResult::IngestSST { .. } => {}
ExecResult::SplitRegion { ref derived, .. } => {
self.region = derived.clone();
self.metrics.size_diff_hint = 0;
Expand Down Expand Up @@ -1741,14 +1739,15 @@ impl ApplyDelegate {
// The target peer should send missing log entries to the source peer.
//
// So, the merge process order would be:
// 1. `exec_commit_merge` in target apply worker
// 2. `catch_up_logs_for_merge` in source apply worker (check whether need to catch up logs)
// 3. `on_ready_catch_up_logs` in source raftstore
// 4. ... (raft append and apply logs)
// 5. `on_ready_prepare_merge` in source raftstore (means source region has finished applying all logs)
// 6. `catch_up_logs_for_merge` in source apply worker (destroy itself and send LogsUpToDate)
// 7. resume `exec_commit_merge` in target apply worker
// 8. `on_ready_commit_merge` in target raftstore
// 1. `exec_commit_merge` in target apply fsm and send `CatchUpLogs` to source peer fsm
// 2. `on_catch_up_logs_for_merge` in source peer fsm
// 3. if the source peer has already executed the corresponding `on_ready_prepare_merge`, set pending_remove and jump to step 6
// 4. ... (raft append and apply logs)
// 5. `on_ready_prepare_merge` in source peer fsm and set pending_remove (means source region has finished applying all logs)
// 6. `logs_up_to_date_for_merge` in source apply fsm (destroy its apply fsm and send Noop to trigger the target apply fsm)
// 7. resume `exec_commit_merge` in target apply fsm
// 8. `on_ready_commit_merge` in target peer fsm and send `MergeResult` to source peer fsm
// 9. `on_merge_result` in source peer fsm (destroy itself)
fn exec_commit_merge(
&mut self,
ctx: &mut ApplyContext,
Expand Down Expand Up @@ -1789,15 +1788,16 @@ impl ApplyDelegate {
"peer_id" => self.id(),
"source_region_id" => source_region_id
);

// Sends message to the source apply worker and pause `exec_commit_merge` process
fail_point!("before_handle_catch_up_logs_for_merge");
// Sends message to the source peer fsm and pause `exec_commit_merge` process
let logs_up_to_date = Arc::new(AtomicU64::new(0));
let msg = Msg::CatchUpLogs(CatchUpLogs {
let msg = SignificantMsg::CatchUpLogs(CatchUpLogs {
target_region_id: self.region_id(),
merge: merge.to_owned(),
logs_up_to_date: logs_up_to_date.clone(),
});
ctx.router.schedule_task(source_region_id, msg);
ctx.notifier
.notify(source_region_id, PeerMsg::SignificantMsg(msg));
return Ok((
AdminResponse::default(),
ApplyResult::WaitMergeSource(logs_up_to_date),
Expand Down Expand Up @@ -2241,8 +2241,8 @@ pub enum Msg {
},
Registration(Registration),
Proposal(RegionProposal),
CatchUpLogs(CatchUpLogs),
LogsUpToDate(u64),
LogsUpToDate(CatchUpLogs),
Noop,
Destroy(Destroy),
Snapshot(GenSnapTask),
#[cfg(test)]
Expand Down Expand Up @@ -2277,8 +2277,8 @@ impl Debug for Msg {
Msg::Registration(ref r) => {
write!(f, "[region {}] Reg {:?}", r.region.get_id(), r.apply_state)
}
Msg::CatchUpLogs(cul) => write!(f, "{:?}", cul.merge),
Msg::LogsUpToDate(region_id) => write!(f, "[region {}] logs are updated", region_id),
Msg::LogsUpToDate(_) => write!(f, "logs are updated"),
Msg::Noop => write!(f, "noop"),
Msg::Destroy(ref d) => write!(f, "[region {}] destroy", d.region_id),
Msg::Snapshot(GenSnapTask { region_id, .. }) => {
write!(f, "[region {}] requests a snapshot", region_id)
Expand Down Expand Up @@ -2506,37 +2506,7 @@ impl ApplyFsm {
true
}

fn catch_up_logs_for_merge(&mut self, ctx: &mut ApplyContext, catch_up_logs: CatchUpLogs) {
if ctx.timer.is_none() {
ctx.timer = Some(SlowTimer::new());
}

// if it is already up to date, no need to catch up anymore
let apply_index = self.delegate.apply_state.get_applied_index();
debug!(
"check catch up logs for merge";
"apply_index" => apply_index,
"commit" => catch_up_logs.merge.get_commit(),
"region_id" => self.delegate.region_id(),
"peer_id" => self.delegate.id(),
);
if apply_index < catch_up_logs.merge.get_commit() {
fail_point!("on_handle_catch_up_logs_for_merge");
let mut res = VecDeque::new();
// send logs to raftstore to append
res.push_back(ExecResult::CatchUpLogs(catch_up_logs));

// TODO: can we use `ctx.finish_for()` directly? is it safe here?
ctx.apply_res.push(ApplyRes {
region_id: self.delegate.region_id(),
apply_state: self.delegate.apply_state.clone(),
exec_res: res,
metrics: self.delegate.metrics.clone(),
applied_index_term: self.delegate.applied_index_term,
});
return;
}

fn logs_up_to_date_for_merge(&mut self, ctx: &mut ApplyContext, catch_up_logs: CatchUpLogs) {
fail_point!("after_handle_catch_up_logs_for_merge");
fail_point!(
"after_handle_catch_up_logs_for_merge_1000_1003",
Expand All @@ -2545,18 +2515,20 @@ impl ApplyFsm {
);

let region_id = self.delegate.region_id();
self.destroy(ctx);
catch_up_logs
.logs_up_to_date
.store(region_id, Ordering::SeqCst);
info!(
"source logs are all applied now";
"region_id" => region_id,
"peer_id" => self.delegate.id(),
);

// The source peer fsm will be destroyed when the target peer executes `on_ready_commit_merge`
// and sends `merge result` to the source peer fsm.
self.destroy(ctx);
catch_up_logs
.logs_up_to_date
.store(region_id, Ordering::SeqCst);
// To trigger the target apply fsm
if let Some(mailbox) = ctx.router.mailbox(catch_up_logs.target_region_id) {
let _ = mailbox.force_send(Msg::LogsUpToDate(region_id));
let _ = mailbox.force_send(Msg::Noop);
} else {
error!(
"failed to get mailbox, are we shutting down?";
Expand Down Expand Up @@ -2610,8 +2582,8 @@ impl ApplyFsm {
Some(Msg::Proposal(prop)) => self.handle_proposal(prop),
Some(Msg::Registration(reg)) => self.handle_registration(reg),
Some(Msg::Destroy(d)) => self.handle_destroy(apply_ctx, d),
Some(Msg::CatchUpLogs(cul)) => self.catch_up_logs_for_merge(apply_ctx, cul),
Some(Msg::LogsUpToDate(_)) => {}
Some(Msg::LogsUpToDate(cul)) => self.logs_up_to_date_for_merge(apply_ctx, cul),
Some(Msg::Noop) => {}
Some(Msg::Snapshot(snap_task)) => self.handle_snapshot(apply_ctx, snap_task),
#[cfg(test)]
Some(Msg::Validate(_, f)) => f(&self.delegate),
Expand Down Expand Up @@ -2792,7 +2764,7 @@ impl ApplyRouter {
}
return;
}
Msg::Apply { .. } | Msg::Destroy(_) | Msg::LogsUpToDate(_) => {
Msg::Apply { .. } | Msg::Destroy(_) | Msg::Noop => {
info!(
"target region is not found, drop messages";
"region_id" => region_id
Expand All @@ -2806,7 +2778,7 @@ impl ApplyRouter {
);
return;
}
Msg::CatchUpLogs(cul) => {
Msg::LogsUpToDate(cul) => {
warn!(
"region is removed before merged, are we shutting down?";
"region_id" => region_id,
Expand Down

0 comments on commit 46585d5

Please sign in to comment.