Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: reject transfer leader to the recently added peers #3878

Merged
merged 16 commits into from
Dec 7, 2018
5 changes: 5 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ pub fn new_store_cfg() -> Config {
region_split_check_diff: ReadableSize(10000),
report_region_flow_interval: ReadableDuration::millis(100),
raft_store_max_leader_lease: ReadableDuration::millis(250),
raft_reject_transfer_leader_duration: ReadableDuration::secs(0),
clean_stale_peer_delay: ReadableDuration::secs(0),
allow_remove_leader: true,
..Config::default()
Expand Down Expand Up @@ -496,6 +497,10 @@ pub fn configure_for_merge<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.merge_check_tick_interval = ReadableDuration::millis(100);
}

pub fn configure_for_transfer_leader<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_reject_transfer_leader_duration = ReadableDuration::secs(1);
}

pub fn configure_for_lease_read<T: Simulator>(
cluster: &mut Cluster<T>,
base_tick_ms: Option<u64>,
Expand Down
3 changes: 3 additions & 0 deletions src/raftstore/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct Config {
pub raft_log_gc_size_limit: ReadableSize,
// When a peer is not responding for this time, leader will not keep entry cache for it.
pub raft_entry_cache_life_time: ReadableDuration,
// When a peer is newly added, reject transferring leader to the peer for a while.
pub raft_reject_transfer_leader_duration: ReadableDuration,

// Interval (ms) to check region whether need to be split or not.
pub split_region_check_tick_interval: ReadableDuration,
Expand Down Expand Up @@ -155,6 +157,7 @@ impl Default for Config {
raft_log_gc_count_limit: split_size * 3 / 4 / ReadableSize::kb(1),
raft_log_gc_size_limit: split_size * 3 / 4,
raft_entry_cache_life_time: ReadableDuration::secs(30),
raft_reject_transfer_leader_duration: ReadableDuration::secs(3),
split_region_check_tick_interval: ReadableDuration::secs(10),
region_split_check_diff: split_size / 16,
clean_stale_peer_delay: ReadableDuration::minutes(10),
Expand Down
1 change: 1 addition & 0 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
if p.is_leader() {
p.peers_start_pending_time.push((id, now));
}
p.recent_added_peer.update(id, now);
p.insert_peer_cache(peer);
}
ConfChangeType::RemoveNode => {
Expand Down
41 changes: 39 additions & 2 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,32 @@ pub struct PeerStat {
pub written_keys: u64,
}

pub struct RecentAddedPeer {
pub reject_duration_as_secs: u64,
pub id: u64,
pub added_time: Instant,
}

impl RecentAddedPeer {
pub fn new(reject_duration_as_secs: u64) -> RecentAddedPeer {
RecentAddedPeer {
reject_duration_as_secs,
id: Default::default(),
added_time: Instant::now(),
}
}

pub fn update(&mut self, id: u64, now: Instant) {
self.id = id;
self.added_time = now;
}

pub fn contains(&self, id: u64) -> bool {
self.id == id
&& duration_to_sec(self.added_time.elapsed()) < self.reject_duration_as_secs as f64
}
}

pub struct Peer {
engines: Engines,
cfg: Rc<Config>,
Expand All @@ -231,6 +257,7 @@ pub struct Peer {
/// Record the instants of peers being added into the configuration.
/// Remove them after they are not pending any more.
pub peers_start_pending_time: Vec<(u64, Instant)>,
pub recent_added_peer: RecentAddedPeer,

coprocessor_host: Arc<CoprocessorHost>,
/// an inaccurate difference in region size since last reset.
Expand Down Expand Up @@ -379,6 +406,9 @@ impl Peer {
peer_cache: RefCell::new(HashMap::default()),
peer_heartbeats: HashMap::default(),
peers_start_pending_time: vec![],
recent_added_peer: RecentAddedPeer::new(
cfg.raft_reject_transfer_leader_duration.as_secs(),
),
coprocessor_host: Arc::clone(&store.coprocessor_host),
size_diff_hint: 0,
delete_keys_hint: 0,
Expand Down Expand Up @@ -1482,7 +1512,7 @@ impl Peer {
self.raft_group.transfer_leader(peer.get_id());
}

fn is_transfer_leader_allowed(&self, peer: &metapb::Peer) -> bool {
fn ready_to_transfer_leader(&self, peer: &metapb::Peer) -> bool {
let peer_id = peer.get_id();
let status = self.raft_group.status();

Expand All @@ -1495,6 +1525,13 @@ impl Peer {
return false;
}
}
if self.recent_added_peer.contains(peer_id) {
debug!(
"{} reject transfer leader to {:?} due to the peer was added recently",
self.tag, peer
);
return false;
}

let last_index = self.get_store().last_index();
last_index <= status.progress[&peer_id].matched + self.cfg.leader_transfer_max_log_lag
Expand Down Expand Up @@ -1732,7 +1769,7 @@ impl Peer {
let transfer_leader = get_transfer_leader_cmd(&req).unwrap();
let peer = transfer_leader.get_peer();

let transferred = if self.is_transfer_leader_allowed(peer) {
let transferred = if self.ready_to_transfer_leader(peer) {
self.transfer_leader(peer);
true
} else {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ fn test_serde_custom_tikv_config() {
raft_log_gc_count_limit: 12,
raft_log_gc_size_limit: ReadableSize::kb(1),
raft_entry_cache_life_time: ReadableDuration::secs(12),
raft_reject_transfer_leader_duration: ReadableDuration::secs(3),
split_region_check_tick_interval: ReadableDuration::secs(12),
region_split_check_diff: ReadableSize::mb(6),
region_compact_check_interval: ReadableDuration::secs(12),
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ raft-log-gc-threshold = 12
raft-log-gc-count-limit = 12
raft-log-gc-size-limit = "1KB"
raft-entry-cache-life-time = "12s"
raft-reject-transfer-leader-duration = "3s"
split-region-check-tick-interval = "12s"
region-split-check-diff = "6MB"
region-compact-check-interval = "12s"
Expand Down
37 changes: 37 additions & 0 deletions tests/integrations/raftstore/test_conf_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,35 @@ fn test_conf_change_safe<T: Simulator>(cluster: &mut Cluster<T>) {
pd_client.must_remove_peer(region_id, new_peer(2, 2));
}

fn test_transfer_leader_safe<T: Simulator>(cluster: &mut Cluster<T>) {
let pd_client = Arc::clone(&cluster.pd_client);
// Disable default max peer count check.
pd_client.disable_default_operator();

let region_id = cluster.run_conf_change();
let cfg = cluster.cfg.clone();

// Test adding nodes.
pd_client.must_add_peer(region_id, new_peer(2, 2));
pd_client.must_add_peer(region_id, new_peer(3, 3));
cluster.transfer_leader(region_id, new_peer(3, 3));
cluster.reset_leader_of_region(region_id);
assert_ne!(cluster.leader_of_region(region_id).unwrap().get_id(), 3);

// Test transfer leader after a safe duration.
thread::sleep(cfg.raft_store.raft_reject_transfer_leader_duration.into());
cluster.transfer_leader(region_id, new_peer(3, 3));
// Retry for more stability
for _ in 0..20 {
cluster.reset_leader_of_region(region_id);
if cluster.leader_of_region(region_id) != Some(new_peer(3, 3)) {
continue;
}
break;
}
assert_eq!(cluster.leader_of_region(region_id).unwrap().get_id(), 3);
}

fn test_learner_conf_change<T: Simulator>(cluster: &mut Cluster<T>) {
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
Expand Down Expand Up @@ -685,6 +714,14 @@ fn test_server_safe_conf_change() {
test_conf_change_safe(&mut cluster);
}

#[test]
fn test_server_transfer_leader_safe() {
let count = 5;
let mut cluster = new_server_cluster(0, count);
configure_for_transfer_leader(&mut cluster);
test_transfer_leader_safe(&mut cluster);
}

#[test]
fn test_conf_change_remove_leader() {
let mut cluster = new_node_cluster(0, 3);
Expand Down