Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix potential unreachable drop #2343

Merged
merged 4 commits into from
Oct 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
.unwrap_or_else(|e| fatal!("failed to create event loop: {:?}", e));
let store_sendch = SendCh::new(event_loop.channel(), "raftstore");
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone());
let (snap_status_sender, snap_status_receiver) = mpsc::channel();
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();

// Create kv engine, storage.
let kv_db_opts = cfg.rocksdb.build_opt();
Expand Down Expand Up @@ -201,7 +201,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
cfg.raft_store.region_split_size.0 as usize,
storage.clone(),
raft_router,
snap_status_sender,
significant_msg_sender,
resolver,
snap_mgr.clone(),
Some(engines.clone()),
Expand All @@ -215,7 +215,7 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
engines.clone(),
trans,
snap_mgr,
snap_status_receiver,
significant_msg_receiver,
).unwrap_or_else(|e| fatal!("failed to start node: {:?}", e));
initial_metric(&cfg.metric, Some(node.id()));

Expand Down
2 changes: 1 addition & 1 deletion src/raftstore/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod worker;
mod metrics;
mod local_metrics;

pub use self::msg::{BatchCallback, Callback, CopFlowStatistics, Msg, SnapshotStatusMsg, Tick};
pub use self::msg::{BatchCallback, Callback, CopFlowStatistics, Msg, SignificantMsg, Tick};
pub use self::store::{create_event_loop, Engines, Store, StoreChannel};
pub use self::config::Config;
pub use self::transport::Transport;
Expand Down
23 changes: 8 additions & 15 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,14 @@ pub enum Tick {
ConsistencyCheck,
}

