Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: support request snapshot on leader's side #4926

Merged
merged 12 commits into from
Jul 4, 2019
11 changes: 9 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ default-features = false
[replace]
"log:0.3.9" = { git = "https://github.com/busyjay/log", branch = "use-static-module" }
"log:0.4.6" = { git = "https://github.com/busyjay/log", branch = "revert-to-static" }
"raft:0.4.3" = { git = "https://github.com/pingcap/raft-rs/", branch = "0.4.x" }

[dev-dependencies]
panic_hook = { path = "components/panic_hook" }
Expand Down
23 changes: 21 additions & 2 deletions components/test_raftstore/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.

use std::path::Path;
use std::sync::{self, Arc, RwLock};
use std::sync::{self, mpsc, Arc, RwLock};
use std::time::*;
use std::{result, thread};

Expand All @@ -20,7 +20,7 @@ use engine::Peekable;
use engine::CF_DEFAULT;
use tikv::config::TiKvConfig;
use tikv::pd::PdClient;
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::fsm::{create_raft_batch_system, PeerFsm, RaftBatchSystem, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::{Error, Result};
use tikv::server::Result as ServerResult;
Expand Down Expand Up @@ -993,6 +993,25 @@ impl<T: Simulator> Cluster<T> {
pub fn partition(&self, s1: Vec<u64>, s2: Vec<u64>) {
self.add_send_filter(PartitionFilterFactory::new(s1, s2));
}

// Request a snapshot on the given region.
pub fn must_request_snapshot(&self, store_id: u64, region_id: u64) -> u64 {
// Request snapshot.
let (request_tx, request_rx) = mpsc::channel();
let router = self.sim.rl().get_router(store_id).unwrap();
router
.send(
region_id,
PeerMsg::CasualMessage(CasualMessage::Test(Box::new(move |peer: &mut PeerFsm| {
let idx = peer.peer.raft_group.get_store().committed_index();
peer.peer.raft_group.request_snapshot(idx).unwrap();
debug!("{} request snapshot at {}", idx, peer.peer.tag);
request_tx.send(idx).unwrap();
}))),
)
.unwrap();
request_rx.recv_timeout(Duration::from_secs(5)).unwrap()
}
}

impl<T: Simulator> Drop for Cluster<T> {
Expand Down
38 changes: 38 additions & 0 deletions components/test_raftstore/src/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,26 @@ impl Filter for DropSnapshotFilter {
}
}

/// Capture the first snapshot message.
pub struct RecvSnapshotFilter {
pub notifier: Mutex<Option<Sender<RaftMessage>>>,
pub region_id: u64,
}

impl Filter for RecvSnapshotFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
for msg in msgs {
if msg.get_message().get_msg_type() == MessageType::MsgSnapshot
&& msg.get_region_id() == self.region_id
{
let tx = self.notifier.lock().unwrap().take().unwrap();
tx.send(msg.clone()).unwrap();
}
}
Ok(())
}
}

/// Filters all `filter_type` packets until seeing the `flush_type`.
///
/// The first filtered message will be flushed too.
Expand Down Expand Up @@ -715,3 +735,21 @@ impl Filter for LeaseReadFilter {
Ok(())
}
}

#[derive(Clone)]
pub struct DropMessageFilter {
ty: MessageType,
}

impl DropMessageFilter {
pub fn new(ty: MessageType) -> DropMessageFilter {
DropMessageFilter { ty }
}
}

impl Filter for DropMessageFilter {
fn before(&self, msgs: &mut Vec<RaftMessage>) -> Result<()> {
msgs.retain(|m| m.get_message().get_msg_type() != self.ty);
Ok(())
}
}
7 changes: 7 additions & 0 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,13 @@ pub fn create_test_engine(
(engines, path)
}

pub fn configure_for_request_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// We don't want to generate snapshots due to compact log.
cluster.cfg.raft_store.raft_log_gc_threshold = 1000;
cluster.cfg.raft_store.raft_log_gc_count_limit = 1000;
cluster.cfg.raft_store.raft_log_gc_size_limit = ReadableSize::mb(20);
}

