Skip to content

Commit

Permalink
fix wrong last index
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Sep 2, 2016
1 parent 5975197 commit 7d90608
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 39 deletions.
13 changes: 9 additions & 4 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ pub enum Msg {

// For snapshot stats.
SnapshotStats,
SnapApplyRes { region_id: u64, is_success: bool },
SnapApplyRes {
region_id: u64,
is_success: bool,
is_abort: bool,
},
SnapGenRes {
region_id: u64,
snap: Option<Snapshot>,
Expand Down Expand Up @@ -88,11 +92,12 @@ impl fmt::Debug for Msg {
region_id)
}
Msg::SnapshotStats => write!(fmt, "Snapshot stats"),
Msg::SnapApplyRes { region_id, is_success } => {
Msg::SnapApplyRes { region_id, is_success, is_abort } => {
write!(fmt,
"SnapApplyRes [region_id: {}, is_success: {}]",
"SnapApplyRes [region_id: {}, is_success: {}, is_abort: {}]",
region_id,
is_success)
is_success,
is_abort)
}
Msg::SnapGenRes { region_id, ref snap } => {
write!(fmt,
Expand Down
11 changes: 9 additions & 2 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ impl Peer {
self.raft_group.mut_store()
}

pub fn is_applying_snap(&self) -> bool {
self.get_store().is_applying_snap()
pub fn is_applying(&self) -> bool {
self.get_store().is_applying()
}

fn send_ready_metric(&self, ready: &Ready) {
Expand Down Expand Up @@ -408,6 +408,13 @@ impl Peer {
let mut ready = self.raft_group.ready();
let is_applying = self.get_store().is_applying_snap();
if is_applying {
if !raft::is_empty_snap(&ready.snapshot) {
if !self.get_store().is_canceling_snap() {
warn!("receiving a new snap when applying the old one, try to abort.");
self.mut_store().cancle_applying_snap();
}
return Ok(None);
}
// skip apply and snapshot
ready.committed_entries = vec![];
ready.snapshot = RaftSnapshot::new();
Expand Down
61 changes: 55 additions & 6 deletions src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// limitations under the License.

use std::sync::{self, Arc};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool};
use std::cell::RefCell;
use std::error;
use std::time::Instant;
Expand Down Expand Up @@ -45,15 +45,32 @@ const MAX_SNAP_TRY_CNT: usize = 5;

pub type Ranges = Vec<(Vec<u8>, Vec<u8>)>;

#[derive(PartialEq, Debug)]
#[derive(Debug)]
pub enum SnapState {
Relax,
Generating,
Snap(Snapshot),
Applying,
Applying(Arc<AtomicBool>),
ApplyAbort,
Failed,
}

impl PartialEq for SnapState {
fn eq(&self, other: &SnapState) -> bool {
match (self, other) {
(&SnapState::Relax, &SnapState::Relax) |
(&SnapState::Generating, &SnapState::Generating) |
(&SnapState::Failed, &SnapState::Failed) |
(&SnapState::ApplyAbort, &SnapState::ApplyAbort) => true,
(&SnapState::Snap(ref s1), &SnapState::Snap(ref s2)) => s1 == s2,
(&SnapState::Applying(ref b1), &SnapState::Applying(ref b2)) => {
b1.load(Ordering::Relaxed) == b2.load(Ordering::Relaxed)
}
_ => false,
}
}
}

pub struct PeerStorage {
pub engine: Arc<DB>,

Expand Down Expand Up @@ -490,9 +507,37 @@ impl PeerStorage {
self.engine.clone()
}

/// Check whether the storage has finished applying snapshot.
#[inline]
pub fn is_applying(&self) -> bool {
match *self.snap_state.borrow() {
SnapState::Applying(_) |
SnapState::ApplyAbort => true,
_ => false,
}
}

/// Check if the storage is applying a snapshot.
#[inline]
pub fn is_applying_snap(&self) -> bool {
self.is_snap_state(SnapState::Applying)
match *self.snap_state.borrow() {
SnapState::Applying(_) => true,
_ => false,
}
}

#[inline]
pub fn is_canceling_snap(&self) -> bool {
match *self.snap_state.borrow() {
SnapState::Applying(ref abort) => abort.load(Ordering::Relaxed),
_ => false,
}
}

pub fn cancle_applying_snap(&mut self) {
if let SnapState::Applying(ref abort) = *self.snap_state.borrow() {
abort.store(true, Ordering::Relaxed);
}
}

#[inline]
Expand Down Expand Up @@ -545,7 +590,8 @@ impl PeerStorage {
self.apply_state = ctx.apply_state;
// If we apply snapshot ok, we should update some infos like applied index too.
if let Some(res) = apply_snap_res {
self.set_snap_state(SnapState::Applying);
let abort = Arc::new(AtomicBool::new(false));
self.set_snap_state(SnapState::Applying(abort.clone()));

// cleanup data before schedule apply task
if self.is_initialized() {
Expand All @@ -560,7 +606,10 @@ impl PeerStorage {
}
}

let task = SnapTask::Apply { region_id: region_id };
let task = SnapTask::Apply {
region_id: region_id,
abort: abort,
};
// TODO: gracefully remove region instead.
self.snap_sched.schedule(task).expect("snap apply job should not fail");
self.region = res.region.clone();
Expand Down
38 changes: 24 additions & 14 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::rc::Rc;
use std::cell::RefCell;
use std::option::Option;
Expand Down Expand Up @@ -171,8 +172,12 @@ impl<T: Transport, C: PdClient> Store<T, C> {
info!("region {:?} is applying in store {}",
local_state.get_region(),
self.store_id());
peer.mut_store().set_snap_state(SnapState::Applying);
box_try!(self.snap_worker.schedule(SnapTask::Apply { region_id: region_id }));
let abort = Arc::new(AtomicBool::new(false));
peer.mut_store().set_snap_state(SnapState::Applying(abort.clone()));
box_try!(self.snap_worker.schedule(SnapTask::Apply {
region_id: region_id,
abort: abort,
}));
}

self.region_ranges.insert(enc_end_key(region), region_id);
Expand Down Expand Up @@ -282,7 +287,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {

fn on_raft_base_tick(&mut self, event_loop: &mut EventLoop<Self>) {
for (&region_id, peer) in &mut self.region_peers {
if !peer.get_store().is_applying_snap() {
if !peer.get_store().is_applying() {
peer.raft_group.tick();
self.pending_raft_groups.insert(region_id);
}
Expand All @@ -300,7 +305,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
has_peer = true;
let target_peer_id = target.get_id();
if p.peer_id() < target_peer_id {
if p.is_applying_snap() {
if p.is_applying() {
// to remove the applying peer, we should find a reliable way to abort
// the apply process.
warn!("Stale peer {} is applying snapshot, will destroy next time.",
Expand Down Expand Up @@ -598,7 +603,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
// Can we destroy it in another thread later?
let mut p = self.region_peers.remove(&region_id).unwrap();
// We can't destroy a peer which is applying snapshot.
assert!(!p.is_applying_snap());
assert!(!p.is_applying());

let is_initialized = p.is_initialized();
let end_key = enc_end_key(p.region());
Expand Down Expand Up @@ -1123,7 +1128,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let s = peer.get_store();
compacted_idx = s.truncated_index();
compacted_term = s.truncated_term();
is_applying_snap = s.is_applying_snap();
is_applying_snap = s.is_applying();
}
};
}
Expand Down Expand Up @@ -1223,16 +1228,21 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
}

fn on_snap_apply_res(&mut self, region_id: u64, is_success: bool) {
fn on_snap_apply_res(&mut self, region_id: u64, is_success: bool, is_abort: bool) {
let peer = self.region_peers.get_mut(&region_id).unwrap();
let mut storage = peer.mut_store();
assert!(storage.is_snap_state(SnapState::Applying),
assert!(storage.is_applying_snap(),
"snap state should not change during applying");
if !is_success {
// TODO: cleanup region and treat it as tombstone.
panic!("applying snapshot to {} failed", region_id);
if is_success {
storage.set_snap_state(SnapState::Relax);
} else {
if !is_abort {
// TODO: cleanup region and treat it as tombstone.
panic!("applying snapshot to {} failed", region_id);
}
self.pending_raft_groups.insert(region_id);
storage.set_snap_state(SnapState::ApplyAbort);
}
storage.set_snap_state(SnapState::Relax);
}
}

Expand Down Expand Up @@ -1296,8 +1306,8 @@ impl<T: Transport, C: PdClient> mio::Handler for Store<T, C> {
self.on_unreachable(region_id, to_peer_id);
}
Msg::SnapshotStats => self.store_heartbeat_pd(),
Msg::SnapApplyRes { region_id, is_success } => {
self.on_snap_apply_res(region_id, is_success);
Msg::SnapApplyRes { region_id, is_success, is_abort } => {
self.on_snap_apply_res(region_id, is_success, is_abort);
}
Msg::SnapGenRes { region_id, snap } => {
self.on_snap_gen_res(region_id, snap);
Expand Down

0 comments on commit 7d90608

Please sign in to comment.