Skip to content

Commit

Permalink
feat: service stop handler
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangsoledad committed Jan 30, 2019
1 parent 07d6a69 commit e0143eb
Show file tree
Hide file tree
Showing 16 changed files with 131 additions and 79 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ members = [
"util/instrument",
"util/build-info",
"util/occupied-capacity",
"util/stop-handler",
"network",
"protocol",
"sync",
Expand Down
3 changes: 1 addition & 2 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,7 @@ impl<CI: ChainIndex + 'static> ChainBuilder<CI> {
pub fn build(mut self) -> ChainService<CI> {
let notify = self.notify.take().unwrap_or_else(|| {
// FIXME: notify should not be optional
let (_handle, notify) = NotifyService::default().start::<&str>(None);
notify
NotifyService::default().start::<&str>(None)
});
ChainService::new(self.shared, notify)
}
Expand Down
1 change: 1 addition & 0 deletions miner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jsonrpc-types = { path = "../util/jsonrpc-types" }
hyper = "0.12"
futures = "0.1"
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }
stop-handler = { path = "../util/stop-handler" }

[dev-dependencies]
proptest = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion miner/src/block_assembler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ mod tests {
}
let shared = builder.build();

let notify = notify.unwrap_or_else(|| NotifyService::default().start::<&str>(None).1);
let notify = notify.unwrap_or_else(|| NotifyService::default().start::<&str>(None));
let (chain_controller, chain_receivers) = ChainController::build();
let chain_service = ChainBuilder::new(shared.clone())
.notify(notify.clone())
Expand Down
41 changes: 8 additions & 33 deletions miner/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{MinerConfig, Work};
use ckb_core::block::Block;
use ckb_util::{Mutex, RwLockUpgradableReadGuard};
use ckb_util::RwLockUpgradableReadGuard;
use crossbeam_channel::Sender;
use futures::sync::{mpsc, oneshot};
use hyper::error::Error as HyperError;
Expand All @@ -16,9 +16,9 @@ use jsonrpc_types::{
use log::{debug, error};
use serde_json::error::Error as JsonError;
use serde_json::{self, json, Value};
use std::sync::Arc;
use std::thread;
use std::time;
use stop_handler::{SignalSender, StopHandler};

type RpcRequest = (oneshot::Sender<Result<Chunk, RpcError>>, MethodCall);

Expand All @@ -30,32 +30,10 @@ pub enum RpcError {
Fail(RpcFail),
}

#[derive(Debug)]
pub(crate) struct Stop {
tx: oneshot::Sender<()>,
thread: thread::JoinHandle<()>,
}

impl Stop {
pub fn new(tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>) -> Stop {
Stop { tx, thread }
}

pub fn send(self) {
self.tx.send(()).expect("rpc stop channel");;
self.thread.join().expect("rpc thread join");
}
}

#[derive(Debug)]
pub(crate) struct RpcInner {
sender: mpsc::Sender<RpcRequest>,
stop: Mutex<Option<Stop>>,
}

#[derive(Debug, Clone)]
pub struct Rpc {
inner: Arc<RpcInner>,
sender: mpsc::Sender<RpcRequest>,
stop: StopHandler<()>,
}

impl Rpc {
Expand Down Expand Up @@ -90,10 +68,8 @@ impl Rpc {
});

Rpc {
inner: Arc::new(RpcInner {
sender,
stop: Mutex::new(Some(Stop::new(stop, thread))),
}),
sender,
stop: StopHandler::new(SignalSender::Future(stop), thread),
}
}

Expand All @@ -112,7 +88,7 @@ impl Rpc {
};

