New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: remove stale peer as soon as possible #2281

Merged
merged 5 commits into from Sep 13, 2017
Jump to file or symbol
Failed to load files and symbols.
+33 −15
Diff settings

Always

Just for now

@@ -672,22 +672,28 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let mut has_peer = false;
let mut async_remove = false;
let mut stale_peer = None;
let mut initialized = false;
if let Some(p) = self.region_peers.get_mut(&region_id) {
has_peer = true;
let target_peer_id = target.get_id();
if p.peer_id() < target_peer_id {
if p.is_applying_snapshot() && !p.mut_store().cancel_applying_snap() {
info!(
"[region {}] Stale peer {} is applying snapshot, will destroy next \
time.",
region_id,
p.peer_id()
);
return Ok(false);
initialized = p.get_store().is_initialized();
async_remove = initialized;
if p.is_applying_snapshot() {
if !p.mut_store().cancel_applying_snap() {
info!(
"[region {}] Stale peer {} is applying snapshot, will destroy next \
time.",
region_id,
p.peer_id()
);
return Ok(false);
}
// There is no tasks in apply worker.
async_remove = false;
}
p.pending_remove = true;
stale_peer = Some(p.peer.clone());
async_remove = p.get_store().is_initialized();
} else if p.peer_id() > target_peer_id {
info!(
"[region {}] target peer id {} is less than {}, msg maybe stale.",
@@ -699,15 +705,17 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
}
if let Some(p) = stale_peer {
if initialized {
self.apply_worker
.schedule(ApplyTask::destroy(region_id))
.unwrap();
}
if async_remove {
info!(
"[region {}] asking destroying stale peer {:?}",
region_id,
p
);
self.apply_worker
.schedule(ApplyTask::destroy(region_id))
.unwrap();
return Ok(false);
}
info!("[region {}] destroying stale peer {:?}", region_id, p);
@@ -1160,10 +1168,20 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
fn destroy_peer(&mut self, region_id: u64, peer: metapb::Peer) {
info!("[region {}] destroy peer {:?}", region_id, peer);
// TODO: should we check None here?
// Can we destroy it in another thread later?
let mut p = self.region_peers.remove(&region_id).unwrap();
let mut p = match self.region_peers.remove(&region_id) {
None => return,
Some(p) => if p.peer_id() == peer.get_id() {
p
} else {
assert!(p.peer_id() > peer.get_id());

This comment has been minimized.

@siddontang

siddontang Sep 12, 2017

Contributor

add a comment for how can we meet this case.

@siddontang

siddontang Sep 12, 2017

Contributor

add a comment for how can we meet this case.

// It has been destroyed.

This comment has been minimized.

@siddontang

siddontang Sep 11, 2017

Contributor

need to check p.peer_id() must > peer.get_id() here?

@siddontang

siddontang Sep 11, 2017

Contributor

need to check p.peer_id() must > peer.get_id() here?

self.region_peers.insert(region_id, p);
return;
},
};
info!("[region {}] destroy peer {:?}", region_id, peer);
// We can't destroy a peer which is applying snapshot.
assert!(!p.is_applying_snapshot());
ProTip! Use n and p to navigate between commits in a pull request.