Skip to content

Commit

Permalink
support pre-apply (#1847)
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay authored and zhangjinpeng87 committed May 17, 2017
1 parent 01fefb2 commit de38f84
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 264 deletions.
250 changes: 91 additions & 159 deletions src/raftstore/coprocessor/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@

use super::{RegionObserver, ObserverContext, Result};

use raftstore::store::PeerStorage;
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse};
use kvproto::raft_cmdpb::RaftCmdRequest;
use kvproto::metapb::Region;

struct ObserverEntry {
priority: u32,
observer: Box<RegionObserver + Send>,
observer: Box<RegionObserver + Send + Sync>,
}

/// Registry contains all registered coprocessors.
Expand All @@ -29,7 +29,7 @@ pub struct Registry {

impl Registry {
/// register an Observer to dispatcher.
pub fn register_observer(&mut self, priority: u32, mut ro: Box<RegionObserver + Send>) {
pub fn register_observer(&mut self, priority: u32, ro: Box<RegionObserver + Send + Sync>) {
ro.start();
let r = ObserverEntry {
priority: priority,
Expand All @@ -54,8 +54,8 @@ impl CoprocessorHost {
}

/// Call all prepose hook until bypass is set to true.
pub fn pre_propose(&mut self, ps: &PeerStorage, req: &mut RaftCmdRequest) -> Result<()> {
let ctx = ObserverContext::new(ps);
pub fn pre_propose(&self, region: &Region, req: &mut RaftCmdRequest) -> Result<()> {
let ctx = ObserverContext::new(region);
if req.has_admin_request() {
self.execute_pre_hook(ctx,
req.mut_admin_request(),
Expand All @@ -65,58 +65,37 @@ impl CoprocessorHost {
}
}

fn execute_pre_hook<Q, H>(&mut self,
fn execute_pre_hook<Q, H>(&self,
mut ctx: ObserverContext,
req: &mut Q,
mut hook: H)
-> Result<()>
where H: FnMut(&mut RegionObserver, &mut ObserverContext, &mut Q) -> Result<()>
where H: FnMut(&RegionObserver, &mut ObserverContext, &mut Q) -> Result<()>
{
for entry in &mut self.registry.observers {
try!(hook(entry.observer.as_mut(), &mut ctx, req));
for entry in &self.registry.observers {
try!(hook(entry.observer.as_ref(), &mut ctx, req));
if ctx.bypass {
break;
}
}
Ok(())
}

fn execute_post_hook<Q, R, H>(&mut self,
mut ctx: ObserverContext,
req: Q,
resp: &mut R,
mut hook: H)
where H: FnMut(&mut RegionObserver, &mut ObserverContext, &Q, &mut R)
{
for entry in &mut self.registry.observers {
hook(entry.observer.as_mut(), &mut ctx, &req, resp);
if ctx.bypass {
break;
/// Call all pre apply hook until bypass is set to true.
pub fn pre_apply(&self, region: &Region, req: &mut RaftCmdRequest) {
let mut ctx = ObserverContext::new(region);
if !req.has_admin_request() {
for entry in &self.registry.observers {
entry.observer.pre_apply_query(&mut ctx, req.mut_requests());
if ctx.bypass {
break;
}
}
}
}

/// call all apply hook until bypass is set to true.
pub fn post_apply(&mut self,
ps: &PeerStorage,
req: &RaftCmdRequest,
resp: &mut RaftCmdResponse) {
let ctx = ObserverContext::new(ps);
if req.has_admin_request() {
self.execute_post_hook(ctx,
req.get_admin_request(),
resp.mut_admin_response(),
|o, ctx, q, r| o.post_admin(ctx, q, r));
} else {
self.execute_post_hook(ctx,
req.get_requests(),
resp.mut_responses(),
|o, ctx, q, r| o.post_query(ctx, q, r));
}
}

pub fn shutdown(&mut self) {
for mut entry in &mut self.registry.observers.drain(..) {
pub fn shutdown(&self) {
for entry in &self.registry.observers {
entry.observer.stop();
}
}
Expand All @@ -131,188 +110,141 @@ impl Drop for CoprocessorHost {
#[cfg(test)]
mod test {
use raftstore::coprocessor::*;
use tempdir::TempDir;
use raftstore::store::PeerStorage;
use util::HandyRwLock;
use util::worker;
use util::rocksdb;
use std::sync::*;
use std::fmt::Debug;
use std::sync::atomic::*;
use protobuf::RepeatedField;
use storage::ALL_CFS;

use kvproto::metapb::Region;
use kvproto::raft_cmdpb::{AdminRequest, Request, AdminResponse, Response, RaftCmdRequest,
RaftCmdResponse};
use kvproto::raft_cmdpb::{AdminRequest, Request, RaftCmdRequest};

struct TestCoprocessor {
bypass_pre: Arc<RwLock<bool>>,
bypass_post: Arc<RwLock<bool>>,
called_pre: Arc<RwLock<u8>>,
called_post: Arc<RwLock<u8>>,
return_err: Arc<RwLock<bool>>,
bypass: Arc<AtomicBool>,
called: Arc<AtomicUsize>,
return_err: Arc<AtomicBool>,
}

impl TestCoprocessor {
fn new(bypass_pre: Arc<RwLock<bool>>,
bypass_post: Arc<RwLock<bool>>,
called_pre: Arc<RwLock<u8>>,
called_post: Arc<RwLock<u8>>,
return_err: Arc<RwLock<bool>>)
fn new(bypass: Arc<AtomicBool>,
called: Arc<AtomicUsize>,
return_err: Arc<AtomicBool>)
-> TestCoprocessor {
TestCoprocessor {
bypass_post: bypass_post,
bypass_pre: bypass_pre,
called_post: called_post,
called_pre: called_pre,
bypass: bypass,
called: called,
return_err: return_err,
}
}
}

impl Coprocessor for TestCoprocessor {
fn start(&mut self) {}
fn stop(&mut self) {}
}
impl Coprocessor for TestCoprocessor {}

impl RegionObserver for TestCoprocessor {
fn pre_admin(&mut self, ctx: &mut ObserverContext, _: &mut AdminRequest) -> Result<()> {
*self.called_pre.wl() += 1;
ctx.bypass = *self.bypass_pre.rl();
if *self.return_err.rl() {
fn pre_admin(&self, ctx: &mut ObserverContext, _: &mut AdminRequest) -> Result<()> {
self.called.fetch_add(1, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
if self.return_err.load(Ordering::SeqCst) {
return Err(box_err!("error"));
}
Ok(())
}

fn pre_query(&mut self,
fn pre_query(&self,
ctx: &mut ObserverContext,
_: &mut RepeatedField<Request>)
-> Result<()> {
*self.called_pre.wl() += 2;
ctx.bypass = *self.bypass_pre.rl();
if *self.return_err.rl() {
self.called.fetch_add(2, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
if self.return_err.load(Ordering::SeqCst) {
return Err(box_err!("error"));
}
Ok(())
}

fn post_admin(&mut self,
ctx: &mut ObserverContext,
_: &AdminRequest,
_: &mut AdminResponse) {
*self.called_post.wl() += 1;
ctx.bypass = *self.bypass_post.rl();
}

fn post_query(&mut self,
ctx: &mut ObserverContext,
_: &[Request],
_: &mut RepeatedField<Response>) {
*self.called_post.wl() += 2;
ctx.bypass = *self.bypass_post.rl();
fn pre_apply_query(&self, ctx: &mut ObserverContext, _: &mut RepeatedField<Request>) {
self.called.fetch_add(3, Ordering::SeqCst);
ctx.bypass = self.bypass.load(Ordering::SeqCst);
}
}

fn new_peer_storage(path: &TempDir) -> PeerStorage {
let engine = Arc::new(rocksdb::new_engine(path.path().to_str().unwrap(), ALL_CFS).unwrap());
PeerStorage::new(engine,
&Region::new(),
worker::dummy_scheduler(),
"".to_owned())
.unwrap()
fn share_bool() -> Arc<AtomicBool> {
Arc::new(AtomicBool::default())
}

fn share<T>(t: T) -> Arc<RwLock<T>> {
Arc::new(RwLock::new(t))
fn share_usize() -> Arc<AtomicUsize> {
Arc::new(AtomicUsize::default())
}

fn assert_all<T: PartialEq + Debug>(ts: &[&Arc<RwLock<T>>], expect: &[T]) {
for (c, e) in ts.iter().zip(expect) {
assert_eq!(*c.wl(), *e);
}
macro_rules! assert_all {
($target:expr, $expect:expr) => ({
for (c, e) in ($target).iter().zip($expect) {
assert_eq!(c.load(Ordering::SeqCst), *e);
}
})
}

fn set_all<T: Copy>(ts: &[&Arc<RwLock<T>>], b: T) {
for c in ts {
*c.wl() = b;
}
macro_rules! set_all {
($target:expr, $v:expr) => ({
for v in $target {
v.store($v, Ordering::SeqCst);
}
})
}

#[test]
fn test_coprocessor_host() {
let (bypass_pre1, bypass_post1, called_pre1, called_post1, r1) =
(share(false), share(false), share(0), share(0), share(false));
let observer1 = TestCoprocessor::new(bypass_pre1.clone(),
bypass_post1.clone(),
called_pre1.clone(),
called_post1.clone(),
r1.clone());
let (bypass1, called1, r1) = (share_bool(), share_usize(), share_bool());
let observer1 = TestCoprocessor::new(bypass1.clone(), called1.clone(), r1.clone());
let mut host = CoprocessorHost::default();
host.registry.register_observer(3, Box::new(observer1));
let path = TempDir::new("test-raftstore").unwrap();
let ps = new_peer_storage(&path);
let region = Region::new();
let mut admin_req = RaftCmdRequest::new();
admin_req.set_admin_request(AdminRequest::new());
let mut query_req = RaftCmdRequest::new();
query_req.set_requests(RepeatedField::from_vec(vec![Request::new()]));
let mut admin_resp = RaftCmdResponse::new();
admin_resp.set_admin_response(AdminResponse::new());
let mut query_resp = RaftCmdResponse::new();
query_resp.set_responses(RepeatedField::from_vec(vec![Response::new()]));

assert_eq!(*called_pre1.rl(), 0);
assert!(host.pre_propose(&ps, &mut admin_req).is_ok());
assert_eq!(*called_pre1.rl(), 1);
assert_eq!(called1.load(Ordering::SeqCst), 0);
assert!(host.pre_propose(&region, &mut admin_req).is_ok());
assert_eq!(called1.load(Ordering::SeqCst), 1);

assert_eq!(*called_post1.rl(), 0);
host.post_apply(&ps, &admin_req, &mut admin_resp);
assert_eq!(*called_post1.rl(), 1);
// pre_apply_request is ignored when handling admin request.
host.pre_apply(&region, &mut admin_req);
assert_eq!(called1.load(Ordering::SeqCst), 1);

// reset
set_all(&[&called_post1, &called_pre1], 0);

let (bypass_pre2, bypass_post2, called_pre2, called_post2, r2) =
(share(false), share(false), share(0), share(0), share(false));
let observer2 = TestCoprocessor::new(bypass_pre2.clone(),
bypass_post2.clone(),
called_pre2.clone(),
called_post2.clone(),
r2.clone());
set_all!(&[&called1], 0);

let (bypass2, called2, r2) = (share_bool(), share_usize(), share_bool());
let observer2 = TestCoprocessor::new(bypass2.clone(), called2.clone(), r2.clone());
host.registry.register_observer(2, Box::new(observer2));

set_all(&[&bypass_pre2, &bypass_post2], true);
set_all!(&[&bypass2, &bypass2], true);

assert_all!(&[&called1, &called2], &[0, 0]);

assert!(host.pre_propose(&region, &mut query_req).is_ok());

assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[0, 0, 0, 0]);
assert_all!(&[&called1, &called2], &[0, 2]);

assert!(host.pre_propose(&ps, &mut query_req).is_ok());
host.post_apply(&ps, &query_req, &mut query_resp);
host.pre_apply(&region, &mut query_req);
assert_all!(&[&called1, &called2], &[0, 5]);

assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[0, 0, 2, 2]);
set_all!(&[&bypass2], false);
set_all!(&[&called2], 0);

set_all(&[&bypass_pre2, &bypass_post2], false);
set_all(&[&called_pre2, &called_post2], 0);
assert_all!(&[&called1, &called2], &[0, 0]);

assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[0, 0, 0, 0]);
assert!(host.pre_propose(&region, &mut admin_req).is_ok());

assert!(host.pre_propose(&ps, &mut admin_req).is_ok());
host.post_apply(&ps, &admin_req, &mut admin_resp);
assert_all!(&[&called1, &called2], &[1, 1]);

assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[1, 1, 1, 1]);
set_all!(&[&bypass2], false);
set_all!(&[&called1, &called2], 0);
assert_all!(&[&called1, &called2], &[0, 0]);

set_all(&[&bypass_pre2, &bypass_post2], false);
set_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
0);
assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[0, 0, 0, 0]);
// when return error, following coprocessor should not be run.
*r2.wl() = true;
assert!(host.pre_propose(&ps, &mut admin_req).is_err());
assert_all(&[&called_pre1, &called_post1, &called_pre2, &called_post2],
&[0, 0, 1, 0]);
set_all!(&[&r2], true);
assert!(host.pre_propose(&region, &mut admin_req).is_err());
assert_all!(&[&called1, &called2], &[0, 1]);
}
}

0 comments on commit de38f84

Please sign in to comment.