Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support event output #58

Merged
merged 4 commits into from
Mar 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ tokio = "0.1.14"
futures = "0.1.25"
crossbeam-channel = "0.3.6"
env_logger = "0.6.0"
bytes = "0.4"
21 changes: 13 additions & 8 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use p2p::{
context::{ServiceContext, SessionContext},
secio::SecioKeyPair,
service::{Service, ServiceControl},
traits::{ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
ProtocolId,
};
use std::{sync::Once, thread};
Expand All @@ -23,7 +23,7 @@ static mut NO_SECIO_RECV: Option<crossbeam_channel::Receiver<Notify>> = None;
#[derive(Debug, PartialEq)]
enum Notify {
Connected,
Message(Vec<u8>),
Message(bytes::Bytes),
}

pub fn create<T, F>(secio: bool, meta: T, shandle: F) -> Service<F, LengthDelimitedCodec>
Expand Down Expand Up @@ -66,16 +66,16 @@ impl ProtocolMeta<LengthDelimitedCodec> for Protocol {
.new_codec()
}

fn service_handle(&self) -> Option<Box<dyn ServiceProtocol + Send + 'static>> {
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
if self.id == 0 {
None
ProtocolHandle::Empty
} else {
let handle = Box::new(PHandle {
proto_id: self.id,
connected_count: 0,
sender: self.sender.clone(),
});
Some(handle)
ProtocolHandle::Callback(handle)
}
}
}
Expand Down Expand Up @@ -104,7 +104,12 @@ impl ServiceProtocol for PHandle {
self.connected_count -= 1;
}

