Skip to content

Commit

Permalink
util: remove event
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Nov 12, 2016
1 parent c57d4c9 commit 801c357
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 285 deletions.
4 changes: 2 additions & 2 deletions src/raftstore/store/msg.rs
Expand Up @@ -115,14 +115,14 @@ mod tests {
request: RaftCmdRequest,
timeout: Duration)
-> Result<RaftCmdResponse, Error> {
wait_event!(|cb: Box<FnBox(RaftCmdResponse) + 'static + Send>| {
wait_op!(|cb: Box<FnBox(RaftCmdResponse) + 'static + Send>| {
sendch.try_send(Msg::RaftCmd {
request: request,
callback: cb,
})
.unwrap()
},
timeout)
timeout)
.ok_or_else(|| Error::Timeout(format!("request timeout for {:?}", timeout)))
}

Expand Down
4 changes: 2 additions & 2 deletions src/storage/engine/mod.rs
Expand Up @@ -47,13 +47,13 @@ pub trait Engine: Send + Debug {

fn write(&self, ctx: &Context, batch: Vec<Modify>) -> Result<()> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
wait_event!(|cb| self.async_write(ctx, batch, cb).unwrap(), timeout)
wait_op!(|cb| self.async_write(ctx, batch, cb).unwrap(), timeout)
.unwrap_or_else(|| Err(Error::Timeout(timeout)))
}

fn snapshot(&self, ctx: &Context) -> Result<Box<Snapshot>> {
let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
wait_event!(|cb| self.async_snapshot(ctx, cb).unwrap(), timeout)
wait_op!(|cb| self.async_snapshot(ctx, cb).unwrap(), timeout)
.unwrap_or_else(|| Err(Error::Timeout(timeout)))
}

Expand Down
257 changes: 0 additions & 257 deletions src/util/event.rs

This file was deleted.

27 changes: 27 additions & 0 deletions src/util/macros.rs
Expand Up @@ -175,3 +175,30 @@ macro_rules! opp_neg {
(-$r as u64)
};
}

/// `wait_op!` waits for async operation. It returns `Option<Res>`
/// after the expression get executed.
#[macro_export]
macro_rules! wait_op {
($expr:expr) => {
wait_op!(IMPL $expr, None)
};
($expr:expr, $timeout:expr) => {
wait_op!(IMPL $expr, Some($timeout))
};
(IMPL $expr:expr, $timeout:expr) => {
{
use std::sync::mpsc;
let (tx, rx) = mpsc::channel();
let cb = box move |res| {
// we don't care error actually.
let _ = tx.send(res);
};
$expr(cb);
match $timeout {
None => rx.recv().ok(),
Some(timeout) => rx.recv_timeout(timeout).ok()
}
}
}
}
2 changes: 0 additions & 2 deletions src/util/mod.rs
Expand Up @@ -33,8 +33,6 @@ pub mod panic_hook;
pub mod worker;
pub mod codec;
pub mod xeval;
#[macro_use]
pub mod event;
pub mod rocksdb;
pub mod config;
pub mod fs;
Expand Down
10 changes: 4 additions & 6 deletions tests/coprocessor/test_select.rs
Expand Up @@ -5,7 +5,6 @@ use tikv::util::codec::{table, Datum, datum};
use tikv::util::codec::number::*;
use tikv::storage::{Mutation, Key, ALL_CFS};
use tikv::storage::engine::{self, Engine, TEMP_DIR};
use tikv::util::event::Event;
use tikv::util::worker::Worker;
use kvproto::coprocessor::{Request, KeyRange};
use tipb::select::{ByItem, SelectRequest, SelectResponse, Chunk};
Expand All @@ -14,6 +13,7 @@ use tipb::expression::{Expr, ExprType};
use storage::sync_storage::SyncStorage;

use std::collections::{HashMap, BTreeMap};
use std::sync::mpsc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::i64;
use protobuf::{RepeatedField, Message};
Expand Down Expand Up @@ -886,12 +886,10 @@ fn test_reverse() {
}

fn handle_select(end_point: &Worker<EndPointTask>, req: Request) -> SelectResponse {
let finish = Event::new();
let finish_clone = finish.clone();
let req = RequestTask::new(req, box move |r| finish_clone.set(r));
let (tx, rx) = mpsc::channel();
let req = RequestTask::new(req, box move |r| tx.send(r).unwrap());
end_point.schedule(EndPointTask::Request(req)).unwrap();
finish.wait_timeout(None);
let resp = finish.take().unwrap().take_cop_resp();
let resp = rx.recv().unwrap().take_cop_resp();
assert!(resp.has_data(), format!("{:?}", resp));
let mut sel_resp = SelectResponse::new();
sel_resp.merge_from_bytes(resp.get_data()).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions tests/raftstore/node.rs
Expand Up @@ -212,10 +212,10 @@ impl Simulator for NodeCluster {
}

let router = self.trans.rl().routers.get(&node_id).cloned().unwrap();
wait_event!(|cb: Box<FnBox(RaftCmdResponse) + 'static + Send>| {
router.send_command(request, cb).unwrap()
},
timeout)
wait_op!(|cb: Box<FnBox(RaftCmdResponse) + 'static + Send>| {
router.send_command(request, cb).unwrap()
},
timeout)
.ok_or_else(|| Error::Timeout(format!("request timeout for {:?}", timeout)))
}

Expand Down

0 comments on commit 801c357

Please sign in to comment.