diff --git a/CHANGELOG.md b/CHANGELOG.md index bc4c46f82..8fc524f6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363) - Replication message de- / serialization [#375](https://github.com/p2panda/aquadoggo/pull/375) - Naive protocol replication [#380](https://github.com/p2panda/aquadoggo/pull/380) +- Integrate replication manager with networking stack [#387](https://github.com/p2panda/aquadoggo/pull/387) 🥞 ### Changed diff --git a/Cargo.lock b/Cargo.lock index e54f203b3..ec634c091 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tower", "tower-http", "tower-service", @@ -4873,6 +4874,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index 1f51d48eb..0745042cd 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -70,6 +70,7 @@ tokio = { version = "1.25.0", features = [ "sync", "time", ] } +tokio-stream = { version = "0.1.14", features = ["sync"] } tower-http = { version = "0.3.4", default-features = false, features = [ "cors", ] } diff --git a/aquadoggo/src/bus.rs b/aquadoggo/src/bus.rs index ccee7719b..addc9402c 100644 --- a/aquadoggo/src/bus.rs +++ b/aquadoggo/src/bus.rs @@ -3,6 +3,8 @@ use p2panda_rs::operation::OperationId; use crate::manager::Sender; +use crate::network::Peer; +use crate::replication::SyncMessage; /// Sender for cross-service communication bus. pub type ServiceSender = Sender; @@ -12,4 +14,19 @@ pub type ServiceSender = Sender; pub enum ServiceMessage { /// A new operation arrived at the node. NewOperation(OperationId), + + /// Node established a bi-directional connection to another node. + PeerConnected(Peer), + + /// Node closed a connection to another node. + PeerDisconnected(Peer), + + /// Node sent a message to remote node for replication. + SentReplicationMessage(Peer, SyncMessage), + + /// Node received a message from remote node for replication. + ReceivedReplicationMessage(Peer, SyncMessage), + + /// Replication protocol failed with an critical error. + ReplicationFailed(Peer), } diff --git a/aquadoggo/src/config.rs b/aquadoggo/src/config.rs index 4b651530d..eb3aa62cd 100644 --- a/aquadoggo/src/config.rs +++ b/aquadoggo/src/config.rs @@ -98,6 +98,29 @@ impl Configuration { } }; + // Derive peer id from key pair + // @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388 + let key_pair = NetworkConfiguration::load_or_generate_key_pair(config.base_path.clone())?; + config.network.set_peer_id(&key_pair.public()); + Ok(config) } } + +#[cfg(test)] +impl Configuration { + /// Returns a new configuration object for a node which stores all data temporarily in memory. + pub fn new_ephemeral() -> Self { + let mut config = Configuration { + database_url: Some("sqlite::memory:".to_string()), + ..Default::default() + }; + + // Generate a random key pair and just keep it in memory + // @TODO: This needs refactoring: https://github.com/p2panda/aquadoggo/issues/388 + let key_pair: libp2p::identity::Keypair = crate::network::identity::Identity::new(); + config.network.set_peer_id(&key_pair.public()); + + config + } +} diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index afce47dc1..39dc6efd5 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -459,6 +459,7 @@ async fn insert_document_view( ) VALUES ($1, $2, $3) + ON CONFLICT(document_view_id) DO NOTHING -- @TODO: temp fix for double document view insertions: https://github.com/p2panda/aquadoggo/issues/398 ", ) .bind(document_view.id().to_string()) diff --git a/aquadoggo/src/db/stores/entry.rs b/aquadoggo/src/db/stores/entry.rs index 3a7a7d220..e9c3d7978 100644 --- a/aquadoggo/src/db/stores/entry.rs +++ b/aquadoggo/src/db/stores/entry.rs @@ -277,14 +277,14 @@ impl EntryStore for SqlStore { log_id: &LogId, initial_seq_num: &SeqNum, ) -> Result, EntryStorageError> { + // Formatting query string in this way as `sqlx` currently doesn't support binding list + // arguments for IN queries. let cert_pool_seq_nums = get_lipmaa_links_back_to(initial_seq_num.as_u64(), 1) .iter() - .map(|seq_num| seq_num.to_string()) + .map(|seq_num| format!("'{seq_num}'")) .collect::>() .join(","); - // Formatting query string in this way as `sqlx` currently - // doesn't support binding list arguments for IN queries. let sql_str = format!( "SELECT public_key, @@ -299,7 +299,7 @@ impl EntryStore for SqlStore { WHERE public_key = $1 AND log_id = $2 - AND CAST(seq_num AS NUMERIC) IN ({}) + AND seq_num IN ({}) ORDER BY CAST(seq_num AS NUMERIC) DESC ", @@ -337,6 +337,8 @@ impl SqlStore { logs.schema = $1 GROUP BY entries.public_key, entries.log_id + ORDER BY + entries.public_key, CAST(entries.log_id AS NUMERIC) ", ) .bind(schema_id.to_string()) diff --git a/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs b/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs index 582dbedf1..ef522b908 100644 --- a/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs +++ b/aquadoggo/src/graphql/scalars/document_view_id_scalar.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::{fmt::Display, str::FromStr}; +use std::fmt::Display; +use std::str::FromStr; use dynamic_graphql::{Error, Result, Scalar, ScalarValue, Value}; use p2panda_rs::document::DocumentViewId; diff --git a/aquadoggo/src/graphql/schema.rs b/aquadoggo/src/graphql/schema.rs index bf33b2fba..6e66d9148 100644 --- a/aquadoggo/src/graphql/schema.rs +++ b/aquadoggo/src/graphql/schema.rs @@ -194,7 +194,7 @@ impl GraphQLSchemaManager { let shared = self.shared.clone(); let schemas = self.schemas.clone(); - info!("Subscribing GraphQL manager to schema provider"); + debug!("Subscribing GraphQL manager to schema provider"); let mut on_schema_added = shared.schema_provider.on_schema_added(); // Create the new GraphQL based on the current state of known p2panda application schemas diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 90ac41ac8..c97b88263 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -88,28 +88,30 @@ pub async fn materializer_service( // Listen to incoming new entries and operations and move them into task queue let handle = task::spawn(async move { - while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { - // Resolve document id of regarding operation - match context - .store - .get_document_id_by_operation_id(&operation_id) - .await - .unwrap_or_else(|_| { - panic!( - "Failed database query when retreiving document for operation_id {}", - operation_id - ) - }) { - Some(document_id) => { - // Dispatch "reduce" task which will materialize the regarding document - factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))); - } - None => { - // Panic when we couldn't find the regarding document in the database. We can - // safely assure that this is due to a critical bug affecting the database - // integrity. Panicking here will close `handle` and by that signal a node - // shutdown. - panic!("Could not find document for operation_id {}", operation_id); + loop { + if let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { + // Resolve document id of regarding operation + match context + .store + .get_document_id_by_operation_id(&operation_id) + .await + .unwrap_or_else(|_| { + panic!( + "Failed database query when retreiving document for operation_id {}", + operation_id + ) + }) { + Some(document_id) => { + // Dispatch "reduce" task which will materialize the regarding document + factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))); + } + None => { + // Panic when we couldn't find the regarding document in the database. We can + // safely assure that this is due to a critical bug affecting the database + // integrity. Panicking here will close `handle` and by that signal a node + // shutdown. + panic!("Could not find document for operation_id {}", operation_id); + } } } } diff --git a/aquadoggo/src/network/behaviour.rs b/aquadoggo/src/network/behaviour.rs index 8886f905a..90380420f 100644 --- a/aquadoggo/src/network/behaviour.rs +++ b/aquadoggo/src/network/behaviour.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::time::Duration; + use anyhow::Result; use libp2p::identity::Keypair; use libp2p::swarm::behaviour::toggle::Toggle; @@ -8,9 +10,31 @@ use libp2p::{autonat, connection_limits, identify, mdns, ping, relay, rendezvous use log::debug; use crate::network::config::NODE_NAMESPACE; +use crate::network::peers; use crate::network::NetworkConfiguration; +/// How often do we broadcast mDNS queries into the network. +const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(5); + +/// How often do we ping other peers to check for a healthy connection. +const PING_INTERVAL: Duration = Duration::from_secs(5); + +/// How long do we wait for an answer from the other peer before we consider the connection as +/// stale. +const PING_TIMEOUT: Duration = Duration::from_secs(3); + /// Network behaviour for the aquadoggo node. +/// +/// In libp2p all different behaviours are "merged" into one "main behaviour" with help of the +/// `NetworkBehaviour` derive macro. +/// +/// All behaviours share the same connections with each other. Together they form something we +/// could call our "custom" networking behaviour. +/// +/// It is possible for a peer to not support all behaviours, internally libp2p negotiates the +/// capabilities of each peer for us and upgrades the protocol accordingly. For example two peers +/// can handle p2panda messages with each others (using the `peers` behaviour) but do not +/// necessarily need to be able to support the `relay` behaviour. #[derive(NetworkBehaviour)] pub struct Behaviour { /// Determine NAT status by requesting remote peers to dial the public address of the @@ -42,9 +66,12 @@ pub struct Behaviour { /// Register with a rendezvous server and query remote peer addresses. pub rendezvous_client: Toggle, - /// Serve as a rendezvous point for remote peers to register their external addresses - /// and query the addresses of other peers. + /// Serve as a rendezvous point for remote peers to register their external addresses and query + /// the addresses of other peers. pub rendezvous_server: Toggle, + + /// Register peer connections and handle p2panda messaging with them. + pub peers: peers::Behaviour, } impl Behaviour { @@ -86,7 +113,13 @@ impl Behaviour { // Create an mDNS behaviour with default configuration if the mDNS flag is set let mdns = if network_config.mdns { debug!("mDNS network behaviour enabled"); - Some(mdns::Behaviour::new(Default::default(), peer_id)?) + Some(mdns::Behaviour::new( + mdns::Config { + query_interval: MDNS_QUERY_INTERVAL, + ..mdns::Config::default() + }, + peer_id, + )?) } else { None }; @@ -94,7 +127,11 @@ impl Behaviour { // Create a ping behaviour with default configuration if the ping flag is set let ping = if network_config.ping { debug!("Ping network behaviour enabled"); - Some(ping::Behaviour::default()) + Some(ping::Behaviour::new( + ping::Config::new() + .with_interval(PING_INTERVAL) + .with_timeout(PING_TIMEOUT), + )) } else { None }; @@ -132,6 +169,9 @@ impl Behaviour { None }; + // Create behaviour to manage peer connections and handle p2panda messaging + let peers = peers::Behaviour::new(); + Ok(Self { autonat: autonat.into(), identify: identify.into(), @@ -142,6 +182,7 @@ impl Behaviour { rendezvous_server: rendezvous_server.into(), relay_client: relay_client.into(), relay_server: relay_server.into(), + peers, }) } } diff --git a/aquadoggo/src/network/config.rs b/aquadoggo/src/network/config.rs index e5b1d283f..48415c861 100644 --- a/aquadoggo/src/network/config.rs +++ b/aquadoggo/src/network/config.rs @@ -4,7 +4,7 @@ use std::path::PathBuf; use anyhow::Result; use libp2p::connection_limits::ConnectionLimits; -use libp2p::identity::Keypair; +use libp2p::identity::{Keypair, PublicKey}; use libp2p::{Multiaddr, PeerId}; use log::info; use serde::{Deserialize, Serialize}; @@ -72,8 +72,8 @@ pub struct NetworkConfiguration { /// Ping behaviour enabled. /// - /// Send outbound pings to connected peers every 15 seconds and respond to inbound pings. - /// Every sent ping must yield a response within 20 seconds in order to be successful. + /// Send outbound pings to connected peers every 15 seconds and respond to inbound pings. Every + /// sent ping must yield a response within 20 seconds in order to be successful. pub ping: bool, /// QUIC transport port. @@ -103,6 +103,9 @@ pub struct NetworkConfiguration { /// /// Serve as a rendezvous point for peer discovery, allowing peer registration and queries. pub rendezvous_server_enabled: bool, + + /// Our local peer id. + pub peer_id: Option, } impl Default for NetworkConfiguration { @@ -127,11 +130,17 @@ impl Default for NetworkConfiguration { rendezvous_address: None, rendezvous_peer_id: None, rendezvous_server_enabled: false, + peer_id: None, } } } impl NetworkConfiguration { + /// Derive peer id from a given public key. + pub fn set_peer_id(&mut self, public_key: &PublicKey) { + self.peer_id = Some(PeerId::from_public_key(public_key)); + } + /// Define the connection limits of the swarm. pub fn connection_limits(&self) -> ConnectionLimits { ConnectionLimits::default() @@ -144,8 +153,8 @@ impl NetworkConfiguration { /// Load the key pair from the file at the specified path. /// - /// If the file does not exist, a random key pair is generated and saved. - /// If no path is specified, a random key pair is generated. + /// If the file does not exist, a random key pair is generated and saved. If no path is + /// specified, a random key pair is generated. pub fn load_or_generate_key_pair(path: Option) -> Result { let key_pair = match path { Some(mut path) => { diff --git a/aquadoggo/src/network/identity.rs b/aquadoggo/src/network/identity.rs index 9f49909e1..a5594deef 100644 --- a/aquadoggo/src/network/identity.rs +++ b/aquadoggo/src/network/identity.rs @@ -2,7 +2,7 @@ use std::fs; use std::fs::File; -use std::io::prelude::*; +use std::io::{Read, Write}; use std::os::unix::fs::PermissionsExt; use std::path::Path; @@ -27,6 +27,8 @@ pub trait Identity { Self: Sized; } +// @TODO: This should use our p2panda `KeyPair` type and in general be handled outside the libp2p +// context. Related issue: https://github.com/p2panda/aquadoggo/issues/388 impl Identity for Keypair { /// Generate a new Ed25519 key pair. fn new() -> Self { @@ -47,11 +49,9 @@ impl Identity for Keypair { // See: https://github.com/p2panda/aquadoggo/issues/295 #[allow(deprecated)] fn save(&self, path: &Path) -> Result<()> { - // Retrieve the private key from the key pair let private_key = match self { Keypair::Ed25519(key_pair) => key_pair.secret(), }; - // Encode the private key let encoded_private_key = hex::encode(private_key); fs::create_dir_all(path.parent().unwrap())?; @@ -73,16 +73,12 @@ impl Identity for Keypair { where Self: Sized, { - // Read the key pair from file let mut file = File::open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; - // Decode the private key let private_key_bytes = hex::decode(contents)?; - // Convert the private key bytes into a `SecretKey` let private_key = ed25519::SecretKey::from_bytes(private_key_bytes)?; - // Derive a key pair from the private key let key_pair = Keypair::Ed25519(private_key.into()); Ok(key_pair) diff --git a/aquadoggo/src/network/mod.rs b/aquadoggo/src/network/mod.rs index 1af29360f..a5440ba00 100644 --- a/aquadoggo/src/network/mod.rs +++ b/aquadoggo/src/network/mod.rs @@ -2,13 +2,14 @@ mod behaviour; mod config; -mod identity; +pub mod identity; +mod peers; mod service; +mod shutdown; mod swarm; mod transport; -// @TODO: Remove this as soon as we integrated it into the libp2p swarm -#[allow(dead_code)] -mod replication; pub use config::NetworkConfiguration; +pub use peers::Peer; pub use service::network_service; +pub use shutdown::ShutdownHandler; diff --git a/aquadoggo/src/network/peers/behaviour.rs b/aquadoggo/src/network/peers/behaviour.rs new file mode 100644 index 000000000..644b4a36c --- /dev/null +++ b/aquadoggo/src/network/peers/behaviour.rs @@ -0,0 +1,404 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::VecDeque; +use std::task::{Context, Poll}; + +use libp2p::core::Endpoint; +use libp2p::swarm::derive_prelude::ConnectionEstablished; +use libp2p::swarm::{ + ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, + PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, +}; +use libp2p::{Multiaddr, PeerId}; +use log::trace; +use p2panda_rs::Human; + +use crate::network::peers::handler::{Handler, HandlerInEvent, HandlerOutEvent}; +use crate::network::peers::Peer; +use crate::replication::SyncMessage; + +#[derive(Debug)] +pub enum Event { + /// Message received on the inbound stream. + MessageReceived(Peer, SyncMessage), + + /// We established an inbound or outbound connection to a peer for the first time. + PeerConnected(Peer), + + /// Peer does not have any inbound or outbound connections left with us. + PeerDisconnected(Peer), +} + +/// p2panda network behaviour managing peers who can speak the "p2panda" protocol, handling +/// incoming and outgoing messages related to it. +/// +/// This custom behaviour represents the "p2panda" protocol. As soon as both peers agree that they +/// can speak the "p2panda" protocol libp2p will upgrade the connection and enable this custom +/// `NetworkBehaviour` implementation. +/// +/// All behaviours will share the same connections but each individual behaviour maintains its own +/// connection handlers on top of them. With this in mind the following procedure takes place: +/// +/// 1. Swarm discovers a node and dials a new outgoing connection OR swarm listener was dialed from +/// a remote peer, establishes a new incoming connection +/// 2. Swarm negotiates if new node can speak the "p2panda" protocol. If this is the case the +/// connection gets upgraded +/// 3. Custom p2panda `NetworkBehaviour` initialises the `ConnectionHandler` for the underlying +/// connection (see `handle_established_inbound_connection` or +/// `handle_established_outbound_connection`) and informs other services about new peer +/// 4. Custom p2panda `ConnectionHandler` establishes bi-directional streams which encode and +/// decode CBOR messages for us. As soon as a new message arrives the handler informs the +/// behaviour about it +/// 5. Custom p2panda `NetworkBehaviour` receives incoming message from handler and passes it +/// further to other services +/// 6. Custom p2panda `NetworkBehaviour` receives messages from other services and passes them down +/// again to `ConnectionHandler` which sends them over the data stream to remote node +/// 7. Swarm informs `NetworkBehaviour` about closed connection handlers (gracefully or via +/// time-out). The custom p2panda `NetworkBehaviour` informs other services about disconnected +/// peer +/// +/// ```text +/// Swarm +/// ┌──────────────────────────────────────────────────────────────────┐ +/// │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ +/// │ │ Connection │ │ Connection │ │ Connection │ │ +/// │ └──────┬───────┘ └───────┬──────┘ └───────┬──────┘ │ +/// │ │ │ │ │ +/// │ Upgrade Upgrade Upgrade │ +/// │ │ │ │ │ +/// └─────────┼───────────────────────┼──────────────────────┼─────────┘ +/// │ │ │ +/// ┌───────────────┼───────────────────────┼──────────────────────┼────────────────┐ +/// │ ┌──────────┴───────────────────────┴──────────────────────┴───────────┐ │ +/// │ │ NetworkBehaviour │ │ +/// │ └──────────┬───────────────────────┬──────────────────────┬───────────┘ │ +/// │ │ │ │ │ +/// │ ┌──────────▼──────────┐ ┌──────────▼──────────┐ ┌─────────▼───────────┐ │ +/// │ │ ConnectionHandler │ │ ConnectionHandler │ │ ConnectionHandler │ │ +/// │ └─────────────────────┘ └─────────────────────┘ └─────────────────────┘ │ +/// └───────────────────────────────────────────────────────────────────────────────┘ +/// p2panda protocol +/// ``` +#[derive(Debug)] +pub struct Behaviour { + events: VecDeque>, +} + +impl Behaviour { + pub fn new() -> Self { + Self { + events: VecDeque::new(), + } + } + + fn on_connection_established(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + let peer = Peer::new(peer_id, connection_id); + self.events + .push_back(ToSwarm::GenerateEvent(Event::PeerConnected(peer))); + } + + fn on_connection_closed(&mut self, peer_id: PeerId, connection_id: ConnectionId) { + let peer = Peer::new(peer_id, connection_id); + self.events + .push_back(ToSwarm::GenerateEvent(Event::PeerDisconnected(peer))); + } + + fn on_received_message( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + message: SyncMessage, + ) { + let peer = Peer::new(peer_id, connection_id); + trace!( + "Notify swarm of received sync message: {} {}", + peer.display(), + message.display() + ); + self.events + .push_back(ToSwarm::GenerateEvent(Event::MessageReceived( + peer, message, + ))); + } + + pub fn send_message(&mut self, peer: Peer, message: SyncMessage) { + trace!( + "Notify handler of sent sync message: {} {}", + peer.display(), + message.display(), + ); + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: peer.id(), + event: HandlerInEvent::Message(message), + handler: NotifyHandler::One(peer.connection_id()), + }); + } + + pub fn handle_critical_error(&mut self, peer: Peer) { + self.events.push_back(ToSwarm::NotifyHandler { + peer_id: peer.id(), + event: HandlerInEvent::CriticalError, + handler: NotifyHandler::One(peer.connection_id()), + }); + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + + type OutEvent = Event; + + fn handle_established_inbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Handler::new()) + } + + fn handle_established_outbound_connection( + &mut self, + _: ConnectionId, + _: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + Ok(Handler::new()) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + handler_event: THandlerOutEvent, + ) { + match handler_event { + HandlerOutEvent::Message(message) => { + self.on_received_message(peer_id, connection_id, message); + } + } + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { + peer_id, + connection_id, + .. + }) => { + self.on_connection_established(peer_id, connection_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + self.on_connection_closed(peer_id, connection_id); + } + FromSwarm::AddressChange(_) + | FromSwarm::DialFailure(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddr(_) + | FromSwarm::ExpiredExternalAddr(_) => {} + } + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _params: &mut impl PollParameters, + ) -> Poll>> { + if let Some(event) = self.events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use futures::FutureExt; + use libp2p::swarm::{keep_alive, ConnectionId, Swarm}; + use libp2p_swarm_test::SwarmExt; + use p2panda_rs::schema::SchemaId; + use rstest::rstest; + + use crate::network::Peer; + use crate::replication::{Message, SyncMessage, TargetSet}; + use crate::test_utils::helpers::random_target_set; + + use super::{Behaviour as PeersBehaviour, Event}; + + #[tokio::test] + async fn peers_connect() { + // Create two swarms + let mut swarm_1 = Swarm::new_ephemeral(|_| PeersBehaviour::new()); + let mut swarm_2 = Swarm::new_ephemeral(|_| PeersBehaviour::new()); + + // Listen on swarm_1 and connect from swarm_2, this should establish a bi-directional + // connection. + swarm_1.listen().await; + swarm_2.connect(&mut swarm_1).await; + + let swarm_1_peer_id = *swarm_1.local_peer_id(); + let swarm_2_peer_id = *swarm_2.local_peer_id(); + + let info1 = swarm_1.network_info(); + let info2 = swarm_2.network_info(); + + // Peers should be connected. + assert!(swarm_2.is_connected(&swarm_1_peer_id)); + assert!(swarm_1.is_connected(&swarm_2_peer_id)); + + // Each swarm should have exactly one connected peer. + assert_eq!(info1.num_peers(), 1); + assert_eq!(info2.num_peers(), 1); + + // Each swarm should have one established connection. + assert_eq!(info1.connection_counters().num_established(), 1); + assert_eq!(info2.connection_counters().num_established(), 1); + } + + #[tokio::test] + async fn incompatible_network_behaviour() { + // Create two swarms + let mut swarm_1 = Swarm::new_ephemeral(|_| PeersBehaviour::new()); + let mut swarm_2 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); + + // Listen on swarm_1 and connect from swarm_2, this should establish a bi-directional connection. + swarm_1.listen().await; + swarm_2.connect(&mut swarm_1).await; + + let swarm_1_peer_id = *swarm_1.local_peer_id(); + let swarm_2_peer_id = *swarm_2.local_peer_id(); + + let info1 = swarm_1.network_info(); + let info2 = swarm_2.network_info(); + + // Even though the network behaviours of our two peers are incompatible they still + // establish a connection. + + // Peers should be connected. + assert!(swarm_2.is_connected(&swarm_1_peer_id)); + assert!(swarm_1.is_connected(&swarm_2_peer_id)); + + // Each swarm should have exactly one connected peer. + assert_eq!(info1.num_peers(), 1); + assert_eq!(info2.num_peers(), 1); + + // Each swarm should have one established connection. + assert_eq!(info1.connection_counters().num_established(), 1); + assert_eq!(info2.connection_counters().num_established(), 1); + + // Send a message from to swarm_1 local peer from swarm_2 local peer. + swarm_1.behaviour_mut().send_message( + Peer::new(swarm_2_peer_id, ConnectionId::new_unchecked(1)), + SyncMessage::new(0, Message::SyncRequest(0.into(), TargetSet::new(&vec![]))), + ); + + // Await a swarm event on swarm_2. + // + // We expect a timeout panic as no event will occur. + let result = std::panic::AssertUnwindSafe(swarm_2.next_swarm_event()) + .catch_unwind() + .await; + + assert!(result.is_err()) + } + + #[rstest] + #[case( + TargetSet::new(&vec![SchemaId::SchemaFieldDefinition(0)]), + TargetSet::new(&vec![SchemaId::SchemaDefinition(0)]), + )] + #[case(random_target_set(), random_target_set())] + #[tokio::test] + async fn swarm_behaviour_events( + #[case] target_set_1: TargetSet, + #[case] target_set_2: TargetSet, + ) { + let mut swarm_1 = Swarm::new_ephemeral(|_| PeersBehaviour::new()); + let mut swarm_2 = Swarm::new_ephemeral(|_| PeersBehaviour::new()); + + // Listen on swarm_1 and connect from swarm_2, this should establish a bi-directional + // connection + swarm_1.listen().await; + swarm_2.connect(&mut swarm_1).await; + + let mut events_1 = Vec::new(); + let mut events_2 = Vec::new(); + + let swarm_1_peer_id = *swarm_1.local_peer_id(); + let swarm_2_peer_id = *swarm_2.local_peer_id(); + + // Collect the next 2 behaviour events which occur in either swarms. + for _ in 0..2 { + tokio::select! { + Event::PeerConnected(peer) = swarm_1.next_behaviour_event() => { + events_1.push((peer, None)); + }, + Event::PeerConnected(peer) = swarm_2.next_behaviour_event() => events_2.push((peer, None)), + } + } + + assert_eq!(events_1.len(), 1); + assert_eq!(events_2.len(), 1); + + // The first event should have been a ConnectionEstablished containing the expected peer + // id + let (peer_2, message) = events_1[0].clone(); + assert_eq!(peer_2.id(), swarm_2_peer_id); + assert!(message.is_none()); + + let (peer_1, message) = events_2[0].clone(); + assert_eq!(peer_1.id(), swarm_1_peer_id); + assert!(message.is_none()); + + // Send a message from swarm_1 to swarm_2 + swarm_1.behaviour_mut().send_message( + peer_2, + SyncMessage::new(0, Message::SyncRequest(0.into(), target_set_1.clone())), + ); + + // Send a message from swarm_2 to swarm_1 + swarm_2.behaviour_mut().send_message( + peer_1, + SyncMessage::new(1, Message::SyncRequest(0.into(), target_set_2.clone())), + ); + + // And again add the next behaviour events which occur in either swarms + for _ in 0..2 { + tokio::select! { + Event::MessageReceived(peer, message) = swarm_1.next_behaviour_event() => events_1.push((peer, Some(message))), + Event::MessageReceived(peer, message) = swarm_2.next_behaviour_event() => events_2.push((peer, Some(message))), + } + } + + assert_eq!(events_1.len(), 2); + assert_eq!(events_2.len(), 2); + + // swarm_1 should have received the message from swarm_2 peer + let (peer, message) = events_1[1].clone(); + assert_eq!(peer.id(), swarm_2_peer_id); + assert_eq!( + message.unwrap(), + SyncMessage::new(1, Message::SyncRequest(0.into(), target_set_2.clone())) + ); + + // swarm_2 should have received the message from swarm_1 peer + let (peer, message) = events_2[1].clone(); + assert_eq!(peer.id(), swarm_1_peer_id); + assert_eq!( + message.unwrap(), + SyncMessage::new(0, Message::SyncRequest(0.into(), target_set_1)) + ); + } +} diff --git a/aquadoggo/src/network/replication/handler.rs b/aquadoggo/src/network/peers/handler.rs similarity index 63% rename from aquadoggo/src/network/replication/handler.rs rename to aquadoggo/src/network/peers/handler.rs index b7b29631d..4f7c13799 100644 --- a/aquadoggo/src/network/replication/handler.rs +++ b/aquadoggo/src/network/peers/handler.rs @@ -3,6 +3,7 @@ use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; use asynchronous_codec::Framed; use futures::{Sink, StreamExt}; @@ -10,12 +11,70 @@ use libp2p::swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegot use libp2p::swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; +use log::warn; use thiserror::Error; -use crate::network::replication::{Codec, CodecError, Message, Protocol}; +use crate::network::peers::{Codec, CodecError, Protocol}; +use crate::replication::SyncMessage; +/// The time a connection is maintained to a peer without being in live mode and without +/// send/receiving a message from. Connections that idle beyond this timeout are disconnected. +const IDLE_TIMEOUT: Duration = Duration::from_secs(60); + +/// Handler for an incoming or outgoing connection to a remote peer dealing with the p2panda +/// protocol. +/// +/// Manages the bi-directional data streams and encodes and decodes p2panda messages on them using +/// the CBOR format. +/// +/// Connection handlers can be closed due to critical errors, for example when a replication error +/// occurred. They also can close after a certain duration of no networking activity (timeout). +/// Note that this does _not_ close the connection to the peer in general, only the p2panda +/// messaging protocol. +/// +/// Usually one connection is established to one peer. Multiple connections to the same peer are +/// also possible. This especially is the case when both peers dial each other at the same time. +/// +/// Each connection is managed by one connection handler each. Inside of each connection we +/// maintain a bi-directional (inbound & outbound) data stream. +/// +/// The following diagram is an example of two connections from one local to one remote peer: +/// +/// ```text +/// Connection +/// (Incoming) +/// ┌───────────────────┐ +/// │ │ +/// │ ┌─────────────┐ │ ┌─────────────┐ +/// │ │ Stream ◄──┼──────────┤ │ +/// │ │ (Inbound) │ │ │ │ +/// │ └─────────────┘ │ │ │ +/// │ │ │ │ +/// │ ┌─────────────┐ │ │ │ +/// │ │ Stream ├──┼──────────► │ +/// │ │ (Outbound) │ │ │ │ +/// │ └─────────────┘ │ │ │ +/// │ │ │ │ +/// └───────────────────┘ │ │ +/// │ │ +/// Connection │ Remote Peer │ +/// (Outgoing) │ │ +/// ┌───────────────────┐ │ │ +/// │ │ │ │ +/// │ ┌─────────────┐ │ │ │ +/// │ │ Stream ◄──┼──────────┤ │ +/// │ │ (Inbound) │ │ │ │ +/// │ └─────────────┘ │ │ │ +/// │ │ │ │ +/// │ ┌─────────────┐ │ │ │ +/// │ │ Stream ├──┼──────────► │ +/// │ │ (Outbound) │ │ │ │ +/// │ └─────────────┘ │ └─────────────┘ +/// │ │ +/// └───────────────────┘ +/// ``` pub struct Handler { - /// Upgrade configuration for the replication protocol. + /// Upgrade configuration for the protocol. listen_protocol: SubstreamProtocol, /// The single long-lived outbound substream. @@ -29,10 +88,16 @@ pub struct Handler { outbound_substream_establishing: bool, /// Queue of messages that we want to send to the remote. - send_queue: VecDeque, + send_queue: VecDeque, - /// Flag determining whether to maintain the connection to the peer. - keep_alive: KeepAlive, + /// Last time we've observed inbound or outbound messaging activity. + last_io_activity: Instant, + + /// Flag indicating that we want to close connection handlers related to that peer. + /// + /// This is useful in scenarios where a critical error occurred outside of the libp2p stack + /// (for example in the replication service) and we need to accordingly close connections. + critical_error: bool, } impl Handler { @@ -43,7 +108,8 @@ impl Handler { inbound_substream: None, outbound_substream_establishing: false, send_queue: VecDeque::new(), - keep_alive: KeepAlive::Yes, + last_io_activity: Instant::now(), + critical_error: false, } } @@ -55,6 +121,7 @@ impl Handler { >, ) { self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(protocol)); + self.outbound_substream_establishing = false; } fn on_fully_negotiated_inbound( @@ -71,8 +138,11 @@ impl Handler { /// An event sent from the network behaviour to the connection handler. #[derive(Debug)] pub enum HandlerInEvent { - /// Replication message to send on outbound stream. - Message(Message), + /// Message to send on outbound stream. + Message(SyncMessage), + + /// Protocol failed with a critical error. + CriticalError, } /// The event emitted by the connection handler. @@ -80,8 +150,8 @@ pub enum HandlerInEvent { /// This informs the network behaviour of various events created by the handler. #[derive(Debug)] pub enum HandlerOutEvent { - /// Replication message received on the inbound stream. - Message(Message), + /// Message received on the inbound stream. + Message(SyncMessage), } #[derive(Debug, Error)] @@ -110,7 +180,7 @@ enum OutboundSubstreamState { WaitingOutput(Stream), /// Waiting to send a message to the remote. - PendingSend(Stream, Message), + PendingSend(Stream, SyncMessage), /// Waiting to flush the substream so that the data arrives to the remote. PendingFlush(Stream), @@ -150,22 +220,36 @@ impl ConnectionHandler for Handler { } ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::AddressChange(_) - | ConnectionEvent::ListenUpgradeError(_) => {} + | ConnectionEvent::ListenUpgradeError(_) => { + warn!("Connection event error"); + } } } fn on_behaviour_event(&mut self, event: Self::InEvent) { - self.keep_alive = KeepAlive::Yes; - match event { HandlerInEvent::Message(message) => { self.send_queue.push_back(message); } + HandlerInEvent::CriticalError => { + self.critical_error = true; + } } } fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive + if self.critical_error { + return KeepAlive::No; + } + + if let Some( + OutboundSubstreamState::PendingSend(_, _) | OutboundSubstreamState::PendingFlush(_), + ) = self.outbound_substream + { + return KeepAlive::Yes; + } + + KeepAlive::Until(self.last_io_activity + IDLE_TIMEOUT) } fn poll( @@ -199,18 +283,21 @@ impl ConnectionHandler for Handler { Some(InboundSubstreamState::WaitingInput(mut substream)) => { match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(message))) => { + self.last_io_activity = Instant::now(); + // Received message from remote peer self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); - self.keep_alive = KeepAlive::Yes; return Poll::Ready(ConnectionHandlerEvent::Custom( HandlerOutEvent::Message(message), )); } - Poll::Ready(Some(Err(_))) => { - // More serious errors, close this side of the stream. If the peer is - // still around, they will re-establish their connection + Poll::Ready(Some(Err(err))) => { + warn!("Error decoding inbound message: {err}"); + + // Close this side of the stream. If the peer is still around, they + // will re-establish their connection self.inbound_substream = Some(InboundSubstreamState::Closing(substream)); } @@ -229,19 +316,13 @@ impl ConnectionHandler for Handler { Some(InboundSubstreamState::Closing(mut substream)) => { match Sink::poll_close(Pin::new(&mut substream), cx) { Poll::Ready(res) => { - if res.is_err() { + if let Err(err) = res { // Don't close the connection but just drop the inbound substream. // In case the remote has more to send, they will open up a new // substream. - // @TODO: Log error here + warn!("Error during closing inbound connection: {err}") } - self.inbound_substream = None; - - if self.outbound_substream.is_none() { - self.keep_alive = KeepAlive::No; - } - break; } Poll::Pending => { @@ -272,6 +353,9 @@ impl ConnectionHandler for Handler { Some(message) => { self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); + + // Continue loop in case there is more messages to be sent + continue; } None => { self.outbound_substream = @@ -289,16 +373,16 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(substream)) } Err(err) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::Codec(err), - )); + warn!("Error sending outbound message: {err}"); + self.outbound_substream = None; + break; } } } Poll::Ready(Err(err)) => { - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::Codec(err), - )); + warn!("Error encoding outbound message: {err}"); + self.outbound_substream = None; + break; } Poll::Pending => { self.outbound_substream = @@ -310,13 +394,14 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(mut substream)) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { + self.last_io_activity = Instant::now(); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(err)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(HandlerError::Codec( - err, - ))) + warn!("Error flushing outbound message: {err}"); + self.outbound_substream = None; + break; } Poll::Pending => { self.outbound_substream = diff --git a/aquadoggo/src/network/peers/mod.rs b/aquadoggo/src/network/peers/mod.rs new file mode 100644 index 000000000..579c75f3c --- /dev/null +++ b/aquadoggo/src/network/peers/mod.rs @@ -0,0 +1,11 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod behaviour; +mod handler; +mod peer; +mod protocol; + +pub use behaviour::{Behaviour, Event}; +pub use handler::Handler; +pub use peer::Peer; +pub use protocol::{Codec, CodecError, Protocol, PROTOCOL_NAME}; diff --git a/aquadoggo/src/network/peers/peer.rs b/aquadoggo/src/network/peers/peer.rs new file mode 100644 index 000000000..be8e0935f --- /dev/null +++ b/aquadoggo/src/network/peers/peer.rs @@ -0,0 +1,65 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::cmp::Ordering; + +use libp2p::swarm::ConnectionId; +use libp2p::PeerId; +use p2panda_rs::Human; + +/// Identifier of a p2panda peer. +/// +/// Additional to the unique `PeerId` we also store the `ConnectionId` to understand which libp2p +/// connection handler deals with the communication with that peer. In case connections get stale +/// or fail we can use this information to understand which peer got affected. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub struct Peer(PeerId, ConnectionId); + +impl Peer { + /// Returns a new instance of a peer. + pub fn new(peer_id: PeerId, connection_id: ConnectionId) -> Self { + Self(peer_id, connection_id) + } + + /// Returns a new instance of our local peer. + /// + /// Local peers can not have a connection "to themselves", still we want to be able to compare + /// our local peer with a remote one. This method therefore sets a "fake" `ConnectionId`. + pub fn new_local_peer(local_peer_id: PeerId) -> Self { + Self(local_peer_id, ConnectionId::new_unchecked(0)) + } + + /// Returns the `PeerId` of this peer. + /// + /// The `PeerId` is used to determine which peer "wins" over a duplicate session conflict. + pub fn id(&self) -> PeerId { + self.0 + } + + /// Returns the `ConnectionId` which handles the bi-directional communication to that peer. + pub fn connection_id(&self) -> ConnectionId { + self.1 + } +} + +impl Ord for Peer { + fn cmp(&self, other: &Self) -> Ordering { + // When comparing `Peer` instances (for example to handle duplicate session requests), we + // only look at the internal `PeerId` since this is what both peers (local and remote) know + // about (the connection id might be different) + self.0.cmp(&other.0) + } +} + +impl PartialOrd for Peer { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.0.cmp(&other.0)) + } +} + +impl Human for Peer { + fn display(&self) -> String { + // Trick to nicely display `ConnectionId` struct + let connection_id = &format!("{:?}", self.1)[13..][..1]; + format!("{} ({})", self.0, connection_id) + } +} diff --git a/aquadoggo/src/network/replication/protocol.rs b/aquadoggo/src/network/peers/protocol.rs similarity index 87% rename from aquadoggo/src/network/replication/protocol.rs rename to aquadoggo/src/network/peers/protocol.rs index f4192f6c2..a73edca2b 100644 --- a/aquadoggo/src/network/replication/protocol.rs +++ b/aquadoggo/src/network/peers/protocol.rs @@ -6,19 +6,14 @@ use asynchronous_codec::{CborCodec, CborCodecError, Framed}; use futures::{future, AsyncRead, AsyncWrite, Future}; use libp2p::core::UpgradeInfo; use libp2p::{InboundUpgrade, OutboundUpgrade}; -use serde::{Deserialize, Serialize}; + +use crate::replication::SyncMessage; pub const PROTOCOL_NAME: &[u8] = b"/p2p/p2panda/1.0.0"; pub type CodecError = CborCodecError; -pub type Codec = CborCodec; - -// @TODO: Get this from our other replication module -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub enum Message { - Dummy(u64), -} +pub type Codec = CborCodec; #[derive(Clone, Debug)] pub struct Protocol; diff --git a/aquadoggo/src/network/replication/behaviour.rs b/aquadoggo/src/network/replication/behaviour.rs deleted file mode 100644 index e6f54d039..000000000 --- a/aquadoggo/src/network/replication/behaviour.rs +++ /dev/null @@ -1,254 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -use std::collections::VecDeque; -use std::task::{Context, Poll}; - -use libp2p::core::Endpoint; -use libp2p::swarm::{ - ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, PollParameters, - THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, -}; -use libp2p::{Multiaddr, PeerId}; - -use crate::network::replication::handler::{Handler, HandlerInEvent, HandlerOutEvent}; -use crate::network::replication::protocol::Message; - -#[derive(Debug)] -pub enum BehaviourOutEvent { - MessageReceived(PeerId, Message), - Error, -} - -#[derive(Debug)] -pub struct Behaviour { - events: VecDeque>, -} - -impl Behaviour { - pub fn new() -> Self { - Self { - events: VecDeque::new(), - } - } -} - -impl Behaviour { - fn send_message(&mut self, peer_id: PeerId, message: Message) { - self.events.push_back(ToSwarm::NotifyHandler { - peer_id, - event: HandlerInEvent::Message(message), - handler: NotifyHandler::Any, - }); - } - - fn handle_received_message(&mut self, peer_id: &PeerId, message: Message) { - // @TODO: Handle incoming messages - self.events - .push_back(ToSwarm::GenerateEvent(BehaviourOutEvent::MessageReceived( - *peer_id, message, - ))); - } -} - -impl NetworkBehaviour for Behaviour { - type ConnectionHandler = Handler; - - type OutEvent = BehaviourOutEvent; - - fn handle_established_inbound_connection( - &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: &Multiaddr, - ) -> Result, ConnectionDenied> { - Ok(Handler::new()) - } - - fn handle_established_outbound_connection( - &mut self, - _: ConnectionId, - _: PeerId, - _: &Multiaddr, - _: Endpoint, - ) -> Result, ConnectionDenied> { - Ok(Handler::new()) - } - - fn on_connection_handler_event( - &mut self, - peer: PeerId, - _connection_id: ConnectionId, - handler_event: THandlerOutEvent, - ) { - match handler_event { - HandlerOutEvent::Message(message) => { - self.handle_received_message(&peer, message); - } - } - } - - fn on_swarm_event(&mut self, event: FromSwarm) { - match event { - FromSwarm::ConnectionEstablished(_) - | FromSwarm::ConnectionClosed(_) - | FromSwarm::AddressChange(_) - | FromSwarm::DialFailure(_) - | FromSwarm::ListenFailure(_) - | FromSwarm::NewListener(_) - | FromSwarm::NewListenAddr(_) - | FromSwarm::ExpiredListenAddr(_) - | FromSwarm::ListenerError(_) - | FromSwarm::ListenerClosed(_) - | FromSwarm::NewExternalAddr(_) - | FromSwarm::ExpiredExternalAddr(_) => {} - } - } - - fn poll( - &mut self, - _cx: &mut Context<'_>, - _params: &mut impl PollParameters, - ) -> Poll>> { - if let Some(event) = self.events.pop_front() { - return Poll::Ready(event); - } - - Poll::Pending - } -} - -#[cfg(test)] -mod tests { - use futures::FutureExt; - use libp2p::swarm::{keep_alive, Swarm}; - use libp2p_swarm_test::SwarmExt; - - use crate::network::replication::Message; - - use super::{Behaviour as ReplicationBehaviour, BehaviourOutEvent}; - - #[tokio::test] - async fn peers_connect() { - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); - let mut swarm2 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - let info1 = swarm1.network_info(); - let info2 = swarm2.network_info(); - - // Peers should be connected. - assert!(swarm2.is_connected(&swarm1_peer_id)); - assert!(swarm1.is_connected(&swarm2_peer_id)); - - // Each swarm should have exactly one connected peer. - assert_eq!(info1.num_peers(), 1); - assert_eq!(info2.num_peers(), 1); - - // Each swarm should have one established connection. - assert_eq!(info1.connection_counters().num_established(), 1); - assert_eq!(info2.connection_counters().num_established(), 1); - } - - #[tokio::test] - async fn incompatible_network_behaviour() { - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); - let mut swarm2 = Swarm::new_ephemeral(|_| keep_alive::Behaviour); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - let info1 = swarm1.network_info(); - let info2 = swarm2.network_info(); - - // Even though the network behaviours of our two peers are incompatible they still - // establish a connection. - - // Peers should be connected. - assert!(swarm2.is_connected(&swarm1_peer_id)); - assert!(swarm1.is_connected(&swarm2_peer_id)); - - // Each swarm should have exactly one connected peer. - assert_eq!(info1.num_peers(), 1); - assert_eq!(info2.num_peers(), 1); - - // Each swarm should have one established connection. - assert_eq!(info1.connection_counters().num_established(), 1); - assert_eq!(info2.connection_counters().num_established(), 1); - - // Send a message from to swarm1 local peer from swarm2 local peer. - swarm1 - .behaviour_mut() - .send_message(swarm2_peer_id, Message::Dummy(0)); - - // Await a swarm event on swarm2. - // - // We expect a timeout panic as no event will occur. - let result = std::panic::AssertUnwindSafe(swarm2.next_swarm_event()) - .catch_unwind() - .await; - - assert!(result.is_err()) - } - - #[tokio::test] - async fn swarm_behaviour_events() { - // Create two swarms - let mut swarm1 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); - let mut swarm2 = Swarm::new_ephemeral(|_| ReplicationBehaviour::new()); - - // Listen on swarm1 and connect from swarm2, this should establish a bi-directional connection. - swarm1.listen().await; - swarm2.connect(&mut swarm1).await; - - let mut res1 = Vec::new(); - let mut res2 = Vec::new(); - - let swarm1_peer_id = *swarm1.local_peer_id(); - let swarm2_peer_id = *swarm2.local_peer_id(); - - // Send a message from to swarm1 local peer from swarm2 local peer. - swarm1 - .behaviour_mut() - .send_message(swarm2_peer_id, Message::Dummy(0)); - - // Send a message from to swarm2 local peer from swarm1 local peer. - swarm2 - .behaviour_mut() - .send_message(swarm1_peer_id, Message::Dummy(1)); - - // Collect the next 2 behaviour events which occur in either swarms. - for _ in 0..2 { - tokio::select! { - BehaviourOutEvent::MessageReceived(peer_id, message) = swarm1.next_behaviour_event() => res1.push((peer_id, message)), - BehaviourOutEvent::MessageReceived(peer_id, message) = swarm2.next_behaviour_event() => res2.push((peer_id, message)), - } - } - - // Each swarm should have emitted exactly one event. - assert_eq!(res1.len(), 1); - assert_eq!(res2.len(), 1); - - // swarm1 should have received the message from swarm2 peer. - let (peer_id, message) = &res1[0]; - assert_eq!(peer_id, &swarm2_peer_id); - assert_eq!(message, &Message::Dummy(1)); - - // swarm2 should have received the message from swarm1 peer. - let (peer_id, message) = &res2[0]; - assert_eq!(peer_id, &swarm1_peer_id); - assert_eq!(message, &Message::Dummy(0)); - } -} diff --git a/aquadoggo/src/network/replication/mod.rs b/aquadoggo/src/network/replication/mod.rs deleted file mode 100644 index 05593e59d..000000000 --- a/aquadoggo/src/network/replication/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -mod behaviour; -mod handler; -mod protocol; - -pub use behaviour::Behaviour; -pub use handler::Handler; -pub use protocol::{Codec, CodecError, Message, Protocol, PROTOCOL_NAME}; diff --git a/aquadoggo/src/network/service.rs b/aquadoggo/src/network/service.rs index d7d8a22c0..744d24af0 100644 --- a/aquadoggo/src/network/service.rs +++ b/aquadoggo/src/network/service.rs @@ -1,25 +1,30 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::time::Duration; + use anyhow::Result; -use futures::StreamExt; use libp2p::multiaddr::Protocol; use libp2p::ping::Event; -use libp2p::swarm::{AddressScore, SwarmEvent}; -use libp2p::{autonat, identify, mdns, rendezvous, Multiaddr}; -use log::{debug, info, trace, warn}; - -use crate::bus::ServiceSender; +use libp2p::swarm::{AddressScore, ConnectionError, SwarmEvent}; +use libp2p::{autonat, identify, mdns, rendezvous, Multiaddr, PeerId, Swarm}; +use log::{debug, trace, warn}; +use tokio::task; +use tokio_stream::wrappers::BroadcastStream; +use tokio_stream::StreamExt; + +use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; -use crate::network::behaviour::BehaviourEvent; +use crate::network::behaviour::{Behaviour, BehaviourEvent}; use crate::network::config::NODE_NAMESPACE; +use crate::network::peers; use crate::network::swarm; -use crate::network::NetworkConfiguration; +use crate::network::{NetworkConfiguration, ShutdownHandler}; -/// Network service that configures and deploys a network swarm over QUIC transports. +/// Network service that configures and deploys a libp2p network swarm over QUIC transports. /// -/// The swarm listens for incoming connections, dials remote nodes, manages -/// connections and executes predefined network behaviours. +/// The swarm listens for incoming connections, dials remote nodes, manages connections and +/// executes predefined network behaviours. pub async fn network_service( context: Context, shutdown: Shutdown, @@ -27,7 +32,7 @@ pub async fn network_service( tx_ready: ServiceReadySender, ) -> Result<()> { // Subscribe to communication bus - let mut _rx = tx.subscribe(); + let _rx = tx.subscribe(); // Load the network key pair and peer ID let key_pair = @@ -35,9 +40,10 @@ pub async fn network_service( // Read the network configuration parameters from the application context let network_config = context.config.network.clone(); + let local_peer_id = network_config.peer_id.expect("Peer id needs to be given"); // Build the network swarm and retrieve the local peer ID - let (mut swarm, local_peer_id) = swarm::build_swarm(&network_config, key_pair).await?; + let mut swarm = swarm::build_swarm(&network_config, key_pair).await?; // Define the QUIC multiaddress on which the swarm will listen for connections let quic_multiaddr = @@ -79,177 +85,388 @@ pub async fn network_service( swarm.dial(addr)?; } - // Create a cookie holder for the identify service - let mut cookie = None; + let mut shutdown_handler = ShutdownHandler::new(); + + // Spawn a task to run swarm in event loop + let event_loop = EventLoop::new( + swarm, + tx, + external_circuit_addr, + network_config, + shutdown_handler.clone(), + ); + let handle = task::spawn(event_loop.run()); + + if tx_ready.send(()).is_err() { + warn!("No subscriber informed about network service being ready"); + }; + + // Wait until we received the application shutdown signal or handle closed + tokio::select! { + _ = handle => (), + _ = shutdown => (), + } + + shutdown_handler.is_done().await; + + Ok(()) +} + +/// Main loop polling the async swarm event stream and incoming service messages stream. +struct EventLoop { + swarm: Swarm, + tx: ServiceSender, + rx: BroadcastStream, + external_circuit_addr: Option, + network_config: NetworkConfiguration, + shutdown_handler: ShutdownHandler, +} + +impl EventLoop { + pub fn new( + swarm: Swarm, + tx: ServiceSender, + external_circuit_addr: Option, + network_config: NetworkConfiguration, + shutdown_handler: ShutdownHandler, + ) -> Self { + Self { + swarm, + rx: BroadcastStream::new(tx.subscribe()), + tx, + external_circuit_addr, + network_config, + shutdown_handler, + } + } + + /// Close all connections actively. + pub async fn shutdown(&mut self) { + let peers: Vec = self.swarm.connected_peers().copied().collect(); + + for peer_id in peers { + if self.swarm.disconnect_peer_id(peer_id).is_err() { + // Silently ignore errors when disconnecting during shutdown + } + } + + // Wait a little bit for libp2p to actually close all connections + tokio::time::sleep(Duration::from_millis(25)).await; + + self.shutdown_handler.set_done(); + } + + /// Main event loop handling libp2p swarm events and incoming messages from the service bus as + /// an ongoing async stream. + pub async fn run(mut self) { + let mut shutdown_request_received = self.shutdown_handler.is_requested(); - // Spawn a task to handle swarm events - let handle = tokio::spawn(async move { loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(BehaviourEvent::Mdns(event)) => match event { - mdns::Event::Discovered(list) => { - for (peer, multiaddr) in list { - debug!("mDNS discovered a new peer: {peer}"); - - if let Err(err) = swarm.dial(multiaddr) { - warn!("Failed to dial: {}", err); - } + tokio::select! { + event = self.swarm.next() => { + self.handle_swarm_event(event.expect("Swarm stream to be infinite")).await + } + event = self.rx.next() => match event { + Some(Ok(message)) => self.handle_service_message(message).await, + Some(Err(err)) => { + panic!("Service bus subscriber for event loop failed: {}", err); + } + // Command channel closed, thus shutting down the network event loop + None => { + return + }, + }, + _ = shutdown_request_received.next() => { + self.shutdown().await; + } + } + } + } + + /// Send a message on the communication bus to inform other services. + fn send_service_message(&mut self, message: ServiceMessage) { + if self.tx.send(message).is_err() { + // Silently fail here as we don't care if the message was received at this + // point + } + } + + /// Handle an incoming message via the communication bus from other services. + async fn handle_service_message(&mut self, message: ServiceMessage) { + match message { + ServiceMessage::SentReplicationMessage(peer, sync_message) => { + self.swarm + .behaviour_mut() + .peers + .send_message(peer, sync_message); + } + ServiceMessage::ReplicationFailed(peer) => { + self.swarm.behaviour_mut().peers.handle_critical_error(peer); + } + _ => (), + } + } + + /// Handle an event coming from the libp2p swarm. + async fn handle_swarm_event( + &mut self, + event: SwarmEvent, + ) { + match event { + // ~~~~~ + // Swarm + // ~~~~~ + SwarmEvent::Dialing(peer_id) => trace!("Dialing: {peer_id}"), + SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + .. + } => { + trace!("Established new connection (total {num_established}) with {peer_id}"); + + // Match on a connection with the rendezvous server + if let Some(rendezvous_peer_id) = self.network_config.rendezvous_peer_id { + if peer_id == rendezvous_peer_id { + if let Some(rendezvous_client) = + self.swarm.behaviour_mut().rendezvous_client.as_mut() + { + trace!("Connected to rendezvous point, discovering nodes in '{NODE_NAMESPACE}' namespace ..."); + + rendezvous_client.discover( + Some(rendezvous::Namespace::from_static(NODE_NAMESPACE)), + None, + None, + rendezvous_peer_id, + ); } } - mdns::Event::Expired(list) => { - for (peer, _multiaddr) in list { - trace!("mDNS peer has expired: {peer}"); + } + } + SwarmEvent::ConnectionClosed { peer_id, cause, .. } => match cause { + Some(ConnectionError::IO(error)) => { + // IO errors coming from libp2p are cumbersome to match, so we just convert + // them to their string representation + match error.to_string().as_str() { + "timed out" => { + debug!("Connection timed out with peer {peer_id}"); + } + "closed by peer: 0" => { + // We received an `ApplicationClose` with code 0 here which means the + // other peer actively closed the connection + debug!("Connection closed with peer {peer_id}"); + } + _ => { + warn!("Connection error occurred with peer {peer_id}: {error}"); } } - }, - SwarmEvent::Behaviour(BehaviourEvent::Ping(Event { peer, result: _ })) => { - debug!("Ping from: {peer}") } - SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - num_established, - cause, - } => { - info!("ConnectionClosed: {peer_id} {endpoint:?} {num_established} {cause:?}") + Some(ConnectionError::KeepAliveTimeout) => { + debug!("Connection timed out with peer {peer_id}"); + } + Some(ConnectionError::Handler(_)) => { + warn!("Connection handler error occurred with peer {peer_id}"); + } + None => { + debug!("Connection closed with peer {peer_id}"); + } + }, + SwarmEvent::ExpiredListenAddr { + listener_id, + address, + } => trace!("Expired listen address: {listener_id:?} {address}"), + SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + } => trace!("Incoming connection: {local_addr} {send_back_addr}"), + SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + } => { + warn!("Incoming connection error occurred with {local_addr} and {send_back_addr}: {error}"); + } + SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + } => trace!("Listener closed: {listener_id:?} {addresses:?} {reason:?}"), + SwarmEvent::ListenerError { error, .. } => { + warn!("Listener failed with error: {error}") + } + SwarmEvent::NewListenAddr { + address, + listener_id: _, + } => { + debug!("Listening on {address}"); + } + SwarmEvent::OutgoingConnectionError { peer_id, error } => match peer_id { + Some(id) => { + warn!("Outgoing connection error with peer {id} occurred: {error}"); + } + None => { + warn!("Outgoing connection error occurred: {error}"); + } + }, + + // ~~~~ + // mDNS + // ~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Mdns(event)) => match event { + mdns::Event::Discovered(list) => { + for (peer_id, multiaddr) in list { + debug!("mDNS discovered a new peer: {peer_id}"); + + if let Err(err) = self.swarm.dial(multiaddr) { + warn!("Failed to dial: {}", err); + } else { + debug!("Dial success: skip remaining addresses for: {peer_id}"); + break; + } + } } - SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - .. + mdns::Event::Expired(list) => { + for (peer, _multiaddr) in list { + trace!("mDNS peer has expired: {peer}"); + } + } + }, + + // ~~~~ + // Ping + // ~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Ping(Event { peer, result: _ })) => { + trace!("Ping from: {peer}") + } + + // ~~~~~~~~~~ + // Rendezvous + // ~~~~~~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(event)) => match event { + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, } => { - info!("ConnectionEstablished: {peer_id} {endpoint:?} {num_established}"); + trace!("Registered for '{namespace}' namespace at rendezvous point {rendezvous_node} for the next {ttl} seconds") + } + rendezvous::client::Event::Discovered { registrations, .. } => { + trace!("Rendezvous point responded with peer registration data"); - // Match on a connection with the rendezvous server - if let Some(rendezvous_peer_id) = network_config.rendezvous_peer_id { - if peer_id == rendezvous_peer_id { - if let Some(rendezvous_client) = - swarm.behaviour_mut().rendezvous_client.as_mut() - { - trace!("Connected to rendezvous point, discovering nodes in '{NODE_NAMESPACE}' namespace ..."); + for registration in registrations { + for address in registration.record.addresses() { + let peer_id = registration.record.peer_id(); + let local_peer_id = *self.swarm.local_peer_id(); - rendezvous_client.discover( - Some(rendezvous::Namespace::from_static(NODE_NAMESPACE)), - None, - None, - rendezvous_peer_id, - ); + // Only dial remote peers discovered via rendezvous server + if peer_id != local_peer_id { + debug!("Discovered peer {peer_id} at {address}"); + + let p2p_suffix = Protocol::P2p(*peer_id.as_ref()); + let address_with_p2p = if !address + .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) + { + address.clone().with(p2p_suffix) + } else { + address.clone() + }; + + debug!("Preparing to dial peer {peer_id} at {address}"); + + if let Err(err) = self.swarm.dial(address_with_p2p) { + warn!("Failed to dial: {}", err); + } } } } } - SwarmEvent::Dialing(peer_id) => info!("Dialing: {peer_id}"), - SwarmEvent::ExpiredListenAddr { - listener_id, - address, - } => trace!("ExpiredListenAddr: {listener_id:?} {address}"), - - SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - } => debug!("IncomingConnection: {local_addr} {send_back_addr}"), - SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - } => warn!("IncomingConnectionError: {local_addr} {send_back_addr} {error:?}"), - SwarmEvent::ListenerClosed { - listener_id, - addresses, - reason, - } => trace!("ListenerClosed: {listener_id:?} {addresses:?} {reason:?}"), - SwarmEvent::ListenerError { listener_id, error } => { - warn!("ListenerError: {listener_id:?} {error:?}") + rendezvous::client::Event::RegisterFailed(error) => { + warn!("Failed to register with rendezvous point: {error}"); } - SwarmEvent::NewListenAddr { - address, - listener_id: _, - } => { - info!("Listening on {address}"); + other => trace!("Unhandled rendezvous client event: {other:?}"), + }, + SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { + rendezvous::server::Event::PeerRegistered { peer, registration } => { + trace!( + "Peer {peer} registered for namespace '{}'", + registration.namespace + ); } - SwarmEvent::OutgoingConnectionError { peer_id, error } => { - warn!("OutgoingConnectionError: {peer_id:?} {error:?}") + rendezvous::server::Event::DiscoverServed { + enquirer, + registrations, + } => { + trace!( + "Served peer {enquirer} with {} registrations", + registrations.len() + ); } - SwarmEvent::Behaviour(BehaviourEvent::RendezvousClient(event)) => match event { - rendezvous::client::Event::Registered { - namespace, - ttl, - rendezvous_node, - } => { - trace!("Registered for '{namespace}' namespace at rendezvous point {rendezvous_node} for the next {ttl} seconds") - } - rendezvous::client::Event::Discovered { - registrations, - cookie: new_cookie, - .. - } => { - trace!("Rendezvous point responded with peer registration data"); - - cookie.replace(new_cookie); - - for registration in registrations { - for address in registration.record.addresses() { - let peer_id = registration.record.peer_id(); - - // Only dial remote peers discovered via rendezvous server - if peer_id != local_peer_id { - debug!("Discovered peer {peer_id} at {address}"); - - let p2p_suffix = Protocol::P2p(*peer_id.as_ref()); - let address_with_p2p = if !address - .ends_with(&Multiaddr::empty().with(p2p_suffix.clone())) - { - address.clone().with(p2p_suffix) - } else { - address.clone() - }; - - debug!("Preparing to dial peer {peer_id} at {address}"); - - if let Err(err) = swarm.dial(address_with_p2p) { - warn!("Failed to dial: {}", err); - } - } + other => trace!("Unhandled rendezvous server event: {other:?}"), + }, + + // ~~~~~~~~ + // Identify + // ~~~~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => { + match event { + identify::Event::Received { peer_id, .. } => { + trace!("Received identify information from peer {peer_id}"); + + // Only attempt registration if the local node is running as a rendezvous client + if let Some(rendezvous_peer_id) = self.network_config.rendezvous_peer_id { + // Register with the rendezvous server. + + // We call `as_mut()` on the rendezvous client network behaviour in + // order to get a mutable reference out of the `Toggle` + if let Some(rendezvous_client) = + self.swarm.behaviour_mut().rendezvous_client.as_mut() + { + rendezvous_client.register( + rendezvous::Namespace::from_static(NODE_NAMESPACE), + rendezvous_peer_id, + None, + ); } } } - rendezvous::client::Event::RegisterFailed(error) => { - warn!("Failed to register with rendezvous point: {error}"); - } - other => trace!("Unhandled rendezvous client event: {other:?}"), - }, - SwarmEvent::Behaviour(BehaviourEvent::RendezvousServer(event)) => match event { - rendezvous::server::Event::PeerRegistered { peer, registration } => { + identify::Event::Sent { peer_id } | identify::Event::Pushed { peer_id } => { trace!( - "Peer {peer} registered for namespace '{}'", - registration.namespace - ); + "Sent identification information of the local node to peer {peer_id}" + ) } - rendezvous::server::Event::DiscoverServed { - enquirer, - registrations, - } => { - trace!( - "Served peer {enquirer} with {} registrations", - registrations.len() - ); + identify::Event::Error { peer_id, error } => { + warn!("Failed to identify the remote peer {peer_id}: {error}") } - other => trace!("Unhandled rendezvous server event: {other:?}"), - }, - SwarmEvent::Behaviour(BehaviourEvent::Identify(event)) => { - match event { - identify::Event::Received { peer_id, .. } => { - trace!("Received identify information from peer {peer_id}"); + } + } - // Only attempt registration if the local node is running as a rendezvous client - if let Some(rendezvous_peer_id) = network_config.rendezvous_peer_id { - // Register with the rendezvous server. + // ~~~~~ + // Relay + // ~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::RelayServer(event)) => { + trace!("Unhandled relay server event: {event:?}") + } + SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => { + trace!("Unhandled relay client event: {event:?}") + } + + // ~~~~~~~ + // AutoNAT + // ~~~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Autonat(event)) => { + match event { + autonat::Event::StatusChanged { old, new } => { + trace!("NAT status changed from {:?} to {:?}", old, new); + + if let Some(addr) = self.external_circuit_addr.clone() { + trace!("Adding external relayed listen address: {}", addr); + self.swarm + .add_external_address(addr, AddressScore::Finite(1)); - // We call `as_mut()` on the rendezvous client network behaviour in - // order to get a mutable reference out of the `Toggle` + if let Some(rendezvous_peer_id) = self.network_config.rendezvous_peer_id + { + // Invoke registration of relayed client address with the rendezvous server if let Some(rendezvous_client) = - swarm.behaviour_mut().rendezvous_client.as_mut() + self.swarm.behaviour_mut().rendezvous_client.as_mut() { rendezvous_client.register( rendezvous::Namespace::from_static(NODE_NAMESPACE), @@ -259,68 +476,42 @@ pub async fn network_service( } } } - identify::Event::Sent { peer_id } | identify::Event::Pushed { peer_id } => { - trace!( - "Sent identification information of the local node to peer {peer_id}" - ) - } - identify::Event::Error { peer_id, error } => { - warn!("Failed to identify the remote peer {peer_id}: {error}") - } } + autonat::Event::InboundProbe(_) | autonat::Event::OutboundProbe(_) => (), } - SwarmEvent::Behaviour(BehaviourEvent::RelayServer(event)) => { - debug!("Unhandled relay server event: {event:?}") - } - SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => { - debug!("Unhandled relay client event: {event:?}") - } - SwarmEvent::Behaviour(BehaviourEvent::Autonat(event)) => { - match event { - autonat::Event::StatusChanged { old, new } => { - trace!("NAT status changed from {:?} to {:?}", old, new); + } - if let Some(addr) = external_circuit_addr.clone() { - trace!("Adding external relayed listen address: {}", addr); - swarm.add_external_address(addr, AddressScore::Finite(1)); + // ~~~~~~ + // Limits + // ~~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Limits(event)) => { + debug!("Unhandled connection limit event: {event:?}") + } - if let Some(rendezvous_peer_id) = network_config.rendezvous_peer_id - { - // Invoke registration of relayed client address with the rendezvous server - if let Some(rendezvous_client) = - swarm.behaviour_mut().rendezvous_client.as_mut() - { - rendezvous_client.register( - rendezvous::Namespace::from_static(NODE_NAMESPACE), - rendezvous_peer_id, - None, - ); - } - } - } - } - autonat::Event::InboundProbe(_) | autonat::Event::OutboundProbe(_) => (), - } + // ~~~~~~~~~~~~~ + // p2panda peers + // ~~~~~~~~~~~~~ + SwarmEvent::Behaviour(BehaviourEvent::Peers(event)) => match event { + peers::Event::PeerConnected(peer) => { + // Inform other services about new peer + self.send_service_message(ServiceMessage::PeerConnected(peer)); } - SwarmEvent::Behaviour(BehaviourEvent::Limits(event)) => { - debug!("Unhandled connection limit event: {event:?}") + peers::Event::PeerDisconnected(peer) => { + // Inform other services about peer leaving + self.send_service_message(ServiceMessage::PeerDisconnected(peer)); } - event => debug!("Unhandled swarm event: {event:?}"), - } - } - }); - - info!("Network service is ready"); - - if tx_ready.send(()).is_err() { - warn!("No subscriber informed about network service being ready"); - }; + peers::Event::MessageReceived(peer, message) => { + // Inform other services about received messages from peer + self.send_service_message(ServiceMessage::ReceivedReplicationMessage( + peer, message, + )) + } + }, - // Wait until we received the application shutdown signal or handle closed - tokio::select! { - _ = handle => (), - _ = shutdown => (), + // ~~~~~~~ + // Unknown + // ~~~~~~~ + event => debug!("Unhandled swarm event: {event:?}"), + } } - - Ok(()) } diff --git a/aquadoggo/src/network/shutdown.rs b/aquadoggo/src/network/shutdown.rs new file mode 100644 index 000000000..7475ac6b9 --- /dev/null +++ b/aquadoggo/src/network/shutdown.rs @@ -0,0 +1,115 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{FutureExt, Stream}; +use triggered::{Listener, Trigger}; + +/// Helper to coordinate finishing an async process which needs to take place before we can close +/// the application. +#[derive(Clone)] +pub struct ShutdownHandler { + request_trigger: Trigger, + request_signal: Listener, + done_trigger: Trigger, + done_signal: Listener, +} + +impl ShutdownHandler { + /// Returns a new instance of `ShutdownHandler`. + pub fn new() -> Self { + let (request_trigger, request_signal) = triggered::trigger(); + let (done_trigger, done_signal) = triggered::trigger(); + + Self { + request_trigger, + request_signal, + done_trigger, + done_signal, + } + } + + /// Returns an async stream which can be polled to find out if a shutdown request was sent. + pub fn is_requested(&self) -> ShutdownRequest { + ShutdownRequest { + inner: self.request_signal.clone(), + is_sent: false, + } + } + + /// Signal that the shutdown has completed. + pub fn set_done(&mut self) { + self.done_trigger.trigger(); + } + + /// Returns a future which can be polled to find out if the shutdown has completed. + /// + /// This automatically triggers the request to shut down when being called. + pub fn is_done(&mut self) -> Listener { + self.request_trigger.trigger(); + self.done_signal.clone() + } +} + +pub struct ShutdownRequest { + inner: Listener, + is_sent: bool, +} + +impl Stream for ShutdownRequest { + type Item = bool; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.is_sent { + return Poll::Pending; + } + + match self.inner.poll_unpin(cx) { + Poll::Ready(_) => { + self.is_sent = true; + Poll::Ready(Some(true)) + } + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use tokio_stream::StreamExt; + + use super::ShutdownHandler; + + #[tokio::test] + async fn changes_value_before_shutdown() { + let num = Arc::new(AtomicUsize::new(0)); + let mut handler = ShutdownHandler::new(); + + { + let num = num.clone(); + let mut handler = handler.clone(); + + tokio::task::spawn(async move { + let mut signal = handler.is_requested(); + + loop { + tokio::select! { + _ = signal.next() => { + // Change the value before we wind down + num.store(100, Ordering::Relaxed); + handler.set_done(); + } + }; + } + }); + } + + handler.is_done().await; + + assert_eq!(num.load(Ordering::Relaxed), 100); + } +} diff --git a/aquadoggo/src/network/swarm.rs b/aquadoggo/src/network/swarm.rs index 2891871ce..ad17d65e6 100644 --- a/aquadoggo/src/network/swarm.rs +++ b/aquadoggo/src/network/swarm.rs @@ -5,7 +5,6 @@ use std::convert::TryInto; use anyhow::Result; use libp2p::identity::Keypair; use libp2p::swarm::SwarmBuilder; -use libp2p::PeerId; use libp2p::Swarm; use log::info; @@ -16,10 +15,9 @@ use crate::network::NetworkConfiguration; pub async fn build_swarm( network_config: &NetworkConfiguration, key_pair: Keypair, -) -> Result<(Swarm, PeerId)> { - // Read the peer ID (public key) from the key pair - let peer_id = PeerId::from(key_pair.public()); - info!("Network service peer ID: {peer_id}"); +) -> Result> { + let peer_id = network_config.peer_id.expect("Peer id needs to be given"); + info!("Local peer id: {peer_id}"); let relay_client_enabled = network_config.relay_address.is_some(); @@ -39,5 +37,5 @@ pub async fn build_swarm( .notify_handler_buffer_size(network_config.notify_handler_buffer_size.try_into()?) .build(); - Ok((swarm, peer_id)) + Ok(swarm) } diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index 8573b5142..0da6886fc 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -11,6 +11,7 @@ use crate::http::http_service; use crate::manager::ServiceManager; use crate::materializer::materializer_service; use crate::network::network_service; +use crate::replication::replication_service; use crate::schema::SchemaProvider; /// Capacity of the internal broadcast channel used to communicate between services. @@ -81,6 +82,15 @@ impl Node { panic!("Failed starting network service"); } + // Start replication service syncing data with other nodes + if manager + .add("replication", replication_service) + .await + .is_err() + { + panic!("Failed starting replication service"); + } + Self { pool, manager } } diff --git a/aquadoggo/src/replication/errors.rs b/aquadoggo/src/replication/errors.rs index 0451886f4..4738ca3f2 100644 --- a/aquadoggo/src/replication/errors.rs +++ b/aquadoggo/src/replication/errors.rs @@ -2,19 +2,21 @@ use thiserror::Error; +use crate::replication::TargetSet; + #[derive(Error, Debug)] pub enum ReplicationError { #[error("Remote peer requested unsupported replication mode")] UnsupportedMode, - #[error("Tried to initialise duplicate inbound replication session with id {0}")] - DuplicateInboundRequest(u64), + #[error("Duplicate session error: {0}")] + DuplicateSession(#[from] DuplicateSessionRequestError), - #[error("Tried to initialise duplicate outbound replication session with id {0}")] - DuplicateOutboundRequest(u64), + #[error("No session found with id {0} for peer {1}")] + NoSessionFound(u64, String), - #[error("No session found with id {0}")] - NoSessionFound(u64), + #[error("No sessions found for peer {0}")] + NoPeerFound(String), #[error("Received entry which is not in target set")] UnmatchedTargetSet, @@ -22,22 +24,23 @@ pub enum ReplicationError { #[error("Replication strategy failed with error: {0}")] StrategyFailed(String), - #[error("Incoming data could not be ingested")] + #[error("Incoming data could not be ingested: {0}")] Validation(#[from] IngestError), } #[derive(Error, Debug)] +#[error(transparent)] pub enum IngestError { #[error("Schema is not supported")] UnsupportedSchema, - #[error("Received entry and operation is invalid")] + #[error(transparent)] Domain(#[from] p2panda_rs::api::DomainError), - #[error("Decoding entry failed")] + #[error("Decoding entry failed: {0}")] DecodeEntry(#[from] p2panda_rs::entry::error::DecodeEntryError), - #[error("Decoding operation failed")] + #[error("Decoding operation failed: {0}")] DecodeOperation(#[from] p2panda_rs::operation::error::DecodeOperationError), } @@ -49,3 +52,28 @@ pub enum TargetSetError { #[error("Target set contains unsorted or duplicate schema ids")] UnsortedSchemaIds, } + +#[derive(Error, Debug, PartialEq)] +pub enum DuplicateSessionRequestError { + #[error("Remote sent two sync requests for session with id {0}")] + InboundPendingSession(u64), + + #[error("Tried to initialise duplicate inbound replication for already established session with id {0}")] + InboundEstablishedSession(u64), + + #[error("Tried to initialise duplicate inbound replication for completed session with id {0}")] + InboundDoneSession(u64), + + #[error( + "Tried to initialise duplicate inbound replication session for existing target set {0:?}" + )] + InboundExistingTargetSet(TargetSet), + + #[error( + "Tried to initialise duplicate outbound replication session for existing target set {0:?}" + )] + OutboundExistingTargetSet(TargetSet), + + #[error("Tried to initialise duplicate outbound replication session with id {0}")] + Outbound(u64), +} diff --git a/aquadoggo/src/replication/ingest.rs b/aquadoggo/src/replication/ingest.rs index e92672c63..344286662 100644 --- a/aquadoggo/src/replication/ingest.rs +++ b/aquadoggo/src/replication/ingest.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use log::trace; use p2panda_rs::api::validation::{ ensure_document_not_deleted, get_checked_document_id_for_view_id, get_expected_skiplink, is_next_seq_num, validate_claimed_schema_id, @@ -16,6 +17,7 @@ use p2panda_rs::operation::validate::validate_operation_with_entry; use p2panda_rs::operation::{EncodedOperation, Operation, OperationAction, OperationId}; use p2panda_rs::schema::Schema; use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore}; +use p2panda_rs::Human; use crate::bus::{ServiceMessage, ServiceSender}; use crate::db::SqlStore; @@ -129,6 +131,14 @@ impl SyncIngest { encoded_operation: &EncodedOperation, ) -> Result<(), IngestError> { let entry = decode_entry(encoded_entry)?; + + trace!( + "Received entry {:?} for log {:?} and {}", + entry.seq_num(), + entry.log_id(), + entry.public_key().display() + ); + let plain_operation = decode_operation(encoded_operation)?; let schema = self diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index d2e037937..67611555b 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -1,20 +1,23 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use std::collections::HashMap; +use std::hash::Hash; use anyhow::Result; +use log::{debug, info, trace, warn}; use p2panda_rs::entry::EncodedEntry; use p2panda_rs::operation::EncodedOperation; +use p2panda_rs::Human; use crate::db::SqlStore; -use crate::replication::errors::ReplicationError; +use crate::replication::errors::{DuplicateSessionRequestError, ReplicationError}; use crate::replication::{ Message, Mode, Session, SessionId, SessionState, SyncIngest, SyncMessage, TargetSet, }; pub const INITIAL_SESSION_ID: SessionId = 0; -pub const SUPPORTED_MODES: [Mode; 1] = [Mode::Naive]; +pub const SUPPORTED_MODES: [Mode; 1] = [Mode::LogHeight]; pub const SUPPORT_LIVE_MODE: bool = false; @@ -50,7 +53,7 @@ pub struct SyncManager

{ impl

SyncManager

where - P: Clone + std::hash::Hash + Eq + PartialOrd, + P: Clone + Human + Hash + Eq + PartialOrd, { pub fn new(store: SqlStore, ingest: SyncIngest, local_peer: P) -> Self { Self { @@ -61,8 +64,16 @@ where } } + /// Removes all sessions related to a remote peer. + /// + /// Warning: This might also remove actively running sessions. Do only clear sessions when you + /// are sure they are a) done or b) the peer closed its connection. + pub fn remove_sessions(&mut self, remote_peer: &P) { + self.sessions.remove(remote_peer); + } + /// Get all sessions related to a remote peer. - fn get_sessions(&self, remote_peer: &P) -> Vec { + pub fn get_sessions(&self, remote_peer: &P) -> Vec { self.sessions .get(remote_peer) // Always return an array, even when it is empty @@ -109,7 +120,7 @@ where } } - fn remove_session(&mut self, remote_peer: &P, session_id: &SessionId) { + pub fn remove_session(&mut self, remote_peer: &P, session_id: &SessionId) { let sessions = self.sessions.get_mut(remote_peer); if let Some(sessions) = sessions { @@ -119,7 +130,18 @@ where .find(|(_, session)| session.id == *session_id) { sessions.remove(index); + } else { + warn!( + "Tried to remove nonexistent session {} with peer: {}", + session_id, + remote_peer.display() + ); } + } else { + warn!( + "Tried to remove sessions from unknown peer: {}", + remote_peer.display() + ); } } @@ -143,14 +165,22 @@ where let sessions = self.get_sessions(remote_peer); + info!( + "Initiate outbound replication session with peer {}", + remote_peer.display() + ); + // Make sure to not have duplicate sessions over the same schema ids let session = sessions .iter() .find(|session| session.target_set() == *target_set); - if let Some(session) = session { - return Err(ReplicationError::DuplicateOutboundRequest(session.id)); - } + match session { + Some(session) => Err(DuplicateSessionRequestError::OutboundExistingTargetSet( + session.target_set(), + )), + None => Ok(()), + }?; // Determine next id for upcoming session let session_id = { @@ -176,24 +206,46 @@ where &mut self, remote_peer: &P, target_set: &TargetSet, - session: &Session, + existing_session: &Session, ) -> Result { - let accept_inbound_request = match session.state { + match existing_session.local { + // Remote peer sent a sync request for an already pending inbound session, we should + // ignore this second request. + false => Err(DuplicateSessionRequestError::InboundPendingSession( + existing_session.id, + )), + _ => Ok(()), + }?; + + let accept_inbound_request = match existing_session.state { // Handle only duplicate sessions when they haven't started yet SessionState::Pending => { if &self.local_peer < remote_peer { // Drop our pending session - self.remove_session(remote_peer, &session.id); + debug!( + "Drop pending outbound session and process inbound session request with duplicate id {}", + existing_session.id + ); + self.remove_session(remote_peer, &existing_session.id); // Accept the inbound request - true + Ok(true) } else { // Keep our pending session, ignore inbound request - false + debug!( + "Ignore inbound request and keep pending outbound session with duplicate id {}", + existing_session.id + ); + Ok(false) } } - _ => return Err(ReplicationError::DuplicateInboundRequest(session.id)), - }; + SessionState::Established => Err( + DuplicateSessionRequestError::InboundEstablishedSession(existing_session.id), + ), + SessionState::Done => Err(DuplicateSessionRequestError::InboundDoneSession( + existing_session.id, + )), + }?; let mut all_messages: Vec = vec![]; @@ -201,19 +253,23 @@ where let messages = self .insert_and_initialize_session( remote_peer, - &session.id, + &existing_session.id, target_set, - &session.mode(), + &existing_session.mode(), false, ) .await; - all_messages.extend(to_sync_messages(session.id, messages)); + all_messages.extend(to_sync_messages(existing_session.id, messages)); // If we dropped our own outbound session request regarding a different target set, we // need to re-establish it with another session id, otherwise it would get lost - if session.target_set() != *target_set { + if existing_session.target_set() != *target_set { let messages = self - .initiate_session(remote_peer, target_set, &session.mode()) + .initiate_session( + remote_peer, + &existing_session.target_set(), + &existing_session.mode(), + ) .await?; all_messages.extend(messages) } @@ -225,6 +281,68 @@ where }) } + async fn handle_duplicate_target_set( + &mut self, + remote_peer: &P, + session_id: &SessionId, + mode: &Mode, + existing_session: &Session, + ) -> Result { + match existing_session.local { + // Remote peer sent a sync request for an already pending inbound session, we should + // ignore this second request. + false => Err(DuplicateSessionRequestError::InboundExistingTargetSet( + existing_session.target_set(), + )), + _ => Ok(()), + }?; + + let accept_inbound_request = match existing_session.state { + // Handle only duplicate sessions when they haven't started yet + SessionState::Pending => { + if &self.local_peer < remote_peer { + // Drop our pending session + debug!( + "Drop pending outbound session and process inbound session request with duplicate target set" + ); + self.remove_session(remote_peer, &existing_session.id); + + // Accept the inbound request + Ok(true) + } else { + // Keep our pending session, ignore inbound request + debug!( + "Ignore inbound request and keep pending outbound session with duplicate target set", + ); + Ok(false) + } + } + _ => Err(DuplicateSessionRequestError::InboundExistingTargetSet( + existing_session.target_set(), + )), + }?; + + let mut all_messages: Vec = vec![]; + + if accept_inbound_request { + let messages = self + .insert_and_initialize_session( + remote_peer, + session_id, + &existing_session.target_set(), + mode, + false, + ) + .await; + all_messages.extend(to_sync_messages(existing_session.id, messages)); + } + + Ok(SyncResult { + messages: all_messages, + is_done: false, + }) + } + async fn handle_sync_request( &mut self, remote_peer: &P, @@ -236,25 +354,35 @@ where let sessions = self.get_sessions(remote_peer); - // Check if a session with this id already exists for this peer, this can happen if both - // peers started to initiate a session at the same time, we can try to resolve this - if let Some(session) = sessions + // Check if a session with this id already exists for this peer. + // + // This can happen if both peers started to initiate a session at the same time, or if the + // remote peer sent two sync request messages with the same session id. + if let Some(existing_session) = sessions .iter() - .find(|session| session.id == *session_id && session.local) + .find(|existing_session| existing_session.id == *session_id) { + trace!("Handle sync request containing duplicate session id"); return self - .handle_duplicate_session(remote_peer, target_set, session) + .handle_duplicate_session(remote_peer, target_set, existing_session) .await; } - // Check if a session with this target set already exists for this peer, this always gets - // rejected because it is clearly redundant + // Check if a session with this target set already exists for this peer. if let Some(session) = sessions .iter() .find(|session| session.target_set() == *target_set) { - return Err(ReplicationError::DuplicateInboundRequest(session.id)); - } + trace!("Handle sync request containing duplicate target sets"); + return self + .handle_duplicate_target_set(remote_peer, session_id, mode, session) + .await; + }; + + info!( + "Accept inbound replication session with peer {}", + remote_peer.display() + ); let messages = self .insert_and_initialize_session(remote_peer, session_id, target_set, mode, false) @@ -269,6 +397,13 @@ where session_id: &SessionId, message: &Message, ) -> Result { + trace!( + "Message received: {} {} {}", + session_id, + remote_peer.display(), + message.display(), + ); + let sessions = self.sessions.get_mut(remote_peer); let (is_both_done, messages) = match sessions { @@ -283,10 +418,13 @@ where let is_both_done = session.state == SessionState::Done; Ok((is_both_done, messages)) } else { - Err(ReplicationError::NoSessionFound(*session_id)) + Err(ReplicationError::NoSessionFound( + *session_id, + remote_peer.display(), + )) } } - None => Err(ReplicationError::NoSessionFound(*session_id)), + None => Err(ReplicationError::NoPeerFound(remote_peer.display())), }?; // We're done, clean up after ourselves @@ -332,7 +470,10 @@ where is_done: session.state == SessionState::Done, }) } else { - Err(ReplicationError::NoSessionFound(*session_id)) + Err(ReplicationError::NoSessionFound( + *session_id, + remote_peer.display(), + )) } } @@ -366,10 +507,11 @@ where #[cfg(test)] mod tests { use p2panda_rs::test_utils::memory_store::helpers::PopulateStoreConfig; + use p2panda_rs::Human; use rstest::rstest; use tokio::sync::broadcast; - use crate::replication::errors::ReplicationError; + use crate::replication::errors::{DuplicateSessionRequestError, ReplicationError}; use crate::replication::message::{Message, HAVE_TYPE, SYNC_DONE_TYPE}; use crate::replication::{Mode, SyncIngest, SyncMessage, TargetSet}; use crate::schema::SchemaProvider; @@ -381,37 +523,52 @@ mod tests { use super::{SyncManager, INITIAL_SESSION_ID}; - const PEER_ID_LOCAL: &'static str = "local"; - const PEER_ID_REMOTE: &'static str = "remote"; + #[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] + struct Peer(String); + + impl Peer { + pub fn new(id: &str) -> Self { + Self(id.to_string()) + } + } + + impl Human for Peer { + fn display(&self) -> String { + self.0.clone() + } + } #[rstest] fn initiate_outbound_session( #[from(random_target_set)] target_set_1: TargetSet, #[from(random_target_set)] target_set_2: TargetSet, ) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + test_runner(move |node: TestNode| async move { - let mode = Mode::Naive; + let mode = Mode::LogHeight; let (tx, _rx) = broadcast::channel(8); let ingest = SyncIngest::new(SchemaProvider::default(), tx); - let mut manager = SyncManager::new(node.context.store.clone(), ingest, PEER_ID_LOCAL); + let mut manager = SyncManager::new(node.context.store.clone(), ingest, peer_id_local); let result = manager - .initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode) + .initiate_session(&peer_id_remote, &target_set_1, &mode) .await; assert!(result.is_ok()); let result = manager - .initiate_session(&PEER_ID_REMOTE, &target_set_2, &mode) + .initiate_session(&peer_id_remote, &target_set_2, &mode) .await; assert!(result.is_ok()); // Expect error when initiating a session for the same target set let result = manager - .initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode) + .initiate_session(&peer_id_remote, &target_set_1, &mode) .await; assert!(matches!( result, - Err(ReplicationError::DuplicateOutboundRequest(0)) + Err(ReplicationError::DuplicateSession(err)) if err == DuplicateSessionRequestError::OutboundExistingTargetSet(target_set_1) )); }) } @@ -420,70 +577,471 @@ mod tests { fn initiate_inbound_session( #[from(random_target_set)] target_set_1: TargetSet, #[from(random_target_set)] target_set_2: TargetSet, + #[from(random_target_set)] target_set_3: TargetSet, ) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + test_runner(move |node: TestNode| async move { let (tx, _rx) = broadcast::channel(8); let ingest = SyncIngest::new(SchemaProvider::default(), tx); - let mut manager = SyncManager::new(node.context.store.clone(), ingest, PEER_ID_LOCAL); + let mut manager = SyncManager::new(node.context.store.clone(), ingest, peer_id_local); - let message = - SyncMessage::new(0, Message::SyncRequest(Mode::Naive, target_set_1.clone())); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; + let message = SyncMessage::new( + 0, + Message::SyncRequest(Mode::LogHeight, target_set_1.clone()), + ); + let result = manager.handle_message(&peer_id_remote, &message).await; assert!(result.is_ok()); - let message = - SyncMessage::new(1, Message::SyncRequest(Mode::Naive, target_set_2.clone())); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; + let message = SyncMessage::new( + 1, + Message::SyncRequest(Mode::LogHeight, target_set_2.clone()), + ); + let result = manager.handle_message(&peer_id_remote, &message).await; assert!(result.is_ok()); // Reject attempt to create session again - let message = - SyncMessage::new(0, Message::SyncRequest(Mode::Naive, target_set_1.clone())); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; - assert!(matches!( - result, - Err(ReplicationError::DuplicateInboundRequest(0)) + let message = SyncMessage::new( + 0, + Message::SyncRequest(Mode::LogHeight, target_set_3.clone()), + ); + let result = manager.handle_message(&peer_id_remote, &message).await; + assert!(matches!(result, + Err(ReplicationError::DuplicateSession(err)) if err == DuplicateSessionRequestError::InboundPendingSession(0) )); // Reject different session concerning same target set - let message = - SyncMessage::new(2, Message::SyncRequest(Mode::Naive, target_set_2.clone())); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; + let message = SyncMessage::new( + 2, + Message::SyncRequest(Mode::LogHeight, target_set_2.clone()), + ); + let result = manager.handle_message(&peer_id_remote, &message).await; assert!(matches!( result, - Err(ReplicationError::DuplicateInboundRequest(1)) + Err(ReplicationError::DuplicateSession(err)) if err == DuplicateSessionRequestError::InboundExistingTargetSet(target_set_2) )); }) } + // PEER A PEER B + // + // SyncRequest(0, 0, ["A"])────────────────────► + // + // ◄─────────────────── SyncRequest(0, 0, ["B"]) + // + // ========== PEER A REQUEST DROPPED =========== + // + // 0 Have([..]) ───────────────────────────────► + // + // 0 SyncDone(false) ─────┐ + // │ + // ◄──────────────────────┼──────── 0 Have([..]) + // │ + // ◄──────────────────────┼─── 0 SyncDone(false) + // │ + // └────────────────────► + // + // ============ SESSION 0 CLOSED =============== + // + // ====== PEER A REPEATS WITH NEW SESS ID ====== + // + // SyncRequest(1, 0, ["A"])────────────────────► + // + // ◄─────────────────────────────── 1 Have([..]) + // + // ┌─── 1 SyncDone(false) + // │ + // 1 Have([..]) ──────────┼────────────────────► + // │ + // 1 SyncDone(false) ─────┼────────────────────► + // │ + // ◄──────────────────────┘ + // + // ============ SESSION 1 CLOSED =============== + #[rstest] + fn concurrent_requests_duplicate_session_ids( + #[from(random_target_set)] target_set_1: TargetSet, + #[from(random_target_set)] target_set_2: TargetSet, + ) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + + test_runner(move |node: TestNode| async move { + let mode = Mode::LogHeight; + let (tx, _rx) = broadcast::channel(8); + let ingest = SyncIngest::new(SchemaProvider::default(), tx); + + // Sanity check: Id of peer A is < id of peer B. + // + // This is important for testing the deterministic handling of concurrent session + // requests which contain the same session id. + assert!(peer_id_local < peer_id_remote); + + // Sanity check: Target sets need to be different + assert!(target_set_1 != target_set_2); + + // Local peer A initiates a session with id 0 and target set 1. + let mut manager_a = SyncManager::new( + node.context.store.clone(), + ingest.clone(), + peer_id_local.clone(), + ); + let result = manager_a + .initiate_session(&peer_id_remote, &target_set_1, &mode) + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let sync_request_a = result[0].clone(); + + // Remote peer B initiates a session with id 0 and target set 2. + // + // Note that both peers use the _same_ session id but _different_ target sets. + let mut manager_b = + SyncManager::new(node.context.store.clone(), ingest, peer_id_remote.clone()); + let result = manager_b + .initiate_session(&peer_id_local, &target_set_2, &mode) + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let sync_request_b = result[0].clone(); + + // Both peers send and handle the requests concurrently. + let result = manager_a + .handle_message(&peer_id_remote, &sync_request_b) + .await + .unwrap(); + + // We expect Peer A to: + // + // 1. Drop their pending outgoing session + // 2. Respond to the request from Peer B + // 3. Send another sync request for the other target set with a corrected session id + assert_eq!(result.messages.len(), 3); + let (have_message_a, done_message_a, sync_request_a_corrected) = ( + result.messages[0].clone(), + result.messages[1].clone(), + result.messages[2].clone(), + ); + + let result = manager_b + .handle_message(&peer_id_local, &sync_request_a) + .await + .unwrap(); + + // We expect Peer B to drop the incoming request from Peer A and simply wait for a + // response from it's original request. + assert_eq!(result.messages.len(), 0); + + // Peer A has two sessions running: The one initiated by Peer B and the one it + // re-initiated itself with the new session id + let manager_a_sessions = manager_a.get_sessions(&peer_id_remote); + assert_eq!(manager_a_sessions.len(), 2); + + // Peer B has still one running, it didn't learn about the re-initiated session of A + // yet + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 1); + + // Peer B processes the `Have`, `SyncDone` and `SyncRequest` messages from Peer A. + let result = manager_b + .handle_message(&peer_id_local, &have_message_a) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 2); + + // They send their own `Have` and `SyncDone` messages. + let (have_message_b, done_message_b) = + (response.messages[0].clone(), response.messages[1].clone()); + + // Sync done, they send no more messages. + let result = manager_b + .handle_message(&peer_id_local, &done_message_a) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // Peer B should have closed the session for good + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 0); + + // Now the second, re-established sync request from peer A concerning another target + // set arrives at peer B + let result = manager_b + .handle_message(&peer_id_local, &sync_request_a_corrected) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 2); + + // They send their own `Have` and `SyncDone` messages for the corrected target set + let (have_message_b_corrected, done_message_b_corrected) = + (response.messages[0].clone(), response.messages[1].clone()); + + // Peer B should now know about one session again + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 1); + + // Peer A processes both the `Have` and `SyncDone` messages from Peer B for the first + // session and produces no new messages. We're done with this session on Peer A as + // well now. + let result = manager_a + .handle_message(&peer_id_remote, &have_message_b) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + let result = manager_a + .handle_message(&peer_id_remote, &done_message_b) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // Peer A should now know about one session again + let manager_a_sessions = manager_a.get_sessions(&peer_id_remote); + assert_eq!(manager_a_sessions.len(), 1); + + // Peer A processes both the re-initiated sessions `Have` and `SyncDone` messages from + // Peer B and produces its own answer. + let result = manager_a + .handle_message(&peer_id_remote, &have_message_b_corrected) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 2); + + let (have_message_a_corrected, done_message_a_corrected) = + (response.messages[0].clone(), response.messages[1].clone()); + + let result = manager_a + .handle_message(&peer_id_remote, &done_message_b_corrected) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // Peer B processes both the re-initiated `Have` and `SyncDone` messages from Peer A + // and produces no new messages. + let result = manager_b + .handle_message(&peer_id_local, &have_message_a_corrected) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + let result = manager_b + .handle_message(&peer_id_local, &done_message_a_corrected) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // After processing all messages both peers should have no sessions remaining. + let manager_a_sessions = manager_a.get_sessions(&peer_id_remote); + assert_eq!(manager_a_sessions.len(), 0); + + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 0); + }) + } + + // PEER A PEER B + // + // SyncRequest(0, 0, ["A"])────────────────────► + // + // ◄─────────────────── SyncRequest(0, 1, ["A"]) + // + // ========== PEER A REQUEST DROPPED =========== + // + // Have([..]) ─────────────────────────────────► + // + // Done(false) ───────────┐ + // │ + // ◄──────────────────────┼────────── Have([..]) + // │ + // ◄──────────────────────┼───────── Done(false) + // │ + // └────────────────────► + // + // ============== SESSION CLOSED =============== + #[rstest] + fn concurrent_requests_duplicate_target_set( + #[from(random_target_set)] target_set_1: TargetSet, + ) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + + test_runner(move |node: TestNode| async move { + let mode = Mode::LogHeight; + let (tx, _rx) = broadcast::channel(8); + let ingest = SyncIngest::new(SchemaProvider::default(), tx); + + // Local peer id is < than remote, this is important for testing the deterministic + // handling of concurrent session requests which contain the same session id. + assert!(peer_id_local < peer_id_remote); + + let mut manager_a = SyncManager::new( + node.context.store.clone(), + ingest.clone(), + peer_id_local.clone(), + ); + + let mut manager_b = + SyncManager::new(node.context.store.clone(), ingest, peer_id_remote.clone()); + + // Local peer A initiates a session with target set A. + let result = manager_a + .initiate_session(&peer_id_remote, &target_set_1, &mode) + .await; + + let sync_messages = result.unwrap(); + assert_eq!(sync_messages.len(), 1); + let sync_request_a = sync_messages[0].clone(); + + // Remote peer B initiates a session with a dummy peer just to increment the session + // id. + let dummy_peer_id = Peer::new("some_other_peer"); + let _result = manager_b + .initiate_session(&dummy_peer_id, &target_set_1, &mode) + .await; + + // Remote peer B initiates a session with target set A. + let result = manager_b + .initiate_session(&peer_id_local, &target_set_1, &mode) + .await; + + let sync_messages = result.unwrap(); + assert_eq!(sync_messages.len(), 1); + let sync_request_b = sync_messages[0].clone(); + + // Remove the session from the dummy peer. + manager_b.remove_sessions(&dummy_peer_id); + + // Both peers send and handle the requests concurrently. + let result = manager_a + .handle_message(&peer_id_remote, &sync_request_b) + .await; + let response = result.unwrap(); + + // We expect Peer A to drop their pending outgoing session and respond to the request + // from Peer B. + assert_eq!(response.messages.len(), 2); + let (have_message_a, done_message_a) = + (response.messages[0].clone(), response.messages[1].clone()); + + let result = manager_b + .handle_message(&peer_id_local, &sync_request_a) + .await; + let response = result.unwrap(); + + // We expect Peer B to drop the incoming request from Peer A and simply wait + // for a response from it's original request. + assert_eq!(response.messages.len(), 0); + + // Both peers have exactly one session running. + let manager_a_sessions = manager_a.get_sessions(&peer_id_remote); + assert_eq!(manager_a_sessions.len(), 1); + + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 1); + + // Peer B processes the `Have` and `SyncDone` messages from Peer A. + let result = manager_b + .handle_message(&peer_id_local, &have_message_a) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 2); + + // They send their own `Have` and `SyncDone` messages. + let (have_message_b, done_message_b) = + (response.messages[0].clone(), response.messages[1].clone()); + + // Sync done, they send no more messages. + let result = manager_b + .handle_message(&peer_id_local, &done_message_a) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // Peer A processes both the `Have` and `SyncDone` messages from Peer B and produces + // no new messages. + let result = manager_a + .handle_message(&peer_id_remote, &have_message_b) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + let result = manager_a + .handle_message(&peer_id_remote, &done_message_b) + .await; + let response = result.unwrap(); + assert_eq!(response.messages.len(), 0); + + // After processing all messages both peers should have no sessions remaining. + let manager_a_sessions = manager_a.get_sessions(&peer_id_remote); + assert_eq!(manager_a_sessions.len(), 0); + + let manager_b_sessions = manager_b.get_sessions(&peer_id_local); + assert_eq!(manager_b_sessions.len(), 0); + }) + } + #[rstest] fn inbound_checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + test_runner(move |node: TestNode| async move { let (tx, _rx) = broadcast::channel(8); let ingest = SyncIngest::new(SchemaProvider::default(), tx); // Should not fail when requesting supported replication mode - let mut manager = - SyncManager::new(node.context.store.clone(), ingest.clone(), PEER_ID_LOCAL); + let mut manager = SyncManager::new( + node.context.store.clone(), + ingest.clone(), + peer_id_local.clone(), + ); let message = SyncMessage::new( INITIAL_SESSION_ID, - Message::SyncRequest(Mode::Naive, target_set.clone()), + Message::SyncRequest(Mode::LogHeight, target_set.clone()), ); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; + let result = manager.handle_message(&peer_id_remote, &message).await; assert!(result.is_ok()); // Should fail when requesting unsupported replication mode - let mut manager = SyncManager::new(node.context.store.clone(), ingest, PEER_ID_LOCAL); + let mut manager = SyncManager::new(node.context.store.clone(), ingest, peer_id_local); let message = SyncMessage::new( INITIAL_SESSION_ID, Message::SyncRequest(Mode::SetReconciliation, target_set.clone()), ); - let result = manager.handle_message(&PEER_ID_REMOTE, &message).await; + let result = manager.handle_message(&peer_id_remote, &message).await; assert!(result.is_err()); }) } + // PEER A PEER B + // + // SyncRequest(0, 0, [..])─────────────────────► + // + // ◄───────────────────────────────── Have([..]) + // + // ┌─────── SyncDone(false) + // │ + // Have([..]) ──────────┼──────────────────────► + // │ + // Entry(..) ──────────┼──────────────────────► + // │ + // Entry(..) ───────────┼──────────────────────► + // │ + // Entry(..) ───────────┼──────────────────────► + // │ + // Entry(..) ───────────┼──────────────────────► + // │ + // Entry(..) ───────────┼──────────────────────► + // │ + // Entry(..) ───────────┼──────────────────────► + // │ + // SyncDone(false) ─────┼──────────────────────► + // │ + // ◄────────────────────┘ #[rstest] fn sync_lifetime( #[from(populate_store_config)] @@ -491,6 +1049,9 @@ mod tests { config_a: PopulateStoreConfig, #[from(populate_store_config)] config_b: PopulateStoreConfig, ) { + let peer_id_local: Peer = Peer::new("local"); + let peer_id_remote: Peer = Peer::new("remote"); + test_runner_with_manager(|manager: TestNodeManager| async move { let mut node_a = manager.create().await; let mut node_b = manager.create().await; @@ -499,23 +1060,23 @@ mod tests { populate_and_materialize(&mut node_b, &config_b).await; let (tx, _rx) = broadcast::channel(8); - let target_set = TargetSet::new(&vec![config_a.schema.id().to_owned()]); + let target_set = TargetSet::new(&[config_a.schema.id().to_owned()]); let mut manager_a = SyncManager::new( node_a.context.store.clone(), SyncIngest::new(node_a.context.schema_provider.clone(), tx.clone()), - PEER_ID_LOCAL, + peer_id_local.clone(), ); let mut manager_b = SyncManager::new( node_b.context.store.clone(), SyncIngest::new(node_b.context.schema_provider.clone(), tx), - PEER_ID_REMOTE, + peer_id_remote.clone(), ); // Send `SyncRequest` to remote let messages = manager_a - .initiate_session(&PEER_ID_REMOTE, &target_set, &Mode::Naive) + .initiate_session(&peer_id_remote, &target_set, &Mode::LogHeight) .await .unwrap(); @@ -523,17 +1084,18 @@ mod tests { messages, vec![SyncMessage::new( 0, - Message::SyncRequest(Mode::Naive, target_set.clone()) + Message::SyncRequest(Mode::LogHeight, target_set.clone()) )] ); - // Receive `Have` and `SyncDone` from remote + // Remote receives `SyncRequest` + // Send `Have` and `SyncDone` messages back to local let result = manager_b - .handle_message(&PEER_ID_LOCAL, &messages[0]) + .handle_message(&peer_id_local, &messages[0]) .await .unwrap(); - assert_eq!(result.is_done, false); + assert!(!result.is_done); assert_eq!( result.messages, vec![ @@ -542,18 +1104,19 @@ mod tests { ] ); + // Receive `Have` and `SyncDone` messages from remote // Send `Have`, `Entry` and `SyncDone` messages to remote let result_have = manager_a - .handle_message(&PEER_ID_REMOTE, &result.messages[0]) + .handle_message(&peer_id_remote, &result.messages[0]) .await .unwrap(); - assert_eq!(result_have.is_done, false); + assert!(!result_have.is_done); let result_done = manager_a - .handle_message(&PEER_ID_REMOTE, &result.messages[1]) + .handle_message(&peer_id_remote, &result.messages[1]) .await .unwrap(); - assert_eq!(result_done.is_done, true); + assert!(result_done.is_done); assert_eq!(result_have.messages.len(), 8); assert_eq!( @@ -565,10 +1128,10 @@ mod tests { SYNC_DONE_TYPE ); - // Receive `SyncDone` from remote + // Remote receives `Have`, `Entry` `SyncDone` messages from local for (index, message) in result_have.messages.iter().enumerate() { let result = manager_b - .handle_message(&PEER_ID_LOCAL, &message) + .handle_message(&peer_id_local, message) .await .unwrap(); diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index d310d677f..73da83871 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -6,6 +6,7 @@ use p2panda_rs::entry::EncodedEntry; use p2panda_rs::entry::{LogId, SeqNum}; use p2panda_rs::identity::PublicKey; use p2panda_rs::operation::EncodedOperation; +use p2panda_rs::Human; use serde::de::Visitor; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; @@ -21,14 +22,14 @@ pub type MessageType = u64; pub type LiveMode = bool; -pub type LogHeight = (PublicKey, Vec<(LogId, SeqNum)>); +pub type LogHeights = (PublicKey, Vec<(LogId, SeqNum)>); #[derive(Debug, Clone, Eq, PartialEq)] pub enum Message { SyncRequest(Mode, TargetSet), Entry(EncodedEntry, Option), SyncDone(LiveMode), - Have(Vec), + Have(Vec), } impl Message { @@ -42,6 +43,21 @@ impl Message { } } +impl Human for Message { + fn display(&self) -> String { + match &self { + Message::Have(log_heights) => { + let log_heights: Vec<(String, &Vec<(LogId, SeqNum)>)> = log_heights + .iter() + .map(|(public_key, log_heights)| (public_key.to_string(), log_heights)) + .collect(); + format!("Have({log_heights:?})") + } + message => format!("{message:?}"), + } + } +} + #[derive(Debug, Clone, Eq, PartialEq)] pub struct SyncMessage(SessionId, Message); @@ -63,6 +79,12 @@ impl SyncMessage { } } +impl Human for SyncMessage { + fn display(&self) -> String { + format!("SyncMessage({:?}, {})", self.0, self.1.display()) + } +} + impl Serialize for SyncMessage { fn serialize(&self, serializer: S) -> Result where @@ -219,7 +241,10 @@ mod tests { assert_eq!( deserialize_into::(&serialize_value(cbor!([0, 12, 0, target_set]))) .unwrap(), - SyncMessage::new(12, Message::SyncRequest(Mode::Naive, target_set.clone())) + SyncMessage::new( + 12, + Message::SyncRequest(Mode::LogHeight, target_set.clone()) + ) ); let log_heights: Vec<(PublicKey, Vec<(LogId, SeqNum)>)> = vec![]; diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index bb943f877..599a64fb3 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -5,6 +5,7 @@ mod ingest; mod manager; mod message; mod mode; +mod service; mod session; mod strategies; mod target_set; @@ -12,8 +13,9 @@ pub mod traits; pub use ingest::SyncIngest; pub use manager::SyncManager; -pub use message::{LiveMode, LogHeight, Message, SyncMessage}; +pub use message::{LiveMode, LogHeights, Message, SyncMessage}; pub use mode::Mode; +pub use service::replication_service; pub use session::{Session, SessionId, SessionState}; -pub use strategies::{NaiveStrategy, SetReconciliationStrategy, StrategyResult}; +pub use strategies::{LogHeightStrategy, SetReconciliationStrategy, StrategyResult}; pub use target_set::TargetSet; diff --git a/aquadoggo/src/replication/mode.rs b/aquadoggo/src/replication/mode.rs index c592c83ff..3d73f96fe 100644 --- a/aquadoggo/src/replication/mode.rs +++ b/aquadoggo/src/replication/mode.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; #[derive(Clone, Debug, Eq, PartialEq)] pub enum Mode { - Naive, + LogHeight, SetReconciliation, Unknown, } @@ -15,7 +15,7 @@ pub enum Mode { impl Mode { pub fn as_str(&self) -> &str { match self { - Mode::Naive => "naive", + Mode::LogHeight => "log-height", Mode::SetReconciliation => "set-reconciliation", Mode::Unknown => "unknown", } @@ -23,7 +23,7 @@ impl Mode { pub fn as_u64(&self) -> u64 { match self { - Mode::Naive => 0, + Mode::LogHeight => 0, Mode::SetReconciliation => 1, Mode::Unknown => unreachable!("Can't create an unknown replication mode"), } @@ -33,7 +33,7 @@ impl Mode { impl From for Mode { fn from(value: u64) -> Self { match value { - 0 => Mode::Naive, + 0 => Mode::LogHeight, 1 => Mode::SetReconciliation, _ => Mode::Unknown, } @@ -81,13 +81,13 @@ mod tests { #[test] fn u64_representation() { - assert_eq!(Mode::Naive.as_u64(), 0); + assert_eq!(Mode::LogHeight.as_u64(), 0); assert_eq!(Mode::SetReconciliation.as_u64(), 1); } #[test] fn serialize() { - let bytes = serialize_from(Mode::Naive); + let bytes = serialize_from(Mode::LogHeight); assert_eq!(bytes, vec![0]); } diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs new file mode 100644 index 000000000..c3acee757 --- /dev/null +++ b/aquadoggo/src/replication/service.rs @@ -0,0 +1,419 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::Result; +use libp2p::PeerId; +use log::{debug, info, trace, warn}; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::Human; +use tokio::task; +use tokio::time::interval; +use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; +use tokio_stream::StreamExt; + +use crate::bus::{ServiceMessage, ServiceSender}; +use crate::context::Context; +use crate::db::SqlStore; +use crate::manager::{ServiceReadySender, Shutdown}; +use crate::network::Peer; +use crate::replication::errors::ReplicationError; +use crate::replication::{ + Mode, Session, SessionId, SyncIngest, SyncManager, SyncMessage, TargetSet, +}; +use crate::schema::SchemaProvider; + +/// Maximum of replication sessions per peer. +const MAX_SESSIONS_PER_PEER: usize = 3; + +/// How often does the scheduler check for initiating replication sessions with peers. +const UPDATE_INTERVAL: Duration = Duration::from_secs(5); + +pub async fn replication_service( + context: Context, + shutdown: Shutdown, + tx: ServiceSender, + tx_ready: ServiceReadySender, +) -> Result<()> { + let _rx = tx.subscribe(); + + let local_peer_id = context + .config + .network + .peer_id + .expect("Peer id needs to be given"); + + let manager = + ConnectionManager::new(&context.schema_provider, &context.store, &tx, local_peer_id); + let handle = task::spawn(manager.run()); + + if tx_ready.send(()).is_err() { + warn!("No subscriber informed about replication service being ready"); + }; + + tokio::select! { + _ = handle => (), + _ = shutdown => (), + } + + Ok(()) +} + +/// Statistics about successful and failed replication sessions for each connected peer. +#[derive(Debug, Clone, PartialEq, Eq)] +struct PeerStatus { + peer: Peer, + successful_count: usize, + failed_count: usize, +} + +impl PeerStatus { + pub fn new(peer: Peer) -> Self { + Self { + peer, + successful_count: 0, + failed_count: 0, + } + } +} + +/// Coordinates peer connections and replication sessions. +/// +/// This entails: +/// +/// 1. Handles incoming replication- and peer connection messages from other services +/// 2. Maintains a list of currently connected p2panda peers +/// 3. Routes messages to the right replication session with help of the `SyncManager` and returns +/// responses to other services +/// 4. Schedules new replication sessions +/// 5. Handles replication errors and informs other services about them +struct ConnectionManager { + /// List of peers the connection mananger knows about and are available for replication. + peers: HashMap, + + /// Replication state manager, data ingest and message generator for handling all replication + /// logic. + sync_manager: SyncManager, + + /// Async stream giving us a regular interval to initiate new replication sessions. + scheduler: IntervalStream, + + /// Receiver for messages from other services, for example the networking layer. + tx: ServiceSender, + + /// Sender for messages to other services. + rx: BroadcastStream, + + /// Provider to retreive our currently supported schema ids. + schema_provider: SchemaProvider, +} + +impl ConnectionManager { + /// Returns a new instance of `ConnectionManager`. + pub fn new( + schema_provider: &SchemaProvider, + store: &SqlStore, + tx: &ServiceSender, + local_peer_id: PeerId, + ) -> Self { + let local_peer = Peer::new_local_peer(local_peer_id); + let ingest = SyncIngest::new(schema_provider.clone(), tx.clone()); + let sync_manager = SyncManager::new(store.clone(), ingest, local_peer); + let scheduler = IntervalStream::new(interval(UPDATE_INTERVAL)); + + Self { + peers: HashMap::new(), + sync_manager, + scheduler, + tx: tx.clone(), + rx: BroadcastStream::new(tx.subscribe()), + schema_provider: schema_provider.clone(), + } + } + + /// Returns set of schema ids we are interested in and support on this node. + async fn target_set(&self) -> TargetSet { + let supported_schema_ids: Vec = self + .schema_provider + .all() + .await + .iter() + .map(|schema| schema.id().to_owned()) + .collect(); + TargetSet::new(&supported_schema_ids) + } + + /// Register a new peer in the network. + async fn on_connection_established(&mut self, peer: Peer) { + info!("Connected to peer: {}", peer.display()); + + match self.peers.get(&peer) { + Some(_) => { + warn!("Peer already known: {}", peer.display()); + } + None => { + self.peers.insert(peer, PeerStatus::new(peer)); + self.update_sessions().await; + } + } + } + + /// Handle a peer disconnecting from the network. + async fn on_connection_closed(&mut self, peer: Peer) { + info!("Disconnected from peer: {}", peer.display()); + + // Clear running replication sessions from sync manager + self.sync_manager.remove_sessions(&peer); + self.remove_connection(peer) + } + + /// Remove a peer from the network. + fn remove_connection(&mut self, peer: Peer) { + match self.peers.remove(&peer) { + Some(_) => debug!("Remove peer: {}", peer.display()), + None => warn!("Tried to remove connection from unknown peer"), + } + } + + /// Route incoming replication messages to the right session. + async fn on_replication_message(&mut self, peer: Peer, message: SyncMessage) { + let session_id = message.session_id(); + + match self.sync_manager.handle_message(&peer, &message).await { + Ok(result) => { + for message in result.messages { + self.send_service_message(ServiceMessage::SentReplicationMessage( + peer, message, + )); + } + + if result.is_done { + self.on_replication_finished(peer, session_id).await; + } + } + Err(err) => { + self.on_replication_error(peer, session_id, err).await; + } + } + } + + /// Handle successful replication sessions. + async fn on_replication_finished(&mut self, peer: Peer, _session_id: SessionId) { + info!("Finished replication with peer {}", peer.display()); + + match self.peers.get_mut(&peer) { + Some(status) => { + status.successful_count += 1; + } + None => { + panic!("Tried to access unknown peer"); + } + } + } + + /// Handle replication errors and inform other services about them. + async fn on_replication_error( + &mut self, + peer: Peer, + session_id: SessionId, + error: ReplicationError, + ) { + warn!("Replication with peer {} failed: {}", peer.display(), error); + + match self.peers.get_mut(&peer) { + Some(status) => { + status.failed_count += 1; + } + None => { + panic!("Tried to access unknown peer"); + } + } + + self.sync_manager.remove_session(&peer, &session_id); + + // Inform network service about error, so it can accordingly react + self.send_service_message(ServiceMessage::ReplicationFailed(peer)); + } + + /// Determine if we can attempt new replication sessions with the peers we currently know + /// about. + async fn update_sessions(&mut self) { + // Determine the target set our node is interested in + let target_set = self.target_set().await; + + // Iterate through all currently connected peers + let attempt_peers: Vec = self + .peers + .clone() + .into_iter() + .filter_map(|(peer, _)| { + let sessions = self.sync_manager.get_sessions(&peer); + + // 1. Check if we're running too many sessions with that peer on this connection + // already. This limit is configurable. + let active_sessions: Vec<&Session> = sessions + .iter() + .filter(|session| !session.is_done()) + .collect(); + + // 2. Check if we're already having at least one session concerning the same target + // set. If we would start that session again it would be considered an error. + let has_active_target_set_session = active_sessions + .iter() + .any(|session| session.target_set() == target_set); + + if active_sessions.len() < MAX_SESSIONS_PER_PEER && !has_active_target_set_session { + Some(peer) + } else { + None + } + }) + .collect(); + + if attempt_peers.is_empty() { + trace!("No peers available for replication") + } + + for peer in attempt_peers { + self.initiate_replication(&peer, &target_set).await; + } + } + + /// Initiate a new replication session with remote peer. + async fn initiate_replication(&mut self, peer: &Peer, target_set: &TargetSet) { + match self + .sync_manager + .initiate_session(peer, target_set, &Mode::LogHeight) + .await + { + Ok(messages) => { + for message in messages { + self.send_service_message(ServiceMessage::SentReplicationMessage( + *peer, message, + )); + } + } + Err(err) => { + warn!("Replication error: {}", err) + } + } + } + + /// Handles incoming messages from other services via the bus. + async fn handle_service_message(&mut self, message: ServiceMessage) { + match message { + ServiceMessage::PeerConnected(peer) => { + self.on_connection_established(peer).await; + } + ServiceMessage::PeerDisconnected(peer) => { + self.on_connection_closed(peer).await; + } + ServiceMessage::ReceivedReplicationMessage(peer, message) => { + self.on_replication_message(peer, message).await; + } + _ => (), // Ignore all other messages + } + } + + /// Sends a message on the bus to other services. + fn send_service_message(&self, message: ServiceMessage) { + if self.tx.send(message).is_err() { + // Silently fail here as we don't care if the message was received at this + // point + } + } + + /// Main event loop running the async streams. + pub async fn run(mut self) { + loop { + tokio::select! { + event = self.rx.next() => match event { + Some(Ok(message)) => self.handle_service_message(message).await, + Some(Err(err)) => { + panic!("Service bus subscriber for connection manager loop failed: {}", err); + } + // Command channel closed, thus shutting down the network event loop + None => { + return + }, + }, + Some(_) = self.scheduler.next() => { + self.update_sessions().await + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use libp2p::swarm::ConnectionId; + use libp2p::PeerId; + use tokio::sync::broadcast; + + use crate::bus::ServiceMessage; + use crate::network::Peer; + use crate::replication::{Message, Mode, SyncMessage}; + use crate::test_utils::{test_runner, TestNode}; + + use super::ConnectionManager; + + #[test] + fn peer_lifetime() { + let local_peer_id = + PeerId::from_str("12D3KooWD3JAiSNrVGxjC7vJCcjwS8egbtJV9kzrstxLRKiwb9UY").unwrap(); + let remote_peer_id = + PeerId::from_str("12D3KooWCqtLMJQLY3sm9rpDampJ2nPLswPPZto3mrRY7794QATF").unwrap(); + + test_runner(move |node: TestNode| async move { + let (tx, mut rx) = broadcast::channel::(10); + + let mut manager = ConnectionManager::new( + &node.context.schema_provider, + &node.context.store, + &tx, + local_peer_id, + ); + + let target_set = manager.target_set().await; + + // Inform connection manager about new peer + let remote_peer = Peer::new(remote_peer_id, ConnectionId::new_unchecked(1)); + + manager + .handle_service_message(ServiceMessage::PeerConnected(remote_peer)) + .await; + + let status = manager + .peers + .get(&remote_peer) + .expect("Peer to be registered in connection manager"); + assert_eq!(manager.peers.len(), 1); + assert_eq!(status.peer, remote_peer); + + // Manager attempts a replication session with that peer + assert_eq!(rx.len(), 1); + assert_eq!( + rx.recv().await, + Ok(ServiceMessage::SentReplicationMessage( + remote_peer, + SyncMessage::new(0, Message::SyncRequest(Mode::LogHeight, target_set)) + )) + ); + assert_eq!(manager.sync_manager.get_sessions(&remote_peer).len(), 1); + + // Inform manager about peer disconnected + manager + .handle_service_message(ServiceMessage::PeerDisconnected(remote_peer)) + .await; + + // Manager cleans up internal state + assert_eq!(rx.len(), 0); + assert_eq!(manager.peers.len(), 0); + assert_eq!(manager.sync_manager.get_sessions(&remote_peer).len(), 0); + }); + } +} diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 445d72537..2a59f5bee 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -9,7 +9,7 @@ use crate::db::SqlStore; use crate::replication::errors::ReplicationError; use crate::replication::traits::Strategy; use crate::replication::{ - Message, Mode, NaiveStrategy, SetReconciliationStrategy, StrategyResult, TargetSet, + LogHeightStrategy, Message, Mode, SetReconciliationStrategy, StrategyResult, TargetSet, }; pub type SessionId = u64; @@ -57,7 +57,7 @@ impl Session { live_mode: bool, ) -> Self { let strategy: Box = match mode { - Mode::Naive => Box::new(NaiveStrategy::new(target_set)), + Mode::LogHeight => Box::new(LogHeightStrategy::new(target_set)), Mode::SetReconciliation => Box::new(SetReconciliationStrategy::new()), Mode::Unknown => panic!("Unknown replication mode"), }; @@ -74,10 +74,22 @@ impl Session { } } - pub fn live_mode(&self) -> bool { + pub fn is_live_mode(&self) -> bool { self.is_local_live_mode && self.is_remote_live_mode } + pub fn is_pending(&self) -> bool { + self.state == SessionState::Pending + } + + pub fn is_established(&self) -> bool { + self.state == SessionState::Established + } + + pub fn is_done(&self) -> bool { + self.state == SessionState::Done + } + pub fn mode(&self) -> Mode { self.strategy.mode() } @@ -175,8 +187,13 @@ mod tests { #[rstest] fn state_machine(#[from(random_target_set)] target_set: TargetSet) { test_runner(move |node: TestNode| async move { - let mut session = - Session::new(&INITIAL_SESSION_ID, &target_set, &Mode::Naive, true, false); + let mut session = Session::new( + &INITIAL_SESSION_ID, + &target_set, + &Mode::LogHeight, + true, + false, + ); assert!(!session.is_local_done); assert!(!session.is_local_live_mode); assert!(!session.is_remote_live_mode); @@ -205,8 +222,13 @@ mod tests { populate_store(&node.context.store, &config).await; let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]); - let mut session = - Session::new(&INITIAL_SESSION_ID, &target_set, &Mode::Naive, true, false); + let mut session = Session::new( + &INITIAL_SESSION_ID, + &target_set, + &Mode::LogHeight, + true, + false, + ); let response_messages = session .handle_message(&node.context.store, &Message::Have(vec![])) diff --git a/aquadoggo/src/replication/strategies/diff.rs b/aquadoggo/src/replication/strategies/diff.rs index 0956785bf..f3c7a39cc 100644 --- a/aquadoggo/src/replication/strategies/diff.rs +++ b/aquadoggo/src/replication/strategies/diff.rs @@ -1,69 +1,130 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::entry::{LogId, SeqNum}; +use std::collections::HashMap; -use crate::replication::LogHeight; +use log::trace; +use p2panda_rs::entry::{LogId, SeqNum}; +use p2panda_rs::identity::PublicKey; +use p2panda_rs::Human; + +/// Compare a remotes' log heights against our own and calculate which (if any) entries they are +/// missing. The returned tuple signifies the sequence number of a log from which the remote is +/// missing entries. +fn remote_requires_entries( + log_id: &LogId, + local_seq_num: &SeqNum, + remote_log_heights: &HashMap, +) -> Option<(LogId, SeqNum)> { + trace!("Local log height: {:?} {:?}", log_id, local_seq_num); + // Get height of the remote log by it's id. + let remote_log_height = remote_log_heights.get(log_id); + + match remote_log_height { + // If a log exists then compare heights of local and remote logs. + Some(remote_seq_num) => { + trace!("Remote log height: {:?} {:?}", log_id, remote_seq_num); + + // If the local seq num is higher the remote needs all entries higher than + // their max seq num for this log. + if local_seq_num > remote_seq_num { + // We increment the seq num as we want it to represent an inclusive lower + // bound. + // + // We can unwrap as we are incrementing the lower remote seq num which means it's + // will not reach max seq number. + let from_seq_num = remote_seq_num.clone().next().unwrap(); + + trace!( + "Remote needs entries from {:?} for {:?}", + from_seq_num, + log_id + ); + + Some((log_id.to_owned(), from_seq_num)) + } else { + trace!("Remote has all entries for {:?}", log_id); + None + } + } + // If no log exists then the remote has a log we don't know about yet and we + // return nothing. + None => { + trace!("{:?} not found on remote, all entries required", log_id); + Some((log_id.to_owned(), SeqNum::default())) + } + } +} +/// Diff a set of local and remote log heights in order to calculate which, if any, entries the +/// remote is missing. +/// +/// The returned list contains the sequence number in every log for every author from which the +/// remote is missing entries. Sending all entries from the returned sequence number which the +/// local node has stored will bring the remote node up-to-date with us. pub fn diff_log_heights( - local_log_heights: &[LogHeight], - remote_log_heights: &[LogHeight], -) -> Vec { + local_log_heights: &HashMap>, + remote_log_heights: &HashMap>, +) -> Vec<(PublicKey, Vec<(LogId, SeqNum)>)> { let mut remote_needs = Vec::new(); for (local_author, local_author_logs) in local_log_heights { - // Helper for diffing local log heights against remote log heights. - let diff_logs = |(remote_log_id, remote_seq_num): (LogId, SeqNum)| { - // Get the remote log by it's id. - let local_log = local_author_logs - .iter() - .find(|(local_log_id, _)| remote_log_id == *local_log_id); - - match local_log { - // If a log exists then compare heights of local and remote logs. - Some((log_id, local_seq_num)) => { - // If the local log is higher we increment their log id (we want all entries - // greater than or equal to this). Otherwise we return none. - if local_seq_num > &remote_seq_num { - // We can unwrap as we are incrementing the remote peers seq num here and - // this means it's will not reach max seq number. - Some((log_id.to_owned(), remote_seq_num.clone().next().unwrap())) - } else { - None - } - } - // If no log exists then the remote has never had this log and they need all - // entries from seq num 1. - None => Some((remote_log_id.to_owned(), SeqNum::default())), - } - }; + trace!( + "Local log heights: {} {:?}", + local_author.display(), + local_author_logs + ); - // Find local log for a public key sent by the remote peer. + let local_author_logs: HashMap = local_author_logs.iter().copied().collect(); + + // Find all logs sent by the remote for a public key we have locally. // - // If none is found we don't do anything as this means we are missing entries they should - // send us. - if let Some((_, remote_author_logs)) = remote_log_heights - .iter() - .find(|(remote_author, _)| remote_author == local_author) - { - // Diff our local log heights against the remote. - let remote_needs_logs: Vec<(LogId, SeqNum)> = remote_author_logs - .iter() - .copied() - .filter_map(diff_logs) - .collect(); + // If none is found we know they need everything we have by this author. + if let Some(remote_author_logs) = remote_log_heights.get(local_author) { + let remote_author_logs: HashMap = + remote_author_logs.iter().copied().collect(); + + trace!("Remote log heights: {} {:?}", local_author.display(), { + let mut logs = remote_author_logs + .clone() + .into_iter() + .collect::>(); + logs.sort(); + logs + }); + + let mut remote_needs_logs = vec![]; + + // For each log we diff the local and remote height and determine which entries, if + // any, we should send them. + for (log_id, seq_num) in local_author_logs { + if let Some(from_log_height) = + remote_requires_entries(&log_id, &seq_num, &remote_author_logs) + { + remote_needs_logs.push(from_log_height) + }; + } + + // Sort the log heights. + remote_needs_logs.sort(); // If the remote needs at least one log we push it to the remote needs. if !remote_needs_logs.is_empty() { remote_needs.push((local_author.to_owned(), remote_needs_logs)); }; } else { - remote_needs.push(( - local_author.to_owned(), - local_author_logs - .iter() - .map(|(log_id, _)| (*log_id, SeqNum::default())) - .collect(), - )); + // The author we know about locally wasn't found on the remote log heights so they + // need everything we have. + + trace!("No logs found on remote for this author"); + let mut remote_needs_logs: Vec<(LogId, SeqNum)> = local_author_logs + .keys() + .map(|log_id| (*log_id, SeqNum::default())) + .collect(); + + // Sort the log heights. + remote_needs_logs.sort(); + + remote_needs.push((local_author.to_owned(), remote_needs_logs)); } } @@ -97,8 +158,14 @@ mod tests { ], )]; - let peer_b_needs = diff_log_heights(&peer_a_log_heights, &peer_b_log_heights); - let peer_a_needs = diff_log_heights(&peer_b_log_heights, &peer_a_log_heights); + let peer_b_needs = diff_log_heights( + &peer_a_log_heights.clone().into_iter().collect(), + &peer_b_log_heights.clone().into_iter().collect(), + ); + let peer_a_needs = diff_log_heights( + &peer_b_log_heights.into_iter().collect(), + &peer_a_log_heights.into_iter().collect(), + ); assert_eq!( peer_a_needs, @@ -119,8 +186,14 @@ mod tests { )]; let peer_b_log_heights = vec![]; - let peer_b_needs = diff_log_heights(&peer_a_log_heights, &peer_b_log_heights); - let peer_a_needs = diff_log_heights(&peer_b_log_heights, &peer_a_log_heights); + let peer_b_needs = diff_log_heights( + &peer_a_log_heights.clone().into_iter().collect(), + &peer_b_log_heights.clone().into_iter().collect(), + ); + let peer_a_needs = diff_log_heights( + &peer_b_log_heights.into_iter().collect(), + &peer_a_log_heights.into_iter().collect(), + ); assert_eq!(peer_a_needs, vec![]); assert_eq!( diff --git a/aquadoggo/src/replication/strategies/naive.rs b/aquadoggo/src/replication/strategies/log_height.rs similarity index 62% rename from aquadoggo/src/replication/strategies/naive.rs rename to aquadoggo/src/replication/strategies/log_height.rs index fd0e04d4f..8493712e3 100644 --- a/aquadoggo/src/replication/strategies/naive.rs +++ b/aquadoggo/src/replication/strategies/log_height.rs @@ -1,22 +1,29 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::collections::HashMap; + use anyhow::Result; use async_trait::async_trait; +use log::debug; +use p2panda_rs::entry::traits::AsEntry; +use p2panda_rs::entry::{LogId, SeqNum}; +use p2panda_rs::identity::PublicKey; +use p2panda_rs::Human; use crate::db::SqlStore; use crate::replication::errors::ReplicationError; use crate::replication::strategies::diff_log_heights; use crate::replication::traits::Strategy; -use crate::replication::{LogHeight, Message, Mode, StrategyResult, TargetSet}; +use crate::replication::{LogHeights, Message, Mode, StrategyResult, TargetSet}; #[derive(Clone, Debug)] -pub struct NaiveStrategy { +pub struct LogHeightStrategy { target_set: TargetSet, received_remote_have: bool, sent_have: bool, } -impl NaiveStrategy { +impl LogHeightStrategy { pub fn new(target_set: &TargetSet) -> Self { Self { target_set: target_set.clone(), @@ -25,30 +32,43 @@ impl NaiveStrategy { } } - async fn local_log_heights(&self, store: &SqlStore) -> Vec { - let mut result = vec![]; + async fn local_log_heights( + &self, + store: &SqlStore, + ) -> HashMap> { + let mut log_heights: HashMap> = HashMap::new(); - // For every schema id in the target set retrieve log heights for all contributing authors - for schema_id in self.target_set().0.iter() { - let log_heights = store + for schema_id in self.target_set().iter() { + // For every schema id in the target set retrieve log heights for all contributing authors + let schema_logs = store .get_log_heights(schema_id) .await - .expect("Fatal database error"); - result.extend(log_heights); + .expect("Fatal database error") + .into_iter(); + + // Then merge them into any existing records for the author + for (public_key, logs) in schema_logs { + let mut author_logs = log_heights.get(&public_key).cloned().unwrap_or(vec![]); + author_logs.extend(logs); + author_logs.sort(); + log_heights.insert(public_key, author_logs); + } } - - result + log_heights } async fn entry_responses( &self, store: &SqlStore, - remote_log_heights: &[LogHeight], + remote_log_heights: &[LogHeights], ) -> Vec { let mut messages = Vec::new(); let local_log_heights = self.local_log_heights(store).await; - let remote_needs = diff_log_heights(&local_log_heights, remote_log_heights); + let remote_needs = diff_log_heights( + &local_log_heights, + &remote_log_heights.iter().cloned().collect(), + ); for (public_key, log_heights) in remote_needs { for (log_id, seq_num) in log_heights { @@ -58,6 +78,13 @@ impl NaiveStrategy { .expect("Fatal database error") .iter() .map(|entry| { + debug!( + "Prepare message containing entry at {:?} on {:?} for {}", + entry.seq_num(), + entry.log_id(), + entry.public_key().display() + ); + Message::Entry(entry.clone().encoded_entry, entry.payload().cloned()) }) .collect(); @@ -70,9 +97,9 @@ impl NaiveStrategy { } #[async_trait] -impl Strategy for NaiveStrategy { +impl Strategy for LogHeightStrategy { fn mode(&self) -> Mode { - Mode::Naive + Mode::LogHeight } fn target_set(&self) -> TargetSet { @@ -85,7 +112,7 @@ impl Strategy for NaiveStrategy { StrategyResult { is_local_done: log_heights.is_empty(), - messages: vec![Message::Have(log_heights)], + messages: vec![Message::Have(log_heights.into_iter().collect())], } } diff --git a/aquadoggo/src/replication/strategies/mod.rs b/aquadoggo/src/replication/strategies/mod.rs index c509580fc..705726c90 100644 --- a/aquadoggo/src/replication/strategies/mod.rs +++ b/aquadoggo/src/replication/strategies/mod.rs @@ -1,11 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-or-later mod diff; -mod naive; +mod log_height; mod set_reconciliation; pub use diff::diff_log_heights; -pub use naive::NaiveStrategy; +pub use log_height::LogHeightStrategy; pub use set_reconciliation::SetReconciliationStrategy; use crate::replication::Message; diff --git a/aquadoggo/src/replication/target_set.rs b/aquadoggo/src/replication/target_set.rs index 86cb76fe8..95ca9afcd 100644 --- a/aquadoggo/src/replication/target_set.rs +++ b/aquadoggo/src/replication/target_set.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::slice::Iter; + use p2panda_rs::schema::SchemaId; use p2panda_rs::Validate; use serde::{Deserialize, Deserializer, Serialize}; @@ -9,7 +11,7 @@ use crate::replication::errors::TargetSetError; /// De-duplicated and sorted set of schema ids which define the target data for the replication /// session. #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize)] -pub struct TargetSet(pub Vec); +pub struct TargetSet(Vec); impl TargetSet { pub fn new(schema_ids: &[SchemaId]) -> Self { @@ -24,6 +26,14 @@ impl TargetSet { // Sort schema ids to compare target sets easily deduplicated_set.sort(); + // And now sort system schema to the front of the set. + deduplicated_set.sort_by(|schema_id_a, schema_id_b| { + let is_system_schema = |schema_id: &SchemaId| -> bool { + !matches!(schema_id, SchemaId::Application(_, _)) + }; + is_system_schema(schema_id_b).cmp(&is_system_schema(schema_id_a)) + }); + Self(deduplicated_set) } @@ -40,6 +50,10 @@ impl TargetSet { Ok(target_set) } + + pub fn iter(&self) -> Iter { + self.0.iter() + } } impl Validate for TargetSet { @@ -52,13 +66,45 @@ impl Validate for TargetSet { }; let mut prev_schema_id: Option<&SchemaId> = None; + let mut initial_system_schema = true; + + // We need to validate that: + // - if system schema are included they are first in the list and ordered alphabetically + // - any following application schema are also ordered alphabetically + for (index, schema_id) in self.0.iter().enumerate() { + // If the first schema id is an application schema then no system schema should be + // included and we flip the `initial_system_schema` flag. + if index == 0 { + initial_system_schema = !matches!(schema_id, SchemaId::Application(_, _)) + } - for schema_id in &self.0 { - // Check if it is sorted, this indirectly also checks against duplicates + // Now validate the order. if let Some(prev) = prev_schema_id { - if prev >= schema_id { - return Err(TargetSetError::UnsortedSchemaIds); - } + match schema_id { + // If current and previous are application schema compare them. + SchemaId::Application(_, _) if !initial_system_schema => { + if prev >= schema_id { + return Err(TargetSetError::UnsortedSchemaIds); + } + } + // If the current is an application schema and the previous is a system schema + // flip the `initial_system_schema` flag. + SchemaId::Application(_, _) if initial_system_schema => { + initial_system_schema = false + } + // If the current is a system schema and the `initial_system_schema` flag is + // false then there is an out of order system schema. + _ if !initial_system_schema => { + return Err(TargetSetError::UnsortedSchemaIds); + } + // If current and previous are both system schema then compare them. + _ if initial_system_schema => { + if prev >= schema_id { + return Err(TargetSetError::UnsortedSchemaIds); + } + } + _ => panic!(), + }; } prev_schema_id = Some(schema_id); @@ -88,6 +134,8 @@ mod tests { use p2panda_rs::test_utils::fixtures::random_document_view_id; use rstest::rstest; + use crate::test_utils::helpers::random_target_set; + use super::TargetSet; #[rstest] @@ -120,18 +168,38 @@ mod tests { } #[rstest] - fn deserialize_unsorted_target_set() { - let unsorted_schema_ids = [ - "venues_0020c13cdc58dfc6f4ebd32992ff089db79980363144bdb2743693a019636fa72ec8", - "alpacas_00202dce4b32cd35d61cf54634b93a526df333c5ed3d93230c2f026f8d1ecabc0cd7", - ]; - let result = deserialize_into::(&serialize_value(cbor!(unsorted_schema_ids))); + #[case(vec![ + "venues_0020c13cdc58dfc6f4ebd32992ff089db79980363144bdb2743693a019636fa72ec8".to_string(), + "alpacas_00202dce4b32cd35d61cf54634b93a526df333c5ed3d93230c2f026f8d1ecabc0cd7".to_string(), + ])] + #[case(vec![ + "alpacas_00202dce4b32cd35d61cf54634b93a526df333c5ed3d93230c2f026f8d1ecabc0cd7".to_string(), + "schema_field_definition_v1".to_string(), + ])] + #[case(vec![ + "schema_field_definition_v1".to_string(), + "schema_definition_v1".to_string(), + ])] + #[case(vec![ + "schema_definition_v1".to_string(), + "alpacas_00202dce4b32cd35d61cf54634b93a526df333c5ed3d93230c2f026f8d1ecabc0cd7".to_string(), + "schema_field_definition_v1".to_string(), + ])] + fn deserialize_unsorted_target_set(#[case] schema_ids: Vec) { + let result = deserialize_into::(&serialize_value(cbor!(schema_ids))); let expected_result = ciborium::de::Error::::Semantic( None, "Target set contains unsorted or duplicate schema ids".to_string(), ); - assert_eq!(result.unwrap_err().to_string(), expected_result.to_string()); } + + #[rstest] + fn serialize(#[from(random_target_set)] target_set: TargetSet) { + assert_eq!( + deserialize_into::(&serialize_value(cbor!(target_set))).unwrap(), + target_set.clone() + ); + } } diff --git a/aquadoggo/src/test_utils/helpers.rs b/aquadoggo/src/test_utils/helpers.rs index 725d88551..2f87b3837 100644 --- a/aquadoggo/src/test_utils/helpers.rs +++ b/aquadoggo/src/test_utils/helpers.rs @@ -118,8 +118,12 @@ pub fn schema_from_fields(fields: Vec<(&str, OperationValue)>) -> Schema { #[fixture] pub fn random_target_set() -> TargetSet { - let document_view_id = random_document_view_id(); - let schema_id = - SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id); - TargetSet::new(&[schema_id]) + let system_schema_id = SchemaId::SchemaFieldDefinition(1); + let document_view_id_1 = random_document_view_id(); + let schema_id_1 = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id_1); + let document_view_id_2 = random_document_view_id(); + let schema_id_2 = + SchemaId::new_application(&SchemaName::new("events").unwrap(), &document_view_id_2); + TargetSet::new(&[system_schema_id, schema_id_1, schema_id_2]) } diff --git a/aquadoggo/src/tests.rs b/aquadoggo/src/tests.rs index 613c46bf9..57718facf 100644 --- a/aquadoggo/src/tests.rs +++ b/aquadoggo/src/tests.rs @@ -43,10 +43,7 @@ async fn e2e() { // default options. The only thing we want to do change is the database config. We want an // in-memory sqlite database for this test. - let config = Configuration { - database_url: Some("sqlite::memory:".to_string()), - ..Default::default() - }; + let config = Configuration::new_ephemeral(); // Start the node. // diff --git a/aquadoggo_cli/src/main.rs b/aquadoggo_cli/src/main.rs index 33022784a..49780fde4 100644 --- a/aquadoggo_cli/src/main.rs +++ b/aquadoggo_cli/src/main.rs @@ -129,7 +129,7 @@ impl TryFrom for Configuration { rendezvous_address: cli.rendezvous_address, rendezvous_peer_id, rendezvous_server_enabled: cli.enable_rendezvous_server, - ..NetworkConfiguration::default() + ..config.network }; Ok(config)