Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: new type id #107

Merged
merged 1 commit into from
Apr 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,15 @@ where
}

struct PHandle {
proto_id: ProtocolId,
connected_count: usize,
sender: crossbeam_channel::Sender<Notify>,
}

impl ServiceProtocol for PHandle {
fn init(&mut self, _control: &mut ProtocolContext) {}

fn connected(&mut self, control: ProtocolContextMutRef, _version: &str) {
fn connected(&mut self, _control: ProtocolContextMutRef, _version: &str) {
self.connected_count += 1;
assert_eq!(self.proto_id, control.session.id);
let _ = self.sender.send(Notify::Connected);
}

Expand All @@ -80,11 +78,10 @@ fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<Not
)
})
.service_handle(move || {
if id == 0 {
if id == ProtocolId::default() {
ProtocolHandle::Neither
} else {
let handle = Box::new(PHandle {
proto_id: id,
connected_count: 0,
sender: sender.clone(),
});
Expand All @@ -99,15 +96,15 @@ fn create_meta(id: ProtocolId) -> (ProtocolMeta, crossbeam_channel::Receiver<Not
pub fn init() {
// init secio two peers
START_SECIO.call_once(|| {
let (meta, _receiver) = create_meta(1);
let (meta, _receiver) = create_meta(ProtocolId::new(1));
let mut service = create(true, meta, ());
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let control = service.control().clone();
thread::spawn(|| tokio::run(service.for_each(|_| Ok(()))));

let (meta, client_receiver) = create_meta(1);
let (meta, client_receiver) = create_meta(1.into());
let mut service = create(true, meta, ());
service.dial(listen_addr, DialProtocol::All).unwrap();
thread::spawn(|| tokio::run(service.for_each(|_| Ok(()))));
Expand All @@ -121,15 +118,15 @@ pub fn init() {

// init no secio two peers
START_NO_SECIO.call_once(|| {
let (meta, _receiver) = create_meta(1);
let (meta, _receiver) = create_meta(ProtocolId::new(1));
let mut service = create(false, meta, ());
let listen_addr = service
.listen("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let control = service.control().clone();
thread::spawn(|| tokio::run(service.for_each(|_| Ok(()))));

let (meta, client_receiver) = create_meta(1);
let (meta, client_receiver) = create_meta(ProtocolId::new(1));
let mut service = create(false, meta, ());
service.dial(listen_addr, DialProtocol::All).unwrap();
thread::spawn(|| tokio::run(service.for_each(|_| Ok(()))));
Expand All @@ -144,9 +141,9 @@ pub fn init() {

fn secio_and_send_data(data: &[u8]) {
unsafe {
SECIO_CONTROL
.as_mut()
.map(|control| control.filter_broadcast(TargetSession::All, 1, data.to_vec()));
SECIO_CONTROL.as_mut().map(|control| {
control.filter_broadcast(TargetSession::All, ProtocolId::new(1), data.to_vec())
});
if let Some(rev) = SECIO_RECV.as_ref() {
assert_eq!(rev.recv(), Ok(Notify::Message(bytes::Bytes::from(data))))
}
Expand All @@ -157,7 +154,7 @@ fn no_secio_and_send_data(data: &[u8]) {
unsafe {
NO_SECIO_CONTROL
.as_mut()
.map(|control| control.filter_broadcast(TargetSession::All, 1, data.to_vec()));
.map(|control| control.filter_broadcast(TargetSession::All, 1.into(), data.to_vec()));

if let Some(rev) = NO_SECIO_RECV.as_ref() {
assert_eq!(rev.recv(), Ok(Notify::Message(bytes::Bytes::from(data))))
Expand Down
15 changes: 9 additions & 6 deletions examples/disc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use discovery::{AddressManager, Discovery, DiscoveryProtocol, MisbehaveResult, M

fn main() {
env_logger::init();
let meta = create_meta(1, 0);
let meta = create_meta(1.into(), 0);
let mut service = ServiceBuilder::default()
.insert_protocol(meta)
.forever(true)
Expand All @@ -46,7 +46,7 @@ fn create_meta(id: ProtocolId, start: u16) -> ProtocolMeta {
.map(|port| format!("/ip4/127.0.0.1/tcp/{}", port).parse().unwrap())
.collect();
let mut peers = FnvHashMap::default();
peers.insert(0, (100, addrs));
peers.insert(0.into(), (100, addrs));
let addr_mgr = SimpleAddressManager { peers };
MetaBuilder::default()
.id(id)
Expand Down Expand Up @@ -75,10 +75,10 @@ pub struct SimpleAddressManager {
}

impl AddressManager for SimpleAddressManager {
fn add_new_addr(&mut self, _session_id: SessionId, addr: Multiaddr) {
fn add_new_addr(&mut self, session_id: SessionId, addr: Multiaddr) {
let (_, addrs) = self
.peers
.entry(0)
.entry(session_id)
.or_insert_with(|| (100, HashSet::default()));
addrs.insert(addr);
}
Expand All @@ -89,8 +89,11 @@ impl AddressManager for SimpleAddressManager {
}
}

fn misbehave(&mut self, _session_id: SessionId, _ty: Misbehavior) -> MisbehaveResult {
let (score, _) = self.peers.entry(0).or_insert((100, HashSet::default()));
fn misbehave(&mut self, session_id: SessionId, _ty: Misbehavior) -> MisbehaveResult {
let (score, _) = self
.peers
.entry(session_id)
.or_insert((100, HashSet::default()));
*score -= 20;
if *score < 0 {
MisbehaveResult::Disconnect
Expand Down
2 changes: 1 addition & 1 deletion examples/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
local_listen_addrs: Vec::new(),
};
let protocol = MetaBuilder::default()
.id(1)
.id(1.into())
.service_handle(move || {
ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(callback.clone())))
})
Expand Down
14 changes: 12 additions & 2 deletions examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ fn main() {
if std::env::args().nth(1) == Some("server".to_string()) {
debug!("Starting server ......");
let (sender, receiver) = channel(256);
let protocol = create_meta(1, Duration::from_secs(5), Duration::from_secs(15), sender);
let protocol = create_meta(
1.into(),
Duration::from_secs(5),
Duration::from_secs(15),
sender,
);
let mut service = ServiceBuilder::default()
.insert_protocol(protocol)
.forever(true)
Expand All @@ -35,7 +40,12 @@ fn main() {
} else {
debug!("Starting client ......");
let (sender, receiver) = channel(256);
let protocol = create_meta(1, Duration::from_secs(5), Duration::from_secs(15), sender);
let protocol = create_meta(
1.into(),
Duration::from_secs(5),
Duration::from_secs(15),
sender,
);
let mut service = ServiceBuilder::default()
.insert_protocol(protocol)
.forever(true)
Expand Down
20 changes: 10 additions & 10 deletions examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ struct PHandle {

impl ServiceProtocol for PHandle {
fn init(&mut self, context: &mut ProtocolContext) {
if context.proto_id == 0 {
context.set_service_notify(0, Duration::from_secs(5), 3);
if context.proto_id == 0.into() {
context.set_service_notify(0.into(), Duration::from_secs(5), 3);
}
}

Expand All @@ -58,7 +58,7 @@ impl ServiceProtocol for PHandle {
);
info!("connected sessions are: {:?}", self.connected_session_ids);

if context.proto_id != 1 {
if context.proto_id != 1.into() {
return;
}

Expand All @@ -72,7 +72,7 @@ impl ServiceProtocol for PHandle {
.for_each(move |_| {
let _ = interval_sender.send_message(
session_id,
1,
1.into(),
b"I am a interval message".to_vec(),
);
if let Ok(Async::Ready(_)) = receiver.poll() {
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ServiceHandle for SHandle {
.and_then(move |_| {
let _ = delay_sender.filter_broadcast(
TargetSession::All,
0,
0.into(),
b"I am a delayed message".to_vec(),
);
Ok(())
Expand All @@ -164,8 +164,8 @@ fn main() {

fn create_server() -> Service<SHandle> {
ServiceBuilder::default()
.insert_protocol(create_meta(0))
.insert_protocol(create_meta(1))
.insert_protocol(create_meta(0.into()))
.insert_protocol(create_meta(1.into()))
.key_pair(SecioKeyPair::secp256k1_generated())
.build(SHandle)
}
Expand All @@ -177,9 +177,9 @@ fn create_server() -> Service<SHandle> {
/// Because server only supports 0,1
fn create_client() -> Service<SHandle> {
ServiceBuilder::default()
.insert_protocol(create_meta(0))
.insert_protocol(create_meta(1))
.insert_protocol(create_meta(2))
.insert_protocol(create_meta(0.into()))
.insert_protocol(create_meta(1.into()))
.insert_protocol(create_meta(2.into()))
.key_pair(SecioKeyPair::secp256k1_generated())
.build(SHandle)
}
Expand Down
4 changes: 2 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ impl MetaBuilder {
impl Default for MetaBuilder {
fn default() -> Self {
MetaBuilder {
id: 0,
name: Box::new(|id| format!("/p2p/{}", id)),
id: ProtocolId::new(0),
name: Box::new(|id| format!("/p2p/{}", id.value())),
support_versions: vec!["0.0.1".to_owned()],
codec: Box::new(|| Box::new(LengthDelimitedCodec::new())),
service_handle: Box::new(|| ProtocolHandle::Neither),
Expand Down
53 changes: 50 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,56 @@ pub(crate) mod transports;
/// Some useful functions
pub mod utils;

use std::{fmt, ops::AddAssign};

/// Index of sub/protocol stream
pub type StreamId = usize;
type StreamId = usize;
/// Protocol id
pub type ProtocolId = usize;
#[derive(Debug, Clone, Copy, Hash, Ord, PartialOrd, Eq, PartialEq, Default)]
pub struct ProtocolId(usize);

impl ProtocolId {
/// New a protocol id
pub fn new(id: usize) -> Self {
ProtocolId(id)
}

/// Get inner value
pub fn value(self) -> usize {
self.0
}
}

impl fmt::Display for ProtocolId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ProtocolId({})", self.0)
}
}

impl From<usize> for ProtocolId {
fn from(id: usize) -> Self {
ProtocolId::new(id)
}
}

/// Index of session
pub type SessionId = usize;
#[derive(Debug, Clone, Copy, Hash, Ord, PartialOrd, Eq, PartialEq, Default)]
pub struct SessionId(usize);

impl fmt::Display for SessionId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SessionId({})", self.0)
}
}

impl AddAssign<usize> for SessionId {
fn add_assign(&mut self, rhs: usize) {
self.0 += rhs
}
}

impl From<usize> for SessionId {
fn from(id: usize) -> Self {
SessionId(id)
}
}
2 changes: 1 addition & 1 deletion src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ where
dial_protocols: HashMap::default(),
config,
state: State::new(forever),
next_session: 0,
next_session: SessionId::default(),
write_buf: VecDeque::default(),
read_service_buf: VecDeque::default(),
read_session_buf: VecDeque::default(),
Expand Down
37 changes: 31 additions & 6 deletions tests/test_close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ServiceProtocol for PHandle {
fn init(&mut self, _context: &mut ProtocolContext) {}

fn connected(&mut self, mut context: ProtocolContextMutRef, _version: &str) {
if context.session.ty.is_inbound() && context.proto_id == 1 {
if context.session.ty.is_inbound() && context.proto_id == 1.into() {
self.count += 1;
if self.count >= 4 {
let proto_id = context.proto_id;
Expand Down Expand Up @@ -67,27 +67,52 @@ fn create_meta(id: ProtocolId) -> ProtocolMeta {
fn test_close(secio: bool) {
let mut service_1 = create(
secio,
vec![create_meta(0), create_meta(1), create_meta(2)].into_iter(),
vec![
create_meta(0.into()),
create_meta(1.into()),
create_meta(2.into()),
]
.into_iter(),
(),
);
let mut service_2 = create(
secio,
vec![create_meta(0), create_meta(1), create_meta(2)].into_iter(),
vec![
create_meta(0.into()),
create_meta(1.into()),
create_meta(2.into()),
]
.into_iter(),
(),
);
let mut service_3 = create(
secio,
vec![create_meta(0), create_meta(1), create_meta(2)].into_iter(),
vec![
create_meta(0.into()),
create_meta(1.into()),
create_meta(2.into()),
]
.into_iter(),
(),
);
let mut service_4 = create(
secio,
vec![create_meta(0), create_meta(1), create_meta(2)].into_iter(),
vec![
create_meta(0.into()),
create_meta(1.into()),
create_meta(2.into()),
]
.into_iter(),
(),
);
let mut service_5 = create(
secio,
vec![create_meta(0), create_meta(1), create_meta(2)].into_iter(),
vec![
create_meta(0.into()),
create_meta(1.into()),
create_meta(2.into()),
]
.into_iter(),
(),
);

Expand Down
Loading