Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: move significant send to raft router #2363

Merged
merged 4 commits into from Oct 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/bin/tikv-server.rs
Expand Up @@ -163,8 +163,8 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
let mut event_loop = store::create_event_loop(&cfg.raft_store)
.unwrap_or_else(|e| fatal!("failed to create event loop: {:?}", e));
let store_sendch = SendCh::new(event_loop.channel(), "raftstore");
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone());
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone(), significant_msg_sender);

// Create kv engine, storage.
let kv_db_opts = cfg.rocksdb.build_opt();
Expand Down Expand Up @@ -201,7 +201,6 @@ fn run_raft_server(pd_client: RpcClient, cfg: &TiKvConfig) {
cfg.raft_store.region_split_size.0 as usize,
storage.clone(),
raft_router,
significant_msg_sender,
resolver,
snap_mgr.clone(),
Some(engines.clone()),
Expand Down
17 changes: 11 additions & 6 deletions src/server/server.rs
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

use std::sync::{Arc, RwLock};
use std::sync::mpsc::Sender;
use std::net::{IpAddr, SocketAddr};
use std::str::FromStr;

Expand All @@ -22,7 +21,7 @@ use kvproto::debugpb_grpc::create_debug;

use util::worker::Worker;
use storage::Storage;
use raftstore::store::{CopFlowStatistics, Engines, Msg, SignificantMsg, SnapManager};
use raftstore::store::{CopFlowStatistics, Engines, Msg, SnapManager};

use super::{Config, Result};
use coprocessor::{CopRequestStatistics, CopSender, EndPointHost, EndPointTask, Result as CopResult};
Expand Down Expand Up @@ -92,7 +91,6 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
region_split_size: usize,
storage: Storage,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
resolver: S,
snap_mgr: SnapManager,
debug_engines: Option<Engines>,
Expand Down Expand Up @@ -142,7 +140,6 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
raft_client,
snap_worker.scheduler(),
raft_router.clone(),
significant_msg_sender,
resolver,
);

Expand Down Expand Up @@ -238,6 +235,7 @@ mod tests {
#[derive(Clone)]
struct TestRaftStoreRouter {
tx: Sender<usize>,
significant_msg_sender: Sender<SignificantMsg>,
}

impl RaftStoreRouter for TestRaftStoreRouter {
Expand All @@ -250,6 +248,11 @@ mod tests {
self.tx.send(1).unwrap();
Ok(())
}

fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()> {
self.significant_msg_sender.send(msg).unwrap();
Ok(())
}
}

#[test]
Expand All @@ -262,16 +265,18 @@ mod tests {
storage.start(&storage_cfg).unwrap();

let (tx, rx) = mpsc::channel();
let router = TestRaftStoreRouter { tx };
let (significant_msg_sender, significant_msg_receiver) = mpsc::channel();
let router = TestRaftStoreRouter {
tx: tx,
significant_msg_sender: significant_msg_sender,
};

let addr = Arc::new(Mutex::new(None));
let mut server = Server::new(
&cfg,
1024,
storage,
router,
significant_msg_sender,
MockResolver { addr: addr.clone() },
SnapManager::new("", None),
None,
Expand Down
81 changes: 55 additions & 26 deletions src/server/transport.rs
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::net::SocketAddr;
use kvproto::raft_serverpb::RaftMessage;
Expand Down Expand Up @@ -55,16 +55,50 @@ pub trait RaftStoreRouter: Send + Clone {
) -> RaftStoreResult<()> {
self.try_send(StoreMsg::new_batch_raft_snapshot_cmd(batch, on_finished))
}

// Send significant message. We should guarantee that the message can't be dropped.
fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()>;

// Report the peer of the region is unreachable.
fn report_unreachable(&self, region_id: u64, to_peer_id: u64) -> RaftStoreResult<()> {
self.significant_send(SignificantMsg::Unreachable {
region_id,
to_peer_id,
})
}

// Report the sending snapshot status to the peer of the region.
fn report_snapshot_status(
&self,
region_id: u64,
to_peer_id: u64,
status: SnapshotStatus,
) -> RaftStoreResult<()> {
self.significant_send(SignificantMsg::SnapshotStatus {
region_id: region_id,
to_peer_id: to_peer_id,
status: status,
})
}
}

#[derive(Clone)]
pub struct ServerRaftStoreRouter {
pub ch: SendCh<StoreMsg>,
// Some tests need Sync for ServerRaftStoreRouter, so here we use Arc + Mutex here.
// TODO: refactor the test and remove the limitation later.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be more reasonable to move the lock to tests.

pub significant_msg_sender: Arc<Mutex<Sender<SignificantMsg>>>,
}

impl ServerRaftStoreRouter {
pub fn new(ch: SendCh<StoreMsg>) -> ServerRaftStoreRouter {
ServerRaftStoreRouter { ch: ch }
pub fn new(
ch: SendCh<StoreMsg>,
significant_msg_sender: Sender<SignificantMsg>,
) -> ServerRaftStoreRouter {
ServerRaftStoreRouter {
ch: ch,
significant_msg_sender: Arc::new(Mutex::new(significant_msg_sender)),
}
}
}

Expand Down Expand Up @@ -94,6 +128,14 @@ impl RaftStoreRouter for ServerRaftStoreRouter {
) -> RaftStoreResult<()> {
self.try_send(StoreMsg::new_batch_raft_snapshot_cmd(batch, on_finished))
}

fn significant_send(&self, msg: SignificantMsg) -> RaftStoreResult<()> {
if let Err(e) = self.significant_msg_sender.lock().unwrap().send(msg) {
return Err(box_err!("failed to sendsignificant msg {:?}", e));
}

Ok(())
}
}

pub struct ServerTransport<T, S>
Expand All @@ -103,8 +145,7 @@ where
{
raft_client: Arc<RwLock<RaftClient>>,
snap_scheduler: Scheduler<SnapTask>,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
pub raft_router: T,
resolving: Arc<RwLock<HashSet<u64>>>,
resolver: S,
}
Expand All @@ -119,7 +160,6 @@ where
raft_client: self.raft_client.clone(),
snap_scheduler: self.snap_scheduler.clone(),
raft_router: self.raft_router.clone(),
significant_msg_sender: self.significant_msg_sender.clone(),
resolving: self.resolving.clone(),
resolver: self.resolver.clone(),
}
Expand All @@ -131,14 +171,12 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
raft_client: Arc<RwLock<RaftClient>>,
snap_scheduler: Scheduler<SnapTask>,
raft_router: T,
significant_msg_sender: Sender<SignificantMsg>,
resolver: S,
) -> ServerTransport<T, S> {
ServerTransport {
raft_client: raft_client,
snap_scheduler: snap_scheduler,
raft_router: raft_router,
significant_msg_sender: significant_msg_sender,
resolving: Arc::new(RwLock::new(Default::default())),
resolver: resolver,
}
Expand Down Expand Up @@ -230,13 +268,13 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
}
}

