Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-codec #63

Merged
merged 1 commit into from
Mar 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use p2p::{
context::{ServiceContext, SessionContext},
secio::SecioKeyPair,
service::{DialProtocol, Service, ServiceControl},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
ProtocolId,
};
use std::{sync::Once, thread};
use tokio::codec::{length_delimited::Builder, LengthDelimitedCodec};
use tokio::codec::length_delimited::Builder;

static START_SECIO: Once = Once::new();
static START_NO_SECIO: Once = Once::new();
Expand All @@ -26,9 +26,9 @@ enum Notify {
Message(bytes::Bytes),
}

pub fn create<T, F>(secio: bool, meta: T, shandle: F) -> Service<F, LengthDelimitedCodec>
pub fn create<T, F>(secio: bool, meta: T, shandle: F) -> Service<F>
where
T: ProtocolMeta<LengthDelimitedCodec> + Send + Sync + 'static,
T: ProtocolMeta + Send + Sync + 'static,
F: ServiceHandle,
{
let builder = ServiceBuilder::default()
Expand Down Expand Up @@ -56,14 +56,16 @@ impl Protocol {
}
}

impl ProtocolMeta<LengthDelimitedCodec> for Protocol {
impl ProtocolMeta for Protocol {
fn id(&self) -> ProtocolId {
self.id
}
fn codec(&self) -> LengthDelimitedCodec {
Builder::new()
.max_frame_length(1024 * 1024 * 20)
.new_codec()
fn codec(&self) -> Box<dyn Codec + Send + 'static> {
Box::new(
Builder::new()
.max_frame_length(1024 * 1024 * 20)
.new_codec(),
)
}

fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
Expand Down
8 changes: 4 additions & 4 deletions examples/disc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tentacle::{
context::{ServiceContext, SessionContext},
multiaddr::{Multiaddr, ToMultiaddr},
service::{DialProtocol, ServiceError, ServiceEvent},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
utils::multiaddr_to_socketaddr,
yamux::session::SessionType,
ProtocolId, SessionId,
Expand Down Expand Up @@ -80,13 +80,13 @@ struct DiscoveryProtocol {
sessions: HashMap<SessionId, SessionData>,
}

impl ProtocolMeta<LengthDelimitedCodec> for DiscoveryProtocolMeta {
impl ProtocolMeta for DiscoveryProtocolMeta {
fn id(&self) -> ProtocolId {
self.id
}

fn codec(&self) -> LengthDelimitedCodec {
LengthDelimitedCodec::new()
fn codec(&self) -> Box<dyn Codec + Send + 'static> {
Box::new(LengthDelimitedCodec::new())
}

fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
Expand Down
12 changes: 6 additions & 6 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tentacle::{
context::{ServiceContext, SessionContext},
secio::SecioKeyPair,
service::{DialProtocol, Service, ServiceError, ServiceEvent},
traits::{ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceHandle, ServiceProtocol},
ProtocolId, SessionId,
};
use tokio::codec::length_delimited::LengthDelimitedCodec;
Expand All @@ -27,12 +27,12 @@ impl Protocol {
}
}

impl ProtocolMeta<LengthDelimitedCodec> for Protocol {
impl ProtocolMeta for Protocol {
fn id(&self) -> ProtocolId {
self.id
}
fn codec(&self) -> LengthDelimitedCodec {
LengthDelimitedCodec::new()
fn codec(&self) -> Box<dyn Codec + Send + 'static> {
Box::new(LengthDelimitedCodec::new())
}

fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
Expand Down Expand Up @@ -174,7 +174,7 @@ fn main() {
}
}

fn create_server() -> Service<SHandle, LengthDelimitedCodec> {
fn create_server() -> Service<SHandle> {
ServiceBuilder::default()
.insert_protocol(Protocol::new(0))
.insert_protocol(Protocol::new(1))
Expand All @@ -187,7 +187,7 @@ fn create_server() -> Service<SHandle, LengthDelimitedCodec> {
/// Proto 2 open failure
///
/// Because server only supports 0,1
fn create_client() -> Service<SHandle, LengthDelimitedCodec> {
fn create_client() -> Service<SHandle> {
ServiceBuilder::default()
.insert_protocol(Protocol::new(0))
.insert_protocol(Protocol::new(1))
Expand Down
8 changes: 4 additions & 4 deletions ping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{debug, error};
use p2p::{
context::{ServiceContext, SessionContext},
secio::PeerId,
traits::{ProtocolHandle, ProtocolMeta, ServiceProtocol},
traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceProtocol},
ProtocolId, SessionId,
};
use std::{
Expand Down Expand Up @@ -59,15 +59,15 @@ where
}
}

impl<S> ProtocolMeta<LengthDelimitedCodec> for PingProtocol<S>
impl<S> ProtocolMeta for PingProtocol<S>
where
S: Sender<Event> + Send + Clone + 'static,
{
fn id(&self) -> ProtocolId {
self.id
}
fn codec(&self) -> LengthDelimitedCodec {
LengthDelimitedCodec::new()
fn codec(&self) -> Box<dyn Codec + Send + 'static> {
Box::new(LengthDelimitedCodec::new())
}

fn service_handle(&self) -> ProtocolHandle<Box<dyn ServiceProtocol + Send + 'static>> {
Expand Down
27 changes: 8 additions & 19 deletions src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::{error, io, time::Duration};
use tokio::codec::{Decoder, Encoder};
use std::time::Duration;

use crate::{
secio::SecioKeyPair,
Expand All @@ -11,26 +10,21 @@ use crate::{
};

/// Builder for Service
pub struct ServiceBuilder<U> {
inner: HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>,
pub struct ServiceBuilder {
inner: HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>,
key_pair: Option<SecioKeyPair>,
forever: bool,
config: ServiceConfig,
}

impl<U> ServiceBuilder<U>
where
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
impl ServiceBuilder {
/// New a default empty builder
pub fn new() -> Self {
Default::default()
}

/// Combine the configuration of this builder with service handle to create a Service.
pub fn build<H>(self, handle: H) -> Service<H, U>
pub fn build<H>(self, handle: H) -> Service<H>
where
H: ServiceHandle,
{
Expand All @@ -46,15 +40,15 @@ where
/// Insert a custom protocol
pub fn insert_protocol<T>(mut self, protocol: T) -> Self
where
T: ProtocolMeta<U> + Send + Sync + 'static,
T: ProtocolMeta + Send + Sync + 'static,
{
if protocol.session_handle().has_event() || protocol.service_handle().has_event() {
self.config.event.insert(protocol.id());
}

self.inner.insert(
protocol.name(),
Box::new(protocol) as Box<dyn ProtocolMeta<_> + Send + Sync>,
Box::new(protocol) as Box<dyn ProtocolMeta + Send + Sync>,
);
self
}
Expand Down Expand Up @@ -107,12 +101,7 @@ where
}
}

impl<U> Default for ServiceBuilder<U>
where
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
impl Default for ServiceBuilder {
fn default() -> Self {
ServiceBuilder {
inner: HashMap::new(),
Expand Down
26 changes: 7 additions & 19 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@ use futures::{
use log::{debug, error, trace, warn};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Arc;
use std::{
error::{self, Error as ErrorTrait},
io,
};
use std::{error::Error as ErrorTrait, io};
use tokio::net::{
tcp::{ConnectFuture, Incoming},
TcpListener, TcpStream,
};
use tokio::{
codec::{Decoder, Encoder},
prelude::{AsyncRead, AsyncWrite, FutureExt},
timer::Timeout,
};
Expand Down Expand Up @@ -59,8 +55,8 @@ pub(crate) enum ProtocolHandle {
}

/// An abstraction of p2p service, currently only supports TCP protocol
pub struct Service<T, U> {
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
pub struct Service<T> {
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>>,

sessions: HashMap<SessionId, SessionContext>,

Expand Down Expand Up @@ -107,16 +103,13 @@ pub struct Service<T, U> {
notify: Option<Task>,
}

impl<T, U> Service<T, U>
impl<T> Service<T>
where
T: ServiceHandle,
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
/// New a Service
pub(crate) fn new(
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>>,
handle: T,
key_pair: Option<SecioKeyPair>,
forever: bool,
Expand Down Expand Up @@ -278,9 +271,7 @@ where
}

/// Get service current protocol configure
pub fn protocol_configs(
&self,
) -> &Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>> {
pub fn protocol_configs(&self) -> &Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>> {
&self.protocol_configs
}

Expand Down Expand Up @@ -1234,12 +1225,9 @@ where
}
}

impl<T, U> Stream for Service<T, U>
impl<T> Stream for Service<T>
where
T: ServiceHandle,
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
type Item = ();
type Error = ();
Expand Down
33 changes: 11 additions & 22 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use log::{debug, error, trace, warn};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::{
error, io,
io,
time::{Duration, Instant},
};
use tokio::prelude::{AsyncRead, AsyncWrite, FutureExt};
use tokio::{
codec::{Decoder, Encoder, Framed, FramedParts, LengthDelimitedCodec},
codec::{Framed, FramedParts, LengthDelimitedCodec},
timer::Delay,
};

Expand Down Expand Up @@ -121,10 +121,10 @@ pub(crate) enum SessionEvent {
}

/// Wrapper for real data streams, such as TCP stream
pub(crate) struct Session<T, U> {
pub(crate) struct Session<T> {
socket: YamuxSession<T>,

protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>>,

id: SessionId,
timeout: Duration,
Expand Down Expand Up @@ -160,19 +160,16 @@ pub(crate) struct Session<T, U> {
notify: Option<Task>,
}

impl<T, U> Session<T, U>
impl<T> Session<T>
where
T: AsyncRead + AsyncWrite,
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
/// New a session
pub fn new(
socket: T,
service_sender: mpsc::Sender<SessionEvent>,
service_receiver: mpsc::Receiver<SessionEvent>,
meta: SessionMeta<U>,
meta: SessionMeta,
) -> Self {
let socket = YamuxSession::new(socket, meta.config, meta.ty);
let (proto_event_sender, proto_event_receiver) = mpsc::channel(256);
Expand Down Expand Up @@ -503,12 +500,9 @@ where
}
}

impl<T, U> Stream for Session<T, U>
impl<T> Stream for Session<T>
where
T: AsyncRead + AsyncWrite,
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
type Item = ();
type Error = io::Error;
Expand Down Expand Up @@ -595,22 +589,17 @@ where
}
}

pub(crate) struct SessionMeta<U> {
pub(crate) struct SessionMeta {
config: Config,
id: SessionId,
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
protocol_configs: Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>>,
ty: SessionType,
// remote_address: ::std::net::SocketAddr,
// remote_public_key: Option<PublicKey>,
timeout: Duration,
}

impl<U> SessionMeta<U>
where
U: Decoder<Item = bytes::BytesMut> + Encoder<Item = bytes::Bytes> + Send + 'static,
<U as Decoder>::Error: error::Error + Into<io::Error>,
<U as Encoder>::Error: error::Error + Into<io::Error>,
{
impl SessionMeta {
pub fn new(id: SessionId, ty: SessionType, timeout: Duration) -> Self {
SessionMeta {
config: Config::default(),
Expand All @@ -623,7 +612,7 @@ where

pub fn protocol(
mut self,
config: Arc<HashMap<String, Box<dyn ProtocolMeta<U> + Send + Sync>>>,
config: Arc<HashMap<String, Box<dyn ProtocolMeta + Send + Sync>>>,
) -> Self {
self.protocol_configs = config;
self
Expand Down
Loading