Skip to content

Commit

Permalink
*: move significant send to raft router (#2363)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddontang committed Oct 11, 2017
1 parent d60161f commit cfdb313
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 58 deletions.
3 changes: 1 addition & 2 deletions src/bin/tikv-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
let mut event_loop = store::create_event_loop(&cfg.raft_store)
.unwrap_or_else(|e| fatal!("failed to create event loop: {:?}", e));
let store_sendch = SendCh::new(event_loop.channel(), "raftstore");
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone());
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone(), significant_msg_sender);

// Create kv engine, storage.
let kv_db_opts = cfg.rocksdb.build_opt();
Expand Down Expand Up @@ -205,7 +205,6 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
cfg.raft_store.region_split_size.0 as usize,
storage.clone(),
raft_router,
significant_msg_sender,
resolver,
snap_mgr.clone(),
Some(engines.clone()),
Expand Down
17 changes: 11 additions & 6 deletions src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

use std::sync::{Arc, RwLock};
use std::sync::mpsc::Sender;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;

Expand All @@ -22,7 +21,7 @@ use kvproto::debugpb_grpc::create_debug;

use util::worker::Worker;
use storage::Storage;
use raftstore::store::{CopFlowStatistics, Engines, Msg, SignificantMsg, SnapManager};
use raftstore::store::{CopFlowStatistics, Engines, Msg, SnapManager};

use super::{Config, Result};
use coprocessor::{CopRequestStatistics, CopSender, EndPointHost, EndPointTask, Result as CopResult};
Expand Down Expand Up @@ -92,7 +91,6 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
region_split_size: usize,
storage: Storage,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
resolver: S,
snap_mgr: SnapManager,
debug_engines: Option<Engines>,
Expand Down Expand Up @@ -142,7 +140,6 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
raft_client,
snap_worker.scheduler(),
raft_router.clone(),
significant_msg_sender,
resolver,
);

Expand Down Expand Up @@ -238,6 +235,7 @@ mod tests {
#[derive(Clone)]
struct TestRaftStoreRouter {
tx: Sender<usize>,
significant_msg_sender: Sender<SignificantMsg>,
}

impl RaftStoreRouter for TestRaftStoreRouter {
Expand All @@ -250,6 +248,11 @@ mod tests {
self.tx.send(1).unwrap();
Ok(())
}

fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()> {
self.significant_msg_sender.send(msg).unwrap();
Ok(())
}
}

#[test]
Expand All @@ -262,16 +265,18 @@ mod tests {
storage.start(&storage_cfg).unwrap();

let (tx, rx) = mpsc::channel();
let router = TestRaftStoreRouter { tx };
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();
let router = TestRaftStoreRouter {
tx: tx,
significant_msg_sender: significant_msg_sender,
};

let addr = Arc::new(Mutex::new(None));
let mut server = Server::new(
&cfg,
1024,
storage,
router,
significant_msg_sender,
MockResolver { addr: addr.clone() },
SnapManager::new("", None),
None,
Expand Down
77 changes: 52 additions & 25 deletions src/server/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,48 @@ pub trait RaftStoreRouter: Send + Clone {
) -> RaftStoreResult<()> {
self.try_send(StoreMsg::new_batch_raft_snapshot_cmd(batch, on_finished))
}

// Send significant message. We should guarantee that the message can't be dropped.
fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()>;

// Report the peer of the region is unreachable.
fn report_unreachable(&self, region_id: u64, to_peer_id: u64) -> RaftStoreResult<()> {
self.significant_send(SignificantMsg::Unreachable {
region_id,
to_peer_id,
})
}

// Report the sending snapshot status to the peer of the region.
fn report_snapshot_status(
&self,
region_id: u64,
to_peer_id: u64,
status: SnapshotStatus,
) -> RaftStoreResult<()> {
self.significant_send(SignificantMsg::SnapshotStatus {
region_id: region_id,
to_peer_id: to_peer_id,
status: status,
})
}
}

#[derive(Clone)]
pub struct ServerRaftStoreRouter {
pub ch: SendCh<StoreMsg>,
pub significant_msg_sender: Sender<SignificantMsg>,
}

impl ServerRaftStoreRouter {
pub fn new(ch: SendCh<StoreMsg>) -> ServerRaftStoreRouter {
ServerRaftStoreRouter { ch: ch }
pub fn new(
ch: SendCh<StoreMsg>,
significant_msg_sender: Sender<SignificantMsg>,
) -> ServerRaftStoreRouter {
ServerRaftStoreRouter {
ch: ch,
significant_msg_sender: significant_msg_sender,
}
}
}

Expand Down Expand Up @@ -94,6 +126,14 @@ impl RaftStoreRouter for ServerRaftStoreRouter {
) -> RaftStoreResult<()> {
self.try_send(StoreMsg::new_batch_raft_snapshot_cmd(batch, on_finished))
}

fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()> {
if let Err(e) = self.significant_msg_sender.send(msg) {
return Err(box_err!("failed to sendsignificant msg {:?}", e));
}

Ok(())
}
}

pub struct ServerTransport<T, S>
Expand All @@ -103,8 +143,7 @@ where
{
raft_client: Arc<RwLock<RaftClient>>,
snap_scheduler: Scheduler<SnapTask>,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
pub raft_router: T,
resolving: Arc<RwLock<HashSet<u64>>>,
resolver: S,
}
Expand All @@ -119,7 +158,6 @@ where
raft_client: self.raft_client.clone(),
snap_scheduler: self.snap_scheduler.clone(),
raft_router: self.raft_router.clone(),
significant_msg_sender: self.significant_msg_sender.clone(),
resolving: self.resolving.clone(),
resolver: self.resolver.clone(),
}
Expand All @@ -131,14 +169,12 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
raft_client: Arc<RwLock<RaftClient>>,
snap_scheduler: Scheduler<SnapTask>,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
resolver: S,
) -> ServerTransport<T, S> {
ServerTransport {
raft_client: raft_client,
snap_scheduler: snap_scheduler,
raft_router: raft_router,
significant_msg_sender: significant_msg_sender,
resolving: Arc::new(RwLock::new(Default::default())),
resolver: resolver,
}
Expand Down Expand Up @@ -230,13 +266,13 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
}
}

