Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore: remove message box/unbox #4232

Merged
merged 17 commits into from Mar 5, 2019
Merged
59 changes: 29 additions & 30 deletions benches/misc/raftkv/mod.rs
Expand Up @@ -19,11 +19,12 @@ use tempdir::TempDir;

use kvproto::kvrpcpb::Context;
use kvproto::metapb::Region;
use kvproto::raft_cmdpb::{RaftCmdResponse, Response};
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, Response};
use kvproto::raft_serverpb::RaftMessage;

use tikv::raftstore::store::{
cmd_resp, engine, util, Callback, Msg, PeerMsg, ReadResponse, RegionSnapshot, SignificantMsg,
WriteResponse,
cmd_resp, engine, util, Callback, CasualMessage, RaftCommand, ReadResponse, RegionSnapshot,
SignificantMsg, WriteResponse,
};
use tikv::raftstore::Result;
use tikv::server::transport::RaftStoreRouter;
Expand All @@ -48,47 +49,45 @@ impl SyncBenchRouter {
}

impl SyncBenchRouter {
fn invoke(&self, msg: Msg) {
fn invoke(&self, cmd: RaftCommand) {
let mut response = RaftCmdResponse::new();
cmd_resp::bind_term(&mut response, 1);
if let Msg::PeerMsg(PeerMsg::RaftCmd {
request, callback, ..
}) = msg
{
match callback {
Callback::Read(cb) => {
let snapshot = engine::Snapshot::new(Arc::clone(&self.db));
let region = self.region.to_owned();
cb(ReadResponse {
response,
snapshot: Some(RegionSnapshot::from_snapshot(snapshot.into_sync(), region)),
})
}
Callback::Write(cb) => {
let mut resp = Response::new();
let cmd_type = request.get_requests()[0].get_cmd_type();
resp.set_cmd_type(cmd_type);
response.mut_responses().push(resp);
cb(WriteResponse { response })
}
_ => unreachable!(),
match cmd.callback {
Callback::Read(cb) => {
let snapshot = engine::Snapshot::new(Arc::clone(&self.db));
let region = self.region.to_owned();
cb(ReadResponse {
response,
snapshot: Some(RegionSnapshot::from_snapshot(snapshot.into_sync(), region)),
})
}
Callback::Write(cb) => {
let mut resp = Response::new();
let cmd_type = cmd.request.get_requests()[0].get_cmd_type();
resp.set_cmd_type(cmd_type);
response.mut_responses().push(resp);
cb(WriteResponse { response })
}
_ => unreachable!(),
}
}
}

impl RaftStoreRouter for SyncBenchRouter {
fn send(&self, msg: Msg) -> Result<()> {
self.invoke(msg);
fn send_raft_msg(&self, _: RaftMessage) -> Result<()> {
Ok(())
}

fn send_command(&self, req: RaftCmdRequest, cb: Callback) -> Result<()> {
self.invoke(RaftCommand::new(req, cb));
Ok(())
}

fn try_send(&self, msg: Msg) -> Result<()> {
self.invoke(msg);
fn significant_send(&self, _: u64, _: SignificantMsg) -> Result<()> {
Ok(())
}

fn significant_send(&self, _region_id: u64, _: SignificantMsg) -> Result<()> {
fn casual_send(&self, _: u64, _: CasualMessage) -> Result<()> {
Ok(())
}
}
Expand Down
40 changes: 23 additions & 17 deletions components/test_raftstore/src/cluster.rs
Expand Up @@ -28,7 +28,7 @@ use kvproto::raft_serverpb::RaftMessage;

use tikv::config::TiKvConfig;
use tikv::pd::PdClient;
use tikv::raftstore::store::fsm::SendCh;
use tikv::raftstore::store::fsm::RaftRouter;
use tikv::raftstore::store::*;
use tikv::raftstore::{Error, Result};
use tikv::storage::CF_DEFAULT;
Expand Down Expand Up @@ -64,10 +64,10 @@ pub trait Simulator {
) -> Result<()>;
fn send_raft_msg(&mut self, msg: RaftMessage) -> Result<()>;
fn get_snap_dir(&self, node_id: u64) -> String;
fn get_store_sendch(&self, node_id: u64) -> Option<SendCh>;
fn add_send_filter(&mut self, node_id: u64, filter: SendFilter);
fn get_router(&self, node_id: u64) -> Option<RaftRouter>;
fn add_send_filter(&mut self, node_id: u64, filter: Box<dyn Filter>);
fn clear_send_filters(&mut self, node_id: u64);
fn add_recv_filter(&mut self, node_id: u64, filter: RecvFilter);
fn add_recv_filter(&mut self, node_id: u64, filter: Box<dyn Filter>);
fn clear_recv_filters(&mut self, node_id: u64);

fn call_command(&self, request: RaftCmdRequest, timeout: Duration) -> Result<RaftCmdResponse> {
Expand All @@ -82,7 +82,14 @@ pub trait Simulator {
) -> Result<RaftCmdResponse> {
let (cb, rx) = make_cb(&request);

self.async_command_on_node(node_id, request, cb)?;
match self.async_command_on_node(node_id, request, cb) {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
Ok(()) => {}
Err(e) => {
let mut resp = RaftCmdResponse::new();
resp.mut_header().set_error(e.into());
return Ok(resp);
}
}
rx.recv_timeout(timeout)
.map_err(|_| Error::Timeout(format!("request timeout for {:?}", timeout)))
}
Expand Down Expand Up @@ -806,19 +813,18 @@ impl<T: Simulator> Cluster<T> {
// Caller must ensure that the `split_key` is in the `region`.
pub fn split_region(&mut self, region: &metapb::Region, split_key: &[u8], cb: Callback) {
let leader = self.leader_of_region(region.get_id()).unwrap();
let ch = self
.sim
.rl()
.get_store_sendch(leader.get_store_id())
.unwrap();
let router = self.sim.rl().get_router(leader.get_store_id()).unwrap();
let split_key = split_key.to_vec();
ch.try_send(Msg::PeerMsg(PeerMsg::SplitRegion {
region_id: region.get_id(),
region_epoch: region.get_region_epoch().clone(),
split_keys: vec![split_key.clone()],
callback: cb,
}))
.unwrap();
router
.send(
region.get_id(),
PeerMsg::CasualMessage(CasualMessage::SplitRegion {
region_epoch: region.get_region_epoch().clone(),
split_keys: vec![split_key.clone()],
callback: cb,
}),
)
.unwrap();
}

pub fn must_split(&mut self, region: &metapb::Region, split_key: &[u8]) {
Expand Down
38 changes: 21 additions & 17 deletions components/test_raftstore/src/node.rs
Expand Up @@ -25,7 +25,7 @@ use raft::SnapshotStatus;
use tikv::config::TiKvConfig;
use tikv::import::SSTImporter;
use tikv::raftstore::coprocessor::CoprocessorHost;
use tikv::raftstore::store::fsm::{create_raft_batch_system, SendCh};
use tikv::raftstore::store::fsm::{create_raft_batch_system, RaftRouter};
use tikv::raftstore::store::*;
use tikv::raftstore::Result;
use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter};
Expand All @@ -37,7 +37,7 @@ use super::*;

pub struct ChannelTransportCore {
snap_paths: HashMap<u64, (SnapManager, TempDir)>,
routers: HashMap<u64, SimulateTransport<Msg, ServerRaftStoreRouter>>,
routers: HashMap<u64, SimulateTransport<ServerRaftStoreRouter>>,
}

#[derive(Clone)]
Expand All @@ -56,8 +56,8 @@ impl ChannelTransport {
}
}

impl Channel<RaftMessage> for ChannelTransport {
fn send(&self, msg: RaftMessage) -> Result<()> {
impl Transport for ChannelTransport {
fn send(&mut self, msg: RaftMessage) -> Result<()> {
let from_store = msg.get_from_peer().get_store_id();
let to_store = msg.get_to_peer().get_store_id();
let to_peer_id = msg.get_to_peer().get_id();
Expand Down Expand Up @@ -103,18 +103,22 @@ impl Channel<RaftMessage> for ChannelTransport {
h.send_raft_msg(msg)?;
if is_snapshot {
// should report snapshot finish.
core.routers[&from_store]
.report_snapshot_status(region_id, to_peer_id, SnapshotStatus::Finish)
.unwrap();
let _ = core.routers[&from_store].report_snapshot_status(
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
region_id,
to_peer_id,
SnapshotStatus::Finish,
);
}
Ok(())
}
_ => Err(box_err!("missing sender for store {}", to_store)),
}
}

fn flush(&mut self) {}
}

type SimulateChannelTransport = SimulateTransport<RaftMessage, ChannelTransport>;
type SimulateChannelTransport = SimulateTransport<ChannelTransport>;

pub struct NodeCluster {
trans: ChannelTransport,
Expand All @@ -138,7 +142,7 @@ impl NodeCluster {

impl NodeCluster {
#[allow(dead_code)]
pub fn get_node_router(&self, node_id: u64) -> SimulateTransport<Msg, ServerRaftStoreRouter> {
pub fn get_node_router(&self, node_id: u64) -> SimulateTransport<ServerRaftStoreRouter> {
self.trans
.core
.lock()
Expand Down Expand Up @@ -181,7 +185,7 @@ impl Simulator for NodeCluster {
);

// Create engine
let (engines, path) = create_test_engine(engines, node.get_sendch(), &cfg);
let (engines, path) = create_test_engine(engines, router.clone(), &cfg);

let (snap_mgr, tmp) = if node_id == 0
|| !self
Expand All @@ -193,7 +197,7 @@ impl Simulator for NodeCluster {
.contains_key(&node_id)
{
let tmp = TempDir::new("test_cluster").unwrap();
let snap_mgr = SnapManager::new(tmp.path().to_str().unwrap(), Some(node.get_sendch()));
let snap_mgr = SnapManager::new(tmp.path().to_str().unwrap(), Some(router.clone()));
(snap_mgr, Some(tmp))
} else {
let trans = self.trans.core.lock().unwrap();
Expand All @@ -202,7 +206,7 @@ impl Simulator for NodeCluster {
};

// Create coprocessor.
let mut coprocessor_host = CoprocessorHost::new(cfg.coprocessor, node.get_sendch());
let mut coprocessor_host = CoprocessorHost::new(cfg.coprocessor, router.clone());

if let Some(f) = self.post_create_coprocessor_host.as_ref() {
f(node_id, &mut coprocessor_host);
Expand Down Expand Up @@ -244,7 +248,7 @@ impl Simulator for NodeCluster {
}

let node_id = node.id();
let router = ServerRaftStoreRouter::new(node.get_sendch(), router.clone(), local_ch);
let router = ServerRaftStoreRouter::new(router.clone(), local_ch);
self.trans
.core
.lock()
Expand Down Expand Up @@ -316,7 +320,7 @@ impl Simulator for NodeCluster {
self.trans.send(msg)
}

fn add_send_filter(&mut self, node_id: u64, filter: SendFilter) {
fn add_send_filter(&mut self, node_id: u64, filter: Box<dyn Filter>) {
self.simulate_trans
.get_mut(&node_id)
.unwrap()
Expand All @@ -330,7 +334,7 @@ impl Simulator for NodeCluster {
.clear_filters();
}

fn add_recv_filter(&mut self, node_id: u64, filter: RecvFilter) {
fn add_recv_filter(&mut self, node_id: u64, filter: Box<dyn Filter>) {
let mut trans = self.trans.core.lock().unwrap();
trans.routers.get_mut(&node_id).unwrap().add_filter(filter);
}
Expand All @@ -340,8 +344,8 @@ impl Simulator for NodeCluster {
trans.routers.get_mut(&node_id).unwrap().clear_filters();
}

fn get_store_sendch(&self, node_id: u64) -> Option<SendCh> {
self.nodes.get(&node_id).map(|node| node.get_sendch())
fn get_router(&self, node_id: u64) -> Option<RaftRouter> {
self.nodes.get(&node_id).map(|node| node.get_router())
}
}

Expand Down