let req = (tx, call);
let mut sender = self.inner.sender.clone();
let mut sender = self.sender.clone();
let _ = sender.try_send(req);
rev.map_err(|_| RpcError::Canceled)
.flatten()
Expand All @@ -122,8 +98,7 @@ impl Rpc {

impl Drop for Rpc {
fn drop(&mut self) {
let stop = self.inner.stop.lock().take().expect("rpc close only once");
stop.send();
self.stop.try_send();
}
}

Expand Down
1 change: 1 addition & 0 deletions notify/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ fnv = "1.0"
ckb-core = { path = "../core" }
crossbeam-channel = "0.3"
log = "0.4"
stop-handler = { path = "../util/stop-handler" }
61 changes: 25 additions & 36 deletions notify/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
#![allow(clippy::needless_pass_by_value)]

use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use ckb_core::block::Block;
use ckb_core::service::Request;
use crossbeam_channel::{select, Receiver, Sender};
use fnv::FnvHashMap;
use log::{debug, trace, warn};
use std::sync::Arc;
use std::thread;
use stop_handler::{SignalSender, StopHandler};

pub const REGISTER_CHANNEL_SIZE: usize = 2;
pub const NOTIFY_CHANNEL_SIZE: usize = 128;
Expand Down Expand Up @@ -37,7 +36,6 @@ impl ForkBlocks {
}
}

type StopSignal = ();
pub type MsgNewTransaction = ();
pub type MsgNewTip = Arc<Block>;
pub type MsgNewUncle = Arc<Block>;
Expand All @@ -49,7 +47,7 @@ pub struct NotifyService {}

