Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: set is_merging flag after restart #5871

Merged
merged 5 commits into from Nov 26, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

fix merge bug

Signed-off-by: gengliqi <gengliqiii@gmail.com>
  • Loading branch information
gengliqi committed Nov 12, 2019
commit 97a7a50c3ee4c772811acc890d5bbc36e973de45
@@ -644,7 +644,7 @@ impl ApplyDelegate {
merged: false,
ready_source_region_id: 0,
wait_merge_state: None,
is_merging: false,
is_merging: reg.is_merging,
pending_cmds: Default::default(),
metrics: Default::default(),
last_merge_version: 0,
@@ -2133,6 +2133,7 @@ pub struct Registration {
pub apply_state: RaftApplyState,
pub applied_index_term: u64,
pub region: Region,
pub is_merging: bool,
}

impl Registration {
@@ -2143,6 +2144,7 @@ impl Registration {
apply_state: peer.get_store().apply_state().clone(),
applied_index_term: peer.get_store().applied_index_term(),
region: peer.region().clone(),
is_merging: peer.pending_merge_state.is_some(),
}
}
}
@@ -2368,6 +2370,8 @@ impl ApplyFsm {
self.delegate.region_id() == 1000 && self.delegate.id() == 1003,
|_| {}
);
fail_point!("on_handle_apply", |_| {});

if apply.entries.is_empty() || self.delegate.pending_remove || self.delegate.stopped {
return;
}
@@ -809,6 +809,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn on_apply_res(&mut self, res: ApplyTaskRes) {
fail_point!("on_apply_res", |_| {});
match res {
ApplyTaskRes::Apply(mut res) => {
debug!(
@@ -2413,6 +2414,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.register_raft_gc_log_tick();
}
debug_assert!(!self.fsm.stopped);
fail_point!("on_raft_gc_log_tick", |_| {});

// As leader, we would not keep caches for the peers that didn't response heartbeat in the
// last few seconds. That happens probably because another TiKV is down. In this case if we
@@ -539,3 +539,87 @@ fn test_node_merge_multiple_snapshots(together: bool) {
cluster.clear_send_filters();
must_get_equal(&cluster.get_engine(3), b"k9", b"v9");
}

// Test if compact log is ignored after premerge was applied and restart
// I.e. is_merging flag should be set after restart
#[test]
fn test_node_merge_restart_after_apply_premerge_before_apply_compact_log() {
let _guard = crate::setup();
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
cluster.cfg.raft_store.merge_max_log_gap = 10;
cluster.cfg.raft_store.raft_log_gc_count_limit = 11;
// rely on this config to trigger a compact log
cluster.cfg.raft_store.raft_log_gc_size_limit = ReadableSize(1);
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(10);
cluster.run();
// prevent gc_log_tick to propose a compact log
let raft_gc_log_tick_fp = "on_raft_gc_log_tick";
fail::cfg(raft_gc_log_tick_fp, "return()").unwrap();
cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");

let pd_client = Arc::clone(&cluster.pd_client);
let region = pd_client.get_region(b"k1").unwrap();

cluster.must_split(&region, b"k2");

let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k2").unwrap();
let left_peer_1 = find_peer(&left, 1).cloned().unwrap();
cluster.must_transfer_leader(left.get_id(), left_peer_1);

// make log gap between store 1 and store 3, for min_index in preMerge
cluster.add_send_filter(IsolationFilterFactory::new(3));
for i in 0..6 {
cluster.must_put(format!("k1{}", i).as_bytes(), b"v1");
}
// prevent on_apply_res to update merge_state in Peer
// if not, almost everything cannot propose including compact log
let on_apply_res_fp = "on_apply_res";
fail::cfg(on_apply_res_fp, "return()").unwrap();

let merge = new_prepare_merge(right.clone());
let req = new_admin_request(left.get_id(), left.get_region_epoch(), merge);
let resp = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();
if resp.get_header().has_error() {
panic!("response {:?} has error", resp);
}
cluster.clear_send_filters();
// prevent apply fsm to apply compact log
let handle_apply_fp = "on_handle_apply";
fail::cfg(handle_apply_fp, "return()").unwrap();

let state1 = cluster.truncated_state(left.get_id(), 1);
fail::remove(raft_gc_log_tick_fp);

// wait for compact log to be proposed and committed maybe
sleep_ms(30);

cluster.shutdown();

fail::remove(handle_apply_fp);
fail::remove(on_apply_res_fp);
// prevent sched_merge_tick to propose CommitMerge
let schedule_merge_fp = "on_schedule_merge";
fail::cfg(schedule_merge_fp, "return()").unwrap();

cluster.start().unwrap();

// wait for compact log to apply
for _ in 0..50 {
let state2 = cluster.truncated_state(left.get_id(), 1);
if state1.get_index() != state2.get_index() {
break;
}
sleep_ms(10);
}
// can schedule merge now
fail::remove(schedule_merge_fp);

// propose to left region and wait for merge to succeed conveniently
cluster.must_put(b"k123", b"v2");
must_get_equal(&cluster.get_engine(3), b"k123", b"v2");
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.