pub fn configure_for_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
// Truncate the log quickly so that we can force sending snapshot.
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
Expand Down
7 changes: 4 additions & 3 deletions src/raftstore/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1584,14 +1584,13 @@ impl ApplyDelegate {
ctx: &mut ApplyContext,
req: &AdminRequest,
) -> Result<(AdminResponse, ApplyResult)> {
let apply_before_split = || {
(|| {
fail_point!(
"apply_before_split_1_3",
{ self.id == 3 && self.region_id() == 1 },
|_| {}
);
};
apply_before_split();
})();

PEER_ADMIN_CMD_COUNTER_VEC
.with_label_values(&["batch-split", "all"])
Expand Down Expand Up @@ -1702,6 +1701,8 @@ impl ApplyDelegate {
ctx: &mut ApplyContext,
req: &AdminRequest,
) -> Result<(AdminResponse, ApplyResult)> {
fail_point!("apply_before_prepare_merge");

PEER_ADMIN_CMD_COUNTER_VEC
.with_label_values(&["prepare_merge", "all"])
.inc();
Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/fsm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub use self::apply::{
pub use self::batch::{
BatchRouter, BatchSystem, Fsm, HandlerBuilder, NormalScheduler, PollHandler,
};
pub use self::peer::{DestroyPeerJob, GroupState};
pub use self::peer::{DestroyPeerJob, GroupState, PeerFsm};
pub use self::router::{BasicMailbox, Mailbox};
pub use self::store::{
create_raft_batch_system, new_compaction_listener, RaftBatchSystem, RaftPollerBuilder,
Expand Down
22 changes: 20 additions & 2 deletions src/raftstore/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub enum GroupState {
}

pub struct PeerFsm {
peer: Peer,
pub peer: Peer,
/// A registry for all scheduled ticks. This can avoid scheduling ticks twice accidentally.
tick_registry: PeerTicks,
/// Ticks for speed up campaign in chaos state.
Expand Down Expand Up @@ -358,6 +358,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.group_state = GroupState::Chaos;
self.register_raft_base_tick();
}
CasualMessage::Test(cb) => cb(self.fsm),
}
}

Expand Down Expand Up @@ -892,6 +893,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
return Ok(());
}

if !self.check_request_snapshot(&msg) {
return Ok(());
}

if util::is_vote_msg(&msg.get_message())
|| msg.get_message().get_msg_type() == MessageType::MsgTimeoutNow
{
Expand Down Expand Up @@ -1315,6 +1320,19 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
Ok(None)
}

// Check if this peer can handle request_snapshot.
fn check_request_snapshot(&mut self, msg: &RaftMessage) -> bool {
let m = msg.get_message();
let request_index = m.get_request_snapshot();
if request_index == raft::INVALID_INDEX {
// If it's not a request snapshot, then go on.
return true;
}
self.fsm
.peer
.ready_to_handle_request_snapshot(request_index)
}

fn handle_destroy_peer(&mut self, job: DestroyPeerJob) -> bool {
if job.initialized {
self.ctx
Expand Down Expand Up @@ -1750,7 +1768,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
let target_id = expect_region.get_id();
let sibling_region = expect_region;

let min_index = self.fsm.peer.get_min_progress() + 1;
let min_index = self.fsm.peer.get_min_progress()? + 1;
let low = cmp::max(min_index, state.get_min_index());
// TODO: move this into raft module.
// > over >= to include the PrepareMerge proposal.
Expand Down
7 changes: 7 additions & 0 deletions src/raftstore/store/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct RaftMessageMetrics {
pub vote: u64,
pub vote_resp: u64,
pub snapshot: u64,
pub request_snapshot: u64,
pub heartbeat: u64,
pub heartbeat_resp: u64,
pub transfer_leader: u64,
Expand Down Expand Up @@ -123,6 +124,12 @@ impl RaftMessageMetrics {
.inc_by(self.snapshot as i64);
self.snapshot = 0;
}
if self.request_snapshot > 0 {
STORE_RAFT_SENT_MESSAGE_COUNTER_VEC
.with_label_values(&["request_snapshot"])
.inc_by(self.request_snapshot as i64);
self.request_snapshot = 0;
}
if self.heartbeat > 0 {
STORE_RAFT_SENT_MESSAGE_COUNTER_VEC
.with_label_values(&["heartbeat"])
Expand Down
6 changes: 6 additions & 0 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use kvproto::raft_serverpb::RaftMessage;
use raft::SnapshotStatus;

use crate::raftstore::store::fsm::apply::TaskRes as ApplyTaskRes;
use crate::raftstore::store::fsm::PeerFsm;
use crate::raftstore::store::util::KeysInfoFormatter;
use crate::raftstore::store::SnapKey;
use crate::storage::kv::CompactedEvent;
Expand Down Expand Up @@ -209,6 +210,10 @@ pub enum CasualMessage {
ClearRegionSize,
/// Indicate a target region is overlapped.
RegionOverlapped,

/// A test only message, it is useful when we want to access
/// peer's internal state.
Test(Box<dyn FnOnce(&mut PeerFsm) + Send + 'static>),
}

impl fmt::Debug for CasualMessage {
Expand Down Expand Up @@ -250,6 +255,7 @@ impl fmt::Debug for CasualMessage {
"clear region size"
},
CasualMessage::RegionOverlapped => write!(fmt, "RegionOverlapped"),
CasualMessage::Test(_) => write!(fmt, "Test"),
}
}
}
Expand Down
Loading