fn new_snapshot_reporter(&self, msg: &RaftMessage) -> SnapshotReporter {
fn new_snapshot_reporter(&self, msg: &RaftMessage) -> SnapshotReporter<T> {
let region_id = msg.get_region_id();
let to_peer_id = msg.get_to_peer().get_id();
let to_store_id = msg.get_to_peer().get_store_id();

SnapshotReporter {
significant_msg_sender: self.significant_msg_sender.clone(),
raft_router: self.raft_router.clone(),
region_id: region_id,
to_peer_id: to_peer_id,
to_store_id: to_store_id,
Expand All @@ -248,12 +286,7 @@ impl<T: RaftStoreRouter + 'static, S: StoreAddrResolver + 'static> ServerTranspo
let to_peer_id = msg.get_to_peer().get_id();
let store_id = msg.get_to_peer().get_store_id();

if let Err(e) = self.significant_msg_sender.send(
SignificantMsg::Unreachable {
region_id,
to_peer_id,
},
) {
if let Err(e) = self.raft_router.report_unreachable(region_id, to_peer_id) {
error!(
"report peer {} on store {} unreachable for region {} failed {:?}",
to_peer_id,
Expand Down Expand Up @@ -285,14 +318,14 @@ where
}
}

struct SnapshotReporter {
significant_msg_sender: Sender<SignificantMsg>,
struct SnapshotReporter<T: RaftStoreRouter + 'static> {
raft_router: T,
region_id: u64,
to_peer_id: u64,
to_store_id: u64,
}

impl SnapshotReporter {
impl<T: RaftStoreRouter + 'static> SnapshotReporter<T> {
pub fn report(&self, status: SnapshotStatus) {
debug!(
"send snapshot to {} for {} {:?}",
Expand All @@ -308,13 +341,9 @@ impl SnapshotReporter {
.inc();
};

if let Err(e) = self.significant_msg_sender.send(
SignificantMsg::SnapshotStatus {
region_id: self.region_id,
to_peer_id: self.to_peer_id,
status: status,
},
) {
if let Err(e) = self.raft_router
.report_snapshot_status(self.region_id, self.to_peer_id, status)
{
error!(
"report snapshot to peer {} in store {} with region {} err {:?}",
self.to_peer_id,
Expand Down
21 changes: 4 additions & 17 deletions tests/raftstore/node.rs
Expand Up @@ -13,8 +13,7 @@


use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, Mutex, RwLock};
use std::sync::mpsc::Sender;
use std::sync::{mpsc, Arc, RwLock};
use std::time::Duration;
use std::boxed::FnBox;
use std::ops::Deref;
Expand All @@ -40,7 +39,6 @@ use super::transport_simulate::*;
pub struct ChannelTransportCore {
snap_paths: HashMap<u64, (SnapManager, TempDir)>,
routers: HashMap<u64, SimulateTransport<Msg, ServerRaftStoreRouter>>,
snapshot_status_senders: HashMap<u64, Mutex<Sender<SignificantMsg>>>,
}

#[derive(Clone)]
Expand All @@ -54,7 +52,6 @@ impl ChannelTransport {
core: Arc::new(RwLock::new(ChannelTransportCore {
snap_paths: HashMap::new(),
routers: HashMap::new(),
snapshot_status_senders: HashMap::new(),
})),
}
}
Expand Down Expand Up @@ -114,14 +111,8 @@ impl Channel<RaftMessage> for ChannelTransport {
if is_snapshot {
// should report snapshot finish.
let core = self.rl();
core.snapshot_status_senders[&from_store]
.lock()
.unwrap()
.send(SignificantMsg::SnapshotStatus {
region_id: region_id,
to_peer_id: to_peer_id,
status: SnapshotStatus::Finish,
})
core.routers[&from_store]
.report_snapshot_status(region_id, to_peer_id, SnapshotStatus::Finish)
.unwrap();
}
Ok(())
Expand Down Expand Up @@ -214,15 +205,11 @@ impl Simulator for NodeCluster {
}

let node_id = node.id();
let router = ServerRaftStoreRouter::new(node.get_sendch());
let router = ServerRaftStoreRouter::new(node.get_sendch(), snap_status_sender.clone());
self.trans
.wl()
.routers
.insert(node_id, SimulateTransport::new(router));
self.trans
.wl()
.snapshot_status_senders
.insert(node_id, Mutex::new(snap_status_sender));
self.nodes.insert(node_id, node);
self.simulate_trans.insert(node_id, simulate_trans);

Expand Down
5 changes: 2 additions & 3 deletions tests/raftstore/server.rs
Expand Up @@ -107,9 +107,9 @@ impl Simulator for ServerCluster {
// Initialize raftstore channels.
let mut event_loop = store::create_event_loop(&cfg.raft_store).unwrap();
let store_sendch = SendCh::new(event_loop.channel(), "raftstore");
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone());
let sim_router = SimulateTransport::new(raft_router);
let (snap_status_sender, snap_status_receiver) = mpsc::channel();
let raft_router = ServerRaftStoreRouter::new(store_sendch.clone(), snap_status_sender);
let sim_router = SimulateTransport::new(raft_router);

// Create storage.
let mut store =
Expand All @@ -126,7 +126,6 @@ impl Simulator for ServerCluster {
cfg.raft_store.region_split_size.0 as usize,
store.clone(),
sim_router.clone(),
snap_status_sender,
resolver,
snap_mgr.clone(),
Some(engines.clone()),
Expand Down
13 changes: 12 additions & 1 deletion tests/raftstore/transport_simulate.rs
Expand Up @@ -14,7 +14,7 @@
use kvproto::raft_serverpb::RaftMessage;
use kvproto::eraftpb::MessageType;
use tikv::raftstore::{Error, Result};
use tikv::raftstore::store::{Msg as StoreMsg, Transport};
use tikv::raftstore::store::{Msg as StoreMsg, SignificantMsg, Transport};
use tikv::server::transport::*;
use tikv::server::StoreAddrResolver;
use tikv::util::{transport, Either, HandyRwLock};
Expand All @@ -29,6 +29,9 @@ use std::sync::atomic::*;

pub trait Channel<M>: Send + Clone {
fn send(&self, m: M) -> Result<()>;
fn significant_send(&self, _: SignificantMsg) -> Result<()> {
unimplemented!()
}
fn flush(&mut self) {}
}

Expand All @@ -50,6 +53,10 @@ impl Channel<StoreMsg> for ServerRaftStoreRouter {
fn send(&self, m: StoreMsg) -> Result<()> {
RaftStoreRouter::try_send(self, m)
}

fn significant_send(&self, msg: SignificantMsg) -> Result<()> {
RaftStoreRouter::significant_send(self, msg)
}
}

pub fn check_messages<M>(msgs: &[M]) -> Result<()> {
Expand Down Expand Up @@ -189,6 +196,10 @@ impl<C: Channel<StoreMsg>> RaftStoreRouter for SimulateTransport<StoreMsg, C> {
fn try_send(&self, m: StoreMsg) -> Result<()> {
Channel::send(self, m)
}

fn significant_send(&self, m: SignificantMsg) -> Result<()> {
self.ch.significant_send(m)
}
}

pub trait FilterFactory {
Expand Down