#[derive(Clone)]
pub struct NotifyController {
signal: Sender<StopSignal>,
stop: StopHandler<()>,
new_transaction_register: NotifyRegister<MsgNewTransaction>,
new_tip_register: NotifyRegister<MsgNewTip>,
new_uncle_register: NotifyRegister<MsgNewUncle>,
Expand All @@ -61,7 +59,7 @@ pub struct NotifyController {
}

impl NotifyService {
pub fn start<S: ToString>(self, thread_name: Option<S>) -> (JoinHandle<()>, NotifyController) {
pub fn start<S: ToString>(self, thread_name: Option<S>) -> NotifyController {
let (signal_sender, signal_receiver) =
crossbeam_channel::bounded::<()>(REGISTER_CHANNEL_SIZE);
let (new_transaction_register, new_transaction_register_receiver) =
Expand Down Expand Up @@ -127,20 +125,17 @@ impl NotifyService {
}
}).expect("Start notify service failed");

(
join_handle,
NotifyController {
new_transaction_register,
new_tip_register,
new_uncle_register,
switch_fork_register,
new_transaction_notifier: new_transaction_sender,
new_tip_notifier: new_tip_sender,
new_uncle_notifier: new_uncle_sender,
switch_fork_notifier: switch_fork_sender,
signal: signal_sender,
},
)
NotifyController {
new_transaction_register,
new_tip_register,
new_uncle_register,
switch_fork_register,
new_transaction_notifier: new_transaction_sender,
new_tip_notifier: new_tip_sender,
new_uncle_notifier: new_uncle_sender,
switch_fork_notifier: switch_fork_sender,
stop: StopHandler::new(SignalSender::Crossbeam(signal_sender), join_handle),
}
}

fn handle_register_new_transaction(
Expand Down Expand Up @@ -283,10 +278,6 @@ impl NotifyService {
}

impl NotifyController {
pub fn stop(self) {
let _ = self.signal.send(());
}

pub fn subscribe_new_transaction<S: ToString>(&self, name: S) -> Receiver<MsgNewTransaction> {
Request::call(&self.new_transaction_register, (name.to_string(), 128))
.expect("Subscribe new transaction failed")
Expand Down Expand Up @@ -318,47 +309,45 @@ impl NotifyController {
}
}

impl Drop for NotifyController {
fn drop(&mut self) {
self.stop.try_send();
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_new_transaction() {
let (handle, notify) = NotifyService::default().start::<&str>(None);
let notify = NotifyService::default().start::<&str>(None);
let receiver1 = notify.subscribe_new_transaction("miner1");
let receiver2 = notify.subscribe_new_transaction("miner2");
notify.notify_new_transaction();
assert_eq!(receiver1.recv(), Ok(()));
assert_eq!(receiver2.recv(), Ok(()));
notify.stop();
handle.join().expect("join failed");
}

#[test]
fn test_new_tip() {
let tip = Arc::new(Block::default());

let (handle, notify) = NotifyService::default().start::<&str>(None);
let notify = NotifyService::default().start::<&str>(None);
let receiver1 = notify.subscribe_new_tip("miner1");
let receiver2 = notify.subscribe_new_tip("miner2");
notify.notify_new_tip(Arc::clone(&tip));
assert_eq!(receiver1.recv(), Ok(Arc::clone(&tip)));
assert_eq!(receiver2.recv(), Ok(tip));
notify.stop();
handle.join().expect("join failed");
}

#[test]
fn test_switch_fork() {
let blks = Arc::new(ForkBlocks::default());

let (handle, notify) = NotifyService::default().start::<&str>(None);
let notify = NotifyService::default().start::<&str>(None);
let receiver1 = notify.subscribe_switch_fork("miner1");
let receiver2 = notify.subscribe_switch_fork("miner2");
notify.notify_switch_fork(Arc::clone(&blks));
assert_eq!(receiver1.recv(), Ok(Arc::clone(&blks)));
assert_eq!(receiver2.recv(), Ok(blks));
notify.stop();
handle.join().expect("join failed");
}
}
2 changes: 1 addition & 1 deletion pool/src/tests/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ struct TestPool<CI> {

impl<CI: ChainIndex + 'static> TestPool<CI> {
fn simple() -> TestPool<ChainKVStore<MemoryKeyValueDB>> {
let (_handle, notify) = NotifyService::default().start::<&str>(None);
let notify = NotifyService::default().start::<&str>(None);
let new_tip_receiver = notify.subscribe_new_tip("txs_pool");
let switch_fork_receiver = notify.subscribe_switch_fork("txs_pool");
let shared = SharedBuilder::<ChainKVStore<MemoryKeyValueDB>>::new_memory()
Expand Down
2 changes: 1 addition & 1 deletion src/cli/run_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn run(setup: Setup) {
.consensus(consensus)
.build();

let (_handle, notify) = NotifyService::default().start(Some("notify"));
let notify = NotifyService::default().start(Some("notify"));
let (chain_controller, chain_receivers) = ChainController::build();
let (block_assembler_controller, block_assembler_receivers) = BlockAssemblerController::build();

Expand Down
4 changes: 2 additions & 2 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ mod tests {
}
let shared = builder.build();

let notify = notify.unwrap_or_else(|| NotifyService::default().start::<&str>(None).1);
let notify = notify.unwrap_or_else(|| NotifyService::default().start::<&str>(None));
let (chain_controller, chain_receivers) = ChainController::build();
let chain_service = ChainBuilder::new(shared.clone())
.notify(notify.clone())
Expand Down Expand Up @@ -1133,7 +1133,7 @@ mod tests {
fn test_sync_process() {
let _ = env_logger::try_init();
let consensus = Consensus::default();
let (_handle, notify) = NotifyService::default().start::<&str>(None);
let notify = NotifyService::default().start::<&str>(None);
let (chain_controller1, shared1, _) =
start_chain(Some(consensus.clone()), Some(notify.clone()));
let (chain_controller2, shared2, _) =
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ fn setup_node(
let (tx_pool_controller, tx_pool_receivers) =
TransactionPoolController::build(Arc::clone(&last_tx_updated_at));

let (_handle, notify) = NotifyService::default().start(Some(thread_name));
let notify = NotifyService::default().start(Some(thread_name));

let tx_pool_service = TransactionPoolService::new(
PoolConfig::default(),
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ fn setup_node(
.consensus(consensus)
.build();
let (chain_controller, chain_receivers) = ChainController::build();
let (_handle, notify) = NotifyService::default().start(Some(thread_name));
let notify = NotifyService::default().start(Some(thread_name));

let chain_service = ChainBuilder::new(shared.clone())
.notify(notify.clone())
Expand Down
1 change: 0 additions & 1 deletion sync/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ impl Hasher for HighLowBytesHasher {
}

fn finish(&self) -> u64 {
println!("finish {:?}", self.0);
self.0
}
}
11 changes: 11 additions & 0 deletions util/stop-handler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "stop-handler"
version = "0.1.0"
authors = ["Nervos Core Dev <dev@nervos.org>"]
edition = "2018"

[dependencies]
parking_lot = "0.7"
crossbeam-channel = "0.3"
log = "0.4"
futures = "0.1"
Loading

0 comments on commit e0143eb

Please sign in to comment.