From 7d9060814eaee377ef513e55bbffafa919f0d417 Mon Sep 17 00:00:00 2001 From: Jay Lee Date: Fri, 2 Sep 2016 19:58:49 +0800 Subject: [PATCH] fix wrong last index --- src/raftstore/store/msg.rs | 13 ++-- src/raftstore/store/peer.rs | 11 ++- src/raftstore/store/peer_storage.rs | 61 ++++++++++++++-- src/raftstore/store/store.rs | 38 ++++++---- src/raftstore/store/worker/snap.rs | 103 ++++++++++++++++++++++++---- 5 files changed, 187 insertions(+), 39 deletions(-) diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index b0a239838d1..30bc338177f 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -60,7 +60,11 @@ pub enum Msg { // For snapshot stats. SnapshotStats, - SnapApplyRes { region_id: u64, is_success: bool }, + SnapApplyRes { + region_id: u64, + is_success: bool, + is_abort: bool, + }, SnapGenRes { region_id: u64, snap: Option, @@ -88,11 +92,12 @@ impl fmt::Debug for Msg { region_id) } Msg::SnapshotStats => write!(fmt, "Snapshot stats"), - Msg::SnapApplyRes { region_id, is_success } => { + Msg::SnapApplyRes { region_id, is_success, is_abort } => { write!(fmt, - "SnapApplyRes [region_id: {}, is_success: {}]", + "SnapApplyRes [region_id: {}, is_success: {}, is_abort: {}]", region_id, - is_success) + is_success, + is_abort) } Msg::SnapGenRes { region_id, ref snap } => { write!(fmt, diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index ae0bb954e32..66b022dd726 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -323,8 +323,8 @@ impl Peer { self.raft_group.mut_store() } - pub fn is_applying_snap(&self) -> bool { - self.get_store().is_applying_snap() + pub fn is_applying(&self) -> bool { + self.get_store().is_applying() } fn send_ready_metric(&self, ready: &Ready) { @@ -408,6 +408,13 @@ impl Peer { let mut ready = self.raft_group.ready(); let is_applying = self.get_store().is_applying_snap(); if is_applying { + if !raft::is_empty_snap(&ready.snapshot) { + if !self.get_store().is_canceling_snap() { + warn!("receiving a new snap when applying the old one, try to abort."); + self.mut_store().cancle_applying_snap(); + } + return Ok(None); + } // skip apply and snapshot ready.committed_entries = vec![]; ready.snapshot = RaftSnapshot::new(); diff --git a/src/raftstore/store/peer_storage.rs b/src/raftstore/store/peer_storage.rs index d23a5da665e..a246524cb88 100644 --- a/src/raftstore/store/peer_storage.rs +++ b/src/raftstore/store/peer_storage.rs @@ -12,7 +12,7 @@ // limitations under the License. use std::sync::{self, Arc}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering, AtomicBool}; use std::cell::RefCell; use std::error; use std::time::Instant; @@ -45,15 +45,32 @@ const MAX_SNAP_TRY_CNT: usize = 5; pub type Ranges = Vec<(Vec, Vec)>; -#[derive(PartialEq, Debug)] +#[derive(Debug)] pub enum SnapState { Relax, Generating, Snap(Snapshot), - Applying, + Applying(Arc), + ApplyAbort, Failed, } +impl PartialEq for SnapState { + fn eq(&self, other: &SnapState) -> bool { + match (self, other) { + (&SnapState::Relax, &SnapState::Relax) | + (&SnapState::Generating, &SnapState::Generating) | + (&SnapState::Failed, &SnapState::Failed) | + (&SnapState::ApplyAbort, &SnapState::ApplyAbort) => true, + (&SnapState::Snap(ref s1), &SnapState::Snap(ref s2)) => s1 == s2, + (&SnapState::Applying(ref b1), &SnapState::Applying(ref b2)) => { + b1.load(Ordering::Relaxed) == b2.load(Ordering::Relaxed) + } + _ => false, + } + } +} + pub struct PeerStorage { pub engine: Arc, @@ -490,9 +507,37 @@ impl PeerStorage { self.engine.clone() } + /// Check whether the storage has finished applying snapshot. + #[inline] + pub fn is_applying(&self) -> bool { + match *self.snap_state.borrow() { + SnapState::Applying(_) | + SnapState::ApplyAbort => true, + _ => false, + } + } + + /// Check if the storage is applying a snapshot. #[inline] pub fn is_applying_snap(&self) -> bool { - self.is_snap_state(SnapState::Applying) + match *self.snap_state.borrow() { + SnapState::Applying(_) => true, + _ => false, + } + } + + #[inline] + pub fn is_canceling_snap(&self) -> bool { + match *self.snap_state.borrow() { + SnapState::Applying(ref abort) => abort.load(Ordering::Relaxed), + _ => false, + } + } + + pub fn cancle_applying_snap(&mut self) { + if let SnapState::Applying(ref abort) = *self.snap_state.borrow() { + abort.store(true, Ordering::Relaxed); + } } #[inline] @@ -545,7 +590,8 @@ impl PeerStorage { self.apply_state = ctx.apply_state; // If we apply snapshot ok, we should update some infos like applied index too. if let Some(res) = apply_snap_res { - self.set_snap_state(SnapState::Applying); + let abort = Arc::new(AtomicBool::new(false)); + self.set_snap_state(SnapState::Applying(abort.clone())); // cleanup data before schedule apply task if self.is_initialized() { @@ -560,7 +606,10 @@ impl PeerStorage { } } - let task = SnapTask::Apply { region_id: region_id }; + let task = SnapTask::Apply { + region_id: region_id, + abort: abort, + }; // TODO: gracefully remove region instead. self.snap_sched.schedule(task).expect("snap apply job should not fail"); self.region = res.region.clone(); diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index 4e8945bb623..11dca3bf6e0 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -12,6 +12,7 @@ // limitations under the License. use std::sync::Arc; +use std::sync::atomic::AtomicBool; use std::rc::Rc; use std::cell::RefCell; use std::option::Option; @@ -171,8 +172,12 @@ impl Store { info!("region {:?} is applying in store {}", local_state.get_region(), self.store_id()); - peer.mut_store().set_snap_state(SnapState::Applying); - box_try!(self.snap_worker.schedule(SnapTask::Apply { region_id: region_id })); + let abort = Arc::new(AtomicBool::new(false)); + peer.mut_store().set_snap_state(SnapState::Applying(abort.clone())); + box_try!(self.snap_worker.schedule(SnapTask::Apply { + region_id: region_id, + abort: abort, + })); } self.region_ranges.insert(enc_end_key(region), region_id); @@ -282,7 +287,7 @@ impl Store { fn on_raft_base_tick(&mut self, event_loop: &mut EventLoop) { for (®ion_id, peer) in &mut self.region_peers { - if !peer.get_store().is_applying_snap() { + if !peer.get_store().is_applying() { peer.raft_group.tick(); self.pending_raft_groups.insert(region_id); } @@ -300,7 +305,7 @@ impl Store { has_peer = true; let target_peer_id = target.get_id(); if p.peer_id() < target_peer_id { - if p.is_applying_snap() { + if p.is_applying() { // to remove the applying peer, we should find a reliable way to abort // the apply process. warn!("Stale peer {} is applying snapshot, will destroy next time.", @@ -598,7 +603,7 @@ impl Store { // Can we destroy it in another thread later? let mut p = self.region_peers.remove(®ion_id).unwrap(); // We can't destroy a peer which is applying snapshot. - assert!(!p.is_applying_snap()); + assert!(!p.is_applying()); let is_initialized = p.is_initialized(); let end_key = enc_end_key(p.region()); @@ -1123,7 +1128,7 @@ impl Store { let s = peer.get_store(); compacted_idx = s.truncated_index(); compacted_term = s.truncated_term(); - is_applying_snap = s.is_applying_snap(); + is_applying_snap = s.is_applying(); } }; } @@ -1223,16 +1228,21 @@ impl Store { } } - fn on_snap_apply_res(&mut self, region_id: u64, is_success: bool) { + fn on_snap_apply_res(&mut self, region_id: u64, is_success: bool, is_abort: bool) { let peer = self.region_peers.get_mut(®ion_id).unwrap(); let mut storage = peer.mut_store(); - assert!(storage.is_snap_state(SnapState::Applying), + assert!(storage.is_applying_snap(), "snap state should not change during applying"); - if !is_success { - // TODO: cleanup region and treat it as tombstone. - panic!("applying snapshot to {} failed", region_id); + if is_success { + storage.set_snap_state(SnapState::Relax); + } else { + if !is_abort { + // TODO: cleanup region and treat it as tombstone. + panic!("applying snapshot to {} failed", region_id); + } + self.pending_raft_groups.insert(region_id); + storage.set_snap_state(SnapState::ApplyAbort); } - storage.set_snap_state(SnapState::Relax); } } @@ -1296,8 +1306,8 @@ impl mio::Handler for Store { self.on_unreachable(region_id, to_peer_id); } Msg::SnapshotStats => self.store_heartbeat_pd(), - Msg::SnapApplyRes { region_id, is_success } => { - self.on_snap_apply_res(region_id, is_success); + Msg::SnapApplyRes { region_id, is_success, is_abort } => { + self.on_snap_apply_res(region_id, is_success, is_abort); } Msg::SnapGenRes { region_id, snap } => { self.on_snap_gen_res(region_id, snap); diff --git a/src/raftstore/store/worker/snap.rs b/src/raftstore/store/worker/snap.rs index 0144eb7abd1..f3ec679a453 100644 --- a/src/raftstore/store/worker/snap.rs +++ b/src/raftstore/store/worker/snap.rs @@ -16,6 +16,7 @@ use std::fmt::{self, Formatter, Display}; use std::error; use std::fs::File; use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Instant; use std::str; @@ -27,7 +28,7 @@ use util::codec::bytes::CompactBytesDecoder; use util::{escape, HandyRwLock, rocksdb}; use util::transport::SendCh; use raftstore; -use raftstore::store::engine::{Mutable, Snapshot, delete_all_in_range}; +use raftstore::store::engine::{Mutable, Snapshot, Iterable}; use raftstore::store::{self, SnapManager, SnapKey, SnapEntry, Msg, keys, Peekable}; use storage::CF_RAFT; @@ -36,7 +37,10 @@ const BATCH_SIZE: usize = 1024 * 1024 * 10; // 10m /// Snapshot related task. pub enum Task { Gen { region_id: u64 }, - Apply { region_id: u64 }, + Apply { + region_id: u64, + abort: Arc, + }, } impl Display for Task { @@ -51,6 +55,10 @@ impl Display for Task { quick_error! { #[derive(Debug)] enum Error { + Abort { + description("abort") + display("abort") + } Other(err: Box) { from() cause(err.as_ref()) @@ -70,6 +78,58 @@ impl MsgSender for SendCh { } } +#[inline] +fn check_abort(abort: &AtomicBool) -> Result<(), Error> { + if abort.load(Ordering::Relaxed) { + return Err(Error::Abort); + } + Ok(()) +} + +fn delete_all_in_range(db: &DB, + start_key: &[u8], + end_key: &[u8], + abort: &AtomicBool) + -> Result<(), Error> { + for cf in db.cf_names() { + try!(check_abort(&abort)); + let handle = box_try!(rocksdb::get_cf_handle(db, cf)); + box_try!(db.delete_file_in_range_cf(*handle, start_key, end_key)); + + let mut it = box_try!(db.new_iterator_cf(cf)); + + let mut wb = WriteBatch::new(); + try!(check_abort(&abort)); + it.seek(start_key.into()); + while it.valid() { + { + let key = it.key(); + if key >= end_key { + break; + } + + box_try!(wb.delete_cf(*handle, key)); + if wb.count() == BATCH_SIZE { + // Can't use write_without_wal here. + // Otherwise it may cause dirty data when applying snapshot. + box_try!(db.write(wb)); + wb = WriteBatch::new(); + } + }; + try!(check_abort(&abort)); + if !it.next() { + break; + } + } + + if wb.count() > 0 { + box_try!(db.write(wb)); + } + } + + Ok(()) +} + // TODO: seperate snap generate and apply to different thread. pub struct Runner { db: Arc, @@ -118,8 +178,9 @@ impl Runner { metric_time!("raftstore.generate_snap.cost", ts.elapsed()); } - fn apply_snap(&self, region_id: u64) -> Result<(), Error> { + fn apply_snap(&self, region_id: u64, abort: Arc) -> Result<(), Error> { info!("begin apply snap data for {}", region_id); + try!(check_abort(&abort)); let region_key = keys::region_state_key(region_id); let mut region_state: RegionLocalState = match box_try!(self.db.get_msg(®ion_key)) { Some(state) => state, @@ -129,7 +190,7 @@ impl Runner { // clear up origin data. let start_key = keys::enc_start_key(region_state.get_region()); let end_key = keys::enc_end_key(region_state.get_region()); - box_try!(delete_all_in_range(self.db.as_ref(), &start_key, &end_key)); + box_try!(delete_all_in_range(self.db.as_ref(), &start_key, &end_key, &abort)); let state_key = keys::apply_state_key(region_id); let apply_state: RaftApplyState = match box_try!(self.db.get_msg_cf(CF_RAFT, &state_key)) { @@ -147,6 +208,7 @@ impl Runner { if !snap_file.exists() { return Err(box_err!("missing snap file {}", snap_file.path().display())); } + try!(check_abort(&abort)); box_try!(snap_file.validate()); let mut reader = box_try!(File::open(snap_file.path())); @@ -154,6 +216,7 @@ impl Runner { // Write the snapshot into the region. loop { // TODO: avoid too many allocation + try!(check_abort(&abort)); let cf = box_try!(reader.decode_compact_bytes()); if cf.is_empty() { break; @@ -163,6 +226,7 @@ impl Runner { let mut wb = WriteBatch::new(); let mut batch_size = 0; loop { + try!(check_abort(&abort)); let key = box_try!(reader.decode_compact_bytes()); if key.is_empty() { box_try!(self.db.write(wb)); @@ -187,25 +251,38 @@ impl Runner { Ok(()) } - fn handle_apply(&self, region_id: u64) { + fn handle_apply(&self, region_id: u64, abort: Arc) { metric_incr!("raftstore.apply_snap"); let ts = Instant::now(); - let mut is_success = true; - if let Err(e) = self.apply_snap(region_id) { - is_success = false; - error!("failed to apply snap: {:?}!!!", e); - } + let (is_success, is_abort) = match self.apply_snap(region_id, abort) { + Ok(()) => (true, false), + Err(Error::Abort) => { + warn!("applying snapshot for region {} is abort.", region_id); + (false, true) + } + Err(e) => { + error!("failed to apply snap: {:?}!!!", e); + (false, false) + } + }; let msg = Msg::SnapApplyRes { region_id: region_id, is_success: is_success, + is_abort: is_abort, }; if let Err(e) = self.ch.send(msg) { panic!("failed to notify snap apply result of {}: {:?}", region_id, e); } - metric_incr!("raftstore.apply_snap.success"); - metric_time!("raftstore.apply_snap.cost", ts.elapsed()); + if is_abort { + metric_incr!("raftstore.apply_snap.abort"); + } else if is_success { + metric_incr!("raftstore.apply_snap.success"); + metric_time!("raftstore.apply_snap.cost", ts.elapsed()); + } else { + metric_incr!("raftstore.apply_snap.fail"); + } } } @@ -213,7 +290,7 @@ impl Runnable for Runner { fn run(&mut self, task: Task) { match task { Task::Gen { region_id } => self.handle_gen(region_id), - Task::Apply { region_id } => self.handle_apply(region_id), + Task::Apply { region_id, abort } => self.handle_apply(region_id, abort), } } }