Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: report pending peers #1395

Merged
merged 1 commit into from Dec 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/pd/mod.rs
Expand Up @@ -88,7 +88,8 @@ pub trait PdClient: Send + Sync {
fn region_heartbeat(&self,
region: metapb::Region,
leader: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>)
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>)
-> Result<pdpb::RegionHeartbeatResponse>;

// Ask pd for split, pd will returns the new split region id.
Expand Down
4 changes: 3 additions & 1 deletion src/pd/protocol.rs
Expand Up @@ -123,12 +123,14 @@ impl super::PdClient for RpcClient {
fn region_heartbeat(&self,
region: metapb::Region,
leader: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>)
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>)
-> Result<pdpb::RegionHeartbeatResponse> {
let mut heartbeat = pdpb::RegionHeartbeatRequest::new();
heartbeat.set_region(region);
heartbeat.set_leader(leader);
heartbeat.set_down_peers(RepeatedField::from_vec(down_peers));
heartbeat.set_pending_peers(RepeatedField::from_vec(pending_peers));

let mut req = new_request(self.cluster_id, pdpb::CommandType::RegionHeartbeat);
req.set_region_heartbeat(heartbeat);
Expand Down
17 changes: 17 additions & 0 deletions src/raftstore/store/peer.rs
Expand Up @@ -493,6 +493,23 @@ impl Peer {
down_peers
}

pub fn collect_pending_peers(&self) -> Vec<metapb::Peer> {
let mut pending_peers = Vec::with_capacity(self.region().get_peers().len());
let status = self.raft_group.status();
let truncated_idx = self.get_store().truncated_index();
for (id, progress) in status.progress {
if id == self.peer.get_id() {
continue;
}
if progress.matched < truncated_idx {
if let Some(p) = self.get_peer_from_cache(id) {
pending_peers.push(p);
}
}
}
pending_peers
}

pub fn check_stale_state(&mut self, d: Duration) -> StaleState {
// Updates the `leader_missing_time` according to the current state.
if self.leader_id() == raft::INVALID_ID {
Expand Down
1 change: 1 addition & 0 deletions src/raftstore/store/store.rs
Expand Up @@ -1204,6 +1204,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
region: peer.region().clone(),
peer: peer.peer.clone(),
down_peers: peer.collect_down_peers(self.cfg.max_peer_down_duration),
pending_peers: peer.collect_pending_peers(),
};
if let Err(e) = self.pd_worker.schedule(task) {
error!("{} failed to notify pd: {}", peer.tag, e);
Expand Down
11 changes: 7 additions & 4 deletions src/raftstore/store/worker/pd.rs
Expand Up @@ -42,6 +42,7 @@ pub enum Task {
region: metapb::Region,
peer: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>,
},
StoreHeartbeat { stats: pdpb::StoreStats },
ReportSplit {
Expand Down Expand Up @@ -143,11 +144,13 @@ impl<T: PdClient> Runner<T> {
fn handle_heartbeat(&self,
region: metapb::Region,
peer: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>) {
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>) {
PD_REQ_COUNTER_VEC.with_label_values(&["heartbeat", "all"]).inc();

// Now we use put region protocol for heartbeat.
match self.pd_client.region_heartbeat(region.clone(), peer.clone(), down_peers) {
match self.pd_client
.region_heartbeat(region.clone(), peer.clone(), down_peers, pending_peers) {
Ok(mut resp) => {
PD_REQ_COUNTER_VEC.with_label_values(&["heartbeat", "success"]).inc();

Expand Down Expand Up @@ -272,8 +275,8 @@ impl<T: PdClient> Runnable<Task> for Runner<T> {
Task::AskSplit { region, split_key, peer } => {
self.handle_ask_split(region, split_key, peer)
}
Task::Heartbeat { region, peer, down_peers } => {
self.handle_heartbeat(region, peer, down_peers)
Task::Heartbeat { region, peer, down_peers, pending_peers } => {
self.handle_heartbeat(region, peer, down_peers, pending_peers)
}
Task::StoreHeartbeat { stats } => self.handle_store_heartbeat(stats),
Task::ReportSplit { left, right } => self.handle_report_split(left, right),
Expand Down
3 changes: 2 additions & 1 deletion src/server/resolve.rs
Expand Up @@ -202,7 +202,8 @@ mod tests {
fn region_heartbeat(&self,
_: metapb::Region,
_: metapb::Peer,
_: Vec<pdpb::PeerStats>)
_: Vec<pdpb::PeerStats>,
_: Vec<metapb::Peer>)
-> Result<pdpb::RegionHeartbeatResponse> {
unimplemented!();
}
Expand Down
2 changes: 1 addition & 1 deletion tests/raftstore/mod.rs
Expand Up @@ -31,6 +31,6 @@ mod test_transport;
mod test_transfer_leader;
mod test_stats;
mod test_snap;
mod test_down_peers;
mod test_region_heartbeat;
mod test_stale_peer;
mod test_lease_read;
18 changes: 15 additions & 3 deletions tests/raftstore/pd.rs
Expand Up @@ -53,6 +53,7 @@ struct Cluster {
split_count: usize,

down_peers: HashMap<u64, pdpb::PeerStats>,
pending_peers: HashMap<u64, metapb::Peer>,
}

impl Cluster {
Expand All @@ -71,6 +72,7 @@ impl Cluster {
store_stats: HashMap::new(),
split_count: 0,
down_peers: HashMap::new(),
pending_peers: HashMap::new(),
}
}

Expand Down Expand Up @@ -275,11 +277,16 @@ impl Cluster {
fn region_heartbeat(&mut self,
region: metapb::Region,
leader: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>)
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>)
-> Result<pdpb::RegionHeartbeatResponse> {
for peer in &down_peers {
self.down_peers.insert(peer.get_peer().get_id(), peer.clone());
}
self.pending_peers.clear();
for p in pending_peers {
self.pending_peers.insert(p.get_id(), p);
}
let active_peers: Vec<_> = region.get_peers()
.iter()
.filter(|p| !down_peers.iter().any(|d| p.get_id() == d.get_peer().get_id()))
Expand Down Expand Up @@ -476,6 +483,10 @@ impl TestPdClient {
pub fn get_down_peers(&self) -> HashMap<u64, pdpb::PeerStats> {
self.cluster.rl().down_peers.clone()
}

pub fn get_pending_peers(&self) -> HashMap<u64, metapb::Peer> {
self.cluster.rl().pending_peers.clone()
}
}

impl PdClient for TestPdClient {
Expand Down Expand Up @@ -537,10 +548,11 @@ impl PdClient for TestPdClient {
fn region_heartbeat(&self,
region: metapb::Region,
leader: metapb::Peer,
down_peers: Vec<pdpb::PeerStats>)
down_peers: Vec<pdpb::PeerStats>,
pending_peers: Vec<metapb::Peer>)
-> Result<pdpb::RegionHeartbeatResponse> {
try!(self.check_bootstrap());
self.cluster.wl().region_heartbeat(region, leader, down_peers)
self.cluster.wl().region_heartbeat(region, leader, down_peers, pending_peers)
}

fn ask_split(&self, region: metapb::Region) -> Result<pdpb::AskSplitResponse> {
Expand Down
@@ -1,13 +1,16 @@
use std::thread::sleep;
use std::sync::mpsc;
use std::time::{Instant, Duration};

use rand::random;
use kvproto::pdpb;
use tikv::util::HandyRwLock;

use super::node::new_node_cluster;
use super::server::new_server_cluster;
use super::cluster::{Cluster, Simulator};
use super::transport_simulate::IsolationFilterFactory;
use super::transport_simulate::*;
use super::util::*;

fn wait_down_peers<T: Simulator>(cluster: &Cluster<T>, count: u64) -> u64 {
let begin = Instant::now();
Expand Down Expand Up @@ -145,3 +148,64 @@ fn test_server_down_peers() {
let mut cluster = new_server_cluster(0, 5);
test_down_peers(&mut cluster, 5);
}

fn test_pending_peers<T: Simulator>(cluster: &mut Cluster<T>) {
let pd_client = cluster.pd_client.clone();
// Disable default max peer count check.
pd_client.disable_default_rule();

let region_id = cluster.run_conf_change();

cluster.must_put(b"k1", b"v1");

let (tx, _) = mpsc::channel();
cluster.sim.wl().add_recv_filter(2, box DropSnapshotFilter::new(tx));

pd_client.must_add_peer(region_id, new_peer(2, 2));

let mut tried_times = 0;
loop {
tried_times += 1;
if tried_times > 100 {
panic!("can't get pending peer after {} tries.", tried_times);
}
let pending_peers = cluster.pd_client.get_pending_peers();
if pending_peers.is_empty() {
sleep(Duration::from_millis(100));
} else {
assert_eq!(pending_peers[&2], new_peer(2, 2));
break;
}
}

cluster.sim.wl().clear_recv_filters(2);
cluster.must_put(b"k2", b"v2");

tried_times = 0;
loop {
tried_times += 1;
let pending_peers = cluster.pd_client.get_pending_peers();
if !pending_peers.is_empty() {
sleep(Duration::from_millis(100));
} else {
return;
}
if tried_times > 100 {
panic!("pending peer {:?} still exists after {} tries.",
pending_peers,
tried_times);
}
}
}

#[test]
fn test_node_pending_peers() {
let mut cluster = new_node_cluster(0, 3);
test_pending_peers(&mut cluster);
}

#[test]
fn test_server_pending_peers() {
let mut cluster = new_server_cluster(0, 3);
test_pending_peers(&mut cluster);
}
6 changes: 4 additions & 2 deletions tests/raftstore/transport_simulate.rs
Expand Up @@ -431,8 +431,10 @@ impl Filter<StoreMsg> for DropSnapshotFilter {
if msg.get_message().get_msg_type() != MessageType::MsgSnapshot {
true
} else {
notifier.send(msg.get_message().get_snapshot().get_metadata().get_index())
.unwrap();
let idx = msg.get_message().get_snapshot().get_metadata().get_index();
if let Err(e) = notifier.send(idx) {
error!("failed to notify snapshot {:?}: {:?}", msg, e);
}
false
}
}
Expand Down