Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use FnOnce to define service protocol #67

Merged
merged 1 commit into from Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions bench/src/main.rs
Expand Up @@ -89,14 +89,14 @@ fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<Not
.new_codec(),
)
})
.service_handle(move |meta| {
if meta.id() == 0 {
.service_handle(move || {
if id == 0 {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle {
proto_id: meta.id(),
proto_id: id,
connected_count: 0,
sender: sender.clone(),
sender,
});
ProtocolHandle::Callback(handle)
}
Expand Down
6 changes: 3 additions & 3 deletions examples/disc.rs
Expand Up @@ -56,9 +56,9 @@ fn create_meta(id: ProtocolId, start: u16) -> ProtocolMeta {
let addr_mgr = SimpleAddressManager { addrs };
MetaBuilder::default()
.id(id)
.service_handle(move |meta| {
let discovery = Discovery::new(addr_mgr.clone());
ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(meta.id(), discovery)))
.service_handle(move || {
let discovery = Discovery::new(addr_mgr);
ProtocolHandle::Callback(Box::new(DiscoveryProtocol::new(id, discovery)))
})
.build()
}
Expand Down
9 changes: 2 additions & 7 deletions examples/ping.rs
Expand Up @@ -63,13 +63,8 @@ pub fn create_meta<S: Sender<Event> + Send + Clone + 'static>(
) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(move |meta| {
let handle = Box::new(PingHandler::new(
meta.id(),
interval,
timeout,
event_sender.clone(),
));
.service_handle(move || {
let handle = Box::new(PingHandler::new(id, interval, timeout, event_sender));
ProtocolHandle::Callback(handle)
})
.build()
Expand Down
4 changes: 2 additions & 2 deletions examples/simple.rs
Expand Up @@ -19,11 +19,11 @@ use tokio::timer::{Delay, Error, Interval};
fn create_meta(id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(|meta| {
.service_handle(move || {
// All protocol use the same handle.
// This is just an example. In the actual environment, this should be a different handle.
let handle = Box::new(PHandle {
proto_id: meta.id(),
proto_id: id,
count: 0,
connected_session_ids: Vec::new(),
clear_handle: HashMap::new(),
Expand Down
36 changes: 23 additions & 13 deletions src/builder.rs
Expand Up @@ -38,7 +38,7 @@ impl ServiceBuilder {

/// Insert a custom protocol
pub fn insert_protocol(mut self, protocol: ProtocolMeta) -> Self {
if protocol.session_handle().has_event() || protocol.service_handle().has_event() {
if protocol.session_handle().has_event() || protocol.service_handle.has_event() {
self.config.event.insert(protocol.id());
}

Expand Down Expand Up @@ -107,18 +107,16 @@ impl Default for ServiceBuilder {

pub(crate) type NameFn = Box<Fn(ProtocolId) -> String + Send + Sync>;
pub(crate) type CodecFn = Box<Fn() -> Box<dyn Codec + Send + 'static> + Send + Sync>;
pub(crate) type ServiceHandleFn =
Box<Fn(&ProtocolMeta) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> + Send>;
pub(crate) type SessionHandleFn =
Box<Fn(&ProtocolMeta) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> + Send>;
Box<Fn(ProtocolId) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> + Send>;

/// Builder for protocol meta
pub struct MetaBuilder {
id: ProtocolId,
name: NameFn,
support_versions: Vec<String>,
codec: CodecFn,
service_handle: ServiceHandleFn,
service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
session_handle: SessionHandleFn,
}

Expand All @@ -129,18 +127,32 @@ impl MetaBuilder {
}

/// Define protocol id
///
/// It is just an internal index of the system that
/// identifies the open/close and message transfer for the specified protocol.
pub fn id(mut self, id: ProtocolId) -> Self {
self.id = id;
self
}

/// Define protocol name
/// Define protocol name, default is "/p2p/protocol_id"
///
/// Used to interact with the remote service to determine whether the protocol is supported.
///
/// If not found, the protocol connection(not session just sub stream) will be closed,
/// and return a `ProtocolSelectError` event.
pub fn name<T: Fn(ProtocolId) -> String + 'static + Send + Sync>(mut self, name: T) -> Self {
self.name = Box::new(name);
self
}

/// Define protocol support versions
/// Define protocol support versions, default is `vec!["0.0.1".to_owned()]`
///
/// Used to interact with the remote service to confirm that both parties
/// open the same version of the protocol.
///
/// If not found, the protocol connection(not session just sub stream) will be closed,
/// and return a `ProtocolSelectError` event.
pub fn support_versions(mut self, versions: Vec<String>) -> Self {
self.support_versions = versions;
self
Expand All @@ -157,20 +169,18 @@ impl MetaBuilder {

/// Define protocol service handle, default is neither
pub fn service_handle<
T: Fn(&ProtocolMeta) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>
+ Send
+ 'static,
T: FnOnce() -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
>(
mut self,
service_handle: T,
) -> Self {
self.service_handle = Box::new(service_handle);
self.service_handle = service_handle();
self
}

/// Define protocol session handle, default is neither
pub fn session_handle<
T: Fn(&ProtocolMeta) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>>
T: Fn(ProtocolId) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>>
+ Send
+ 'static,
>(
Expand Down Expand Up @@ -204,7 +214,7 @@ impl Default for MetaBuilder {
name: Box::new(|id| format!("/p2p/{}", id)),
support_versions: vec!["0.0.1".to_owned()],
codec: Box::new(|| Box::new(LengthDelimitedCodec::new())),
service_handle: Box::new(|_| ProtocolHandle::Neither),
service_handle: ProtocolHandle::Neither,
session_handle: Box::new(|_| ProtocolHandle::Neither),
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/service.rs
Expand Up @@ -419,11 +419,8 @@ where
/// Get the callback handle of the specified protocol
#[inline]
fn proto_handle(&self, session: bool, proto_id: ProtocolId) -> Option<InnerProtocolHandle> {
let handle = self
.protocol_configs
.values()
.filter(|proto| proto.id() == proto_id)
.find_map(|proto| {
let handle = self.protocol_configs.values().find_map(|proto| {
if proto.id() == proto_id {
if session {
match proto.session_handle() {
ProtocolHandle::Callback(handle) | ProtocolHandle::Both(handle) => {
Expand All @@ -439,7 +436,10 @@ where
_ => None,
}
}
});
} else {
None
}
});

if handle.is_none() {
debug!(
Expand Down
13 changes: 9 additions & 4 deletions src/service/config.rs
@@ -1,5 +1,5 @@
use crate::{
builder::{CodecFn, NameFn, ServiceHandleFn, SessionHandleFn},
builder::{CodecFn, NameFn, SessionHandleFn},
traits::{Codec, ServiceProtocol, SessionProtocol},
yamux::config::Config as YamuxConfig,
ProtocolId,
Expand Down Expand Up @@ -41,7 +41,7 @@ pub enum DialProtocol {
/// Define the minimum data required for a custom protocol
pub struct ProtocolMeta {
pub(crate) inner: Arc<Meta>,
pub(crate) service_handle: ServiceHandleFn,
pub(crate) service_handle: ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>>,
pub(crate) session_handle: SessionHandleFn,
}

Expand Down Expand Up @@ -78,9 +78,14 @@ impl ProtocolMeta {
///
/// This function is called when the protocol is first opened in the service
/// and remains in memory until the entire service is closed.
///
/// #### Warning
///
/// Only can be called once, and will return `ProtocolHandle::Neither` or later.
#[inline]
pub fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
(self.service_handle)(&self)
let ptr = self as *const Self as *mut Self;
unsafe { ::std::mem::replace(&mut (*ptr).service_handle, ProtocolHandle::Neither) }
}

/// A session level callback handle for a protocol.
Expand All @@ -95,7 +100,7 @@ impl ProtocolMeta {
/// Correspondingly, whenever the protocol is closed, the corresponding exclusive handle is cleared.
#[inline]
pub fn session_handle(&self) -> ProtocolHandle<Box<dyn SessionProtocol + Send + 'static>> {
(self.session_handle)(&self)
(self.session_handle)(self.inner.id)
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/test_dial.rs
Expand Up @@ -148,14 +148,14 @@ fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<usi

let meta = MetaBuilder::new()
.id(id)
.service_handle(move |meta| {
if meta.id() == 0 {
.service_handle(move || {
if id == 0 {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle {
proto_id: meta.id(),
proto_id: id,
connected_count: 0,
sender: sender.clone(),
sender,
dial_count: 0,
dial_addr: None,
});
Expand Down
6 changes: 3 additions & 3 deletions tests/test_disconnect.rs
Expand Up @@ -50,12 +50,12 @@ impl ServiceProtocol for PHandle {
fn create_meta(id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(move |meta| {
if meta.id() == 0 {
.service_handle(move || {
if id == 0 {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle {
proto_id: meta.id(),
proto_id: id,
connected_count: 0,
});
ProtocolHandle::Callback(handle)
Expand Down
8 changes: 4 additions & 4 deletions tests/test_kill.rs
Expand Up @@ -92,14 +92,14 @@ fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<()>

let meta = MetaBuilder::new()
.id(id)
.service_handle(move |meta| {
if meta.id() == 0 {
.service_handle(move || {
if id == 0 {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle {
proto_id: meta.id(),
proto_id: id,
connected_count: 0,
sender: sender.clone(),
sender,
});
ProtocolHandle::Callback(handle)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_peer_id.rs
Expand Up @@ -70,8 +70,8 @@ fn create_shandle() -> (EmptySHandle, crossbeam_channel::Receiver<usize>) {
fn create_meta(id: ProtocolId) -> ProtocolMeta {
MetaBuilder::new()
.id(id)
.service_handle(|meta| {
if meta.id() == 0 {
.service_handle(move || {
if id == 0 {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle);
Expand Down