diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs index b47601ea65d..28abdf6d102 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/protocols/rendezvous/examples/discover.rs @@ -43,7 +43,7 @@ async fn main() { let mut swarm = Swarm::new( development_transport(identity.clone()).await.unwrap(), MyBehaviour { - rendezvous: rendezvous::Behaviour::new(identity.clone(), rendezvous::Config::default()), + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), }, PeerId::from(identity.public()), @@ -79,7 +79,7 @@ async fn main() { ); return; } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::Discovered { + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Discovered { registrations, .. })) => { @@ -115,12 +115,12 @@ async fn main() { #[derive(Debug)] enum MyEvent { - Rendezvous(rendezvous::Event), + Rendezvous(rendezvous::client::Event), Ping(PingEvent), } -impl From for MyEvent { - fn from(event: rendezvous::Event) -> Self { +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { MyEvent::Rendezvous(event) } } @@ -135,6 +135,6 @@ impl From for MyEvent { #[behaviour(event_process = false)] #[behaviour(out_event = "MyEvent")] struct MyBehaviour { - rendezvous: rendezvous::Behaviour, + rendezvous: rendezvous::client::Behaviour, ping: Ping, } diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs index 2e000ac48e2..2c02bca12ef 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/protocols/rendezvous/examples/register.rs @@ -43,7 +43,7 @@ async fn main() { let mut swarm = Swarm::new( development_transport(identity.clone()).await.unwrap(), MyBehaviour { - rendezvous: rendezvous::Behaviour::new(identity.clone(), rendezvous::Config::default()), + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), }, PeerId::from(identity.public()), @@ -81,7 +81,7 @@ async fn main() { log::info!("Connection established with rendezvous point {}", peer_id); } // once `/identify` did its job, we know our external address and can register - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::Registered { + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { namespace, ttl, rendezvous_node, @@ -93,9 +93,9 @@ async fn main() { ttl ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::RegisterFailed( - error, - ))) => { + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed(error), + )) => { log::error!("Failed to register {}", error); return; } @@ -114,12 +114,12 @@ async fn main() { #[derive(Debug)] enum MyEvent { - Rendezvous(rendezvous::Event), + Rendezvous(rendezvous::client::Event), Ping(PingEvent), } -impl From for MyEvent { - fn from(event: rendezvous::Event) -> Self { +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { MyEvent::Rendezvous(event) } } @@ -134,6 +134,6 @@ impl From for MyEvent { #[behaviour(event_process = false)] #[behaviour(out_event = "MyEvent")] struct MyBehaviour { - rendezvous: rendezvous::Behaviour, + rendezvous: rendezvous::client::Behaviour, ping: Ping, } diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs index 84f7be24a99..25450d4ab02 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -47,7 +47,7 @@ async fn main() { "rendezvous-example/1.0.0".to_string(), identity.public(), )), - rendezvous: rendezvous::Behaviour::new(identity.clone(), rendezvous::Config::default()), + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), }, PeerId::from(identity.public()), @@ -79,7 +79,7 @@ async fn main() { None, ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::Registered { + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { namespace, ttl, rendezvous_node, @@ -91,9 +91,9 @@ async fn main() { ttl ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::RegisterFailed( - error, - ))) => { + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed(error), + )) => { log::error!("Failed to register {}", error); return; } @@ -112,13 +112,13 @@ async fn main() { #[derive(Debug)] enum MyEvent { - Rendezvous(rendezvous::Event), + Rendezvous(rendezvous::client::Event), Identify(IdentifyEvent), Ping(PingEvent), } -impl From for MyEvent { - fn from(event: rendezvous::Event) -> Self { +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { MyEvent::Rendezvous(event) } } @@ -140,6 +140,6 @@ impl From for MyEvent { #[behaviour(out_event = "MyEvent")] struct MyBehaviour { identify: Identify, - rendezvous: rendezvous::Behaviour, + rendezvous: rendezvous::client::Behaviour, ping: Ping, } diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs index a161dd757f9..bbbe9b973be 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -37,7 +37,7 @@ async fn main() { let mut swarm = Swarm::new( development_transport(identity.clone()).await.unwrap(), MyBehaviour { - rendezvous: rendezvous::Behaviour::new(identity.clone(), rendezvous::Config::default()), + rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), ping: Ping::default(), }, PeerId::from(identity.public()), @@ -57,20 +57,21 @@ async fn main() { SwarmEvent::ConnectionClosed { peer_id, .. } => { log::info!("Disconnected from {}", peer_id); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::PeerRegistered { - peer, - registration, - })) => { + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::server::Event::PeerRegistered { peer, registration }, + )) => { log::info!( "Peer {} registered for namespace '{}'", peer, registration.namespace ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::Event::DiscoverServed { - enquirer, - registrations, - })) => { + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::server::Event::DiscoverServed { + enquirer, + registrations, + }, + )) => { log::info!( "Served peer {} with {} registrations", enquirer, @@ -86,12 +87,12 @@ async fn main() { #[derive(Debug)] enum MyEvent { - Rendezvous(rendezvous::Event), + Rendezvous(rendezvous::server::Event), Ping(PingEvent), } -impl From for MyEvent { - fn from(event: rendezvous::Event) -> Self { +impl From for MyEvent { + fn from(event: rendezvous::server::Event) -> Self { MyEvent::Rendezvous(event) } } @@ -106,6 +107,6 @@ impl From for MyEvent { #[behaviour(event_process = false)] #[behaviour(out_event = "MyEvent")] struct MyBehaviour { - rendezvous: rendezvous::Behaviour, + rendezvous: rendezvous::server::Behaviour, ping: Ping, } diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs new file mode 100644 index 00000000000..e51f6542907 --- /dev/null +++ b/protocols/rendezvous/src/client.rs @@ -0,0 +1,294 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; +use crate::handler; +use crate::handler::outbound; +use crate::handler::outbound::OpenInfo; +use crate::substream_handler::SubstreamProtocolsHandler; +use libp2p_core::connection::ConnectionId; +use libp2p_core::identity::error::SigningError; +use libp2p_core::identity::Keypair; +use libp2p_core::{Multiaddr, PeerId, PeerRecord}; +use libp2p_swarm::{ + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, +}; +use std::collections::{HashMap, VecDeque}; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct Behaviour { + events: VecDeque>, + keypair: Keypair, + pending_register_requests: Vec<(Namespace, PeerId, Option)>, + + /// Hold addresses of all peers that we have discovered so far. + /// + /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by returning addresses from [`NetworkBehaviour::addresses_of_peer`]. + discovered_peers: HashMap<(PeerId, Namespace), Vec>, +} + +impl Behaviour { + /// Create a new instance of the rendezvous [`NetworkBehaviour`]. + pub fn new(keypair: Keypair) -> Self { + Self { + events: Default::default(), + keypair, + pending_register_requests: vec![], + discovered_peers: Default::default(), + } + } + + /// Register our external addresses in the given namespace with the given rendezvous peer. + /// + /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported + /// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. + pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { + self.pending_register_requests + .push((namespace, rendezvous_node, ttl)); + } + + /// Unregister ourselves from the given namespace with the given rendezvous peer. + pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::UnregisterRequest(namespace), + }, + handler: NotifyHandler::Any, + }); + } + + /// Discover other peers at a given rendezvous peer. + /// + /// If desired, the registrations can be filtered by a namespace. + /// If no namespace is given, peers from all namespaces will be returned. + /// A successfully discovery returns a cookie within [`Event::Discovered`]. + /// Such a cookie can be used to only fetch the _delta_ of registrations since + /// the cookie was acquired. + pub fn discover( + &mut self, + ns: Option, + cookie: Option, + limit: Option, + rendezvous_node: PeerId, + ) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::DiscoverRequest { + namespace: ns, + cookie, + limit, + }, + }, + handler: NotifyHandler::Any, + }); + } +} + +#[derive(Debug, thiserror::Error)] +pub enum RegisterError { + #[error("We don't know about any externally reachable addresses of ours")] + NoExternalAddresses, + #[error("Failed to make a new PeerRecord")] + FailedToMakeRecord(#[from] SigningError), + #[error("Failed to register with Rendezvous node")] + Remote { + rendezvous_node: PeerId, + namespace: Namespace, + error: ErrorCode, + }, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + /// We successfully discovered other nodes with using the contained rendezvous node. + Discovered { + rendezvous_node: PeerId, + registrations: Vec, + cookie: Cookie, + }, + /// We failed to discover other nodes on the contained rendezvous node. + DiscoverFailed { + rendezvous_node: PeerId, + namespace: Option, + error: ErrorCode, + }, + /// We successfully registered with the contained rendezvous node. + Registered { + rendezvous_node: PeerId, + ttl: Ttl, + namespace: Namespace, + }, + /// We failed to register with the contained rendezvous node. + RegisterFailed(RegisterError), +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = + SubstreamProtocolsHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let initial_keep_alive = Duration::from_secs(30); + + SubstreamProtocolsHandler::new_outbound_only(initial_keep_alive) + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.discovered_peers + .iter() + .filter_map(|((candidate, _), addresses)| (candidate == peer).then(|| addresses)) + .flatten() + .cloned() + .collect() + } + + fn inject_connected(&mut self, _: &PeerId) {} + + fn inject_disconnected(&mut self, _: &PeerId) {} + + fn inject_event(&mut self, peer_id: PeerId, _: ConnectionId, event: handler::OutboundOutEvent) { + let new_events = match event { + handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message), + handler::OutboundOutEvent::OutboundEvent { message, .. } => { + handle_outbound_event(message, peer_id, &mut self.discovered_peers) + } + handler::OutboundOutEvent::InboundError { .. } => { + // TODO: log errors and close connection? + vec![] + } + handler::OutboundOutEvent::OutboundError { .. } => { + // TODO: log errors and close connection? + vec![] + } + }; + + self.events.extend(new_events); + } + + fn poll( + &mut self, + _: &mut Context<'_>, + poll_params: &mut impl PollParameters, + ) -> Poll< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() { + // Update our external addresses based on the Swarm's current knowledge. + // It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside. + let external_addresses = poll_params + .external_addresses() + .map(|r| r.addr) + .collect::>(); + + if external_addresses.is_empty() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + Event::RegisterFailed(RegisterError::NoExternalAddresses), + )); + } + + let action = match PeerRecord::new(self.keypair.clone(), external_addresses) { + Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::RegisterRequest(NewRegistration { + namespace, + record: peer_record, + ttl, + }), + }, + handler: NotifyHandler::Any, + }, + Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( + RegisterError::FailedToMakeRecord(signing_error), + )), + }; + + return Poll::Ready(action); + } + + Poll::Pending + } +} + +fn handle_outbound_event( + event: outbound::OutEvent, + peer_id: PeerId, + discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, +) -> Vec> { + match event { + outbound::OutEvent::Registered { namespace, ttl } => { + vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { + rendezvous_node: peer_id, + ttl, + namespace, + })] + } + outbound::OutEvent::RegisterFailed(namespace, error) => { + vec![NetworkBehaviourAction::GenerateEvent( + Event::RegisterFailed(RegisterError::Remote { + rendezvous_node: peer_id, + namespace, + error, + }), + )] + } + outbound::OutEvent::Discovered { + registrations, + cookie, + } => { + discovered_peers.extend(registrations.iter().map(|registration| { + let peer_id = registration.record.peer_id(); + let namespace = registration.namespace.clone(); + + let addresses = registration.record.addresses().to_vec(); + + ((peer_id, namespace), addresses) + })); + + vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { + rendezvous_node: peer_id, + registrations, + cookie, + })] + } + outbound::OutEvent::DiscoverFailed { namespace, error } => { + vec![NetworkBehaviourAction::GenerateEvent( + Event::DiscoverFailed { + rendezvous_node: peer_id, + namespace, + error, + }, + )] + } + } +} diff --git a/protocols/rendezvous/src/handler.rs b/protocols/rendezvous/src/handler.rs index 55399424b29..b4883825e25 100644 --- a/protocols/rendezvous/src/handler.rs +++ b/protocols/rendezvous/src/handler.rs @@ -22,6 +22,8 @@ use crate::codec; use crate::codec::Message; use void::Void; +const PROTOCOL_IDENT: &[u8] = b"/rendezvous/1.0.0"; + pub mod inbound; pub mod outbound; @@ -38,6 +40,9 @@ pub enum Error { UnexpectedEndOfStream, } -pub type InEvent = crate::substream_handler::InEvent; -pub type OutEvent = - crate::substream_handler::OutEvent; +pub type OutboundInEvent = crate::substream_handler::InEvent; +pub type OutboundOutEvent = + crate::substream_handler::OutEvent; + +pub type InboundInEvent = crate::substream_handler::InEvent<(), inbound::InEvent, Void>; +pub type InboundOutEvent = crate::substream_handler::OutEvent; diff --git a/protocols/rendezvous/src/handler/inbound.rs b/protocols/rendezvous/src/handler/inbound.rs index 59952bf2526..8a18f366c68 100644 --- a/protocols/rendezvous/src/handler/inbound.rs +++ b/protocols/rendezvous/src/handler/inbound.rs @@ -22,10 +22,11 @@ use crate::codec::{ Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, RendezvousCodec, Ttl, }; use crate::handler::Error; -use crate::substream_handler::{Next, SubstreamHandler}; +use crate::handler::PROTOCOL_IDENT; +use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler}; use asynchronous_codec::Framed; use futures::{SinkExt, StreamExt}; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; use std::fmt; use std::task::{Context, Poll}; @@ -83,6 +84,12 @@ impl SubstreamHandler for Stream { type Error = Error; type OpenInfo = (); + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) + } + fn new(substream: NegotiatedSubstream, _: Self::OpenInfo) -> Self { Stream::PendingRead(Framed::new(substream, RendezvousCodec::default())) } diff --git a/protocols/rendezvous/src/handler/outbound.rs b/protocols/rendezvous/src/handler/outbound.rs index 09f3bca0975..ab06040ca19 100644 --- a/protocols/rendezvous/src/handler/outbound.rs +++ b/protocols/rendezvous/src/handler/outbound.rs @@ -20,11 +20,12 @@ use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec}; use crate::handler::Error; -use crate::substream_handler::{FutureSubstream, Next, SubstreamHandler}; +use crate::handler::PROTOCOL_IDENT; +use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler}; use crate::{ErrorCode, Namespace, Registration, Ttl}; use asynchronous_codec::Framed; use futures::{SinkExt, TryFutureExt, TryStreamExt}; -use libp2p_swarm::NegotiatedSubstream; +use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; use std::task::Context; use void::Void; @@ -36,6 +37,12 @@ impl SubstreamHandler for Stream { type Error = Error; type OpenInfo = OpenInfo; + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) + } + fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self { let mut stream = Framed::new(substream, RendezvousCodec::default()); let sent_message = match info { diff --git a/protocols/rendezvous/src/lib.rs b/protocols/rendezvous/src/lib.rs index e86a620e859..87c88434db3 100644 --- a/protocols/rendezvous/src/lib.rs +++ b/protocols/rendezvous/src/lib.rs @@ -18,10 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub use self::behaviour::{Behaviour, Config, Event, RegisterError}; pub use self::codec::{ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl}; -mod behaviour; mod codec; mod handler; mod substream_handler; @@ -40,3 +38,6 @@ pub const MIN_TTL: Ttl = 60 * 60 * 2; /// /// . pub const MAX_TTL: Ttl = 60 * 60 * 72; + +pub mod client; +pub mod server; diff --git a/protocols/rendezvous/src/behaviour.rs b/protocols/rendezvous/src/server.rs similarity index 74% rename from protocols/rendezvous/src/behaviour.rs rename to protocols/rendezvous/src/server.rs index 3ec45e164f6..17d6f22c01c 100644 --- a/protocols/rendezvous/src/behaviour.rs +++ b/protocols/rendezvous/src/server.rs @@ -19,8 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; -use crate::handler::outbound::OpenInfo; -use crate::handler::{inbound, outbound}; +use crate::handler::inbound; use crate::substream_handler::{InboundSubstreamId, SubstreamProtocolsHandler}; use crate::{handler, MAX_TTL, MIN_TTL}; use bimap::BiMap; @@ -29,9 +28,7 @@ use futures::ready; use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use libp2p_core::connection::ConnectionId; -use libp2p_core::identity::error::SigningError; -use libp2p_core::identity::Keypair; -use libp2p_core::{Multiaddr, PeerId, PeerRecord}; +use libp2p_core::PeerId; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, }; @@ -39,17 +36,11 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; use std::time::Duration; +use void::Void; pub struct Behaviour { - events: VecDeque>, + events: VecDeque>, registrations: Registrations, - keypair: Keypair, - pending_register_requests: Vec<(Namespace, PeerId, Option)>, - - /// Hold addresses of all peers that we have discovered so far. - /// - /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by returning addresses from [`NetworkBehaviour::addresses_of_peer`]. - discovered_peers: HashMap<(PeerId, Namespace), Vec>, } pub struct Config { @@ -80,103 +71,17 @@ impl Default for Config { impl Behaviour { /// Create a new instance of the rendezvous [`NetworkBehaviour`]. - pub fn new(keypair: Keypair, config: Config) -> Self { + pub fn new(config: Config) -> Self { Self { events: Default::default(), registrations: Registrations::with_config(config), - keypair, - pending_register_requests: vec![], - discovered_peers: Default::default(), } } - - /// Register our external addresses in the given namespace with the given rendezvous peer. - /// - /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported - /// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. - pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { - self.pending_register_requests - .push((namespace, rendezvous_node, ttl)); - } - - /// Unregister ourselves from the given namespace with the given rendezvous peer. - pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::InEvent::NewSubstream { - open_info: OpenInfo::UnregisterRequest(namespace), - }, - handler: NotifyHandler::Any, - }); - } - - /// Discover other peers at a given rendezvous peer. - /// - /// If desired, the registrations can be filtered by a namespace. - /// If no namespace is given, peers from all namespaces will be returned. - /// A successfully discovery returns a cookie within [`Event::Discovered`]. - /// Such a cookie can be used to only fetch the _delta_ of registrations since - /// the cookie was acquired. - pub fn discover( - &mut self, - ns: Option, - cookie: Option, - limit: Option, - rendezvous_node: PeerId, - ) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::InEvent::NewSubstream { - open_info: OpenInfo::DiscoverRequest { - namespace: ns, - cookie, - limit, - }, - }, - handler: NotifyHandler::Any, - }); - } -} - -#[derive(Debug, thiserror::Error)] -pub enum RegisterError { - #[error("We don't know about any externally reachable addresses of ours")] - NoExternalAddresses, - #[error("Failed to make a new PeerRecord")] - FailedToMakeRecord(#[from] SigningError), - #[error("Failed to register with Rendezvous node")] - Remote { - rendezvous_node: PeerId, - namespace: Namespace, - error: ErrorCode, - }, } #[derive(Debug)] #[allow(clippy::large_enum_variant)] pub enum Event { - /// We successfully discovered other nodes with using the contained rendezvous node. - Discovered { - rendezvous_node: PeerId, - registrations: Vec, - cookie: Cookie, - }, - /// We failed to discover other nodes on the contained rendezvous node. - DiscoverFailed { - rendezvous_node: PeerId, - namespace: Option, - error: ErrorCode, - }, - /// We successfully registered with the contained rendezvous node. - Registered { - rendezvous_node: PeerId, - ttl: Ttl, - namespace: Namespace, - }, - /// We failed to register with the contained rendezvous node. - RegisterFailed(RegisterError), /// We successfully served a discover request from a peer. DiscoverServed { enquirer: PeerId, @@ -202,47 +107,31 @@ pub enum Event { } impl NetworkBehaviour for Behaviour { - type ProtocolsHandler = - SubstreamProtocolsHandler; + type ProtocolsHandler = SubstreamProtocolsHandler; type OutEvent = Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { let initial_keep_alive = Duration::from_secs(30); - SubstreamProtocolsHandler::new(b"/rendezvous/1.0.0", initial_keep_alive) - } - - fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { - self.discovered_peers - .iter() - .filter_map(|((candidate, _), addresses)| (candidate == peer).then(|| addresses)) - .flatten() - .cloned() - .collect() + SubstreamProtocolsHandler::new_inbound_only(initial_keep_alive) } - fn inject_connected(&mut self, _: &PeerId) {} - - fn inject_disconnected(&mut self, _: &PeerId) {} - fn inject_event( &mut self, peer_id: PeerId, connection: ConnectionId, - event: handler::OutEvent, + event: handler::InboundOutEvent, ) { let new_events = match event { - handler::OutEvent::InboundEvent { id, message } => { + handler::InboundOutEvent::InboundEvent { id, message } => { handle_inbound_event(message, peer_id, connection, id, &mut self.registrations) } - handler::OutEvent::OutboundEvent { message, .. } => { - handle_outbound_event(message, peer_id, &mut self.discovered_peers) - } - handler::OutEvent::InboundError { .. } => { + handler::InboundOutEvent::OutboundEvent { message, .. } => void::unreachable(message), + handler::InboundOutEvent::InboundError { .. } => { // TODO: log errors and close connection? vec![] } - handler::OutEvent::OutboundError { .. } => { + handler::InboundOutEvent::OutboundError { .. } => { // TODO: log errors and close connection? vec![] } @@ -254,7 +143,7 @@ impl NetworkBehaviour for Behaviour { fn poll( &mut self, cx: &mut Context<'_>, - poll_params: &mut impl PollParameters, + _: &mut impl PollParameters, ) -> Poll< NetworkBehaviourAction< ::InEvent, @@ -271,40 +160,6 @@ impl NetworkBehaviour for Behaviour { return Poll::Ready(event); } - if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() { - // Update our external addresses based on the Swarm's current knowledge. - // It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside. - let external_addresses = poll_params - .external_addresses() - .map(|r| r.addr) - .collect::>(); - - if external_addresses.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::NoExternalAddresses), - )); - } - - let action = match PeerRecord::new(self.keypair.clone(), external_addresses) { - Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::InEvent::NewSubstream { - open_info: OpenInfo::RegisterRequest(NewRegistration { - namespace, - record: peer_record, - ttl, - }), - }, - handler: NotifyHandler::Any, - }, - Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( - RegisterError::FailedToMakeRecord(signing_error), - )), - }; - - return Poll::Ready(action); - } - Poll::Pending } } @@ -315,7 +170,7 @@ fn handle_inbound_event( connection: ConnectionId, id: InboundSubstreamId, registrations: &mut Registrations, -) -> Vec> { +) -> Vec> { match event { // bad registration inbound::OutEvent::RegistrationRequested(registration) @@ -327,7 +182,7 @@ fn handle_inbound_event( NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), - event: handler::InEvent::NotifyInboundSubstream { + event: handler::InboundInEvent::NotifyInboundSubstream { id, message: inbound::InEvent::DeclineRegisterRequest(error), }, @@ -348,7 +203,7 @@ fn handle_inbound_event( NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), - event: handler::InEvent::NotifyInboundSubstream { + event: handler::InboundInEvent::NotifyInboundSubstream { id, message: inbound::InEvent::RegisterResponse { ttl: registration.ttl, @@ -368,7 +223,7 @@ fn handle_inbound_event( NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), - event: handler::InEvent::NotifyInboundSubstream { + event: handler::InboundInEvent::NotifyInboundSubstream { id, message: inbound::InEvent::DeclineRegisterRequest(error), }, @@ -394,7 +249,7 @@ fn handle_inbound_event( NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), - event: handler::InEvent::NotifyInboundSubstream { + event: handler::InboundInEvent::NotifyInboundSubstream { id, message: inbound::InEvent::DiscoverResponse { discovered: discovered.clone(), @@ -415,7 +270,7 @@ fn handle_inbound_event( NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), - event: handler::InEvent::NotifyInboundSubstream { + event: handler::InboundInEvent::NotifyInboundSubstream { id, message: inbound::InEvent::DeclineDiscoverRequest(error), }, @@ -440,59 +295,6 @@ fn handle_inbound_event( } } -fn handle_outbound_event( - event: outbound::OutEvent, - peer_id: PeerId, - discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, -) -> Vec> { - match event { - outbound::OutEvent::Registered { namespace, ttl } => { - vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { - rendezvous_node: peer_id, - ttl, - namespace, - })] - } - outbound::OutEvent::RegisterFailed(namespace, error) => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::Remote { - rendezvous_node: peer_id, - namespace, - error, - }), - )] - } - outbound::OutEvent::Discovered { - registrations, - cookie, - } => { - discovered_peers.extend(registrations.iter().map(|registration| { - let peer_id = registration.record.peer_id(); - let namespace = registration.namespace.clone(); - - let addresses = registration.record.addresses().to_vec(); - - ((peer_id, namespace), addresses) - })); - - vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { - rendezvous_node: peer_id, - registrations, - cookie, - })] - } - outbound::OutEvent::DiscoverFailed { namespace, error } => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::DiscoverFailed { - rendezvous_node: peer_id, - namespace, - error, - }, - )] - } - } -} - #[derive(Debug, Eq, PartialEq, Hash, Copy, Clone)] struct RegistrationId(u64); @@ -699,7 +501,7 @@ mod tests { use std::option::Option::None; use std::time::SystemTime; - use libp2p_core::identity; + use libp2p_core::{identity, PeerRecord}; use super::*; diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs index f3a683f15f8..efd7956b13c 100644 --- a/protocols/rendezvous/src/substream_handler.rs +++ b/protocols/rendezvous/src/substream_handler.rs @@ -25,10 +25,9 @@ //! //! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well. -use futures::future::{BoxFuture, Fuse, FusedFuture}; +use futures::future::{self, BoxFuture, Fuse, FusedFuture}; use futures::FutureExt; -use libp2p_core::upgrade::FromFnUpgrade; -use libp2p_core::Endpoint; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; use libp2p_swarm::{ KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, @@ -36,7 +35,7 @@ use libp2p_swarm::{ }; use std::collections::{HashMap, VecDeque}; use std::fmt; -use std::future::{Future, Ready}; +use std::future::Future; use std::hash::Hash; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; @@ -49,6 +48,8 @@ pub trait SubstreamHandler: Sized { type Error; type OpenInfo; + fn upgrade(open_info: Self::OpenInfo) + -> SubstreamProtocol; fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self; fn inject_event(self, event: Self::InEvent) -> Self; fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error>; @@ -125,15 +126,55 @@ impl fmt::Display for OutboundSubstreamId { } } -type ProtocolUpgradeFn = - Box Ready> + Send>; -type Protocol = FromFnUpgrade<&'static [u8], ProtocolUpgradeFn>; +pub struct PassthroughProtocol { + ident: Option<&'static [u8]>, +} + +impl PassthroughProtocol { + pub fn new(ident: &'static [u8]) -> Self { + Self { ident: Some(ident) } + } +} + +impl UpgradeInfo for PassthroughProtocol { + type Info = &'static [u8]; + type InfoIter = std::option::IntoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.ident.into_iter() + } +} + +impl InboundUpgrade for PassthroughProtocol { + type Output = C; + type Error = Void; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + match self.ident { + Some(_) => future::ready(Ok(socket)).boxed(), + None => future::pending().boxed(), + } + } +} + +impl OutboundUpgrade for PassthroughProtocol { + type Output = C; + type Error = Void; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + match self.ident { + Some(_) => future::ready(Ok(socket)).boxed(), + None => future::pending().boxed(), + } + } +} /// An implementation of [`ProtocolsHandler`] that delegates to individual [`SubstreamHandler`]s. pub struct SubstreamProtocolsHandler { inbound_substreams: HashMap, outbound_substreams: HashMap, - protocol: &'static [u8], next_inbound_substream_id: InboundSubstreamId, next_outbound_substream_id: OutboundSubstreamId, @@ -145,11 +186,40 @@ pub struct SubstreamProtocolsHandler SubstreamProtocolsHandler { - pub fn new(protocol: &'static [u8], initial_keep_alive: Duration) -> Self { + pub fn new(initial_keep_alive: Duration) -> Self { + Self { + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), + next_inbound_substream_id: InboundSubstreamId(0), + next_outbound_substream_id: OutboundSubstreamId(0), + new_substreams: Default::default(), + initial_keep_alive_deadline: Instant::now() + initial_keep_alive, + } + } +} + +impl + SubstreamProtocolsHandler +{ + pub fn new_outbound_only(initial_keep_alive: Duration) -> Self { + Self { + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), + next_inbound_substream_id: InboundSubstreamId(0), + next_outbound_substream_id: OutboundSubstreamId(0), + new_substreams: Default::default(), + initial_keep_alive_deadline: Instant::now() + initial_keep_alive, + } + } +} + +impl + SubstreamProtocolsHandler +{ + pub fn new_inbound_only(initial_keep_alive: Duration) -> Self { Self { inbound_substreams: Default::default(), outbound_substreams: Default::default(), - protocol, next_inbound_substream_id: InboundSubstreamId(0), next_outbound_substream_id: OutboundSubstreamId(0), new_substreams: Default::default(), @@ -286,19 +356,13 @@ where type InEvent = InEvent; type OutEvent = OutEvent; type Error = Void; - type InboundProtocol = Protocol; - type OutboundProtocol = Protocol; + type InboundProtocol = PassthroughProtocol; + type OutboundProtocol = PassthroughProtocol; type InboundOpenInfo = (); type OutboundOpenInfo = TOutboundOpenInfo; fn listen_protocol(&self) -> SubstreamProtocol { - SubstreamProtocol::new( - libp2p_core::upgrade::from_fn( - self.protocol, - Box::new(|socket, _| std::future::ready(Ok(socket))), - ), - (), - ) + TInboundSubstreamHandler::upgrade(()) } fn inject_fully_negotiated_inbound( @@ -391,13 +455,7 @@ where > { if let Some(open_info) = self.new_substreams.pop_front() { return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new( - libp2p_core::upgrade::from_fn( - self.protocol, - Box::new(|socket, _| std::future::ready(Ok(socket))), - ), - open_info, - ), + protocol: TOutboundSubstreamHandler::upgrade(open_info), }); } @@ -466,3 +524,28 @@ impl FutureSubstream { } } } + +impl SubstreamHandler for void::Void { + type InEvent = void::Void; + type OutEvent = void::Void; + type Error = void::Void; + type OpenInfo = (); + + fn new(_: NegotiatedSubstream, _: Self::OpenInfo) -> Self { + unreachable!("we should never yield a substream") + } + + fn inject_event(self, event: Self::InEvent) -> Self { + void::unreachable(event) + } + + fn advance(self, _: &mut Context<'_>) -> Result, Self::Error> { + void::unreachable(self) + } + + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info) + } +} diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs index 43ac231c806..8d359bb1b5c 100644 --- a/protocols/rendezvous/tests/rendezvous.rs +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -123,8 +123,8 @@ async fn given_invalid_ttl_then_unsuccessful_registration() { match await_events_or_timeout(&mut test.robert, &mut test.alice).await { ( - SwarmEvent::Behaviour(rendezvous::Event::PeerNotRegistered { .. }), - SwarmEvent::Behaviour(rendezvous::Event::RegisterFailed(rendezvous::RegisterError::Remote {error , ..})), + SwarmEvent::Behaviour(rendezvous::server::Event::PeerNotRegistered { .. }), + SwarmEvent::Behaviour(rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {error , ..})), ) => { assert_eq!(error, rendezvous::ErrorCode::InvalidTtl); } @@ -142,6 +142,7 @@ async fn discover_allows_for_dial_by_peer_id() { mut alice, mut bob, mut robert, + charlie: _charlie, .. } = RendezvousTest::setup().await; let roberts_peer_id = *robert.local_peer_id(); @@ -163,8 +164,8 @@ async fn discover_allows_for_dial_by_peer_id() { match await_events_or_timeout(&mut alice, &mut bob).await { ( - SwarmEvent::Behaviour(rendezvous::Event::Registered { .. }), - SwarmEvent::Behaviour(rendezvous::Event::Discovered { .. }), + SwarmEvent::Behaviour(rendezvous::client::Event::Registered { .. }), + SwarmEvent::Behaviour(rendezvous::client::Event::Discovered { .. }), ) => {} _ => panic!("bad event combination emitted"), }; @@ -211,8 +212,8 @@ async fn eve_cannot_register() { match await_events_or_timeout(&mut test.robert, &mut test.eve).await { ( - SwarmEvent::Behaviour(rendezvous::Event::PeerNotRegistered { .. }), - SwarmEvent::Behaviour(rendezvous::Event::RegisterFailed(rendezvous::RegisterError::Remote { error: err_code , ..})), + SwarmEvent::Behaviour(rendezvous::server::Event::PeerNotRegistered { .. }), + SwarmEvent::Behaviour(rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote { error: err_code , ..})), ) => { assert_eq!(err_code, rendezvous::ErrorCode::NotAuthorized); } @@ -223,31 +224,71 @@ async fn eve_cannot_register() { } } +// test if charlie can operate as client and server simultaneously +#[tokio::test] +async fn can_combine_client_and_server() { + let _ = env_logger::try_init(); + let mut test = RendezvousTest::setup().await; + + let namespace = rendezvous::Namespace::from_static("some-namespace"); + + test.charlie.behaviour_mut().client.register( + namespace.clone(), + *test.robert.local_peer_id(), + None, + ); + + match await_events_or_timeout(&mut test.robert, &mut test.charlie).await { + ( + SwarmEvent::Behaviour(rendezvous::server::Event::PeerRegistered { .. }), + SwarmEvent::Behaviour(CombinedEvent::Client(rendezvous::client::Event::Registered { .. })), + ) => { + } + (rendezvous_swarm_event, registration_swarm_event) => panic!( + "Received unexpected event, rendezvous swarm emitted {:?} and registration swarm emitted {:?}", + rendezvous_swarm_event, registration_swarm_event + ), + } + + test.alice + .behaviour_mut() + .register(namespace, *test.charlie.local_peer_id(), None); + + match await_events_or_timeout(&mut test.alice, &mut test.charlie).await { + ( + SwarmEvent::Behaviour(rendezvous::client::Event::Registered { .. }), + SwarmEvent::Behaviour(CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. })), + ) => { + } + (rendezvous_swarm_event, registration_swarm_event) => panic!( + "Received unexpected event, rendezvous swarm emitted {:?} and registration swarm emitted {:?}", + rendezvous_swarm_event, registration_swarm_event + ), + } +} + /// Holds a network of nodes that is used to test certain rendezvous functionality. /// /// In all cases, Alice would like to connect to Bob with Robert acting as a rendezvous point. /// Eve is an evil actor that tries to act maliciously. struct RendezvousTest { - pub alice: Swarm, - pub bob: Swarm, - pub eve: Swarm, - pub robert: Swarm, + pub alice: Swarm, + pub bob: Swarm, + pub charlie: Swarm, + pub eve: Swarm, + pub robert: Swarm, } impl RendezvousTest { pub async fn setup() -> Self { - let mut alice = new_swarm(|_, identity| { - rendezvous::Behaviour::new(identity, rendezvous::Config::default()) - }); + let mut alice = new_swarm(|_, identity| rendezvous::client::Behaviour::new(identity)); alice.listen_on_random_memory_address().await; - let mut bob = new_swarm(|_, identity| { - rendezvous::Behaviour::new(identity, rendezvous::Config::default()) - }); + let mut bob = new_swarm(|_, identity| rendezvous::client::Behaviour::new(identity)); bob.listen_on_random_memory_address().await; - let mut robert = new_swarm(|_, identity| { - rendezvous::Behaviour::new(identity, rendezvous::Config::default()) + let mut robert = new_swarm(|_, _| { + rendezvous::server::Behaviour::new(rendezvous::server::Config::default()) }); robert.listen_on_random_memory_address().await; @@ -256,21 +297,28 @@ impl RendezvousTest { // Due to the type-safe API of the `Rendezvous` behaviour and `PeerRecord`, we actually cannot construct a bad `PeerRecord` (i.e. one that is claims to be someone else). // As such, the best we can do is hand eve a completely different keypair from what she is using to authenticate her connection. let someone_else = identity::Keypair::generate_ed25519(); - let mut eve = new_swarm(move |_, _| { - rendezvous::Behaviour::new(someone_else, rendezvous::Config::default()) - }); + let mut eve = new_swarm(move |_, _| rendezvous::client::Behaviour::new(someone_else)); eve.listen_on_random_memory_address().await; eve }; + let mut charlie = new_swarm(|_, identity| CombinedBehaviour { + client: rendezvous::client::Behaviour::new(identity), + server: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), + }); + charlie.listen_on_random_memory_address().await; + alice.block_on_connection(&mut robert).await; bob.block_on_connection(&mut robert).await; + charlie.block_on_connection(&mut robert).await; eve.block_on_connection(&mut robert).await; + alice.block_on_connection(&mut charlie).await; Self { alice, bob, + charlie, eve, robert, } @@ -283,8 +331,8 @@ impl RendezvousTest { ) { match await_events_or_timeout(&mut self.robert, &mut self.alice).await { ( - SwarmEvent::Behaviour(rendezvous::Event::PeerRegistered { peer, registration }), - SwarmEvent::Behaviour(rendezvous::Event::Registered { rendezvous_node, ttl, namespace: register_node_namespace }), + SwarmEvent::Behaviour(rendezvous::server::Event::PeerRegistered { peer, registration }), + SwarmEvent::Behaviour(rendezvous::client::Event::Registered { rendezvous_node, ttl, namespace: register_node_namespace }), ) => { assert_eq!(&peer, self.alice.local_peer_id()); assert_eq!(&rendezvous_node, self.robert.local_peer_id()); @@ -307,8 +355,10 @@ impl RendezvousTest { ) { match await_events_or_timeout(&mut self.robert, &mut self.bob).await { ( - SwarmEvent::Behaviour(rendezvous::Event::DiscoverServed { .. }), - SwarmEvent::Behaviour(rendezvous::Event::Discovered { registrations, .. }), + SwarmEvent::Behaviour(rendezvous::server::Event::DiscoverServed { .. }), + SwarmEvent::Behaviour(rendezvous::client::Event::Discovered { + registrations, .. + }), ) => match registrations.as_slice() { [rendezvous::Registration { namespace, @@ -325,3 +375,28 @@ impl RendezvousTest { } } } + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(event_process = false, out_event = "CombinedEvent")] +struct CombinedBehaviour { + client: rendezvous::client::Behaviour, + server: rendezvous::server::Behaviour, +} + +#[derive(Debug)] +enum CombinedEvent { + Client(rendezvous::client::Event), + Server(rendezvous::server::Event), +} + +impl From for CombinedEvent { + fn from(v: rendezvous::server::Event) -> Self { + Self::Server(v) + } +} + +impl From for CombinedEvent { + fn from(v: rendezvous::client::Event) -> Self { + Self::Client(v) + } +}