fn new_snapshot_reporter(&self, msg: &RaftMessage) -> SnapshotReporter {
fn new_snapshot_reporter(&self, msg: &RaftMessage) -> SnapshotReporter<T> {
let region_id = msg.get_region_id();
let to_peer_id = msg.get_to_peer().get_id();
let to_store_id = msg.get_to_peer().get_store_id();

SnapshotReporter {
significant_msg_sender: self.significant_msg_sender.clone(),
raft_router: self.raft_router.clone(),
region_id: region_id,
to_peer_id: to_peer_id,
to_store_id: to_store_id,
Expand All @@ -248,12 +284,7 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
let to_peer_id = msg.get_to_peer().get_id();
let store_id = msg.get_to_peer().get_store_id();

if let Err(e) = self.significant_msg_sender.send(
SignificantMsg::Unreachable {
region_id,
to_peer_id,
},
) {
if let Err(e) = self.raft_router.report_unreachable(region_id, to_peer_id) {
error!(
"report peer {} on store {} unreachable for region {} failed {:?}",
to_peer_id,
Expand Down Expand Up @@ -285,14 +316,14 @@ where
}
}

struct SnapshotReporter {
significant_msg_sender: Sender<SignificantMsg>,
struct SnapshotReporter<T: RaftStoreRouter + 'static> {
raft_router: T,
region_id: u64,
to_peer_id: u64,
to_store_id: u64,
}

impl SnapshotReporter {
impl<T: RaftStoreRouter + 'static> SnapshotReporter<T> {
pub fn report(&self, status: SnapshotStatus) {
debug!(
"send snapshot to {} for {} {:?}",
Expand All @@ -308,13 +339,9 @@ impl SnapshotReporter {
.inc();
};

if let Err(e) = self.significant_msg_sender.send(
SignificantMsg::SnapshotStatus {
region_id: self.region_id,
to_peer_id: self.to_peer_id,
status: status,
},
) {
if let Err(e) = self.raft_router
.report_snapshot_status(self.region_id, self.to_peer_id, status)
{
error!(
"report snapshot to peer {} in store {} with region {} err {:?}",
self.to_peer_id,
Expand Down
21 changes: 4 additions & 17 deletions tests/raftstore/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@


use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::sync::{mpsc, Arc, RwLock};
use std::time::Duration;
use std::boxed::FnBox;
use std::ops::Deref;
Expand All @@ -41,7 +40,6 @@ use super::transport_simulate::*;
pub struct ChannelTransportCore {
snap_paths: HashMap<u64, (SnapManager, TempDir)>,
routers: HashMap<u64, SimulateTransport<Msg, ServerRaftStoreRouter>>,
snapshot_status_senders: HashMap<u64, Mutex<Sender<SignificantMsg>>>,
}

#[derive(Clone)]
Expand All @@ -55,7 +53,6 @@ impl ChannelTransport {
core: Arc::new(RwLock::new(ChannelTransportCore {
snap_paths: HashMap::new(),
routers: HashMap::new(),
snapshot_status_senders: HashMap::new(),
})),
}
}
Expand Down Expand Up @@ -115,14 +112,8 @@ impl Channel<RaftMessage> for ChannelTransport {
if is_snapshot {
// should report snapshot finish.
let core = self.rl();
core.snapshot_status_senders[&from_store]
.lock()
.unwrap()
.send(SignificantMsg::SnapshotStatus {
region_id: region_id,
to_peer_id: to_peer_id,
status: SnapshotStatus::Finish,
})
core.routers[&from_store]
.report_snapshot_status(region_id, to_peer_id, SnapshotStatus::Finish)
.unwrap();
}
Ok(())
Expand Down Expand Up @@ -217,15 +208,11 @@ impl Simulator for NodeCluster {
}

let node_id = node.id();
let router = ServerRaftStoreRouter::new(node.get_sendch());
let router = ServerRaftStoreRouter::new(node.get_sendch(), snap_status_sender.clone());
self.trans
.wl()
.routers
.insert(node_id, SimulateTransport::new(router));
self.trans
.wl()
.snapshot_status_senders
.insert(node_id, Mutex::new(snap_status_sender));
self.nodes.insert(node_id, node);
self.simulate_trans.insert(node_id, simulate_trans);

Expand Down
5 changes: 2 additions & 3 deletions tests/raftstore/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ impl Simulator for ServerCluster {
// Initialize raftstore channels.
let mut event_loop = store::create_event_loop(&cfg.raft_store).unwrap();
let store_sendch = SendCh::new(event_loop.channel(), "raftstore");
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone());
let sim_router = SimulateTransport::new(raft_router);
let (snap_status_sender, snap_status_receiver) = mpsc::channel();
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone(), snap_status_sender);
let sim_router = SimulateTransport::new(raft_router);

// Create storage.
let mut store =
Expand All @@ -127,7 +127,6 @@ impl Simulator for ServerCluster {
cfg.raft_store.region_split_size.0 as usize,
store.clone(),
sim_router.clone(),
snap_status_sender,
resolver,
snap_mgr.clone(),
Some(engines.clone()),
Expand Down
Loading

0 comments on commit cfdb313

Please sign in to comment.