Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: reject transfer leader to the recently added peers #3878

Merged
merged 16 commits into from Dec 7, 2018
5 changes: 5 additions & 0 deletions components/test_raftstore/src/util.rs
Expand Up @@ -105,6 +105,7 @@ pub fn new_store_cfg() -> Config {
region_split_check_diff: ReadableSize(10000),
report_region_flow_interval: ReadableDuration::millis(100),
raft_store_max_leader_lease: ReadableDuration::millis(250),
raft_reject_transfer_leader_duration: ReadableDuration::secs(0),
clean_stale_peer_delay: ReadableDuration::secs(0),
allow_remove_leader: true,
..Config::default()
Expand Down Expand Up @@ -496,6 +497,10 @@ pub fn configure_for_merge<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.merge_check_tick_interval = ReadableDuration::millis(100);
}

pub fn configure_for_transfer_leader<T: Simulator>(cluster: &mut Cluster<T>) {
cluster.cfg.raft_store.raft_reject_transfer_leader_duration = ReadableDuration::secs(1);
}

pub fn configure_for_lease_read<T: Simulator>(
cluster: &mut Cluster<T>,
base_tick_ms: Option<u64>,
Expand Down
3 changes: 3 additions & 0 deletions src/raftstore/store/config.rs
Expand Up @@ -54,6 +54,8 @@ pub struct Config {
pub raft_log_gc_size_limit: ReadableSize,
// When a peer is not responding for this time, leader will not keep entry cache for it.
pub raft_entry_cache_life_time: ReadableDuration,
// When a peer is newly added, reject transfer leader to the peer for a while.
nolouch marked this conversation as resolved.
Show resolved Hide resolved
pub raft_reject_transfer_leader_duration: ReadableDuration,

// Interval (ms) to check region whether need to be split or not.
pub split_region_check_tick_interval: ReadableDuration,
Expand Down Expand Up @@ -155,6 +157,7 @@ impl Default for Config {
raft_log_gc_count_limit: split_size * 3 / 4 / ReadableSize::kb(1),
raft_log_gc_size_limit: split_size * 3 / 4,
raft_entry_cache_life_time: ReadableDuration::secs(30),
raft_reject_transfer_leader_duration: ReadableDuration::secs(10),
nolouch marked this conversation as resolved.
Show resolved Hide resolved
split_region_check_tick_interval: ReadableDuration::secs(10),
region_split_check_diff: split_size / 16,
clean_stale_peer_delay: ReadableDuration::minutes(10),
Expand Down
1 change: 1 addition & 0 deletions src/raftstore/store/fsm/peer.rs
Expand Up @@ -893,6 +893,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
if p.is_leader() {
p.peers_start_pending_time.push((id, now));
}
p.recent_added_peers.push((id, now));
p.insert_peer_cache(peer);
}
ConfChangeType::RemoveNode => {
Expand Down
46 changes: 46 additions & 0 deletions src/raftstore/store/peer.rs
Expand Up @@ -215,6 +215,41 @@ pub struct PeerStat {
pub written_keys: u64,
}

pub struct RecentAddedPeersQueue {
pub reject_duration_as_secs: u64,
pub queue: VecDeque<(u64, Instant)>,
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}

impl RecentAddedPeersQueue {
pub fn new(reject_duration_as_secs: u64) -> RecentAddedPeersQueue {
RecentAddedPeersQueue {
reject_duration_as_secs,
queue: Default::default(),
}
}

pub fn push(&mut self, pair: (u64, Instant)) {
if self.queue.iter().any(|&(pid, _)| pid == pair.0) {
return;
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}
self.queue.push_back(pair);
for i in 0..self.queue.len() {
if duration_to_sec(self.queue[i].1.elapsed()) > self.reject_duration_as_secs as f64 {
self.queue.pop_front();
} else {
break;
}
nolouch marked this conversation as resolved.
Show resolved Hide resolved
}
}

pub fn get(&self, id: u64) -> Option<&(u64, Instant)> {
self.queue.iter().find(|&(pid, start_time)| {
*pid == id
&& duration_to_sec(start_time.elapsed()) <= self.reject_duration_as_secs as f64
})
}
}

pub struct Peer {
engines: Engines,
cfg: Rc<Config>,
Expand All @@ -231,6 +266,7 @@ pub struct Peer {
/// Record the instants of peers being added into the configuration.
/// Remove them after they are not pending any more.
pub peers_start_pending_time: Vec<(u64, Instant)>,
pub recent_added_peers: RecentAddedPeersQueue,

coprocessor_host: Arc<CoprocessorHost>,
/// an inaccurate difference in region size since last reset.
Expand Down Expand Up @@ -379,6 +415,9 @@ impl Peer {
peer_cache: RefCell::new(HashMap::default()),
peer_heartbeats: HashMap::default(),
peers_start_pending_time: vec![],
recent_added_peers: RecentAddedPeersQueue::new(
cfg.raft_reject_transfer_leader_duration.as_secs(),
),
coprocessor_host: Arc::clone(&store.coprocessor_host),
size_diff_hint: 0,
delete_keys_hint: 0,
Expand Down Expand Up @@ -1495,6 +1534,13 @@ impl Peer {
return false;
}
}
if self.recent_added_peers.get(peer_id).is_some() {
debug!(
"{} reject transfer leader to {:?} due to the peer was added recently",
self.tag, peer
);
return false;
}

let last_index = self.get_store().last_index();
last_index <= status.progress[&peer_id].matched + self.cfg.leader_transfer_max_log_lag
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Expand Up @@ -123,6 +123,7 @@ fn test_serde_custom_tikv_config() {
raft_log_gc_count_limit: 12,
raft_log_gc_size_limit: ReadableSize::kb(1),
raft_entry_cache_life_time: ReadableDuration::secs(12),
raft_reject_transfer_leader_duration: ReadableDuration::secs(10),
split_region_check_tick_interval: ReadableDuration::secs(12),
region_split_check_diff: ReadableSize::mb(6),
region_compact_check_interval: ReadableDuration::secs(12),
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Expand Up @@ -79,6 +79,7 @@ raft-log-gc-threshold = 12
raft-log-gc-count-limit = 12
raft-log-gc-size-limit = "1KB"
raft-entry-cache-life-time = "12s"
raft-reject-transfer-leader-duration = "10s"
split-region-check-tick-interval = "12s"
region-split-check-diff = "6MB"
region-compact-check-interval = "12s"
Expand Down
35 changes: 35 additions & 0 deletions tests/integrations/raftstore/test_conf_change.rs
Expand Up @@ -585,6 +585,26 @@ fn test_conf_change_safe<T: Simulator>(cluster: &mut Cluster<T>) {
pd_client.must_remove_peer(region_id, new_peer(2, 2));
}

fn test_transfer_leader_safe<T: Simulator>(cluster: &mut Cluster<T>) {
let pd_client = Arc::clone(&cluster.pd_client);
// Disable default max peer count check.
pd_client.disable_default_operator();

let region_id = cluster.run_conf_change();

// Test adding nodes.
pd_client.must_add_peer(region_id, new_peer(2, 2));
pd_client.must_add_peer(region_id, new_peer(3, 3));
cluster.transfer_leader(region_id, new_peer(2, 2));
assert_ne!(cluster.leader_of_region(region_id).unwrap().get_id(), 2);
cluster.transfer_leader(region_id, new_peer(3, 3));
assert_ne!(cluster.leader_of_region(region_id).unwrap().get_id(), 3);

// Test transfer leader after a safe duration.
thread::sleep(Duration::from_secs(3));
cluster.must_transfer_leader(region_id, new_peer(2, 2));
}

fn test_learner_conf_change<T: Simulator>(cluster: &mut Cluster<T>) {
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
Expand Down Expand Up @@ -685,6 +705,21 @@ fn test_server_safe_conf_change() {
test_conf_change_safe(&mut cluster);
}

#[test]
fn test_node_transfer_leader_safe() {
nolouch marked this conversation as resolved.
Show resolved Hide resolved
let count = 5;
let mut cluster = new_node_cluster(0, count);
configure_for_transfer_leader(&mut cluster);
test_transfer_leader_safe(&mut cluster);
}

#[test]
fn test_server_transfer_leader_safe() {
let count = 5;
let mut cluster = new_server_cluster(0, count);
configure_for_transfer_leader(&mut cluster);
test_transfer_leader_safe(&mut cluster);
}
#[test]
fn test_conf_change_remove_leader() {
let mut cluster = new_node_cluster(0, 3);
Expand Down