diff --git a/CHANGELOG.md b/CHANGELOG.md index c9284c09eb8..c28e4afe1e8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [`libp2p-ping` CHANGELOG](protocols/ping/CHANGELOG.md) - [`libp2p-relay` CHANGELOG](protocols/relay/CHANGELOG.md) - [`libp2p-request-response` CHANGELOG](protocols/request-response/CHANGELOG.md) +- [`libp2p-rendezvous` CHANGELOG](protocols/rendezvous/CHANGELOG.md) ## Transport Protocols & Upgrades diff --git a/Cargo.toml b/Cargo.toml index 3764ced864d..05b5ccc8860 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ default = [ "pnet", "relay", "request-response", + "rendezvous", "secp256k1", "tcp-async-io", "uds", @@ -48,6 +49,7 @@ plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] relay = ["libp2p-relay"] request-response = ["libp2p-request-response"] +rendezvous = ["libp2p-rendezvous"] tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] uds = ["libp2p-uds"] @@ -78,6 +80,7 @@ libp2p-ping = { version = "0.31.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.30.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true } +libp2p-rendezvous = { version = "0.1.0", path = "protocols/rendezvous", optional = true } libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.31.0", path = "swarm" } libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" } @@ -115,6 +118,7 @@ members = [ "muxers/yamux", "protocols/floodsub", "protocols/gossipsub", + "protocols/rendezvous", "protocols/identify", "protocols/kad", "protocols/mdns", diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index f5227e7c2c4..1c4ace6835f 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -33,6 +33,9 @@ - Remove deprecated functions `upgrade::write_one`, `upgrade::write_with_len_prefix` and `upgrade::read_one` (see [PR 2213]). +- Add `SignedEnvelope` and `PeerRecord` according to [RFC0002] and [RFC0003] + (see [PR 2107]). + [PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145 [PR 2213]: https://github.com/libp2p/rust-libp2p/pull/2213 [PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142 @@ -40,6 +43,9 @@ [PR 2183]: https://github.com/libp2p/rust-libp2p/pull/2183 [PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191 [PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195 +[PR 2107]: https://github.com/libp2p/rust-libp2p/pull/2107 +[RFC0002]: https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md +[RFC0003]: https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md # 0.29.0 [2021-07-12] diff --git a/core/build.rs b/core/build.rs index 9692abd9c81..f0c09f93abf 100644 --- a/core/build.rs +++ b/core/build.rs @@ -19,5 +19,13 @@ // DEALINGS IN THE SOFTWARE. fn main() { - prost_build::compile_protos(&["src/keys.proto"], &["src"]).unwrap(); + prost_build::compile_protos( + &[ + "src/keys.proto", + "src/envelope.proto", + "src/peer_record.proto", + ], + &["src"], + ) + .unwrap(); } diff --git a/core/src/envelope.proto b/core/src/envelope.proto new file mode 100644 index 00000000000..9ab3e6fd256 --- /dev/null +++ b/core/src/envelope.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package envelope_proto; + +import "keys.proto"; + +// Envelope encloses a signed payload produced by a peer, along with the public +// key of the keypair it was signed with so that it can be statelessly validated +// by the receiver. +// +// The payload is prefixed with a byte string that determines the type, so it +// can be deserialized deterministically. Often, this byte string is a +// multicodec. +message Envelope { + // public_key is the public key of the keypair the enclosed payload was + // signed with. + keys_proto.PublicKey public_key = 1; + + // payload_type encodes the type of payload, so that it can be deserialized + // deterministically. + bytes payload_type = 2; + + // payload is the actual payload carried inside this envelope. + bytes payload = 3; + + // signature is the signature produced by the private key corresponding to + // the enclosed public key, over the payload, prefixing a domain string for + // additional security. + bytes signature = 5; +} diff --git a/core/src/identity.rs b/core/src/identity.rs index 8c3c83db16e..76ed4c39fd4 100644 --- a/core/src/identity.rs +++ b/core/src/identity.rs @@ -42,6 +42,7 @@ pub mod error; use self::error::*; use crate::{keys_proto, PeerId}; +use std::convert::{TryFrom, TryInto}; /// Identity keypair of a node. /// @@ -205,6 +206,7 @@ impl PublicKey { /// that the signature has been produced by the corresponding /// private key (authenticity), and that the message has not been /// tampered with (integrity). + #[must_use] pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool { use PublicKey::*; match self { @@ -221,7 +223,35 @@ impl PublicKey { pub fn to_protobuf_encoding(&self) -> Vec { use prost::Message; - let public_key = match self { + let public_key = keys_proto::PublicKey::from(self); + + let mut buf = Vec::with_capacity(public_key.encoded_len()); + public_key + .encode(&mut buf) + .expect("Vec provides capacity as needed"); + buf + } + + /// Decode a public key from a protobuf structure, e.g. read from storage + /// or received from another node. + pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { + use prost::Message; + + let pubkey = keys_proto::PublicKey::decode(bytes) + .map_err(|e| DecodingError::new("Protobuf").source(e))?; + + pubkey.try_into() + } + + /// Convert the `PublicKey` into the corresponding `PeerId`. + pub fn to_peer_id(&self) -> PeerId { + self.into() + } +} + +impl From<&PublicKey> for keys_proto::PublicKey { + fn from(key: &PublicKey) -> Self { + match key { PublicKey::Ed25519(key) => keys_proto::PublicKey { r#type: keys_proto::KeyType::Ed25519 as i32, data: key.encode().to_vec(), @@ -236,24 +266,14 @@ impl PublicKey { r#type: keys_proto::KeyType::Secp256k1 as i32, data: key.encode().to_vec(), }, - }; - - let mut buf = Vec::with_capacity(public_key.encoded_len()); - public_key - .encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + } } +} - /// Decode a public key from a protobuf structure, e.g. read from storage - /// or received from another node. - pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { - use prost::Message; - - #[allow(unused_mut)] // Due to conditional compilation. - let mut pubkey = keys_proto::PublicKey::decode(bytes) - .map_err(|e| DecodingError::new("Protobuf").source(e))?; +impl TryFrom for PublicKey { + type Error = DecodingError; + fn try_from(pubkey: keys_proto::PublicKey) -> Result { let key_type = keys_proto::KeyType::from_i32(pubkey.r#type) .ok_or_else(|| DecodingError::new(format!("unknown key type: {}", pubkey.r#type)))?; @@ -281,11 +301,6 @@ impl PublicKey { } } } - - /// Convert the `PublicKey` into the corresponding `PeerId`. - pub fn to_peer_id(&self) -> PeerId { - self.into() - } } #[cfg(test)] diff --git a/core/src/lib.rs b/core/src/lib.rs index 60727c52062..1a7d841e342 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,6 +39,14 @@ mod keys_proto { include!(concat!(env!("OUT_DIR"), "/keys_proto.rs")); } +mod envelope_proto { + include!(concat!(env!("OUT_DIR"), "/envelope_proto.rs")); +} + +mod peer_record_proto { + include!(concat!(env!("OUT_DIR"), "/peer_record_proto.rs")); +} + /// Multi-address re-export. pub use multiaddr; pub type Negotiated = multistream_select::Negotiated; @@ -51,6 +59,8 @@ pub mod either; pub mod identity; pub mod muxing; pub mod network; +pub mod peer_record; +pub mod signed_envelope; pub mod transport; pub mod upgrade; @@ -61,6 +71,8 @@ pub use multihash; pub use muxing::StreamMuxer; pub use network::Network; pub use peer_id::PeerId; +pub use peer_record::PeerRecord; +pub use signed_envelope::SignedEnvelope; pub use translation::address_translation; pub use transport::Transport; pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, UpgradeInfo}; diff --git a/core/src/network/event.rs b/core/src/network/event.rs index cea5bbddc21..091cbf3d228 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -109,7 +109,7 @@ where /// A connection may close if /// /// * it encounters an error, which includes the connection being - /// closed by the remote. In this case `error` is `Some`. + /// closed by the remote. In this case `error` is `ome`. /// * it was actively closed by [`EstablishedConnection::start_close`], /// i.e. a successful, orderly close. In this case `error` is `None`. /// * it was actively closed by [`super::peer::ConnectedPeer::disconnect`] or diff --git a/core/src/peer_record.proto b/core/src/peer_record.proto new file mode 100644 index 00000000000..69bb345e02f --- /dev/null +++ b/core/src/peer_record.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package peer_record_proto; + +// PeerRecord messages contain information that is useful to share with other peers. +// Currently, a PeerRecord contains the public listen addresses for a peer, but this +// is expected to expand to include other information in the future. +// +// PeerRecords are designed to be serialized to bytes and placed inside of +// SignedEnvelopes before sharing with other peers. +message PeerRecord { + + // AddressInfo is a wrapper around a binary multiaddr. It is defined as a + // separate message to allow us to add per-address metadata in the future. + message AddressInfo { + bytes multiaddr = 1; + } + + // peer_id contains a libp2p peer id in its binary representation. + bytes peer_id = 1; + + // seq contains a monotonically-increasing sequence counter to order PeerRecords in time. + uint64 seq = 2; + + // addresses is a list of public listen addresses for the peer. + repeated AddressInfo addresses = 3; +} diff --git a/core/src/peer_record.rs b/core/src/peer_record.rs new file mode 100644 index 00000000000..6b7759213c9 --- /dev/null +++ b/core/src/peer_record.rs @@ -0,0 +1,199 @@ +use crate::identity::error::SigningError; +use crate::identity::Keypair; +use crate::signed_envelope::SignedEnvelope; +use crate::{peer_record_proto, signed_envelope, Multiaddr, PeerId}; +use std::convert::TryInto; +use std::fmt; +use std::time::SystemTime; + +const PAYLOAD_TYPE: &str = "/libp2p/routing-state-record"; +const DOMAIN_SEP: &str = "libp2p-routing-state"; + +/// Represents a peer routing record. +/// +/// Peer records are designed to be distributable and carry a signature by being wrapped in a signed envelope. +/// For more information see RFC0003 of the libp2p specifications: +#[derive(Debug, PartialEq, Clone)] +pub struct PeerRecord { + peer_id: PeerId, + seq: u64, + addresses: Vec, + + /// A signed envelope representing this [`PeerRecord`]. + /// + /// If this [`PeerRecord`] was constructed from a [`SignedEnvelope`], this is the original instance. + envelope: SignedEnvelope, +} + +impl PeerRecord { + /// Attempt to re-construct a [`PeerRecord`] from a [`SignedEnvelope`]. + /// + /// If this function succeeds, the [`SignedEnvelope`] contained a peer record with a valid signature and can hence be considered authenticated. + pub fn from_signed_envelope(envelope: SignedEnvelope) -> Result { + use prost::Message; + + let payload = envelope.payload(String::from(DOMAIN_SEP), PAYLOAD_TYPE.as_bytes())?; + let record = peer_record_proto::PeerRecord::decode(payload)?; + + let peer_id = PeerId::from_bytes(&record.peer_id)?; + let seq = record.seq; + let addresses = record + .addresses + .into_iter() + .map(|a| a.multiaddr.try_into()) + .collect::, _>>()?; + + Ok(Self { + peer_id, + seq, + addresses, + envelope, + }) + } + + /// Construct a new [`PeerRecord`] by authenticating the provided addresses with the given key. + /// + /// This is the same key that is used for authenticating every libp2p connection of your application, i.e. what you use when setting up your [`crate::transport::Transport`]. + pub fn new(key: Keypair, addresses: Vec) -> Result { + use prost::Message; + + let seq = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("now() is never before UNIX_EPOCH") + .as_secs(); + let peer_id = key.public().to_peer_id(); + + let payload = { + let record = peer_record_proto::PeerRecord { + peer_id: peer_id.to_bytes(), + seq, + addresses: addresses + .iter() + .map(|m| peer_record_proto::peer_record::AddressInfo { + multiaddr: m.to_vec(), + }) + .collect(), + }; + + let mut buf = Vec::with_capacity(record.encoded_len()); + record + .encode(&mut buf) + .expect("Vec provides capacity as needed"); + buf + }; + + let envelope = SignedEnvelope::new( + key, + String::from(DOMAIN_SEP), + PAYLOAD_TYPE.as_bytes().to_vec(), + payload, + )?; + + Ok(Self { + peer_id, + seq, + addresses, + envelope, + }) + } + + pub fn to_signed_envelope(&self) -> SignedEnvelope { + self.envelope.clone() + } + + pub fn into_signed_envelope(self) -> SignedEnvelope { + self.envelope + } + + pub fn peer_id(&self) -> PeerId { + self.peer_id + } + + pub fn seq(&self) -> u64 { + self.seq + } + + pub fn addresses(&self) -> &[Multiaddr] { + self.addresses.as_slice() + } +} + +#[derive(Debug)] +pub enum FromEnvelopeError { + /// Failed to extract the payload from the envelope. + BadPayload(signed_envelope::ReadPayloadError), + /// Failed to decode the provided bytes as a [`PeerRecord`]. + InvalidPeerRecord(prost::DecodeError), + /// Failed to decode the peer ID. + InvalidPeerId(multihash::Error), + /// Failed to decode a multi-address. + InvalidMultiaddr(multiaddr::Error), +} + +impl From for FromEnvelopeError { + fn from(e: signed_envelope::ReadPayloadError) -> Self { + Self::BadPayload(e) + } +} + +impl From for FromEnvelopeError { + fn from(e: prost::DecodeError) -> Self { + Self::InvalidPeerRecord(e) + } +} + +impl From for FromEnvelopeError { + fn from(e: multihash::Error) -> Self { + Self::InvalidPeerId(e) + } +} + +impl From for FromEnvelopeError { + fn from(e: multiaddr::Error) -> Self { + Self::InvalidMultiaddr(e) + } +} + +impl fmt::Display for FromEnvelopeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::BadPayload(_) => write!(f, "Failed to extract payload from envelope"), + Self::InvalidPeerRecord(_) => { + write!(f, "Failed to decode bytes as PeerRecord") + } + Self::InvalidPeerId(_) => write!(f, "Failed to decode bytes as PeerId"), + Self::InvalidMultiaddr(_) => { + write!(f, "Failed to decode bytes as MultiAddress") + } + } + } +} + +impl std::error::Error for FromEnvelopeError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::InvalidPeerRecord(inner) => Some(inner), + Self::InvalidPeerId(inner) => Some(inner), + Self::InvalidMultiaddr(inner) => Some(inner), + Self::BadPayload(inner) => Some(inner), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const HOME: &str = "/ip4/127.0.0.1/tcp/1337"; + + #[test] + fn roundtrip_envelope() { + let record = + PeerRecord::new(Keypair::generate_ed25519(), vec![HOME.parse().unwrap()]).unwrap(); + + let envelope = record.to_signed_envelope(); + let reconstructed = PeerRecord::from_signed_envelope(envelope).unwrap(); + + assert_eq!(reconstructed, record) + } +} diff --git a/core/src/signed_envelope.rs b/core/src/signed_envelope.rs new file mode 100644 index 00000000000..afa00a23746 --- /dev/null +++ b/core/src/signed_envelope.rs @@ -0,0 +1,203 @@ +use crate::identity::error::SigningError; +use crate::identity::Keypair; +use crate::{identity, PublicKey}; +use std::convert::TryInto; +use std::fmt; +use unsigned_varint::encode::usize_buffer; + +/// A signed envelope contains an arbitrary byte string payload, a signature of the payload, and the public key that can be used to verify the signature. +/// +/// For more details see libp2p RFC0002: +#[derive(Debug, Clone, PartialEq)] +pub struct SignedEnvelope { + key: PublicKey, + payload_type: Vec, + payload: Vec, + signature: Vec, +} + +impl SignedEnvelope { + /// Constructs a new [`SignedEnvelope`]. + pub fn new( + key: Keypair, + domain_separation: String, + payload_type: Vec, + payload: Vec, + ) -> Result { + let buffer = signature_payload(domain_separation, &payload_type, &payload); + + let signature = key.sign(&buffer)?; + + Ok(Self { + key: key.public(), + payload_type, + payload, + signature, + }) + } + + /// Verify this [`SignedEnvelope`] against the provided domain-separation string. + #[must_use] + pub fn verify(&self, domain_separation: String) -> bool { + let buffer = signature_payload(domain_separation, &self.payload_type, &self.payload); + + self.key.verify(&buffer, &self.signature) + } + + /// Extract the payload of this [`SignedEnvelope`]. + /// + /// You must provide the correct domain-separation string and expected payload type in order to get the payload. + /// This guards against accidental mis-use of the payload where the signature was created for a different purpose or payload type. + pub fn payload( + &self, + domain_separation: String, + expected_payload_type: &[u8], + ) -> Result<&[u8], ReadPayloadError> { + if &self.payload_type != expected_payload_type { + return Err(ReadPayloadError::UnexpectedPayloadType { + expected: expected_payload_type.to_vec(), + got: self.payload_type.clone(), + }); + } + + if !self.verify(domain_separation) { + return Err(ReadPayloadError::InvalidSignature); + } + + Ok(&self.payload) + } + + /// Encode this [`SignedEnvelope`] using the protobuf encoding specified in the RFC. + pub fn into_protobuf_encoding(self) -> Vec { + use prost::Message; + + let envelope = crate::envelope_proto::Envelope { + public_key: Some((&self.key).into()), + payload_type: self.payload_type, + payload: self.payload, + signature: self.signature, + }; + + let mut buf = Vec::with_capacity(envelope.encoded_len()); + envelope + .encode(&mut buf) + .expect("Vec provides capacity as needed"); + + buf + } + + /// Decode a [`SignedEnvelope`] using the protobuf encoding specified in the RFC. + pub fn from_protobuf_encoding(bytes: &[u8]) -> Result { + use prost::Message; + + let envelope = crate::envelope_proto::Envelope::decode(bytes)?; + + Ok(Self { + key: envelope + .public_key + .ok_or(DecodingError::MissingPublicKey)? + .try_into()?, + payload_type: envelope.payload_type, + payload: envelope.payload, + signature: envelope.signature, + }) + } +} + +fn signature_payload(domain_separation: String, payload_type: &[u8], payload: &[u8]) -> Vec { + let mut domain_sep_length_buffer = usize_buffer(); + let domain_sep_length = + unsigned_varint::encode::usize(domain_separation.len(), &mut domain_sep_length_buffer); + + let mut payload_type_length_buffer = usize_buffer(); + let payload_type_length = + unsigned_varint::encode::usize(payload_type.len(), &mut payload_type_length_buffer); + + let mut payload_length_buffer = usize_buffer(); + let payload_length = unsigned_varint::encode::usize(payload.len(), &mut payload_length_buffer); + + let mut buffer = Vec::with_capacity( + domain_sep_length.len() + + domain_separation.len() + + payload_type_length.len() + + payload_type.len() + + payload_length.len() + + payload.len(), + ); + + buffer.extend_from_slice(domain_sep_length); + buffer.extend_from_slice(domain_separation.as_bytes()); + buffer.extend_from_slice(payload_type_length); + buffer.extend_from_slice(payload_type); + buffer.extend_from_slice(payload_length); + buffer.extend_from_slice(payload); + + buffer +} + +/// Errors that occur whilst decoding a [`SignedEnvelope`] from its byte representation. +#[derive(Debug)] +pub enum DecodingError { + /// Decoding the provided bytes as a signed envelope failed. + InvalidEnvelope(prost::DecodeError), + /// The public key in the envelope could not be converted to our internal public key type. + InvalidPublicKey(identity::error::DecodingError), + /// The public key in the envelope could not be converted to our internal public key type. + MissingPublicKey, +} + +impl From for DecodingError { + fn from(e: prost::DecodeError) -> Self { + Self::InvalidEnvelope(e) + } +} + +impl From for DecodingError { + fn from(e: identity::error::DecodingError) -> Self { + Self::InvalidPublicKey(e) + } +} + +impl fmt::Display for DecodingError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidEnvelope(_) => write!(f, "Failed to decode envelope"), + Self::InvalidPublicKey(_) => write!(f, "Failed to convert public key"), + Self::MissingPublicKey => write!(f, "Public key is missing from protobuf struct"), + } + } +} + +impl std::error::Error for DecodingError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::InvalidEnvelope(inner) => Some(inner), + Self::InvalidPublicKey(inner) => Some(inner), + Self::MissingPublicKey => None, + } + } +} + +/// Errors that occur whilst extracting the payload of a [`SignedEnvelope`]. +#[derive(Debug)] +pub enum ReadPayloadError { + /// The signature on the signed envelope does not verify with the provided domain separation string. + InvalidSignature, + /// The payload contained in the envelope is not of the expected type. + UnexpectedPayloadType { expected: Vec, got: Vec }, +} + +impl fmt::Display for ReadPayloadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidSignature => write!(f, "Invalid signature"), + Self::UnexpectedPayloadType { expected, got } => write!( + f, + "Unexpected payload type, expected {:?} but got {:?}", + expected, got + ), + } + } +} + +impl std::error::Error for ReadPayloadError {} diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index 80da197a965..e475b15763b 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -1104,7 +1104,6 @@ mod tests { use async_std::task; use asynchronous_codec::{Decoder, Encoder}; use bytes::BytesMut; - use futures::prelude::*; use quickcheck::*; use rand::prelude::*; use std::collections::HashSet; diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 9604e660e9f..2df49bf54f3 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -258,7 +258,7 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result { #[cfg(test)] mod tests { use super::*; - use futures::{channel::oneshot, prelude::*}; + use futures::channel::oneshot; use libp2p_core::{ identity, upgrade::{self, apply_inbound, apply_outbound}, diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md new file mode 100644 index 00000000000..ab400cdaf64 --- /dev/null +++ b/protocols/rendezvous/CHANGELOG.md @@ -0,0 +1,3 @@ +# 0.1.0 [unreleased] + +- Initial release. diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml new file mode 100644 index 00000000000..1ebfae9d3b7 --- /dev/null +++ b/protocols/rendezvous/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "libp2p-rendezvous" +edition = "2018" +description = "Rendezvous protocol for libp2p" +version = "0.1.0" +authors = ["The COMIT guys "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +asynchronous-codec = "0.6" +libp2p-core = { version = "0.30.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.31.0", path = "../../swarm" } +prost = "0.7" +void = "1" +log = "0.4" +futures = { version = "0.3", default-features = false, features = ["std"] } +thiserror = "1" +unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } +bimap = "0.6.1" +sha2 = "0.9" +rand = "0.8" +wasm-timer = "0.2" + +[dev-dependencies] +libp2p = { path = "../.." } +rand = "0.8" +async-std = { version = "1", features = ["attributes"] } +env_logger = "0.8" +async-trait = "0.1" +tokio = { version = "1", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } + +[build-dependencies] +prost-build = "0.7" diff --git a/protocols/rendezvous/build.rs b/protocols/rendezvous/build.rs new file mode 100644 index 00000000000..fa982fa3d90 --- /dev/null +++ b/protocols/rendezvous/build.rs @@ -0,0 +1,3 @@ +fn main() { + prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap(); +} diff --git a/protocols/rendezvous/examples/discover.rs b/protocols/rendezvous/examples/discover.rs new file mode 100644 index 00000000000..28abdf6d102 --- /dev/null +++ b/protocols/rendezvous/examples/discover.rs @@ -0,0 +1,140 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::StreamExt; +use libp2p::core::identity; +use libp2p::core::PeerId; +use libp2p::multiaddr::Protocol; +use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; +use libp2p::swarm::Swarm; +use libp2p::swarm::SwarmEvent; +use libp2p::{development_transport, rendezvous, Multiaddr}; +use std::time::Duration; + +const NAMESPACE: &str = "rendezvous"; + +#[async_std::main] +async fn main() { + env_logger::init(); + + let identity = identity::Keypair::generate_ed25519(); + let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); + let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" + .parse() + .unwrap(); + + let mut swarm = Swarm::new( + development_transport(identity.clone()).await.unwrap(), + MyBehaviour { + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), + }, + PeerId::from(identity.public()), + ); + + log::info!("Local peer id: {}", swarm.local_peer_id()); + + let _ = swarm.dial_addr(rendezvous_point_address.clone()); + + while let Some(event) = swarm.next().await { + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => { + log::info!( + "Connected to rendezvous point, discovering nodes in '{}' namespace ...", + NAMESPACE + ); + + swarm.behaviour_mut().rendezvous.discover( + Some(rendezvous::Namespace::new(NAMESPACE.to_string()).unwrap()), + None, + None, + rendezvous_point, + ); + } + SwarmEvent::UnreachableAddr { error, address, .. } + | SwarmEvent::UnknownPeerUnreachableAddr { error, address, .. } + if address == rendezvous_point_address => + { + log::error!( + "Failed to connect to rendezvous point at {}: {}", + address, + error + ); + return; + } + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Discovered { + registrations, + .. + })) => { + for registration in registrations { + for address in registration.record.addresses() { + let peer = registration.record.peer_id(); + log::info!("Discovered peer {} at {}", peer, address); + + let p2p_suffix = Protocol::P2p(*peer.as_ref()); + let address_with_p2p = + if !address.ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) { + address.clone().with(p2p_suffix) + } else { + address.clone() + }; + + swarm.dial_addr(address_with_p2p).unwrap() + } + } + } + SwarmEvent::Behaviour(MyEvent::Ping(PingEvent { + peer, + result: Ok(PingSuccess::Ping { rtt }), + })) if peer != rendezvous_point => { + log::info!("Ping to {} is {}ms", peer, rtt.as_millis()) + } + other => { + log::debug!("Unhandled {:?}", other); + } + } + } +} + +#[derive(Debug)] +enum MyEvent { + Rendezvous(rendezvous::client::Event), + Ping(PingEvent), +} + +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { + MyEvent::Rendezvous(event) + } +} + +impl From for MyEvent { + fn from(event: PingEvent) -> Self { + MyEvent::Ping(event) + } +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(event_process = false)] +#[behaviour(out_event = "MyEvent")] +struct MyBehaviour { + rendezvous: rendezvous::client::Behaviour, + ping: Ping, +} diff --git a/protocols/rendezvous/examples/register.rs b/protocols/rendezvous/examples/register.rs new file mode 100644 index 00000000000..2c02bca12ef --- /dev/null +++ b/protocols/rendezvous/examples/register.rs @@ -0,0 +1,139 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::StreamExt; +use libp2p::core::identity; +use libp2p::core::PeerId; +use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; +use libp2p::swarm::Swarm; +use libp2p::swarm::SwarmEvent; +use libp2p::{development_transport, rendezvous}; +use libp2p::{Multiaddr, NetworkBehaviour}; +use libp2p_swarm::AddressScore; +use std::time::Duration; + +#[async_std::main] +async fn main() { + env_logger::init(); + + let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); + let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" + .parse() + .unwrap(); + + let identity = identity::Keypair::generate_ed25519(); + + let mut swarm = Swarm::new( + development_transport(identity.clone()).await.unwrap(), + MyBehaviour { + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), + }, + PeerId::from(identity.public()), + ); + + // In production the external address should be the publicly facing IP address of the rendezvous point. + // This address is recorded in the registration entry by the rendezvous point. + let external_address = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); + swarm.add_external_address(external_address, AddressScore::Infinite); + + log::info!("Local peer id: {}", swarm.local_peer_id()); + + let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + + swarm.dial_addr(rendezvous_point_address).unwrap(); + + while let Some(event) = swarm.next().await { + match event { + SwarmEvent::NewListenAddr { address, .. } => { + log::info!("Listening on {}", address); + } + SwarmEvent::ConnectionClosed { + peer_id, + cause: Some(error), + .. + } if peer_id == rendezvous_point => { + log::error!("Lost connection to rendezvous point {}", error); + } + SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == rendezvous_point => { + swarm.behaviour_mut().rendezvous.register( + rendezvous::Namespace::from_static("rendezvous"), + rendezvous_point, + None, + ); + log::info!("Connection established with rendezvous point {}", peer_id); + } + // once `/identify` did its job, we know our external address and can register + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + })) => { + log::info!( + "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", + namespace, + rendezvous_node, + ttl + ); + } + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed(error), + )) => { + log::error!("Failed to register {}", error); + return; + } + SwarmEvent::Behaviour(MyEvent::Ping(PingEvent { + peer, + result: Ok(PingSuccess::Ping { rtt }), + })) if peer != rendezvous_point => { + log::info!("Ping to {} is {}ms", peer, rtt.as_millis()) + } + other => { + log::debug!("Unhandled {:?}", other); + } + } + } +} + +#[derive(Debug)] +enum MyEvent { + Rendezvous(rendezvous::client::Event), + Ping(PingEvent), +} + +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { + MyEvent::Rendezvous(event) + } +} + +impl From for MyEvent { + fn from(event: PingEvent) -> Self { + MyEvent::Ping(event) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(event_process = false)] +#[behaviour(out_event = "MyEvent")] +struct MyBehaviour { + rendezvous: rendezvous::client::Behaviour, + ping: Ping, +} diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/protocols/rendezvous/examples/register_with_identify.rs new file mode 100644 index 00000000000..25450d4ab02 --- /dev/null +++ b/protocols/rendezvous/examples/register_with_identify.rs @@ -0,0 +1,145 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::StreamExt; +use libp2p::core::identity; +use libp2p::core::PeerId; +use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent}; +use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess}; +use libp2p::swarm::Swarm; +use libp2p::swarm::SwarmEvent; +use libp2p::{development_transport, rendezvous}; +use libp2p::{Multiaddr, NetworkBehaviour}; +use std::time::Duration; + +#[async_std::main] +async fn main() { + env_logger::init(); + + let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); + let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" + .parse() + .unwrap(); + + let identity = identity::Keypair::generate_ed25519(); + + let mut swarm = Swarm::new( + development_transport(identity.clone()).await.unwrap(), + MyBehaviour { + identify: Identify::new(IdentifyConfig::new( + "rendezvous-example/1.0.0".to_string(), + identity.public(), + )), + rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + ping: Ping::new(PingConfig::new().with_interval(Duration::from_secs(1))), + }, + PeerId::from(identity.public()), + ); + + log::info!("Local peer id: {}", swarm.local_peer_id()); + + let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + + swarm.dial_addr(rendezvous_point_address).unwrap(); + + while let Some(event) = swarm.next().await { + match event { + SwarmEvent::NewListenAddr { address, .. } => { + log::info!("Listening on {}", address); + } + SwarmEvent::ConnectionClosed { + peer_id, + cause: Some(error), + .. + } if peer_id == rendezvous_point => { + log::error!("Lost connection to rendezvous point {}", error); + } + // once `/identify` did its job, we know our external address and can register + SwarmEvent::Behaviour(MyEvent::Identify(IdentifyEvent::Received { .. })) => { + swarm.behaviour_mut().rendezvous.register( + rendezvous::Namespace::from_static("rendezvous"), + rendezvous_point, + None, + ); + } + SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + })) => { + log::info!( + "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", + namespace, + rendezvous_node, + ttl + ); + } + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::client::Event::RegisterFailed(error), + )) => { + log::error!("Failed to register {}", error); + return; + } + SwarmEvent::Behaviour(MyEvent::Ping(PingEvent { + peer, + result: Ok(PingSuccess::Ping { rtt }), + })) if peer != rendezvous_point => { + log::info!("Ping to {} is {}ms", peer, rtt.as_millis()) + } + other => { + log::debug!("Unhandled {:?}", other); + } + } + } +} + +#[derive(Debug)] +enum MyEvent { + Rendezvous(rendezvous::client::Event), + Identify(IdentifyEvent), + Ping(PingEvent), +} + +impl From for MyEvent { + fn from(event: rendezvous::client::Event) -> Self { + MyEvent::Rendezvous(event) + } +} + +impl From for MyEvent { + fn from(event: IdentifyEvent) -> Self { + MyEvent::Identify(event) + } +} + +impl From for MyEvent { + fn from(event: PingEvent) -> Self { + MyEvent::Ping(event) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(event_process = false)] +#[behaviour(out_event = "MyEvent")] +struct MyBehaviour { + identify: Identify, + rendezvous: rendezvous::client::Behaviour, + ping: Ping, +} diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/protocols/rendezvous/examples/rendezvous_point.rs new file mode 100644 index 00000000000..bbbe9b973be --- /dev/null +++ b/protocols/rendezvous/examples/rendezvous_point.rs @@ -0,0 +1,112 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::StreamExt; +use libp2p::core::identity; +use libp2p::core::PeerId; +use libp2p::ping::{Ping, PingEvent}; +use libp2p::swarm::{Swarm, SwarmEvent}; +use libp2p::NetworkBehaviour; +use libp2p::{development_transport, rendezvous}; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let bytes = [0u8; 32]; + let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes"); + let identity = identity::Keypair::Ed25519(key.into()); + + let mut swarm = Swarm::new( + development_transport(identity.clone()).await.unwrap(), + MyBehaviour { + rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), + ping: Ping::default(), + }, + PeerId::from(identity.public()), + ); + + log::info!("Local peer id: {}", swarm.local_peer_id()); + + swarm + .listen_on("/ip4/0.0.0.0/tcp/62649".parse().unwrap()) + .unwrap(); + + while let Some(event) = swarm.next().await { + match event { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + log::info!("Connected to {}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, .. } => { + log::info!("Disconnected from {}", peer_id); + } + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::server::Event::PeerRegistered { peer, registration }, + )) => { + log::info!( + "Peer {} registered for namespace '{}'", + peer, + registration.namespace + ); + } + SwarmEvent::Behaviour(MyEvent::Rendezvous( + rendezvous::server::Event::DiscoverServed { + enquirer, + registrations, + }, + )) => { + log::info!( + "Served peer {} with {} registrations", + enquirer, + registrations.len() + ); + } + other => { + log::debug!("Unhandled {:?}", other); + } + } + } +} + +#[derive(Debug)] +enum MyEvent { + Rendezvous(rendezvous::server::Event), + Ping(PingEvent), +} + +impl From for MyEvent { + fn from(event: rendezvous::server::Event) -> Self { + MyEvent::Rendezvous(event) + } +} + +impl From for MyEvent { + fn from(event: PingEvent) -> Self { + MyEvent::Ping(event) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(event_process = false)] +#[behaviour(out_event = "MyEvent")] +struct MyBehaviour { + rendezvous: rendezvous::server::Behaviour, + ping: Ping, +} diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs new file mode 100644 index 00000000000..73eb90edb38 --- /dev/null +++ b/protocols/rendezvous/src/client.rs @@ -0,0 +1,337 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; +use crate::handler; +use crate::handler::outbound; +use crate::handler::outbound::OpenInfo; +use crate::substream_handler::SubstreamProtocolsHandler; +use futures::future::BoxFuture; +use futures::future::FutureExt; +use futures::stream::FuturesUnordered; +use futures::stream::StreamExt; +use libp2p_core::connection::ConnectionId; +use libp2p_core::identity::error::SigningError; +use libp2p_core::identity::Keypair; +use libp2p_core::{Multiaddr, PeerId, PeerRecord}; +use libp2p_swarm::{ + CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, +}; +use std::collections::{HashMap, VecDeque}; +use std::iter::FromIterator; +use std::task::{Context, Poll}; +use std::time::Duration; + +pub struct Behaviour { + events: VecDeque< + NetworkBehaviourAction< + Event, + SubstreamProtocolsHandler, + >, + >, + keypair: Keypair, + pending_register_requests: Vec<(Namespace, PeerId, Option)>, + + /// Hold addresses of all peers that we have discovered so far. + /// + /// Storing these internally allows us to assist the [`libp2p_swarm::Swarm`] in dialing by returning addresses from [`NetworkBehaviour::addresses_of_peer`]. + discovered_peers: HashMap<(PeerId, Namespace), Vec>, + + /// Tracks the expiry of registrations that we have discovered and stored in `discovered_peers` otherwise we have a memory leak. + expiring_registrations: FuturesUnordered>, +} + +impl Behaviour { + /// Create a new instance of the rendezvous [`NetworkBehaviour`]. + pub fn new(keypair: Keypair) -> Self { + Self { + events: Default::default(), + keypair, + pending_register_requests: vec![], + discovered_peers: Default::default(), + expiring_registrations: FuturesUnordered::from_iter(vec![ + futures::future::pending().boxed() + ]), + } + } + + /// Register our external addresses in the given namespace with the given rendezvous peer. + /// + /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported + /// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. + pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { + self.pending_register_requests + .push((namespace, rendezvous_node, ttl)); + } + + /// Unregister ourselves from the given namespace with the given rendezvous peer. + pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::UnregisterRequest(namespace), + }, + handler: NotifyHandler::Any, + }); + } + + /// Discover other peers at a given rendezvous peer. + /// + /// If desired, the registrations can be filtered by a namespace. + /// If no namespace is given, peers from all namespaces will be returned. + /// A successfully discovery returns a cookie within [`Event::Discovered`]. + /// Such a cookie can be used to only fetch the _delta_ of registrations since + /// the cookie was acquired. + pub fn discover( + &mut self, + ns: Option, + cookie: Option, + limit: Option, + rendezvous_node: PeerId, + ) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::DiscoverRequest { + namespace: ns, + cookie, + limit, + }, + }, + handler: NotifyHandler::Any, + }); + } +} + +#[derive(Debug, thiserror::Error)] +pub enum RegisterError { + #[error("We don't know about any externally reachable addresses of ours")] + NoExternalAddresses, + #[error("Failed to make a new PeerRecord")] + FailedToMakeRecord(#[from] SigningError), + #[error("Failed to register with Rendezvous node")] + Remote { + rendezvous_node: PeerId, + namespace: Namespace, + error: ErrorCode, + }, +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + /// We successfully discovered other nodes with using the contained rendezvous node. + Discovered { + rendezvous_node: PeerId, + registrations: Vec, + cookie: Cookie, + }, + /// We failed to discover other nodes on the contained rendezvous node. + DiscoverFailed { + rendezvous_node: PeerId, + namespace: Option, + error: ErrorCode, + }, + /// We successfully registered with the contained rendezvous node. + Registered { + rendezvous_node: PeerId, + ttl: Ttl, + namespace: Namespace, + }, + /// We failed to register with the contained rendezvous node. + RegisterFailed(RegisterError), + /// The connection details we learned from this node expired. + Expired { peer: PeerId }, +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = + SubstreamProtocolsHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let initial_keep_alive = Duration::from_secs(30); + + SubstreamProtocolsHandler::new_outbound_only(initial_keep_alive) + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.discovered_peers + .iter() + .filter_map(|((candidate, _), addresses)| (candidate == peer).then(|| addresses)) + .flatten() + .cloned() + .collect() + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: handler::OutboundOutEvent, + ) { + let new_events = match event { + handler::OutboundOutEvent::InboundEvent { message, .. } => void::unreachable(message), + handler::OutboundOutEvent::OutboundEvent { message, .. } => handle_outbound_event( + message, + peer_id, + &mut self.discovered_peers, + &mut self.expiring_registrations, + ), + handler::OutboundOutEvent::InboundError { error, .. } => void::unreachable(error), + handler::OutboundOutEvent::OutboundError { error, .. } => { + log::warn!("Connection with peer {} failed: {}", peer_id, error); + + vec![NetworkBehaviourAction::CloseConnection { + peer_id, + connection: CloseConnection::One(connection_id), + }] + } + }; + + self.events.extend(new_events); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + poll_params: &mut impl PollParameters, + ) -> Poll> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + if let Some((namespace, rendezvous_node, ttl)) = self.pending_register_requests.pop() { + // Update our external addresses based on the Swarm's current knowledge. + // It doesn't make sense to register addresses on which we are not reachable, hence this should not be configurable from the outside. + let external_addresses = poll_params + .external_addresses() + .map(|r| r.addr) + .collect::>(); + + if external_addresses.is_empty() { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + Event::RegisterFailed(RegisterError::NoExternalAddresses), + )); + } + + let action = match PeerRecord::new(self.keypair.clone(), external_addresses) { + Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::RegisterRequest(NewRegistration { + namespace, + record: peer_record, + ttl, + }), + }, + handler: NotifyHandler::Any, + }, + Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( + RegisterError::FailedToMakeRecord(signing_error), + )), + }; + + return Poll::Ready(action); + } + + if let Some(expired_registration) = + futures::ready!(self.expiring_registrations.poll_next_unpin(cx)) + { + self.discovered_peers.remove(&expired_registration); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event::Expired { + peer: expired_registration.0, + })); + } + + Poll::Pending + } +} + +fn handle_outbound_event( + event: outbound::OutEvent, + peer_id: PeerId, + discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, + expiring_registrations: &mut FuturesUnordered>, +) -> Vec< + NetworkBehaviourAction< + Event, + SubstreamProtocolsHandler, + >, +> { + match event { + outbound::OutEvent::Registered { namespace, ttl } => { + vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { + rendezvous_node: peer_id, + ttl, + namespace, + })] + } + outbound::OutEvent::RegisterFailed(namespace, error) => { + vec![NetworkBehaviourAction::GenerateEvent( + Event::RegisterFailed(RegisterError::Remote { + rendezvous_node: peer_id, + namespace, + error, + }), + )] + } + outbound::OutEvent::Discovered { + registrations, + cookie, + } => { + discovered_peers.extend(registrations.iter().map(|registration| { + let peer_id = registration.record.peer_id(); + let namespace = registration.namespace.clone(); + + let addresses = registration.record.addresses().to_vec(); + + ((peer_id, namespace), addresses) + })); + expiring_registrations.extend(registrations.iter().cloned().map(|registration| { + async move { + // if the timer errors we consider it expired + let _ = + wasm_timer::Delay::new(Duration::from_secs(registration.ttl as u64)).await; + + (registration.record.peer_id(), registration.namespace) + } + .boxed() + })); + + vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { + rendezvous_node: peer_id, + registrations, + cookie, + })] + } + outbound::OutEvent::DiscoverFailed { namespace, error } => { + vec![NetworkBehaviourAction::GenerateEvent( + Event::DiscoverFailed { + rendezvous_node: peer_id, + namespace, + error, + }, + )] + } + } +} diff --git a/protocols/rendezvous/src/codec.rs b/protocols/rendezvous/src/codec.rs new file mode 100644 index 00000000000..d050ff8ca9a --- /dev/null +++ b/protocols/rendezvous/src/codec.rs @@ -0,0 +1,622 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::DEFAULT_TTL; +use asynchronous_codec::{Bytes, BytesMut, Decoder, Encoder}; +use libp2p_core::{peer_record, signed_envelope, PeerRecord, SignedEnvelope}; +use rand::RngCore; +use std::convert::{TryFrom, TryInto}; +use std::fmt; +use unsigned_varint::codec::UviBytes; + +pub type Ttl = u64; + +#[derive(Debug, Clone)] +pub enum Message { + Register(NewRegistration), + RegisterResponse(Result), + Unregister(Namespace), + Discover { + namespace: Option, + cookie: Option, + limit: Option, + }, + DiscoverResponse(Result<(Vec, Cookie), ErrorCode>), +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct Namespace(String); + +impl Namespace { + /// Creates a new [`Namespace`] from a static string. + /// + /// This will panic if the namespace is too long. We accepting panicking in this case because we are enforcing a `static lifetime which means this value can only be a constant in the program and hence we hope the developer checked that it is of an acceptable length. + pub fn from_static(value: &'static str) -> Self { + if value.len() > 255 { + panic!("Namespace '{}' is too long!", value) + } + + Namespace(value.to_owned()) + } + + pub fn new(value: String) -> Result { + if value.len() > 255 { + return Err(NamespaceTooLong); + } + + Ok(Namespace(value)) + } +} + +impl From for String { + fn from(namespace: Namespace) -> Self { + namespace.0 + } +} + +impl fmt::Display for Namespace { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl PartialEq for Namespace { + fn eq(&self, other: &str) -> bool { + self.0.eq(other) + } +} + +impl PartialEq for str { + fn eq(&self, other: &Namespace) -> bool { + other.0.eq(self) + } +} + +#[derive(Debug, thiserror::Error)] +#[error("Namespace is too long")] +pub struct NamespaceTooLong; + +#[derive(Debug, Eq, PartialEq, Hash, Clone)] +pub struct Cookie { + id: u64, + namespace: Option, +} + +impl Cookie { + /// Construct a new [`Cookie`] for a given namespace. + /// + /// This cookie will only be valid for subsequent DISCOVER requests targeting the same namespace. + pub fn for_namespace(namespace: Namespace) -> Self { + Self { + id: rand::thread_rng().next_u64(), + namespace: Some(namespace), + } + } + + /// Construct a new [`Cookie`] for a DISCOVER request that inquires about all namespaces. + pub fn for_all_namespaces() -> Self { + Self { + id: rand::random(), + namespace: None, + } + } + + pub fn into_wire_encoding(self) -> Vec { + let id_bytes = self.id.to_be_bytes(); + let namespace = self.namespace.map(|ns| ns.0).unwrap_or_default(); + + let mut buffer = Vec::with_capacity(id_bytes.len() + namespace.len()); + buffer.extend_from_slice(&id_bytes); + buffer.extend_from_slice(namespace.as_bytes()); + + buffer + } + + pub fn from_wire_encoding(mut bytes: Vec) -> Result { + // check length early to avoid panic during slicing + if bytes.len() < 8 { + return Err(InvalidCookie); + } + + let namespace = bytes.split_off(8); + let namespace = if namespace.is_empty() { + None + } else { + Some( + Namespace::new(String::from_utf8(namespace).map_err(|_| InvalidCookie)?) + .map_err(|_| InvalidCookie)?, + ) + }; + + let bytes = <[u8; 8]>::try_from(bytes).map_err(|_| InvalidCookie)?; + let id = u64::from_be_bytes(bytes); + + Ok(Self { id, namespace }) + } + + pub fn namespace(&self) -> Option<&Namespace> { + self.namespace.as_ref() + } +} + +#[derive(Debug, thiserror::Error)] +#[error("The cookie was malformed")] +pub struct InvalidCookie; + +#[derive(Debug, Clone)] +pub struct NewRegistration { + pub namespace: Namespace, + pub record: PeerRecord, + pub ttl: Option, +} + +impl NewRegistration { + pub fn new(namespace: Namespace, record: PeerRecord, ttl: Option) -> Self { + Self { + namespace, + record, + ttl, + } + } + + pub fn effective_ttl(&self) -> Ttl { + self.ttl.unwrap_or(DEFAULT_TTL) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct Registration { + pub namespace: Namespace, + pub record: PeerRecord, + pub ttl: Ttl, +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum ErrorCode { + InvalidNamespace, + InvalidSignedPeerRecord, + InvalidTtl, + InvalidCookie, + NotAuthorized, + InternalError, + Unavailable, +} + +pub struct RendezvousCodec { + /// Codec to encode/decode the Unsigned varint length prefix of the frames. + length_codec: UviBytes, +} + +impl Default for RendezvousCodec { + fn default() -> Self { + let mut length_codec = UviBytes::default(); + length_codec.set_max_len(1024 * 1024); // 1MB + + Self { length_codec } + } +} + +impl Encoder for RendezvousCodec { + type Item = Message; + type Error = Error; + + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + use prost::Message; + + let message = wire::Message::from(item); + + let mut buf = Vec::with_capacity(message.encoded_len()); + + message + .encode(&mut buf) + .expect("Buffer has sufficient capacity"); + + // Length prefix the protobuf message, ensuring the max limit is not hit + self.length_codec.encode(Bytes::from(buf), dst)?; + + Ok(()) + } +} + +impl Decoder for RendezvousCodec { + type Item = Message; + type Error = Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + use prost::Message; + + let message = match self.length_codec.decode(src)? { + Some(p) => p, + None => return Ok(None), + }; + + let message = wire::Message::decode(message)?; + + Ok(Some(message.try_into()?)) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Failed to encode message as bytes")] + Encode(#[from] prost::EncodeError), + #[error("Failed to decode message from bytes")] + Decode(#[from] prost::DecodeError), + #[error("Failed to read/write")] + Io(#[from] std::io::Error), + #[error("Failed to convert wire message to internal data model")] + ConversionError(#[from] ConversionError), +} + +impl From for wire::Message { + fn from(message: Message) -> Self { + use wire::message::*; + + match message { + Message::Register(NewRegistration { + namespace, + record, + ttl, + }) => wire::Message { + r#type: Some(MessageType::Register.into()), + register: Some(Register { + ns: Some(namespace.into()), + ttl, + signed_peer_record: Some( + record.into_signed_envelope().into_protobuf_encoding(), + ), + }), + register_response: None, + unregister: None, + discover: None, + discover_response: None, + }, + Message::RegisterResponse(Ok(ttl)) => wire::Message { + r#type: Some(MessageType::RegisterResponse.into()), + register_response: Some(RegisterResponse { + status: Some(ResponseStatus::Ok.into()), + status_text: None, + ttl: Some(ttl), + }), + register: None, + discover: None, + unregister: None, + discover_response: None, + }, + Message::RegisterResponse(Err(error)) => wire::Message { + r#type: Some(MessageType::RegisterResponse.into()), + register_response: Some(RegisterResponse { + status: Some(ResponseStatus::from(error).into()), + status_text: None, + ttl: None, + }), + register: None, + discover: None, + unregister: None, + discover_response: None, + }, + Message::Unregister(namespace) => wire::Message { + r#type: Some(MessageType::Unregister.into()), + unregister: Some(Unregister { + ns: Some(namespace.into()), + id: None, + }), + register: None, + register_response: None, + discover: None, + discover_response: None, + }, + Message::Discover { + namespace, + cookie, + limit, + } => wire::Message { + r#type: Some(MessageType::Discover.into()), + discover: Some(Discover { + ns: namespace.map(|ns| ns.into()), + cookie: cookie.map(|cookie| cookie.into_wire_encoding()), + limit, + }), + register: None, + register_response: None, + unregister: None, + discover_response: None, + }, + Message::DiscoverResponse(Ok((registrations, cookie))) => wire::Message { + r#type: Some(MessageType::DiscoverResponse.into()), + discover_response: Some(DiscoverResponse { + registrations: registrations + .into_iter() + .map(|reggo| Register { + ns: Some(reggo.namespace.into()), + ttl: Some(reggo.ttl), + signed_peer_record: Some( + reggo.record.into_signed_envelope().into_protobuf_encoding(), + ), + }) + .collect(), + status: Some(ResponseStatus::Ok.into()), + status_text: None, + cookie: Some(cookie.into_wire_encoding()), + }), + register: None, + discover: None, + unregister: None, + register_response: None, + }, + Message::DiscoverResponse(Err(error)) => wire::Message { + r#type: Some(MessageType::DiscoverResponse.into()), + discover_response: Some(DiscoverResponse { + registrations: Vec::new(), + status: Some(ResponseStatus::from(error).into()), + status_text: None, + cookie: None, + }), + register: None, + discover: None, + unregister: None, + register_response: None, + }, + } + } +} + +impl TryFrom for Message { + type Error = ConversionError; + + fn try_from(message: wire::Message) -> Result { + use wire::message::*; + + let message = match message { + wire::Message { + r#type: Some(0), + register: + Some(Register { + ns, + ttl, + signed_peer_record: Some(signed_peer_record), + }), + .. + } => Message::Register(NewRegistration { + namespace: ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ttl, + record: PeerRecord::from_signed_envelope(SignedEnvelope::from_protobuf_encoding( + &signed_peer_record, + )?)?, + }), + wire::Message { + r#type: Some(1), + register_response: + Some(RegisterResponse { + status: Some(0), + ttl, + .. + }), + .. + } => Message::RegisterResponse(Ok(ttl.ok_or(ConversionError::MissingTtl)?)), + wire::Message { + r#type: Some(3), + discover: Some(Discover { ns, limit, cookie }), + .. + } => Message::Discover { + namespace: ns.map(Namespace::new).transpose()?, + cookie: cookie.map(Cookie::from_wire_encoding).transpose()?, + limit, + }, + wire::Message { + r#type: Some(4), + discover_response: + Some(DiscoverResponse { + registrations, + status: Some(0), + cookie: Some(cookie), + .. + }), + .. + } => { + let registrations = registrations + .into_iter() + .map(|reggo| { + Ok(Registration { + namespace: reggo + .ns + .map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + record: PeerRecord::from_signed_envelope( + SignedEnvelope::from_protobuf_encoding( + ®go + .signed_peer_record + .ok_or(ConversionError::MissingSignedPeerRecord)?, + )?, + )?, + ttl: reggo.ttl.ok_or(ConversionError::MissingTtl)?, + }) + }) + .collect::, ConversionError>>()?; + let cookie = Cookie::from_wire_encoding(cookie)?; + + Message::DiscoverResponse(Ok((registrations, cookie))) + } + wire::Message { + r#type: Some(1), + register_response: + Some(RegisterResponse { + status: Some(error_code), + .. + }), + .. + } => { + let error_code = wire::message::ResponseStatus::from_i32(error_code) + .ok_or(ConversionError::BadStatusCode)? + .try_into()?; + Message::RegisterResponse(Err(error_code)) + } + wire::Message { + r#type: Some(2), + unregister: Some(Unregister { ns, .. }), + .. + } => Message::Unregister( + ns.map(Namespace::new) + .transpose()? + .ok_or(ConversionError::MissingNamespace)?, + ), + wire::Message { + r#type: Some(4), + discover_response: + Some(DiscoverResponse { + status: Some(error_code), + .. + }), + .. + } => { + let error = wire::message::ResponseStatus::from_i32(error_code) + .ok_or(ConversionError::BadStatusCode)? + .try_into()?; + Message::DiscoverResponse(Err(error)) + } + _ => return Err(ConversionError::InconsistentWireMessage), + }; + + Ok(message) + } +} + +#[derive(Debug, thiserror::Error)] +pub enum ConversionError { + #[error("The wire message is consistent")] + InconsistentWireMessage, + #[error("Missing namespace field")] + MissingNamespace, + #[error("Invalid namespace")] + InvalidNamespace(#[from] NamespaceTooLong), + #[error("Missing signed peer record field")] + MissingSignedPeerRecord, + #[error("Missing TTL field")] + MissingTtl, + #[error("Bad status code")] + BadStatusCode, + #[error("Failed to decode signed envelope")] + BadSignedEnvelope(#[from] signed_envelope::DecodingError), + #[error("Failed to decode envelope as signed peer record")] + BadSignedPeerRecord(#[from] peer_record::FromEnvelopeError), + #[error(transparent)] + BadCookie(#[from] InvalidCookie), + #[error("The requested PoW difficulty is out of range")] + PoWDifficultyOutOfRange, + #[error("The provided PoW hash is not 32 bytes long")] + BadPoWHash, +} + +impl ConversionError { + pub fn to_error_code(&self) -> ErrorCode { + match self { + ConversionError::MissingNamespace => ErrorCode::InvalidNamespace, + ConversionError::MissingSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadSignedEnvelope(_) => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadSignedPeerRecord(_) => ErrorCode::InvalidSignedPeerRecord, + ConversionError::BadCookie(_) => ErrorCode::InvalidCookie, + ConversionError::MissingTtl => ErrorCode::InvalidTtl, + ConversionError::InconsistentWireMessage => ErrorCode::InternalError, + ConversionError::BadStatusCode => ErrorCode::InternalError, + ConversionError::PoWDifficultyOutOfRange => ErrorCode::InternalError, + ConversionError::BadPoWHash => ErrorCode::InternalError, + ConversionError::InvalidNamespace(_) => ErrorCode::InvalidNamespace, + } + } +} + +impl TryFrom for ErrorCode { + type Error = UnmappableStatusCode; + + fn try_from(value: wire::message::ResponseStatus) -> Result { + use wire::message::ResponseStatus::*; + + let code = match value { + Ok => return Err(UnmappableStatusCode(value)), + EInvalidNamespace => ErrorCode::InvalidNamespace, + EInvalidSignedPeerRecord => ErrorCode::InvalidSignedPeerRecord, + EInvalidTtl => ErrorCode::InvalidTtl, + EInvalidCookie => ErrorCode::InvalidCookie, + ENotAuthorized => ErrorCode::NotAuthorized, + EInternalError => ErrorCode::InternalError, + EUnavailable => ErrorCode::Unavailable, + }; + + Result::Ok(code) + } +} + +impl From for wire::message::ResponseStatus { + fn from(error_code: ErrorCode) -> Self { + use wire::message::ResponseStatus::*; + + match error_code { + ErrorCode::InvalidNamespace => EInvalidNamespace, + ErrorCode::InvalidSignedPeerRecord => EInvalidSignedPeerRecord, + ErrorCode::InvalidTtl => EInvalidTtl, + ErrorCode::InvalidCookie => EInvalidCookie, + ErrorCode::NotAuthorized => ENotAuthorized, + ErrorCode::InternalError => EInternalError, + ErrorCode::Unavailable => EUnavailable, + } + } +} + +impl From for ConversionError { + fn from(_: UnmappableStatusCode) -> Self { + ConversionError::InconsistentWireMessage + } +} + +#[derive(Debug, thiserror::Error)] +#[error("The response code ({0:?}) cannot be mapped to our ErrorCode enum")] +pub struct UnmappableStatusCode(wire::message::ResponseStatus); + +mod wire { + include!(concat!(env!("OUT_DIR"), "/rendezvous.pb.rs")); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cookie_wire_encoding_roundtrip() { + let cookie = Cookie::for_namespace(Namespace::from_static("foo")); + + let bytes = cookie.clone().into_wire_encoding(); + let parsed = Cookie::from_wire_encoding(bytes).unwrap(); + + assert_eq!(parsed, cookie); + } + + #[test] + fn cookie_wire_encoding_length() { + let cookie = Cookie::for_namespace(Namespace::from_static("foo")); + + let bytes = cookie.into_wire_encoding(); + + assert_eq!(bytes.len(), 8 + 3) + } +} diff --git a/protocols/rendezvous/src/handler.rs b/protocols/rendezvous/src/handler.rs new file mode 100644 index 00000000000..b4883825e25 --- /dev/null +++ b/protocols/rendezvous/src/handler.rs @@ -0,0 +1,48 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec; +use crate::codec::Message; +use void::Void; + +const PROTOCOL_IDENT: &[u8] = b"/rendezvous/1.0.0"; + +pub mod inbound; +pub mod outbound; + +/// Errors that can occur while interacting with a substream. +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Reading message {0:?} at this stage is a protocol violation")] + BadMessage(Message), + #[error("Failed to write message to substream")] + WriteMessage(#[source] codec::Error), + #[error("Failed to read message from substream")] + ReadMessage(#[source] codec::Error), + #[error("Substream ended unexpectedly mid-protocol")] + UnexpectedEndOfStream, +} + +pub type OutboundInEvent = crate::substream_handler::InEvent; +pub type OutboundOutEvent = + crate::substream_handler::OutEvent; + +pub type InboundInEvent = crate::substream_handler::InEvent<(), inbound::InEvent, Void>; +pub type InboundOutEvent = crate::substream_handler::OutEvent; diff --git a/protocols/rendezvous/src/handler/inbound.rs b/protocols/rendezvous/src/handler/inbound.rs new file mode 100644 index 00000000000..8a18f366c68 --- /dev/null +++ b/protocols/rendezvous/src/handler/inbound.rs @@ -0,0 +1,189 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec::{ + Cookie, ErrorCode, Message, Namespace, NewRegistration, Registration, RendezvousCodec, Ttl, +}; +use crate::handler::Error; +use crate::handler::PROTOCOL_IDENT; +use crate::substream_handler::{Next, PassthroughProtocol, SubstreamHandler}; +use asynchronous_codec::Framed; +use futures::{SinkExt, StreamExt}; +use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; +use std::fmt; +use std::task::{Context, Poll}; + +/// The state of an inbound substream (i.e. the remote node opened it). +#[allow(clippy::large_enum_variant)] +pub enum Stream { + /// We are in the process of reading a message from the substream. + PendingRead(Framed), + /// We read a message, dispatched it to the behaviour and are waiting for the response. + PendingBehaviour(Framed), + /// We are in the process of sending a response. + PendingSend(Framed, Message), + /// We've sent the message and are now closing down the substream. + PendingClose(Framed), +} + +impl fmt::Debug for Stream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Stream::PendingRead(_) => write!(f, "Inbound::PendingRead"), + Stream::PendingBehaviour(_) => write!(f, "Inbound::PendingBehaviour"), + Stream::PendingSend(_, _) => write!(f, "Inbound::PendingSend"), + Stream::PendingClose(_) => write!(f, "Inbound::PendingClose"), + } + } +} + +#[derive(Debug, Clone)] +pub enum OutEvent { + RegistrationRequested(NewRegistration), + UnregisterRequested(Namespace), + DiscoverRequested { + namespace: Option, + cookie: Option, + limit: Option, + }, +} + +#[derive(Debug)] +pub enum InEvent { + RegisterResponse { + ttl: Ttl, + }, + DeclineRegisterRequest(ErrorCode), + DiscoverResponse { + discovered: Vec, + cookie: Cookie, + }, + DeclineDiscoverRequest(ErrorCode), +} + +impl SubstreamHandler for Stream { + type InEvent = InEvent; + type OutEvent = OutEvent; + type Error = Error; + type OpenInfo = (); + + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) + } + + fn new(substream: NegotiatedSubstream, _: Self::OpenInfo) -> Self { + Stream::PendingRead(Framed::new(substream, RendezvousCodec::default())) + } + + fn inject_event(self, event: Self::InEvent) -> Self { + match (event, self) { + (InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => { + Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl))) + } + (InEvent::DeclineRegisterRequest(error), Stream::PendingBehaviour(substream)) => { + Stream::PendingSend(substream, Message::RegisterResponse(Err(error))) + } + ( + InEvent::DiscoverResponse { discovered, cookie }, + Stream::PendingBehaviour(substream), + ) => Stream::PendingSend( + substream, + Message::DiscoverResponse(Ok((discovered, cookie))), + ), + (InEvent::DeclineDiscoverRequest(error), Stream::PendingBehaviour(substream)) => { + Stream::PendingSend(substream, Message::DiscoverResponse(Err(error))) + } + (event, inbound) => { + debug_assert!(false, "{:?} cannot handle event {:?}", inbound, event); + + inbound + } + } + } + + fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error> { + let next_state = match self { + Stream::PendingRead(mut substream) => { + match substream.poll_next_unpin(cx).map_err(Error::ReadMessage)? { + Poll::Ready(Some(msg)) => { + let event = match msg { + Message::Register(registration) => { + OutEvent::RegistrationRequested(registration) + } + Message::Discover { + cookie, + namespace, + limit, + } => OutEvent::DiscoverRequested { + cookie, + namespace, + limit, + }, + Message::Unregister(namespace) => { + OutEvent::UnregisterRequested(namespace) + } + other => return Err(Error::BadMessage(other)), + }; + + Next::EmitEvent { + event, + next_state: Stream::PendingBehaviour(substream), + } + } + Poll::Ready(None) => return Err(Error::UnexpectedEndOfStream), + Poll::Pending => Next::Pending { + next_state: Stream::PendingRead(substream), + }, + } + } + Stream::PendingBehaviour(substream) => Next::Pending { + next_state: Stream::PendingBehaviour(substream), + }, + Stream::PendingSend(mut substream, message) => match substream + .poll_ready_unpin(cx) + .map_err(Error::WriteMessage)? + { + Poll::Ready(()) => { + substream + .start_send_unpin(message) + .map_err(Error::WriteMessage)?; + + Next::Continue { + next_state: Stream::PendingClose(substream), + } + } + Poll::Pending => Next::Pending { + next_state: Stream::PendingSend(substream, message), + }, + }, + Stream::PendingClose(mut substream) => match substream.poll_close_unpin(cx) { + Poll::Ready(Ok(())) => Next::Done, + Poll::Ready(Err(_)) => Next::Done, // there is nothing we can do about an error during close + Poll::Pending => Next::Pending { + next_state: Stream::PendingClose(substream), + }, + }, + }; + + Ok(next_state) + } +} diff --git a/protocols/rendezvous/src/handler/outbound.rs b/protocols/rendezvous/src/handler/outbound.rs new file mode 100644 index 00000000000..ab06040ca19 --- /dev/null +++ b/protocols/rendezvous/src/handler/outbound.rs @@ -0,0 +1,132 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec}; +use crate::handler::Error; +use crate::handler::PROTOCOL_IDENT; +use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler}; +use crate::{ErrorCode, Namespace, Registration, Ttl}; +use asynchronous_codec::Framed; +use futures::{SinkExt, TryFutureExt, TryStreamExt}; +use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol}; +use std::task::Context; +use void::Void; + +pub struct Stream(FutureSubstream); + +impl SubstreamHandler for Stream { + type InEvent = Void; + type OutEvent = OutEvent; + type Error = Error; + type OpenInfo = OpenInfo; + + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info) + } + + fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self { + let mut stream = Framed::new(substream, RendezvousCodec::default()); + let sent_message = match info { + OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration), + OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace), + OpenInfo::DiscoverRequest { + namespace, + cookie, + limit, + } => Message::Discover { + namespace, + cookie, + limit, + }, + }; + + Self(FutureSubstream::new(async move { + use Message::*; + use OutEvent::*; + + stream + .send(sent_message.clone()) + .map_err(Error::WriteMessage) + .await?; + let received_message = stream.try_next().map_err(Error::ReadMessage).await?; + let received_message = received_message.ok_or(Error::UnexpectedEndOfStream)?; + + let event = match (sent_message, received_message) { + (Register(registration), RegisterResponse(Ok(ttl))) => Registered { + namespace: registration.namespace, + ttl, + }, + (Register(registration), RegisterResponse(Err(error))) => { + RegisterFailed(registration.namespace, error) + } + (Discover { .. }, DiscoverResponse(Ok((registrations, cookie)))) => Discovered { + registrations, + cookie, + }, + (Discover { namespace, .. }, DiscoverResponse(Err(error))) => { + DiscoverFailed { namespace, error } + } + (.., other) => return Err(Error::BadMessage(other)), + }; + + stream.close().map_err(Error::WriteMessage).await?; + + Ok(event) + })) + } + + fn inject_event(self, event: Self::InEvent) -> Self { + void::unreachable(event) + } + + fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error> { + Ok(self.0.advance(cx)?.map_state(Stream)) + } +} + +#[derive(Debug, Clone)] +pub enum OutEvent { + Registered { + namespace: Namespace, + ttl: Ttl, + }, + RegisterFailed(Namespace, ErrorCode), + Discovered { + registrations: Vec, + cookie: Cookie, + }, + DiscoverFailed { + namespace: Option, + error: ErrorCode, + }, +} + +#[derive(Debug)] +pub enum OpenInfo { + RegisterRequest(NewRegistration), + UnregisterRequest(Namespace), + DiscoverRequest { + namespace: Option, + cookie: Option, + limit: Option, + }, +} diff --git a/protocols/rendezvous/src/lib.rs b/protocols/rendezvous/src/lib.rs new file mode 100644 index 00000000000..87c88434db3 --- /dev/null +++ b/protocols/rendezvous/src/lib.rs @@ -0,0 +1,43 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +pub use self::codec::{ErrorCode, Namespace, NamespaceTooLong, Registration, Ttl}; + +mod codec; +mod handler; +mod substream_handler; + +/// If unspecified, rendezvous nodes should assume a TTL of 2h. +/// +/// See . +pub const DEFAULT_TTL: Ttl = 60 * 60 * 2; + +/// By default, nodes should require a minimum TTL of 2h +/// +/// . +pub const MIN_TTL: Ttl = 60 * 60 * 2; + +/// By default, nodes should allow a maximum TTL of 72h +/// +/// . +pub const MAX_TTL: Ttl = 60 * 60 * 72; + +pub mod client; +pub mod server; diff --git a/protocols/rendezvous/src/rpc.proto b/protocols/rendezvous/src/rpc.proto new file mode 100644 index 00000000000..d4e388ca96e --- /dev/null +++ b/protocols/rendezvous/src/rpc.proto @@ -0,0 +1,61 @@ +syntax = "proto2"; + +package rendezvous.pb; + +message Message { + enum MessageType { + REGISTER = 0; + REGISTER_RESPONSE = 1; + UNREGISTER = 2; + DISCOVER = 3; + DISCOVER_RESPONSE = 4; + } + + enum ResponseStatus { + OK = 0; + E_INVALID_NAMESPACE = 100; + E_INVALID_SIGNED_PEER_RECORD = 101; + E_INVALID_TTL = 102; + E_INVALID_COOKIE = 103; + E_NOT_AUTHORIZED = 200; + E_INTERNAL_ERROR = 300; + E_UNAVAILABLE = 400; + } + + message Register { + optional string ns = 1; + optional bytes signedPeerRecord = 2; + optional uint64 ttl = 3; // in seconds + } + + message RegisterResponse { + optional ResponseStatus status = 1; + optional string statusText = 2; + optional uint64 ttl = 3; // in seconds + } + + message Unregister { + optional string ns = 1; + optional bytes id = 2; + } + + message Discover { + optional string ns = 1; + optional uint64 limit = 2; + optional bytes cookie = 3; + } + + message DiscoverResponse { + repeated Register registrations = 1; + optional bytes cookie = 2; + optional ResponseStatus status = 3; + optional string statusText = 4; + } + + optional MessageType type = 1; + optional Register register = 2; + optional RegisterResponse registerResponse = 3; + optional Unregister unregister = 4; + optional Discover discover = 5; + optional DiscoverResponse discoverResponse = 6; +} diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs new file mode 100644 index 00000000000..93682dda422 --- /dev/null +++ b/protocols/rendezvous/src/server.rs @@ -0,0 +1,764 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::codec::{Cookie, ErrorCode, Namespace, NewRegistration, Registration, Ttl}; +use crate::handler::inbound; +use crate::substream_handler::{InboundSubstreamId, SubstreamProtocolsHandler}; +use crate::{handler, MAX_TTL, MIN_TTL}; +use bimap::BiMap; +use futures::future::BoxFuture; +use futures::ready; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; +use libp2p_core::connection::ConnectionId; +use libp2p_core::PeerId; +use libp2p_swarm::{ + CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, +}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::iter::FromIterator; +use std::task::{Context, Poll}; +use std::time::Duration; +use void::Void; + +pub struct Behaviour { + events: VecDeque< + NetworkBehaviourAction>, + >, + registrations: Registrations, +} + +pub struct Config { + min_ttl: Ttl, + max_ttl: Ttl, +} + +impl Config { + pub fn with_min_ttl(mut self, min_ttl: Ttl) -> Self { + self.min_ttl = min_ttl; + self + } + + pub fn with_max_ttl(mut self, max_ttl: Ttl) -> Self { + self.max_ttl = max_ttl; + self + } +} + +impl Default for Config { + fn default() -> Self { + Self { + min_ttl: MIN_TTL, + max_ttl: MAX_TTL, + } + } +} + +impl Behaviour { + /// Create a new instance of the rendezvous [`NetworkBehaviour`]. + pub fn new(config: Config) -> Self { + Self { + events: Default::default(), + registrations: Registrations::with_config(config), + } + } +} + +#[derive(Debug)] +#[allow(clippy::large_enum_variant)] +pub enum Event { + /// We successfully served a discover request from a peer. + DiscoverServed { + enquirer: PeerId, + registrations: Vec, + }, + /// We failed to serve a discover request for a peer. + DiscoverNotServed { enquirer: PeerId, error: ErrorCode }, + /// A peer successfully registered with us. + PeerRegistered { + peer: PeerId, + registration: Registration, + }, + /// We declined a registration from a peer. + PeerNotRegistered { + peer: PeerId, + namespace: Namespace, + error: ErrorCode, + }, + /// A peer successfully unregistered with us. + PeerUnregistered { peer: PeerId, namespace: Namespace }, + /// A registration from a peer expired. + RegistrationExpired(Registration), +} + +impl NetworkBehaviour for Behaviour { + type ProtocolsHandler = SubstreamProtocolsHandler; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let initial_keep_alive = Duration::from_secs(30); + + SubstreamProtocolsHandler::new_inbound_only(initial_keep_alive) + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: handler::InboundOutEvent, + ) { + let new_events = match event { + handler::InboundOutEvent::InboundEvent { id, message } => { + handle_inbound_event(message, peer_id, connection, id, &mut self.registrations) + } + handler::InboundOutEvent::OutboundEvent { message, .. } => void::unreachable(message), + handler::InboundOutEvent::InboundError { error, .. } => { + log::warn!("Connection with peer {} failed: {}", peer_id, error); + + vec![NetworkBehaviourAction::CloseConnection { + peer_id, + connection: CloseConnection::One(connection), + }] + } + handler::InboundOutEvent::OutboundError { error, .. } => void::unreachable(error), + }; + + self.events.extend(new_events); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll> { + if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { + return Poll::Ready(NetworkBehaviourAction::GenerateEvent( + Event::RegistrationExpired(registration), + )); + } + + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} + +fn handle_inbound_event( + event: inbound::OutEvent, + peer_id: PeerId, + connection: ConnectionId, + id: InboundSubstreamId, + registrations: &mut Registrations, +) -> Vec>> { + match event { + // bad registration + inbound::OutEvent::RegistrationRequested(registration) + if registration.record.peer_id() != peer_id => + { + let error = ErrorCode::NotAuthorized; + + vec![ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection), + event: handler::InboundInEvent::NotifyInboundSubstream { + id, + message: inbound::InEvent::DeclineRegisterRequest(error), + }, + }, + NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + peer: peer_id, + namespace: registration.namespace, + error, + }), + ] + } + inbound::OutEvent::RegistrationRequested(registration) => { + let namespace = registration.namespace.clone(); + + match registrations.add(registration) { + Ok(registration) => { + vec![ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection), + event: handler::InboundInEvent::NotifyInboundSubstream { + id, + message: inbound::InEvent::RegisterResponse { + ttl: registration.ttl, + }, + }, + }, + NetworkBehaviourAction::GenerateEvent(Event::PeerRegistered { + peer: peer_id, + registration, + }), + ] + } + Err(TtlOutOfRange::TooLong { .. }) | Err(TtlOutOfRange::TooShort { .. }) => { + let error = ErrorCode::InvalidTtl; + + vec![ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection), + event: handler::InboundInEvent::NotifyInboundSubstream { + id, + message: inbound::InEvent::DeclineRegisterRequest(error), + }, + }, + NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + peer: peer_id, + namespace, + error, + }), + ] + } + } + } + inbound::OutEvent::DiscoverRequested { + namespace, + cookie, + limit, + } => match registrations.get(namespace, cookie, limit) { + Ok((registrations, cookie)) => { + let discovered = registrations.cloned().collect::>(); + + vec![ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection), + event: handler::InboundInEvent::NotifyInboundSubstream { + id, + message: inbound::InEvent::DiscoverResponse { + discovered: discovered.clone(), + cookie, + }, + }, + }, + NetworkBehaviourAction::GenerateEvent(Event::DiscoverServed { + enquirer: peer_id, + registrations: discovered, + }), + ] + } + Err(_) => { + let error = ErrorCode::InvalidCookie; + + vec![ + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection), + event: handler::InboundInEvent::NotifyInboundSubstream { + id, + message: inbound::InEvent::DeclineDiscoverRequest(error), + }, + }, + NetworkBehaviourAction::GenerateEvent(Event::DiscoverNotServed { + enquirer: peer_id, + error, + }), + ] + } + }, + inbound::OutEvent::UnregisterRequested(namespace) => { + registrations.remove(namespace.clone(), peer_id); + + vec![NetworkBehaviourAction::GenerateEvent( + Event::PeerUnregistered { + peer: peer_id, + namespace, + }, + )] + } + } +} + +#[derive(Debug, Eq, PartialEq, Hash, Copy, Clone)] +struct RegistrationId(u64); + +impl RegistrationId { + fn new() -> Self { + Self(rand::random()) + } +} + +#[derive(Debug, PartialEq)] +struct ExpiredRegistration(Registration); + +pub struct Registrations { + registrations_for_peer: BiMap<(PeerId, Namespace), RegistrationId>, + registrations: HashMap, + cookies: HashMap>, + min_ttl: Ttl, + max_ttl: Ttl, + next_expiry: FuturesUnordered>, +} + +#[derive(Debug, thiserror::Error)] +pub enum TtlOutOfRange { + #[error("Requested TTL ({requested}s) is too long; max {bound}s")] + TooLong { bound: Ttl, requested: Ttl }, + #[error("Requested TTL ({requested}s) is too short; min {bound}s")] + TooShort { bound: Ttl, requested: Ttl }, +} + +impl Default for Registrations { + fn default() -> Self { + Registrations::with_config(Config::default()) + } +} + +impl Registrations { + pub fn with_config(config: Config) -> Self { + Self { + registrations_for_peer: Default::default(), + registrations: Default::default(), + min_ttl: config.min_ttl, + max_ttl: config.max_ttl, + cookies: Default::default(), + next_expiry: FuturesUnordered::from_iter(vec![futures::future::pending().boxed()]), + } + } + + pub fn add( + &mut self, + new_registration: NewRegistration, + ) -> Result { + let ttl = new_registration.effective_ttl(); + if ttl > self.max_ttl { + return Err(TtlOutOfRange::TooLong { + bound: self.max_ttl, + requested: ttl, + }); + } + if ttl < self.min_ttl { + return Err(TtlOutOfRange::TooShort { + bound: self.min_ttl, + requested: ttl, + }); + } + + let namespace = new_registration.namespace; + let registration_id = RegistrationId::new(); + + if let Some(old_registration) = self + .registrations_for_peer + .get_by_left(&(new_registration.record.peer_id(), namespace.clone())) + { + self.registrations.remove(old_registration); + } + + self.registrations_for_peer.insert( + (new_registration.record.peer_id(), namespace.clone()), + registration_id, + ); + + let registration = Registration { + namespace, + record: new_registration.record, + ttl, + }; + self.registrations + .insert(registration_id, registration.clone()); + + let next_expiry = wasm_timer::Delay::new(Duration::from_secs(ttl as u64)) + .map(move |result| { + if result.is_err() { + log::warn!("Timer for registration {} has unexpectedly errored, treating it as expired", registration_id.0); + } + + registration_id + }) + .boxed(); + + self.next_expiry.push(next_expiry); + + Ok(registration) + } + + pub fn remove(&mut self, namespace: Namespace, peer_id: PeerId) { + let reggo_to_remove = self + .registrations_for_peer + .remove_by_left(&(peer_id, namespace)); + + if let Some((_, reggo_to_remove)) = reggo_to_remove { + self.registrations.remove(®go_to_remove); + } + } + + pub fn get( + &mut self, + discover_namespace: Option, + cookie: Option, + limit: Option, + ) -> Result<(impl Iterator + '_, Cookie), CookieNamespaceMismatch> { + let cookie_namespace = cookie.as_ref().and_then(|cookie| cookie.namespace()); + + match (discover_namespace.as_ref(), cookie_namespace) { + // discover all namespace but cookie is specific to a namespace? => bad + (None, Some(_)) => return Err(CookieNamespaceMismatch), + // discover for a namespace but cookie is for a different namesapce? => bad + (Some(namespace), Some(cookie_namespace)) if namespace != cookie_namespace => { + return Err(CookieNamespaceMismatch) + } + // every other combination is fine + _ => {} + } + + let mut reggos_of_last_discover = cookie + .and_then(|cookie| self.cookies.get(&cookie)) + .cloned() + .unwrap_or_default(); + + let ids = self + .registrations_for_peer + .iter() + .filter_map({ + |((_, namespace), registration_id)| { + if reggos_of_last_discover.contains(registration_id) { + return None; + } + + match discover_namespace.as_ref() { + Some(discover_namespace) if discover_namespace == namespace => { + Some(registration_id) + } + Some(_) => None, + None => Some(registration_id), + } + } + }) + .take(limit.unwrap_or(u64::MAX) as usize) + .cloned() + .collect::>(); + + reggos_of_last_discover.extend(&ids); + + let new_cookie = discover_namespace + .map(Cookie::for_namespace) + .unwrap_or_else(Cookie::for_all_namespaces); + self.cookies + .insert(new_cookie.clone(), reggos_of_last_discover); + + let reggos = &self.registrations; + let registrations = ids + .into_iter() + .map(move |id| reggos.get(&id).expect("bad internal datastructure")); + + Ok((registrations, new_cookie)) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + let expired_registration = ready!(self.next_expiry.poll_next_unpin(cx)).expect( + "This stream should never finish because it is initialised with a pending future", + ); + + // clean up our cookies + self.cookies.retain(|_, registrations| { + registrations.remove(&expired_registration); + + // retain all cookies where there are still registrations left + !registrations.is_empty() + }); + + self.registrations_for_peer + .remove_by_right(&expired_registration); + match self.registrations.remove(&expired_registration) { + None => self.poll(cx), + Some(registration) => Poll::Ready(ExpiredRegistration(registration)), + } + } +} + +#[derive(Debug, thiserror::Error, Eq, PartialEq)] +#[error("The provided cookie is not valid for a DISCOVER request for the given namespace")] +pub struct CookieNamespaceMismatch; + +#[cfg(test)] +mod tests { + use std::option::Option::None; + use std::time::SystemTime; + + use libp2p_core::{identity, PeerRecord}; + + use super::*; + + #[test] + fn given_cookie_from_discover_when_discover_again_then_only_get_diff() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (initial_discover, cookie) = registrations.get(None, None, None).unwrap(); + assert_eq!(initial_discover.count(), 2); + + let (subsequent_discover, _) = registrations.get(None, Some(cookie), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + } + + #[test] + fn given_registrations_when_discover_all_then_all_are_returned() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (discover, _) = registrations.get(None, None, None).unwrap(); + + assert_eq!(discover.count(), 2); + } + + #[test] + fn given_registrations_when_discover_only_for_specific_namespace_then_only_those_are_returned() + { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("bar")).unwrap(); + + let (discover, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + + assert_eq!( + discover.map(|r| &r.namespace).collect::>(), + vec!["foo"] + ); + } + + #[test] + fn given_reregistration_old_registration_is_discarded() { + let alice = identity::Keypair::generate_ed25519(); + let mut registrations = Registrations::default(); + registrations + .add(new_registration("foo", alice.clone(), None)) + .unwrap(); + registrations + .add(new_registration("foo", alice, None)) + .unwrap(); + + let (discover, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + + assert_eq!( + discover.map(|r| &r.namespace).collect::>(), + vec!["foo"] + ); + } + + #[test] + fn given_cookie_from_2nd_discover_does_not_return_nodes_from_first_discover() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (initial_discover, cookie1) = registrations.get(None, None, None).unwrap(); + assert_eq!(initial_discover.count(), 2); + + let (subsequent_discover, cookie2) = registrations.get(None, Some(cookie1), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + + let (subsequent_discover, _) = registrations.get(None, Some(cookie2), None).unwrap(); + assert_eq!(subsequent_discover.count(), 0); + } + + #[test] + fn cookie_from_different_discover_request_is_not_valid() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("bar")).unwrap(); + + let (_, foo_discover_cookie) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + let result = registrations.get( + Some(Namespace::from_static("bar")), + Some(foo_discover_cookie), + None, + ); + + assert!(matches!(result, Err(CookieNamespaceMismatch))) + } + + #[tokio::test] + async fn given_two_registration_ttls_one_expires_one_lives() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 0, + max_ttl: 4, + }); + + let start_time = SystemTime::now(); + + registrations + .add(new_dummy_registration_with_ttl("foo", 1)) + .unwrap(); + registrations + .add(new_dummy_registration_with_ttl("bar", 4)) + .unwrap(); + + let event = registrations.next_event().await; + + let elapsed = start_time.elapsed().unwrap(); + assert!(elapsed.as_secs() >= 1); + assert!(elapsed.as_secs() < 2); + + assert_eq!(event.0.namespace, Namespace::from_static("foo")); + + { + let (mut discovered_foo, _) = registrations + .get(Some(Namespace::from_static("foo")), None, None) + .unwrap(); + assert!(discovered_foo.next().is_none()); + } + let (mut discovered_bar, _) = registrations + .get(Some(Namespace::from_static("bar")), None, None) + .unwrap(); + assert!(discovered_bar.next().is_some()); + } + + #[tokio::test] + async fn given_peer_unregisters_before_expiry_do_not_emit_registration_expired() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 1, + max_ttl: 10, + }); + let dummy_registration = new_dummy_registration_with_ttl("foo", 2); + let namespace = dummy_registration.namespace.clone(); + let peer_id = dummy_registration.record.peer_id(); + + registrations.add(dummy_registration).unwrap(); + registrations.no_event_for(1).await; + registrations.remove(namespace, peer_id); + + registrations.no_event_for(3).await + } + + /// FuturesUnordered stop polling for ready futures when poll_next() is called until a None + /// value is returned. To prevent the next_expiry future from going to "sleep", next_expiry + /// is initialised with a future that always returns pending. This test ensures that + /// FuturesUnordered does not stop polling for ready futures. + #[tokio::test] + async fn given_all_registrations_expired_then_successfully_handle_new_registration_and_expiry() + { + let mut registrations = Registrations::with_config(Config { + min_ttl: 0, + max_ttl: 10, + }); + let dummy_registration = new_dummy_registration_with_ttl("foo", 1); + + registrations.add(dummy_registration.clone()).unwrap(); + let _ = registrations.next_event_in_at_most(2).await; + + registrations.no_event_for(1).await; + + registrations.add(dummy_registration).unwrap(); + let _ = registrations.next_event_in_at_most(2).await; + } + + #[tokio::test] + async fn cookies_are_cleaned_up_if_registrations_expire() { + let mut registrations = Registrations::with_config(Config { + min_ttl: 1, + max_ttl: 10, + }); + + registrations + .add(new_dummy_registration_with_ttl("foo", 2)) + .unwrap(); + let (_, _) = registrations.get(None, None, None).unwrap(); + + assert_eq!(registrations.cookies.len(), 1); + + let _ = registrations.next_event_in_at_most(3).await; + + assert_eq!(registrations.cookies.len(), 0); + } + + #[test] + fn given_limit_discover_only_returns_n_results() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (registrations, _) = registrations.get(None, None, Some(1)).unwrap(); + + assert_eq!(registrations.count(), 1); + } + + #[test] + fn given_limit_cookie_can_be_used_for_pagination() { + let mut registrations = Registrations::default(); + registrations.add(new_dummy_registration("foo")).unwrap(); + registrations.add(new_dummy_registration("foo")).unwrap(); + + let (discover1, cookie) = registrations.get(None, None, Some(1)).unwrap(); + assert_eq!(discover1.count(), 1); + + let (discover2, _) = registrations.get(None, Some(cookie), None).unwrap(); + assert_eq!(discover2.count(), 1); + } + + fn new_dummy_registration(namespace: &'static str) -> NewRegistration { + let identity = identity::Keypair::generate_ed25519(); + + new_registration(namespace, identity, None) + } + + fn new_dummy_registration_with_ttl(namespace: &'static str, ttl: Ttl) -> NewRegistration { + let identity = identity::Keypair::generate_ed25519(); + + new_registration(namespace, identity, Some(ttl)) + } + + fn new_registration( + namespace: &'static str, + identity: identity::Keypair, + ttl: Option, + ) -> NewRegistration { + NewRegistration::new( + Namespace::from_static(namespace), + PeerRecord::new(identity, vec!["/ip4/127.0.0.1/tcp/1234".parse().unwrap()]).unwrap(), + ttl, + ) + } + + /// Defines utility functions that make the tests more readable. + impl Registrations { + async fn next_event(&mut self) -> ExpiredRegistration { + futures::future::poll_fn(|cx| self.poll(cx)).await + } + + /// Polls [`Registrations`] for `seconds` and panics if it returns a event during this time. + async fn no_event_for(&mut self, seconds: u64) { + tokio::time::timeout(Duration::from_secs(seconds), self.next_event()) + .await + .unwrap_err(); + } + + /// Polls [`Registrations`] for at most `seconds` and panics if doesn't return an event within that time. + async fn next_event_in_at_most(&mut self, seconds: u64) -> ExpiredRegistration { + tokio::time::timeout(Duration::from_secs(seconds), self.next_event()) + .await + .unwrap() + } + } +} diff --git a/protocols/rendezvous/src/substream_handler.rs b/protocols/rendezvous/src/substream_handler.rs new file mode 100644 index 00000000000..efd7956b13c --- /dev/null +++ b/protocols/rendezvous/src/substream_handler.rs @@ -0,0 +1,551 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A generic [`ProtocolsHandler`] that delegates the handling of substreams to [`SubstreamHandler`]s. +//! +//! This module is an attempt to simplify the implementation of protocols by freeing implementations from dealing with aspects such as concurrent substreams. +//! Particularly for outbound substreams, it greatly simplifies the definition of protocols through the [`FutureSubstream`] helper. +//! +//! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well. + +use futures::future::{self, BoxFuture, Fuse, FusedFuture}; +use futures::FutureExt; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_swarm::protocols_handler::{InboundUpgradeSend, OutboundUpgradeSend}; +use libp2p_swarm::{ + KeepAlive, NegotiatedSubstream, ProtocolsHandler, ProtocolsHandlerEvent, + ProtocolsHandlerUpgrErr, SubstreamProtocol, +}; +use std::collections::{HashMap, VecDeque}; +use std::fmt; +use std::future::Future; +use std::hash::Hash; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use void::Void; + +/// Handles a substream throughout its lifetime. +pub trait SubstreamHandler: Sized { + type InEvent; + type OutEvent; + type Error; + type OpenInfo; + + fn upgrade(open_info: Self::OpenInfo) + -> SubstreamProtocol; + fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self; + fn inject_event(self, event: Self::InEvent) -> Self; + fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error>; +} + +/// The result of advancing a [`SubstreamHandler`]. +pub enum Next { + /// Return the given event and set the handler into `next_state`. + EmitEvent { event: TEvent, next_state: TState }, + /// The handler currently cannot do any more work, set its state back into `next_state`. + Pending { next_state: TState }, + /// The handler performed some work and wants to continue in the given state. + /// + /// This variant is useful because it frees the handler from implementing a loop internally. + Continue { next_state: TState }, + /// The handler finished. + Done, +} + +impl Next { + pub fn map_state( + self, + map: impl FnOnce(TState) -> TNextState, + ) -> Next { + match self { + Next::EmitEvent { event, next_state } => Next::EmitEvent { + event, + next_state: map(next_state), + }, + Next::Pending { next_state } => Next::Pending { + next_state: map(next_state), + }, + Next::Continue { next_state } => Next::Pending { + next_state: map(next_state), + }, + Next::Done => Next::Done, + } + } +} + +#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] +pub struct InboundSubstreamId(u64); + +impl InboundSubstreamId { + fn fetch_and_increment(&mut self) -> Self { + let next_id = *self; + self.0 += 1; + + next_id + } +} + +impl fmt::Display for InboundSubstreamId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +#[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] +pub struct OutboundSubstreamId(u64); + +impl OutboundSubstreamId { + fn fetch_and_increment(&mut self) -> Self { + let next_id = *self; + self.0 += 1; + + next_id + } +} + +impl fmt::Display for OutboundSubstreamId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +pub struct PassthroughProtocol { + ident: Option<&'static [u8]>, +} + +impl PassthroughProtocol { + pub fn new(ident: &'static [u8]) -> Self { + Self { ident: Some(ident) } + } +} + +impl UpgradeInfo for PassthroughProtocol { + type Info = &'static [u8]; + type InfoIter = std::option::IntoIter; + + fn protocol_info(&self) -> Self::InfoIter { + self.ident.into_iter() + } +} + +impl InboundUpgrade for PassthroughProtocol { + type Output = C; + type Error = Void; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { + match self.ident { + Some(_) => future::ready(Ok(socket)).boxed(), + None => future::pending().boxed(), + } + } +} + +impl OutboundUpgrade for PassthroughProtocol { + type Output = C; + type Error = Void; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { + match self.ident { + Some(_) => future::ready(Ok(socket)).boxed(), + None => future::pending().boxed(), + } + } +} + +/// An implementation of [`ProtocolsHandler`] that delegates to individual [`SubstreamHandler`]s. +pub struct SubstreamProtocolsHandler { + inbound_substreams: HashMap, + outbound_substreams: HashMap, + next_inbound_substream_id: InboundSubstreamId, + next_outbound_substream_id: OutboundSubstreamId, + + new_substreams: VecDeque, + + initial_keep_alive_deadline: Instant, +} + +impl + SubstreamProtocolsHandler +{ + pub fn new(initial_keep_alive: Duration) -> Self { + Self { + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), + next_inbound_substream_id: InboundSubstreamId(0), + next_outbound_substream_id: OutboundSubstreamId(0), + new_substreams: Default::default(), + initial_keep_alive_deadline: Instant::now() + initial_keep_alive, + } + } +} + +impl + SubstreamProtocolsHandler +{ + pub fn new_outbound_only(initial_keep_alive: Duration) -> Self { + Self { + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), + next_inbound_substream_id: InboundSubstreamId(0), + next_outbound_substream_id: OutboundSubstreamId(0), + new_substreams: Default::default(), + initial_keep_alive_deadline: Instant::now() + initial_keep_alive, + } + } +} + +impl + SubstreamProtocolsHandler +{ + pub fn new_inbound_only(initial_keep_alive: Duration) -> Self { + Self { + inbound_substreams: Default::default(), + outbound_substreams: Default::default(), + next_inbound_substream_id: InboundSubstreamId(0), + next_outbound_substream_id: OutboundSubstreamId(0), + new_substreams: Default::default(), + initial_keep_alive_deadline: Instant::now() + initial_keep_alive, + } + } +} + +/// Poll all substreams within the given HashMap. +/// +/// This is defined as a separate function because we call it with two different fields stored within [`SubstreamProtocolsHandler`]. +fn poll_substreams( + substreams: &mut HashMap, + cx: &mut Context<'_>, +) -> Poll> +where + TSubstream: SubstreamHandler, + TId: Copy + Eq + Hash + fmt::Display, +{ + let substream_ids = substreams.keys().copied().collect::>(); + + 'loop_substreams: for id in substream_ids { + let mut handler = substreams + .remove(&id) + .expect("we just got the key out of the map"); + + let (next_state, poll) = 'loop_handler: loop { + match handler.advance(cx) { + Ok(Next::EmitEvent { next_state, event }) => { + break (next_state, Poll::Ready(Ok((id, event)))) + } + Ok(Next::Pending { next_state }) => break (next_state, Poll::Pending), + Ok(Next::Continue { next_state }) => { + handler = next_state; + continue 'loop_handler; + } + Ok(Next::Done) => { + log::debug!("Substream handler {} finished", id); + continue 'loop_substreams; + } + Err(e) => return Poll::Ready(Err((id, e))), + } + }; + + substreams.insert(id, next_state); + + return poll; + } + + Poll::Pending +} + +/// Event sent from the [`libp2p_swarm::NetworkBehaviour`] to the [`SubstreamProtocolsHandler`]. +#[derive(Debug)] +pub enum InEvent { + /// Open a new substream using the provided `open_info`. + /// + /// For "client-server" protocols, this is typically the initial message to be sent to the other party. + NewSubstream { open_info: I }, + NotifyInboundSubstream { + id: InboundSubstreamId, + message: TInboundEvent, + }, + NotifyOutboundSubstream { + id: OutboundSubstreamId, + message: TOutboundEvent, + }, +} + +/// Event produced by the [`SubstreamProtocolsHandler`] for the corresponding [`libp2p_swarm::NetworkBehaviour`]. +#[derive(Debug)] +pub enum OutEvent { + /// An inbound substream produced an event. + InboundEvent { + id: InboundSubstreamId, + message: TInbound, + }, + /// An outbound substream produced an event. + OutboundEvent { + id: OutboundSubstreamId, + message: TOutbound, + }, + /// An inbound substream errored irrecoverably. + InboundError { + id: InboundSubstreamId, + error: TInboundError, + }, + /// An outbound substream errored irrecoverably. + OutboundError { + id: OutboundSubstreamId, + error: TOutboundError, + }, +} + +impl< + TInboundInEvent, + TInboundOutEvent, + TOutboundInEvent, + TOutboundOutEvent, + TOutboundOpenInfo, + TInboundError, + TOutboundError, + TInboundSubstreamHandler, + TOutboundSubstreamHandler, + > ProtocolsHandler + for SubstreamProtocolsHandler< + TInboundSubstreamHandler, + TOutboundSubstreamHandler, + TOutboundOpenInfo, + > +where + TInboundSubstreamHandler: SubstreamHandler< + InEvent = TInboundInEvent, + OutEvent = TInboundOutEvent, + Error = TInboundError, + OpenInfo = (), + >, + TOutboundSubstreamHandler: SubstreamHandler< + InEvent = TOutboundInEvent, + OutEvent = TOutboundOutEvent, + Error = TOutboundError, + OpenInfo = TOutboundOpenInfo, + >, + TInboundInEvent: fmt::Debug + Send + 'static, + TInboundOutEvent: fmt::Debug + Send + 'static, + TOutboundInEvent: fmt::Debug + Send + 'static, + TOutboundOutEvent: fmt::Debug + Send + 'static, + TOutboundOpenInfo: fmt::Debug + Send + 'static, + TInboundError: fmt::Debug + Send + 'static, + TOutboundError: fmt::Debug + Send + 'static, + TInboundSubstreamHandler: Send + 'static, + TOutboundSubstreamHandler: Send + 'static, +{ + type InEvent = InEvent; + type OutEvent = OutEvent; + type Error = Void; + type InboundProtocol = PassthroughProtocol; + type OutboundProtocol = PassthroughProtocol; + type InboundOpenInfo = (); + type OutboundOpenInfo = TOutboundOpenInfo; + + fn listen_protocol(&self) -> SubstreamProtocol { + TInboundSubstreamHandler::upgrade(()) + } + + fn inject_fully_negotiated_inbound( + &mut self, + protocol: ::Output, + _: Self::InboundOpenInfo, + ) { + self.inbound_substreams.insert( + self.next_inbound_substream_id.fetch_and_increment(), + TInboundSubstreamHandler::new(protocol, ()), + ); + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol: ::Output, + info: Self::OutboundOpenInfo, + ) { + self.outbound_substreams.insert( + self.next_outbound_substream_id.fetch_and_increment(), + TOutboundSubstreamHandler::new(protocol, info), + ); + } + + fn inject_event(&mut self, event: Self::InEvent) { + match event { + InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info), + InEvent::NotifyInboundSubstream { id, message } => { + match self.inbound_substreams.remove(&id) { + Some(handler) => { + let new_handler = handler.inject_event(message); + + self.inbound_substreams.insert(id, new_handler); + } + None => { + log::debug!("Substream with ID {} not found", id); + } + } + } + InEvent::NotifyOutboundSubstream { id, message } => { + match self.outbound_substreams.remove(&id) { + Some(handler) => { + let new_handler = handler.inject_event(message); + + self.outbound_substreams.insert(id, new_handler); + } + None => { + log::debug!("Substream with ID {} not found", id); + } + } + } + } + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + _: ProtocolsHandlerUpgrErr, + ) { + // TODO: Handle upgrade errors properly + } + + fn connection_keep_alive(&self) -> KeepAlive { + // Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols. + + if Instant::now() < self.initial_keep_alive_deadline { + return KeepAlive::Yes; + } + + if self.inbound_substreams.is_empty() + && self.outbound_substreams.is_empty() + && self.new_substreams.is_empty() + { + return KeepAlive::No; + } + + KeepAlive::Yes + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ProtocolsHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if let Some(open_info) = self.new_substreams.pop_front() { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: TOutboundSubstreamHandler::upgrade(open_info), + }); + } + + match poll_substreams(&mut self.inbound_substreams, cx) { + Poll::Ready(Ok((id, message))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::InboundEvent { + id, + message, + })) + } + Poll::Ready(Err((id, error))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::InboundError { + id, + error, + })) + } + Poll::Pending => {} + } + + match poll_substreams(&mut self.outbound_substreams, cx) { + Poll::Ready(Ok((id, message))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::OutboundEvent { + id, + message, + })) + } + Poll::Ready(Err((id, error))) => { + return Poll::Ready(ProtocolsHandlerEvent::Custom(OutEvent::OutboundError { + id, + error, + })) + } + Poll::Pending => {} + } + + Poll::Pending + } +} + +/// A helper struct for substream handlers that can be implemented as async functions. +/// +/// This only works for substreams without an `InEvent` because - once constructed - the state of an inner future is opaque. +pub struct FutureSubstream { + future: Fuse>>, +} + +impl FutureSubstream { + pub fn new(future: impl Future> + Send + 'static) -> Self { + Self { + future: future.boxed().fuse(), + } + } + + pub fn advance(mut self, cx: &mut Context<'_>) -> Result, TError> { + if self.future.is_terminated() { + return Ok(Next::Done); + } + + match self.future.poll_unpin(cx) { + Poll::Ready(Ok(event)) => Ok(Next::EmitEvent { + event, + next_state: self, + }), + Poll::Ready(Err(error)) => Err(error), + Poll::Pending => Ok(Next::Pending { next_state: self }), + } + } +} + +impl SubstreamHandler for void::Void { + type InEvent = void::Void; + type OutEvent = void::Void; + type Error = void::Void; + type OpenInfo = (); + + fn new(_: NegotiatedSubstream, _: Self::OpenInfo) -> Self { + unreachable!("we should never yield a substream") + } + + fn inject_event(self, event: Self::InEvent) -> Self { + void::unreachable(event) + } + + fn advance(self, _: &mut Context<'_>) -> Result, Self::Error> { + void::unreachable(self) + } + + fn upgrade( + open_info: Self::OpenInfo, + ) -> SubstreamProtocol { + SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info) + } +} diff --git a/protocols/rendezvous/tests/harness/mod.rs b/protocols/rendezvous/tests/harness/mod.rs new file mode 100644 index 00000000000..5747f7d19a6 --- /dev/null +++ b/protocols/rendezvous/tests/harness/mod.rs @@ -0,0 +1,221 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use async_trait::async_trait; +use futures::stream::FusedStream; +use futures::StreamExt; +use futures::{future, Stream}; +use libp2p::core::muxing::StreamMuxerBox; +use libp2p::core::transport::upgrade::Version; +use libp2p::core::transport::MemoryTransport; +use libp2p::core::upgrade::SelectUpgrade; +use libp2p::core::{identity, Multiaddr, PeerId, Transport}; +use libp2p::mplex::MplexConfig; +use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; +use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; +use libp2p::yamux::YamuxConfig; +use std::fmt::Debug; +use std::time::Duration; + +pub fn new_swarm(behaviour_fn: F) -> Swarm +where + B: NetworkBehaviour, + ::OutEvent: Debug, + B: NetworkBehaviour, + F: FnOnce(PeerId, identity::Keypair) -> B, +{ + let identity = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(identity.public()); + + let dh_keys = Keypair::::new() + .into_authentic(&identity) + .expect("failed to create dh_keys"); + let noise = NoiseConfig::xx(dh_keys).into_authenticated(); + + let transport = MemoryTransport::default() + .upgrade(Version::V1) + .authenticate(noise) + .multiplex(SelectUpgrade::new( + YamuxConfig::default(), + MplexConfig::new(), + )) + .timeout(Duration::from_secs(5)) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))) + .boxed(); + + SwarmBuilder::new(transport, behaviour_fn(peer_id, identity), peer_id) + .executor(Box::new(|future| { + let _ = tokio::spawn(future); + })) + .build() +} + +fn get_rand_memory_address() -> Multiaddr { + let address_port = rand::random::(); + let addr = format!("/memory/{}", address_port) + .parse::() + .unwrap(); + + addr +} + +pub async fn await_events_or_timeout( + swarm_1: &mut (impl Stream> + FusedStream + Unpin), + swarm_2: &mut (impl Stream> + FusedStream + Unpin), +) -> (SwarmEvent, SwarmEvent) +where + SwarmEvent: Debug, + SwarmEvent: Debug, +{ + tokio::time::timeout( + Duration::from_secs(30), + future::join( + swarm_1 + .inspect(|event| log::debug!("Swarm1 emitted {:?}", event)) + .select_next_some(), + swarm_2 + .inspect(|event| log::debug!("Swarm2 emitted {:?}", event)) + .select_next_some(), + ), + ) + .await + .expect("network behaviours to emit an event within 10 seconds") +} + +#[macro_export] +macro_rules! assert_behaviour_events { + ($swarm1: ident: $pat1: pat, $swarm2: ident: $pat2: pat, || $body: block) => { + match await_events_or_timeout(&mut $swarm1, &mut $swarm2).await { + ( + libp2p::swarm::SwarmEvent::Behaviour($pat1), + libp2p::swarm::SwarmEvent::Behaviour($pat2), + ) => $body, + _ => panic!("Unexpected combination of events emitted, check logs for details"), + } + }; +} + +/// An extension trait for [`Swarm`] that makes it easier to set up a network of [`Swarm`]s for tests. +#[async_trait] +pub trait SwarmExt { + /// Establishes a connection to the given [`Swarm`], polling both of them until the connection is established. + async fn block_on_connection(&mut self, other: &mut Swarm) + where + T: NetworkBehaviour, + ::OutEvent: Debug; + + /// Listens on a random memory address, polling the [`Swarm`] until the transport is ready to accept connections. + async fn listen_on_random_memory_address(&mut self) -> Multiaddr; + + /// Spawns the given [`Swarm`] into a runtime, polling it endlessly. + fn spawn_into_runtime(self); +} + +#[async_trait] +impl SwarmExt for Swarm +where + B: NetworkBehaviour, + ::OutEvent: Debug, +{ + async fn block_on_connection(&mut self, other: &mut Swarm) + where + T: NetworkBehaviour, + ::OutEvent: Debug, + { + let addr_to_dial = other.external_addresses().next().unwrap().addr.clone(); + + self.dial_addr(addr_to_dial.clone()).unwrap(); + + let mut dialer_done = false; + let mut listener_done = false; + + loop { + let dialer_event_fut = self.select_next_some(); + + tokio::select! { + dialer_event = dialer_event_fut => { + match dialer_event { + SwarmEvent::ConnectionEstablished { .. } => { + dialer_done = true; + } + SwarmEvent::UnknownPeerUnreachableAddr { address, error } if address == addr_to_dial => { + panic!("Failed to dial address {}: {}", addr_to_dial, error) + } + other => { + log::debug!("Ignoring {:?}", other); + } + } + }, + listener_event = other.select_next_some() => { + match listener_event { + SwarmEvent::ConnectionEstablished { .. } => { + listener_done = true; + } + SwarmEvent::IncomingConnectionError { error, .. } => { + panic!("Failure in incoming connection {}", error); + } + other => { + log::debug!("Ignoring {:?}", other); + } + } + } + } + + if dialer_done && listener_done { + return; + } + } + } + + async fn listen_on_random_memory_address(&mut self) -> Multiaddr { + let memory_addr_listener_id = self.listen_on(get_rand_memory_address()).unwrap(); + + // block until we are actually listening + let multiaddr = loop { + match self.select_next_some().await { + SwarmEvent::NewListenAddr { + address, + listener_id, + } if listener_id == memory_addr_listener_id => { + break address; + } + other => { + log::debug!( + "Ignoring {:?} while waiting for listening to succeed", + other + ); + } + } + }; + + // Memory addresses are externally reachable because they all share the same memory-space. + self.add_external_address(multiaddr.clone(), AddressScore::Infinite); + + multiaddr + } + + fn spawn_into_runtime(mut self) { + tokio::spawn(async move { + loop { + self.next().await; + } + }); + } +} diff --git a/protocols/rendezvous/tests/rendezvous.rs b/protocols/rendezvous/tests/rendezvous.rs new file mode 100644 index 00000000000..e6ba5fcc39a --- /dev/null +++ b/protocols/rendezvous/tests/rendezvous.rs @@ -0,0 +1,384 @@ +// Copyright 2021 COMIT Network. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#[macro_use] +pub mod harness; + +use crate::harness::{await_events_or_timeout, new_swarm, SwarmExt}; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use libp2p_core::identity; +use libp2p_rendezvous as rendezvous; +use libp2p_swarm::DialError; +use libp2p_swarm::{Swarm, SwarmEvent}; +use std::convert::TryInto; +use std::time::Duration; + +#[tokio::test] +async fn given_successful_registration_then_successful_discovery() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let _ = alice + .behaviour_mut() + .register(namespace.clone(), *robert.local_peer_id(), None); + + assert_behaviour_events! { + alice: rendezvous::client::Event::Registered { rendezvous_node, ttl, namespace: register_node_namespace }, + robert: rendezvous::server::Event::PeerRegistered { peer, registration }, + || { + assert_eq!(&peer, alice.local_peer_id()); + assert_eq!(&rendezvous_node, robert.local_peer_id()); + assert_eq!(registration.namespace, namespace); + assert_eq!(register_node_namespace, namespace); + assert_eq!(ttl, rendezvous::DEFAULT_TTL); + } + }; + + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, *robert.local_peer_id()); + + assert_behaviour_events! { + bob: rendezvous::client::Event::Discovered { registrations, .. }, + robert: rendezvous::server::Event::DiscoverServed { .. }, + || { + match registrations.as_slice() { + [rendezvous::Registration { + namespace: registered_namespace, + record, + ttl, + }] => { + assert_eq!(*ttl, rendezvous::DEFAULT_TTL); + assert_eq!(record.peer_id(), *alice.local_peer_id()); + assert_eq!(*registered_namespace, namespace); + } + _ => panic!("Expected exactly one registration to be returned from discover"), + } + } + }; +} + +#[tokio::test] +async fn given_successful_registration_then_refresh_ttl() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let roberts_peer_id = *robert.local_peer_id(); + let refresh_ttl = 10_000; + + let _ = alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, None); + + assert_behaviour_events! { + alice: rendezvous::client::Event::Registered { .. }, + robert: rendezvous::server::Event::PeerRegistered { .. }, + || { } + }; + + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, roberts_peer_id); + + assert_behaviour_events! { + bob: rendezvous::client::Event::Discovered { .. }, + robert: rendezvous::server::Event::DiscoverServed { .. }, + || { } + }; + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, Some(refresh_ttl)); + + assert_behaviour_events! { + alice: rendezvous::client::Event::Registered { ttl, .. }, + robert: rendezvous::server::Event::PeerRegistered { .. }, + || { + assert_eq!(ttl, refresh_ttl); + } + }; + + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, *robert.local_peer_id()); + + assert_behaviour_events! { + bob: rendezvous::client::Event::Discovered { registrations, .. }, + robert: rendezvous::server::Event::DiscoverServed { .. }, + || { + match registrations.as_slice() { + [rendezvous::Registration { ttl, .. }] => { + assert_eq!(*ttl, refresh_ttl); + } + _ => panic!("Expected exactly one registration to be returned from discover"), + } + } + }; +} + +#[tokio::test] +async fn given_invalid_ttl_then_unsuccessful_registration() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + alice.behaviour_mut().register( + namespace.clone(), + *robert.local_peer_id(), + Some(100_000_000), + ); + + assert_behaviour_events! { + alice: rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote {error , ..}), + robert: rendezvous::server::Event::PeerNotRegistered { .. }, + || { + assert_eq!(error, rendezvous::ErrorCode::InvalidTtl); + } + }; +} + +#[tokio::test] +async fn discover_allows_for_dial_by_peer_id() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + + let roberts_peer_id = *robert.local_peer_id(); + robert.spawn_into_runtime(); + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, None); + bob.behaviour_mut() + .discover(Some(namespace.clone()), None, None, roberts_peer_id); + + assert_behaviour_events! { + alice: rendezvous::client::Event::Registered { .. }, + bob: rendezvous::client::Event::Discovered { .. }, + || { } + }; + + let alices_peer_id = *alice.local_peer_id(); + let bobs_peer_id = *bob.local_peer_id(); + + bob.dial(&alices_peer_id).unwrap(); + + let alice_connected_to = tokio::spawn(async move { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = + alice.select_next_some().await + { + break peer_id; + } + } + }); + let bob_connected_to = tokio::spawn(async move { + loop { + if let SwarmEvent::ConnectionEstablished { peer_id, .. } = bob.select_next_some().await + { + break peer_id; + } + } + }); + + assert_eq!(alice_connected_to.await.unwrap(), bobs_peer_id); + assert_eq!(bob_connected_to.await.unwrap(), alices_peer_id); +} + +#[tokio::test] +async fn eve_cannot_register() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let mut robert = new_server(rendezvous::server::Config::default()).await; + let mut eve = new_impersonating_client().await; + eve.block_on_connection(&mut robert).await; + + eve.behaviour_mut() + .register(namespace.clone(), *robert.local_peer_id(), None); + + assert_behaviour_events! { + eve: rendezvous::client::Event::RegisterFailed(rendezvous::client::RegisterError::Remote { error: err_code , ..}), + robert: rendezvous::server::Event::PeerNotRegistered { .. }, + || { + assert_eq!(err_code, rendezvous::ErrorCode::NotAuthorized); + } + }; +} + +// test if charlie can operate as client and server simultaneously +#[tokio::test] +async fn can_combine_client_and_server() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice], mut robert) = + new_server_with_connected_clients(rendezvous::server::Config::default()).await; + let mut charlie = new_combined_node().await; + charlie.block_on_connection(&mut robert).await; + alice.block_on_connection(&mut charlie).await; + + charlie + .behaviour_mut() + .client + .register(namespace.clone(), *robert.local_peer_id(), None); + + assert_behaviour_events! { + charlie: CombinedEvent::Client(rendezvous::client::Event::Registered { .. }), + robert: rendezvous::server::Event::PeerRegistered { .. }, + || { } + }; + + alice + .behaviour_mut() + .register(namespace, *charlie.local_peer_id(), None); + + assert_behaviour_events! { + charlie: CombinedEvent::Server(rendezvous::server::Event::PeerRegistered { .. }), + alice: rendezvous::client::Event::Registered { .. }, + || { } + }; +} + +#[tokio::test] +async fn registration_on_clients_expire() { + let _ = env_logger::try_init(); + let namespace = rendezvous::Namespace::from_static("some-namespace"); + let ([mut alice, mut bob], robert) = + new_server_with_connected_clients(rendezvous::server::Config::default().with_min_ttl(1)) + .await; + + let roberts_peer_id = *robert.local_peer_id(); + robert.spawn_into_runtime(); + + let registration_ttl = 3; + + alice + .behaviour_mut() + .register(namespace.clone(), roberts_peer_id, Some(registration_ttl)); + bob.behaviour_mut() + .discover(Some(namespace), None, None, roberts_peer_id); + + assert_behaviour_events! { + alice: rendezvous::client::Event::Registered { .. }, + bob: rendezvous::client::Event::Discovered { .. }, + || { } + }; + + tokio::time::sleep(Duration::from_secs(registration_ttl + 5)).await; + + let event = bob.select_next_some().await; + let error = bob.dial(alice.local_peer_id()).unwrap_err(); + + assert!(matches!( + event, + SwarmEvent::Behaviour(rendezvous::client::Event::Expired { .. }) + )); + assert!(matches!(error, DialError::NoAddresses)); +} + +async fn new_server_with_connected_clients( + config: rendezvous::server::Config, +) -> ( + [Swarm; N], + Swarm, +) { + let mut server = new_server(config).await; + + let mut clients: [Swarm<_>; N] = match (0usize..N) + .map(|_| new_client()) + .collect::>() + .collect::>() + .await + .try_into() + { + Ok(clients) => clients, + Err(_) => panic!("Vec is of size N"), + }; + + for client in &mut clients { + client.block_on_connection(&mut server).await; + } + + (clients, server) +} + +async fn new_client() -> Swarm { + let mut client = new_swarm(|_, identity| rendezvous::client::Behaviour::new(identity)); + client.listen_on_random_memory_address().await; // we need to listen otherwise we don't have addresses to register + + client +} + +async fn new_server(config: rendezvous::server::Config) -> Swarm { + let mut server = new_swarm(|_, _| rendezvous::server::Behaviour::new(config)); + + server.listen_on_random_memory_address().await; + + server +} + +async fn new_combined_node() -> Swarm { + let mut node = new_swarm(|_, identity| CombinedBehaviour { + client: rendezvous::client::Behaviour::new(identity), + server: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), + }); + node.listen_on_random_memory_address().await; + + node +} + +async fn new_impersonating_client() -> Swarm { + // In reality, if Eve were to try and fake someones identity, she would obviously only know the public key. + // Due to the type-safe API of the `Rendezvous` behaviour and `PeerRecord`, we actually cannot construct a bad `PeerRecord` (i.e. one that is claims to be someone else). + // As such, the best we can do is hand eve a completely different keypair from what she is using to authenticate her connection. + let someone_else = identity::Keypair::generate_ed25519(); + let mut eve = new_swarm(move |_, _| rendezvous::client::Behaviour::new(someone_else)); + eve.listen_on_random_memory_address().await; + + eve +} + +#[derive(libp2p::NetworkBehaviour)] +#[behaviour(event_process = false, out_event = "CombinedEvent")] +struct CombinedBehaviour { + client: rendezvous::client::Behaviour, + server: rendezvous::server::Behaviour, +} + +#[derive(Debug)] +enum CombinedEvent { + Client(rendezvous::client::Event), + Server(rendezvous::server::Event), +} + +impl From for CombinedEvent { + fn from(v: rendezvous::server::Event) -> Self { + Self::Server(v) + } +} + +impl From for CombinedEvent { + fn from(v: rendezvous::client::Event) -> Self { + Self::Client(v) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8514b9791f1..b8728005ea3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -104,6 +104,10 @@ pub use libp2p_pnet as pnet; #[cfg_attr(docsrs, doc(cfg(feature = "relay")))] #[doc(inline)] pub use libp2p_relay as relay; +#[cfg(feature = "rendezvous")] +#[cfg_attr(docsrs, doc(cfg(feature = "rendezvous")))] +#[doc(inline)] +pub use libp2p_rendezvous as rendezvous; #[cfg(feature = "request-response")] #[cfg_attr(docsrs, doc(cfg(feature = "request-response")))] #[doc(inline)]