Skip to content

Commit

Permalink
tests: call commands asynchronously (#2703)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus committed Jan 23, 2018
1 parent 9e7e766 commit 0e52440
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 29 deletions.
18 changes: 15 additions & 3 deletions tests/raftstore/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ pub trait Simulator {
fn run_node(&mut self, node_id: u64, cfg: TiKvConfig, engines: Engines) -> u64;
fn stop_node(&mut self, node_id: u64);
fn get_node_ids(&self) -> HashSet<u64>;
fn call_command_on_node(
fn async_command_on_node(
&self,
node_id: u64,
request: RaftCmdRequest,
timeout: Duration,
) -> Result<RaftCmdResponse>;
cb: Callback,
) -> Result<()>;
fn send_raft_msg(&mut self, msg: RaftMessage) -> Result<()>;
fn get_snap_dir(&self, node_id: u64) -> String;
fn get_store_sendch(&self, node_id: u64) -> Option<SendCh<Msg>>;
Expand All @@ -69,6 +69,18 @@ pub trait Simulator {
let node_id = request.get_header().get_peer().get_store_id();
self.call_command_on_node(node_id, request, timeout)
}
fn call_command_on_node(
&self,
node_id: u64,
request: RaftCmdRequest,
timeout: Duration,
) -> Result<RaftCmdResponse> {
let (cb, rx) = make_cb(&request);

self.async_command_on_node(node_id, request, cb)?;
rx.recv_timeout(timeout)
.map_err(|_| Error::Timeout(format!("request timeout for {:?}", timeout)))
}
}

pub struct Cluster<T: Simulator> {
Expand Down
10 changes: 4 additions & 6 deletions tests/raftstore/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, RwLock};
use std::time::Duration;
use std::ops::Deref;

use tempdir::TempDir;
Expand All @@ -35,7 +34,6 @@ use tikv::server::transport::{RaftStoreRouter, ServerRaftStoreRouter};
use tikv::raft::SnapshotStatus;
use super::pd::TestPdClient;
use super::transport_simulate::*;
use super::util::wait_cb;

pub struct ChannelTransportCore {
snap_paths: HashMap<u64, (SnapManager, TempDir)>,
Expand Down Expand Up @@ -241,18 +239,18 @@ impl Simulator for NodeCluster {
self.nodes.keys().cloned().collect()
}

fn call_command_on_node(
fn async_command_on_node(
&self,
node_id: u64,
request: RaftCmdRequest,
timeout: Duration,
) -> Result<RaftCmdResponse> {
cb: Callback,
) -> Result<()> {
if !self.trans.rl().routers.contains_key(&node_id) {
return Err(box_err!("missing sender for store {}", node_id));
}

let router = self.trans.rl().routers.get(&node_id).cloned().unwrap();
wait_cb(router, request, timeout)
router.send_command(request, cb)
}

fn send_raft_msg(&mut self, msg: raft_serverpb::RaftMessage) -> Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions tests/raftstore/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

use std::collections::{HashMap, HashSet};
use std::sync::{mpsc, Arc, RwLock};
use std::time::Duration;

use grpc::EnvBuilder;
use tempdir::TempDir;
Expand All @@ -24,8 +23,9 @@ use tikv::server::{create_raft_storage, Config, Node, PdStoreAddrResolver, RaftC
use tikv::server::resolve::{self, Task as ResolveTask};
use tikv::server::transport::ServerRaftStoreRouter;
use tikv::raftstore::{store, Result};
use tikv::raftstore::store::{Engines, Msg as StoreMsg, SnapManager};
use tikv::raftstore::store::{Callback, Engines, Msg as StoreMsg, SnapManager};
use tikv::raftstore::coprocessor::CoprocessorHost;
use tikv::server::transport::RaftStoreRouter;
use tikv::util::transport::SendCh;
use tikv::util::security::SecurityManager;
use tikv::util::worker::{FutureWorker, Worker};
Expand All @@ -36,7 +36,6 @@ use kvproto::raft_cmdpb::*;
use super::pd::TestPdClient;
use super::transport_simulate::*;
use super::cluster::{Cluster, Simulator};
use super::util::wait_cb;

type SimulateStoreTransport = SimulateTransport<StoreMsg, ServerRaftStoreRouter>;
type SimulateServerTransport =
Expand Down Expand Up @@ -202,17 +201,17 @@ impl Simulator for ServerCluster {
self.metas.keys().cloned().collect()
}

fn call_command_on_node(
fn async_command_on_node(
&self,
node_id: u64,
request: RaftCmdRequest,
timeout: Duration,
) -> Result<RaftCmdResponse> {
cb: Callback,
) -> Result<()> {
let router = match self.metas.get(&node_id) {
None => return Err(box_err!("missing sender for store {}", node_id)),
Some(meta) => meta.router.clone(),
};
wait_cb(router, request, timeout)
router.send_command(request, cb)
}

fn send_raft_msg(&mut self, raft_msg: raft_serverpb::RaftMessage) -> Result<()> {
Expand Down
15 changes: 2 additions & 13 deletions tests/raftstore/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest, CmdType, RaftCmdRequest, R
use kvproto::pdpb::{ChangePeer, RegionHeartbeatResponse, TransferLeader};
use kvproto::eraftpb::ConfChangeType;

use tikv::raftstore;
use tikv::raftstore::store::*;
use tikv::server::Config as ServerConfig;
use tikv::server::transport::RaftStoreRouter;
use tikv::storage::Config as StorageConfig;
use tikv::util::escape;
use tikv::util::config::*;
Expand Down Expand Up @@ -302,14 +300,7 @@ pub fn new_pd_transfer_leader(peer: metapb::Peer) -> Option<RegionHeartbeatRespo
Some(resp)
}

pub fn wait_cb<R>(
router: R,
cmd: RaftCmdRequest,
timeout: Duration,
) -> raftstore::Result<RaftCmdResponse>
where
R: RaftStoreRouter,
{
pub fn make_cb(cmd: &RaftCmdRequest) -> (Callback, mpsc::Receiver<RaftCmdResponse>) {
let mut is_read;
let mut is_write;
is_read = cmd.has_status_request();
Expand All @@ -335,7 +326,5 @@ where
let _ = tx.send(resp.response);
}))
};
router.send_command(cmd, cb).unwrap();
rx.recv_timeout(timeout)
.map_err(|_| raftstore::Error::Timeout(format!("request timeout for {:?}", timeout)))
(cb, rx)
}

0 comments on commit 0e52440

Please sign in to comment.