fn received(&mut self, _env: &mut ServiceContext, _session: &SessionContext, data: Vec<u8>) {
fn received(
&mut self,
_env: &mut ServiceContext,
_session: &SessionContext,
data: bytes::Bytes,
) {
let _ = self.sender.send(Notify::Message(data));
}
}
Expand Down Expand Up @@ -167,7 +172,7 @@ fn secio_and_send_data(data: &[u8]) {
.as_mut()
.map(|control| control.filter_broadcast(None, 1, data.to_vec()));
if let Some(rev) = SECIO_RECV.as_ref() {
assert_eq!(rev.recv(), Ok(Notify::Message(data.to_vec())))
assert_eq!(rev.recv(), Ok(Notify::Message(bytes::Bytes::from(data))))
}
}
}
Expand All @@ -179,7 +184,7 @@ fn no_secio_and_send_data(data: &[u8]) {
.map(|control| control.filter_broadcast(None, 1, data.to_vec()));

if let Some(rev) = NO_SECIO_RECV.as_ref() {
assert_eq!(rev.recv(), Ok(Notify::Message(data.to_vec())))
assert_eq!(rev.recv(), Ok(Notify::Message(bytes::Bytes::from(data))))
}
}
}
Expand Down
17 changes: 11 additions & 6 deletions examples/disc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tentacle::{
context::{ServiceContext, SessionContext},
multiaddr::{Multiaddr, ToMultiaddr},
service::{ServiceError, ServiceEvent},
traits::{ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
utils::multiaddr_to_socketaddr,
yamux::session::SessionType,
ProtocolId, SessionId,
Expand Down Expand Up @@ -86,7 +86,7 @@ impl ProtocolMeta<LengthDelimitedCodec> for DiscoveryProtocolMeta {
LengthDelimitedCodec::new()
}

fn service_handle(&self) -> Option<Box<dyn ServiceProtocol + Send + 'static>> {
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
let discovery = Discovery::new(self.addr_mgr.clone());
let discovery_handle = discovery.handle();
let handle = Box::new(DiscoveryProtocol {
Expand All @@ -97,7 +97,7 @@ impl ProtocolMeta<LengthDelimitedCodec> for DiscoveryProtocolMeta {
discovery_senders: FnvHashMap::default(),
sessions: HashMap::default(),
});
Some(handle)
ProtocolHandle::Callback(handle)
}
}

Expand Down Expand Up @@ -172,14 +172,19 @@ impl ServiceProtocol for DiscoveryProtocol {
debug!("protocol [discovery] close on session [{}]", session.id);
}

fn received(&mut self, _control: &mut ServiceContext, session: &SessionContext, data: Vec<u8>) {
fn received(
&mut self,
_control: &mut ServiceContext,
session: &SessionContext,
data: bytes::Bytes,
) {
debug!("[received message]: length={}", data.len());
self.sessions
.get_mut(&session.id)
.unwrap()
.push_data(data.clone());
.push_data(data.to_vec());
if let Some(ref mut sender) = self.discovery_senders.get_mut(&session.id) {
if let Err(err) = sender.try_send(data) {
if let Err(err) = sender.try_send(data.to_vec()) {
if err.is_full() {
warn!("channel is full");
} else if err.is_disconnected() {
Expand Down
15 changes: 10 additions & 5 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tentacle::{
context::{ServiceContext, SessionContext},
secio::SecioKeyPair,
service::{Service, ServiceError, ServiceEvent},
traits::{ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
ProtocolId, SessionId,
};
use tokio::codec::length_delimited::LengthDelimitedCodec;
Expand All @@ -35,7 +35,7 @@ impl ProtocolMeta<LengthDelimitedCodec> for Protocol {
LengthDelimitedCodec::new()
}

fn service_handle(&self) -> Option<Box<dyn ServiceProtocol + Send + 'static>> {
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
// All protocol use the same handle.
// This is just an example. In the actual environment, this should be a different handle.
let handle = Box::new(PHandle {
Expand All @@ -44,7 +44,7 @@ impl ProtocolMeta<LengthDelimitedCodec> for Protocol {
connected_session_ids: Vec::new(),
clear_handle: HashMap::new(),
});
Some(handle)
ProtocolHandle::Callback(handle)
}
}

Expand Down Expand Up @@ -117,13 +117,18 @@ impl ServiceProtocol for PHandle {
);
}

fn received(&mut self, _env: &mut ServiceContext, session: &SessionContext, data: Vec<u8>) {
fn received(
&mut self,
_env: &mut ServiceContext,
session: &SessionContext,
data: bytes::Bytes,
) {
self.count += 1;
info!(
"received from [{}]: proto [{}] data {:?}, message count: {}",
session.id,
self.proto_id,
str::from_utf8(&data).unwrap(),
str::from_utf8(data.as_ref()).unwrap(),
self.count
);
}
Expand Down
1 change: 1 addition & 0 deletions ping/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ log = "0.4"
flatbuffers = "0.5.0"
fnv = "1.0.6"
generic-channel = "0.2.0"
bytes = "0.4"
15 changes: 10 additions & 5 deletions ping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{debug, error};
use p2p::{
context::{ServiceContext, SessionContext},
secio::PeerId,
traits::{ProtocolMeta, ServiceProtocol},
traits::{ProtocolHandle, ProtocolMeta, ServiceProtocol},
ProtocolId, SessionId,
};
use std::{
Expand Down Expand Up @@ -70,15 +70,15 @@ where
LengthDelimitedCodec::new()
}

fn service_handle(&self) -> Option<Box<dyn ServiceProtocol + Send + 'static>> {
fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
let handle = Box::new(PingHandler {
proto_id: self.id,
interval: self.interval,
timeout: self.timeout,
connected_session_ids: Default::default(),
event_sender: self.event_sender.clone(),
});
Some(handle)
ProtocolHandle::Callback(handle)
}
}

Expand Down Expand Up @@ -164,13 +164,18 @@ where
);
}

fn received(&mut self, control: &mut ServiceContext, session: &SessionContext, data: Vec<u8>) {
fn received(
&mut self,
control: &mut ServiceContext,
session: &SessionContext,
data: bytes::Bytes,
) {
if let Some(peer_id) = self
.connected_session_ids
.get(&session.id)
.map(|ps| ps.peer_id.clone())
{
let msg = get_root::<PingMessage>(&data);
let msg = get_root::<PingMessage>(data.as_ref());
match msg.payload_type() {
PingPayload::Ping => {
let ping_msg = msg.payload_as_ping().unwrap();
Expand Down
29 changes: 14 additions & 15 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::codec::{Decoder, Encoder};

use crate::{
secio::SecioKeyPair,
service::Service,
service::{config::ServiceConfig, Service},
traits::{ProtocolMeta, ServiceHandle},
yamux::Config,
};
Expand All @@ -15,9 +15,7 @@ pub struct ServiceBuilder<U> {
inner: HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>,
key_pair: Option<SecioKeyPair>,
forever: bool,
timeout: Duration,
yamux_config: Config,
max_frame_length: usize,
config: ServiceConfig,
}

impl<U> ServiceBuilder<U>
Expand All @@ -41,17 +39,19 @@ where
handle,
self.key_pair,
self.forever,
self.timeout,
self.config,
)
.max_frame_length(self.max_frame_length)
.yamux_config(self.yamux_config)
}

/// Insert a custom protocol
pub fn insert_protocol<T>(mut self, protocol: T) -> Self
where
T: ProtocolMeta<U> + Send + Sync + 'static,
{
if protocol.session_handle().has_event() || protocol.service_handle().has_event() {
self.config.event.insert(protocol.id());
}

self.inner.insert(
protocol.name(),
Box::new(protocol) as Box<dyn ProtocolMeta<_> + Send + Sync>,
Expand All @@ -78,31 +78,32 @@ where
///
/// Default 10 second
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self.config.timeout = timeout;
self
}

/// Yamux config for service
///
/// Panic when max_frame_length < yamux_max_window_size
pub fn yamux_config(mut self, config: Config) -> Self {
assert!(self.max_frame_length as u32 >= config.max_stream_window_size);
self.yamux_config = config;
assert!(self.config.max_frame_length as u32 >= config.max_stream_window_size);
self.config.yamux_config = config;
self
}

/// Secio max frame length
///
/// Panic when max_frame_length < yamux_max_window_size
pub fn max_frame_length(mut self, size: usize) -> Self {
assert!(size as u32 >= self.yamux_config.max_stream_window_size);
self.max_frame_length = size;
assert!(size as u32 >= self.config.yamux_config.max_stream_window_size);
self.config.max_frame_length = size;
self
}

/// Clear all protocols
pub fn clear(&mut self) {
self.inner.clear();
self.config.event.clear();
}
}

Expand All @@ -117,9 +118,7 @@ where
inner: HashMap::new(),
key_pair: None,
forever: false,
timeout: Duration::from_secs(10),
yamux_config: Config::default(),
max_frame_length: 1024 * 1024 * 8,
config: ServiceConfig::default(),
}
}
}
5 changes: 2 additions & 3 deletions src/protocol_handle_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ServiceProtocolStream {
Received { id, data } => {
if let Some(session) = self.sessions.get_mut(&id) {
self.handle
.received(&mut self.service_context, session, data.to_vec());
.received(&mut self.service_context, session, data);
}
}
Notify { token } => {
Expand Down Expand Up @@ -189,8 +189,7 @@ impl SessionProtocolStream {
self.close();
}
Received { data } => {
self.handle
.received(&mut self.service_context, data.to_vec());
self.handle.received(&mut self.service_context, data);
}
Notify { token } => {
self.handle.notify(&mut self.service_context, token);
Expand Down
Loading