pub struct SnapshotStatusMsg {
pub region_id: u64,
pub to_peer_id: u64,
pub status: SnapshotStatus,
#[derive(Debug, PartialEq)]
pub enum SignificantMsg {
SnapshotStatus {
region_id: u64,
to_peer_id: u64,
status: SnapshotStatus,
},
Unreachable { region_id: u64, to_peer_id: u64 },
}

pub enum Msg {
Expand Down Expand Up @@ -74,8 +78,6 @@ pub enum Msg {
callback: Option<Callback>,
},

ReportUnreachable { region_id: u64, to_peer_id: u64 },

// For snapshot stats.
SnapshotStats,

Expand All @@ -96,15 +98,6 @@ impl fmt::Debug for Msg {
Msg::RaftMessage(_) => write!(fmt, "Raft Message"),
Msg::RaftCmd { .. } => write!(fmt, "Raft Command"),
Msg::BatchRaftSnapCmds { .. } => write!(fmt, "Batch Raft Commands"),
Msg::ReportUnreachable {
ref region_id,
ref to_peer_id,
} => write!(
fmt,
"peer {} for region {} is unreachable",
to_peer_id,
region_id
),
Msg::SnapshotStats => write!(fmt, "Snapshot stats"),
Msg::CoprocessorStats { .. } => write!(fmt, "Coperocessor stats"),
Msg::ComputeHashResult {
Expand Down
45 changes: 14 additions & 31 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use super::worker::{ApplyRunner, ApplyTask, ApplyTaskRes, CompactRunner, Compact
RaftlogGcRunner, RaftlogGcTask, RegionRunner, RegionTask, SplitCheckRunner,
SplitCheckTask};
use super::worker::apply::{ChangePeer, ExecResult};
use super::{util, Msg, SnapManager, SnapshotDeleter, SnapshotStatusMsg, Tick};
use super::{util, Msg, SignificantMsg, SnapManager, SnapshotDeleter, Tick};
use super::keys::{self, data_end_key, data_key, enc_end_key, enc_start_key};
use super::engine::{Iterable, Peekable, Snapshot as EngineSnapshot};
use super::config::Config;
Expand Down Expand Up @@ -88,7 +88,7 @@ impl Engines {
// A helper structure to bundle all channels for messages to `Store`.
pub struct StoreChannel {
pub sender: Sender<Msg>,
pub snapshot_status_receiver: StdReceiver<SnapshotStatusMsg>,
pub significant_msg_receiver: StdReceiver<SignificantMsg>,
}

pub struct StoreStat {
Expand Down Expand Up @@ -142,8 +142,7 @@ pub struct Store<T, C: 'static> {
store: metapb::Store,
sendch: SendCh<Msg>,

sent_snapshot_count: u64,
snapshot_status_receiver: StdReceiver<SnapshotStatusMsg>,
significant_msg_receiver: StdReceiver<SignificantMsg>,

// region_id -> peers
region_peers: HashMap<u64, Peer>,
Expand Down Expand Up @@ -223,8 +222,7 @@ impl<T, C> Store<T, C> {
kv_engine: engines.kv_engine,
raft_engine: engines.raft_engine,
sendch: sendch,
sent_snapshot_count: 0,
snapshot_status_receiver: ch.snapshot_status_receiver,
significant_msg_receiver: ch.significant_msg_receiver,
region_peers: HashMap::default(),
pending_raft_groups: HashSet::default(),
split_check_worker: Worker::new("split check worker"),
Expand Down Expand Up @@ -450,22 +448,24 @@ impl<T, C> Store<T, C> {
self.cfg.clone()
}

fn poll_snapshot_status(&mut self) {
if self.sent_snapshot_count == 0 {
return;
}

fn poll_significant_msg(&mut self) {
// Poll all snapshot messages and handle them.
loop {
match self.snapshot_status_receiver.try_recv() {
Ok(SnapshotStatusMsg {
match self.significant_msg_receiver.try_recv() {
Ok(SignificantMsg::SnapshotStatus {
region_id,
to_peer_id,
status,
}) => {
// Report snapshot status to the corresponding peer.
self.report_snapshot_status(region_id, to_peer_id, status);
}
Ok(SignificantMsg::Unreachable {
region_id,
to_peer_id,
}) => if let Some(peer) = self.region_peers.get_mut(&region_id) {
peer.raft_group.report_unreachable(to_peer_id);
},
Err(TryRecvError::Empty) => {
// The snapshot status receiver channel is empty
return;
Expand All @@ -483,7 +483,6 @@ impl<T, C> Store<T, C> {
}

fn report_snapshot_status(&mut self, region_id: u64, to_peer_id: u64, status: SnapshotStatus) {
self.sent_snapshot_count -= 1;
if let Some(peer) = self.region_peers.get_mut(&region_id) {
let to_peer = match peer.get_peer_from_cache(to_peer_id) {
Some(peer) => peer,
Expand Down Expand Up @@ -661,7 +660,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
}

self.poll_snapshot_status();
self.poll_significant_msg();

timer.observe_duration();

Expand Down Expand Up @@ -1105,7 +1104,6 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let t = SlowTimer::new();
let pending_count = self.pending_raft_groups.len();
let previous_ready_metrics = self.raft_metrics.ready.clone();
let previous_sent_snapshot_count = self.raft_metrics.message.snapshot;

self.raft_metrics.ready.pending_region += pending_count as u64;

Expand Down Expand Up @@ -1181,9 +1179,6 @@ impl<T: Transport, C: PdClient> Store<T, C> {
.append_log
.observe(duration_to_sec(t.elapsed()) as f64);

let sent_snapshot_count = self.raft_metrics.message.snapshot - previous_sent_snapshot_count;
self.sent_snapshot_count += sent_snapshot_count;

slow_log!(
t,
"{} handle {} pending peers include {} ready, {} entries, {} messages and {} \
Expand Down Expand Up @@ -2205,12 +2200,6 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
}

fn on_unreachable(&mut self, region_id: u64, to_peer_id: u64) {
if let Some(peer) = self.region_peers.get_mut(&region_id) {
peer.raft_group.report_unreachable(to_peer_id);
}
}

fn handle_coprocessor_msg(&mut self, request_stats: CopFlowStatistics) {
for (region_id, stats) in &request_stats {
if let Some(peer) = self.region_peers.get_mut(region_id) {
Expand Down Expand Up @@ -2519,12 +2508,6 @@ impl<T: Transport, C: PdClient> mio::Handler for Store<T, C> {
info!("{} receive quit message", self.tag);
event_loop.shutdown();
}
Msg::ReportUnreachable {
region_id,
to_peer_id,
} => {
self.on_unreachable(region_id, to_peer_id);
}
Msg::SnapshotStats => self.store_heartbeat_pd(),
Msg::CoprocessorStats { request_stats } => self.handle_coprocessor_msg(request_stats),
Msg::ComputeHashResult {
Expand Down
14 changes: 7 additions & 7 deletions src/server/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use kvproto::raft_serverpb::StoreIdent;
use kvproto::metapb;
use protobuf::RepeatedField;
use util::transport::SendCh;
use raftstore::store::{self, keys, Config as StoreConfig, Engines, Msg, Peekable, SnapManager,
SnapshotStatusMsg, Store, StoreChannel, Transport};
use raftstore::store::{self, keys, Config as StoreConfig, Engines, Msg, Peekable, SignificantMsg,
SnapManager, Store, StoreChannel, Transport};
use super::Result;
use server::Config as ServerConfig;
use storage::{Config as StorageConfig, RaftKv, Storage};
Expand Down Expand Up @@ -123,7 +123,7 @@ where
engines: Engines,
trans: T,
snap_mgr: SnapManager,
snap_status_receiver: Receiver<SnapshotStatusMsg>,
significant_msg_receiver: Receiver<SignificantMsg>,
) -> Result<()>
where
T: Transport + 'static,
Expand Down Expand Up @@ -160,7 +160,7 @@ where
engines,
trans,
snap_mgr,
snap_status_receiver
significant_msg_receiver
));
Ok(())
}
Expand Down Expand Up @@ -323,7 +323,7 @@ where
engines: Engines,
trans: T,
snap_mgr: SnapManager,
snapshot_status_receiver: Receiver<SnapshotStatusMsg>,
significant_msg_receiver: Receiver<SignificantMsg>,
) -> Result<()>
where
T: Transport + 'static,
Expand All @@ -343,8 +343,8 @@ where
let builder = thread::Builder::new().name(thd_name!(format!("raftstore-{}", store_id)));
let h = try!(builder.spawn(move || {
let ch = StoreChannel {
sender: sender,
snapshot_status_receiver: snapshot_status_receiver,
sender,
significant_msg_receiver,
};
let mut store = match Store::new(ch, store, cfg, engines, trans, pd_client, snap_mgr) {
Err(e) => panic!("construct store {} err {:?}", store_id, e),
Expand Down
44 changes: 15 additions & 29 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use kvproto::debugpb_grpc::create_debug;

use util::worker::Worker;
use storage::Storage;
use raftstore::store::{CopFlowStatistics, Engines, Msg, SnapManager, SnapshotStatusMsg};
use raftstore::store::{CopFlowStatistics, Engines, Msg, SignificantMsg, SnapManager};

use super::{Config, Result};
use coprocessor::{CopRequestStatistics, CopSender, EndPointHost, EndPointTask, Result as CopResult};
Expand Down Expand Up @@ -92,7 +92,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
region_split_size: usize,
storage: Storage,
raft_router: T,
snapshot_status_sender: Sender<SnapshotStatusMsg>,
significant_msg_sender: Sender<SignificantMsg>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move the sender to the RaftRouter? I think we can only use raft router to communicate with raft store.

So the RaftRouter trait may add:

fn report_unreachable()
fn report_snapshot_status()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried, but got a lot of unrelated changes, maybe in another pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I think it is worth to do it later.

resolver: S,
snap_mgr: SnapManager,
debug_engines: Option<Engines>,
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
raft_client,
snap_worker.scheduler(),
raft_router.clone(),
snapshot_status_sender,
significant_msg_sender,
resolver,
);

Expand Down Expand Up @@ -211,7 +211,6 @@ mod tests {
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Sender};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};

use super::*;
use super::super::{Config, Result};
Expand All @@ -221,6 +220,7 @@ mod tests {
use kvproto::raft_serverpb::RaftMessage;
use raftstore::Result as RaftStoreResult;
use raftstore::store::Msg as StoreMsg;
use raftstore::store::*;
use raftstore::store::transport::Transport;

#[derive(Clone)]
Expand All @@ -238,16 +238,6 @@ mod tests {
#[derive(Clone)]
struct TestRaftStoreRouter {
tx: Sender<usize>,
report_unreachable_count: Arc<AtomicUsize>,
}

impl TestRaftStoreRouter {
fn new(tx: Sender<usize>) -> TestRaftStoreRouter {
TestRaftStoreRouter {
tx: tx,
report_unreachable_count: Arc::new(AtomicUsize::new(0)),
}
}
}

impl RaftStoreRouter for TestRaftStoreRouter {
Expand All @@ -260,12 +250,6 @@ mod tests {
self.tx.send(1).unwrap();
Ok(())
}

fn report_unreachable(&self, _: u64, _: u64, _: u64) -> RaftStoreResult<()> {
let count = self.report_unreachable_count.clone();
count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

#[test]
Expand All @@ -278,17 +262,16 @@ mod tests {
storage.start(&storage_cfg).unwrap();

let (tx, rx) = mpsc::channel();
let router = TestRaftStoreRouter::new(tx);
let report_unreachable_count = router.report_unreachable_count.clone();
let (snapshot_status_sender, _) = mpsc::channel();
let router = TestRaftStoreRouter { tx };
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();

let addr = Arc::new(Mutex::new(None));
let mut server = Server::new(
&cfg,
1024,
storage,
router,
snapshot_status_sender,
significant_msg_sender,
MockResolver { addr: addr.clone() },
SnapManager::new("", None),
None,
Expand All @@ -298,12 +281,15 @@ mod tests {
server.start(&cfg).unwrap();

let mut trans = server.transport();
for i in 0..10 {
if i % 2 == 1 {
trans.report_unreachable(RaftMessage::new());
trans.report_unreachable(RaftMessage::new());
assert_eq!(
significant_msg_receiver.try_recv().unwrap(),
SignificantMsg::Unreachable {
region_id: 0,
to_peer_id: 0,
}
assert_eq!(report_unreachable_count.load(Ordering::SeqCst), (i + 1) / 2);
}
);

let mut msg = RaftMessage::new();
msg.set_region_id(1);
trans.send(msg).unwrap();
Expand Down
Loading