diff --git a/Cargo.lock b/Cargo.lock index 5bfbb95cfb2..9e103c3867a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -700,6 +700,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1433,7 +1442,7 @@ version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", + "block-buffer 0.10.4", "crypto-common", "subtle", ] @@ -2847,7 +2856,10 @@ dependencies = [ "libp2p-quic", "libp2p-swarm", "libp2p-tcp", + "libp2p-tls", "libp2p-upnp", + "libp2p-websocket", + "libp2p-websocket-websys", "libp2p-yamux", "multiaddr", "pin-project", @@ -3233,6 +3245,44 @@ dependencies = [ "void", ] +[[package]] +name = "libp2p-websocket" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4846d51afd08180e164291c3754ba30dd4fbac6fac65571be56403c16431a5e" +dependencies = [ + "either", + "futures", + "futures-rustls", + "libp2p-core", + "libp2p-identity", + "parking_lot", + "pin-project-lite", + "rw-stream-sink", + "soketto", + "tracing", + "url", + "webpki-roots", +] + +[[package]] +name = "libp2p-websocket-websys" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "550e578dcc9cd572be9dd564831d1f5efe8e6661953768b1d56c1d462855bf6f" +dependencies = [ + "bytes", + "futures", + "js-sys", + "libp2p-core", + "parking_lot", + "send_wrapper", + "thiserror", + "tracing", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "libp2p-yamux" version = "0.45.1" @@ -3622,7 +3672,7 @@ dependencies = [ [[package]] name = "mithril-end-to-end" -version = "0.4.9" +version = "0.4.10" dependencies = [ "anyhow", "async-recursion", @@ -3671,7 +3721,7 @@ dependencies = [ [[package]] name = "mithril-relay" -version = "0.1.15" +version = "0.1.16" dependencies = [ "anyhow", "clap", @@ -5345,6 +5395,12 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" +[[package]] +name = "send_wrapper" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73" + [[package]] name = "serde" version = "1.0.197" @@ -5508,6 +5564,19 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha1" version = "0.10.6" @@ -5730,6 +5799,21 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "soketto" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d1c5305e39e09653383c2c7244f2f78b3bcae37cf50c64cb4789c9f5096ec2" +dependencies = [ + "base64 0.13.1", + "bytes", + "futures", + "httparse", + "log", + "rand", + "sha-1", +] + [[package]] name = "spin" version = "0.5.2" @@ -6074,9 +6158,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.36.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -6713,6 +6797,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "widestring" version = "1.0.2" diff --git a/mithril-infra/assets/docker/docker-compose-signer-unverified-p2p.yaml b/mithril-infra/assets/docker/docker-compose-signer-unverified-p2p.yaml index c3d17877769..3ef7544dbbe 100644 --- a/mithril-infra/assets/docker/docker-compose-signer-unverified-p2p.yaml +++ b/mithril-infra/assets/docker/docker-compose-signer-unverified-p2p.yaml @@ -89,6 +89,7 @@ services: - SERVER_PORT=${SIGNER_RELAY_SERVER_PORT} - AGGREGATOR_ENDPOINT=http://${AGGREGATOR_CREDENTIALS}mithril-aggregator:8080/aggregator - DIAL_TO=/dns4/mithril-aggregator-relay/tcp/${AGGREGATOR_RELAY_LISTEN_PORT} + - SIGNER_REPEATER_DELAY=${SIGNER_RELAY_REGISTRATION_REPEATER_DELAY} ports: - "${SIGNER_RELAY_LISTEN_PORT}:${SIGNER_RELAY_LISTEN_PORT}" - "${SIGNER_RELAY_SERVER_PORT}:${SIGNER_RELAY_SERVER_PORT}" diff --git a/mithril-infra/assets/docker/docker-compose-signer-verified-p2p.yaml b/mithril-infra/assets/docker/docker-compose-signer-verified-p2p.yaml index 822171811c9..a35bf2285e5 100644 --- a/mithril-infra/assets/docker/docker-compose-signer-verified-p2p.yaml +++ b/mithril-infra/assets/docker/docker-compose-signer-verified-p2p.yaml @@ -147,6 +147,7 @@ services: - SERVER_PORT=${SIGNER_RELAY_SERVER_PORT} - AGGREGATOR_ENDPOINT=http://${AGGREGATOR_CREDENTIALS}mithril-aggregator:8080/aggregator - DIAL_TO=/dns4/mithril-aggregator-relay/tcp/${AGGREGATOR_RELAY_LISTEN_PORT} + - SIGNER_REPEATER_DELAY=${SIGNER_RELAY_REGISTRATION_REPEATER_DELAY} ports: - "${SIGNER_RELAY_LISTEN_PORT}:${SIGNER_RELAY_LISTEN_PORT}" - "${SIGNER_RELAY_SERVER_PORT}:${SIGNER_RELAY_SERVER_PORT}" diff --git a/mithril-infra/assets/infra.version b/mithril-infra/assets/infra.version index 769ed6ae790..b005e307c51 100644 --- a/mithril-infra/assets/infra.version +++ b/mithril-infra/assets/infra.version @@ -1 +1 @@ -0.2.14 +0.2.15 diff --git a/mithril-infra/mithril.signer.tf b/mithril-infra/mithril.signer.tf index 601b8e07039..9088820989d 100644 --- a/mithril-infra/mithril.signer.tf +++ b/mithril-infra/mithril.signer.tf @@ -85,6 +85,7 @@ EOT "export AGGREGATOR_RELAY_LISTEN_PORT='${local.mithril_aggregator_relay_mithril_listen_port}'", "export SIGNER_RELAY_LISTEN_PORT='${local.mithril_signers_relay_listen_port[each.key]}'", "export SIGNER_RELAY_SERVER_PORT='${local.mithril_signers_relay_server_port[each.key]}'", + "export SIGNER_RELAY_REGISTRATION_REPEATER_DELAY='${var.mithril_p2p_signer_registration_repeat_delay}'", "export ENABLE_METRICS_SERVER=true", "export METRICS_SERVER_IP=0.0.0.0", "export METRICS_SERVER_PORT=9090", diff --git a/mithril-infra/variables.tf b/mithril-infra/variables.tf index 8d215e79587..b465154ab42 100644 --- a/mithril-infra/variables.tf +++ b/mithril-infra/variables.tf @@ -178,6 +178,12 @@ variable "mithril_use_p2p_network" { default = false } +variable "mithril_p2p_signer_registration_repeat_delay" { + type = number + description = "The repeat delay in milliseconds for the signer registration when operating in P2P mode (defaults to 1 hour)" + default = 3600 * 1000 +} + locals { mithril_network_type_suffix = var.mithril_use_p2p_network ? "-p2p" : "" } diff --git a/mithril-relay/Cargo.toml b/mithril-relay/Cargo.toml index a00366a09da..020fb5f82e4 100644 --- a/mithril-relay/Cargo.toml +++ b/mithril-relay/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-relay" -version = "0.1.15" +version = "0.1.16" description = "A Mithril relay" authors = { workspace = true } edition = { workspace = true } @@ -24,7 +24,11 @@ libp2p = { version = "0.53.2", features = [ "noise", "ping", "pnet", + "quic", "tcp", + "tls", + "websocket-websys", + "websocket", "yamux", ] } mithril-common = { path = "../mithril-common", features = ["full"] } @@ -42,5 +46,5 @@ slog-bunyan = "2.5.0" slog-scope = "4.4.0" slog-term = "2.9.0" thiserror = "1.0.56" -tokio = { version = "1.35.1", features = ["full"] } +tokio = { version = "1.37.0", features = ["full"] } warp = "0.3.6" diff --git a/mithril-relay/src/commands/passive.rs b/mithril-relay/src/commands/passive.rs index 000b54fc8c6..66636ee2006 100644 --- a/mithril-relay/src/commands/passive.rs +++ b/mithril-relay/src/commands/passive.rs @@ -23,7 +23,7 @@ impl PassiveCommand { let dial_to = self.dial_to.to_owned(); let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?; - let mut relay = PassiveRelay::new(&addr).start().await?; + let mut relay = PassiveRelay::start(&addr).await?; if let Some(dial_to_address) = dial_to { relay.dial_peer(dial_to_address.clone())?; } diff --git a/mithril-relay/src/commands/signer.rs b/mithril-relay/src/commands/signer.rs index 67d7b5f32ca..b625f29cdb1 100644 --- a/mithril-relay/src/commands/signer.rs +++ b/mithril-relay/src/commands/signer.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use clap::Parser; use config::{builder::DefaultState, ConfigBuilder}; use libp2p::Multiaddr; @@ -23,6 +25,10 @@ pub struct SignerCommand { /// Aggregator endpoint URL. #[clap(long, env = "AGGREGATOR_ENDPOINT")] aggregator_endpoint: String, + + /// Interval at which a signer registration should be repeated in milliseconds (defaults to 1 hour) + #[clap(long, env = "SIGNER_REPEATER_DELAY", default_value_t = 3_600 * 1_000)] + signer_repeater_delay: u64, } impl SignerCommand { @@ -32,8 +38,15 @@ impl SignerCommand { let dial_to = self.dial_to.to_owned(); let addr: Multiaddr = format!("/ip4/0.0.0.0/tcp/{}", self.listen_port).parse()?; let aggregator_endpoint = self.aggregator_endpoint.to_owned(); - - let mut relay = SignerRelay::start(&addr, &server_port, &aggregator_endpoint).await?; + let signer_repeater_delay = Duration::from_millis(self.signer_repeater_delay); + + let mut relay = SignerRelay::start( + &addr, + &server_port, + &aggregator_endpoint, + &signer_repeater_delay, + ) + .await?; if let Some(dial_to_address) = dial_to { relay.dial_peer(dial_to_address.clone())?; } diff --git a/mithril-relay/src/lib.rs b/mithril-relay/src/lib.rs index cbeb80207b7..9d63ae7308a 100644 --- a/mithril-relay/src/lib.rs +++ b/mithril-relay/src/lib.rs @@ -5,6 +5,7 @@ mod commands; /// Peer to peer module pub mod p2p; mod relay; +mod repeater; pub use commands::Args; pub use commands::RelayCommands; @@ -12,5 +13,11 @@ pub use relay::AggregatorRelay; pub use relay::PassiveRelay; pub use relay::SignerRelay; -/// The topic name where signatures are published -pub const MITHRIL_SIGNATURES_TOPIC_NAME: &str = "mithril/signatures"; +/// The P2P topic names used by Mithril +pub mod mithril_p2p_topic { + /// The topic name where signer registrations are published + pub const SIGNERS: &str = "mithril/signers"; + + /// The topic name where signatures are published + pub const SIGNATURES: &str = "mithril/signatures"; +} diff --git a/mithril-relay/src/p2p/peer.rs b/mithril-relay/src/p2p/peer.rs index 9414642e354..c0c434cb732 100644 --- a/mithril-relay/src/p2p/peer.rs +++ b/mithril-relay/src/p2p/peer.rs @@ -1,18 +1,22 @@ #![allow(missing_docs)] use anyhow::{anyhow, Context}; use libp2p::{ - core::upgrade::Version, + core::{muxing::StreamMuxerBox, transport::dummy::DummyTransport}, futures::StreamExt, gossipsub::{self, ValidationMode}, noise, ping, swarm::{self, DialError, NetworkBehaviour}, - tcp, yamux, Multiaddr, PeerId, Swarm, Transport, + tls, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, }; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use mithril_common::{ + messages::{RegisterSignatureMessage, RegisterSignerMessage}, + StdResult, +}; +use serde::{Deserialize, Serialize}; use slog_scope::{debug, info}; use std::{collections::HashMap, time::Duration}; -use crate::{p2p::PeerError, MITHRIL_SIGNATURES_TOPIC_NAME}; +use crate::{mithril_p2p_topic, p2p::PeerError}; /// The idle connection timeout for a P2P connection const P2P_IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30); @@ -54,6 +58,16 @@ pub enum PeerEvent { /// The topic name of a P2P pubsub pub type TopicName = String; +/// The broadcast message received from a Gossip sub event +#[derive(Serialize, Deserialize)] +pub enum BroadcastMessage { + /// A signer registration message received from the Gossip sub + RegisterSigner(RegisterSignerMessage), + + /// A signature registration message received from the Gossip sub + RegisterSignature(RegisterSignatureMessage), +} + /// A peer in the P2P network pub struct Peer { topics: HashMap, @@ -75,28 +89,36 @@ impl Peer { } fn build_topics() -> HashMap { - HashMap::from([( - MITHRIL_SIGNATURES_TOPIC_NAME.into(), - gossipsub::IdentTopic::new(MITHRIL_SIGNATURES_TOPIC_NAME), - )]) + HashMap::from([ + ( + mithril_p2p_topic::SIGNATURES.into(), + gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNATURES), + ), + ( + mithril_p2p_topic::SIGNERS.into(), + gossipsub::IdentTopic::new(mithril_p2p_topic::SIGNERS), + ), + ]) } /// Start the peer pub async fn start(mut self) -> StdResult { debug!("Peer: starting..."); - let mut swarm = libp2p::SwarmBuilder::with_new_identity() + let mut swarm = SwarmBuilder::with_new_identity() .with_tokio() - .with_other_transport(|key| { - let noise_config = noise::Config::new(key).unwrap(); - let yamux_config = yamux::Config::default(); - let base_transport = - tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)); - base_transport - .upgrade(Version::V1Lazy) - .authenticate(noise_config) - .multiplex(yamux_config) - })? + .with_tcp( + Default::default(), + (tls::Config::new, noise::Config::new), + yamux::Config::default, + )? + .with_quic() + .with_other_transport(|_key| DummyTransport::<(PeerId, StreamMuxerBox)>::new())? .with_dns()? + .with_websocket( + (tls::Config::new, noise::Config::new), + yamux::Config::default, + ) + .await? .with_behaviour(|key| { let gossipsub_config = gossipsub::ConfigBuilder::default() .max_transmit_size(262144) @@ -137,11 +159,11 @@ impl Peer { Ok(self) } - /// Convert a peer event to a signature message - pub fn convert_peer_event_to_signature_message( + /// Convert a peer event to a broadcast message + pub fn convert_peer_event_to_message( &mut self, event: PeerEvent, - ) -> StdResult> { + ) -> StdResult> { match event { PeerEvent::Behaviour { event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }), @@ -189,31 +211,60 @@ impl Peer { pub fn publish_signature( &mut self, message: &RegisterSignatureMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSignature(message.to_owned()), + mithril_p2p_topic::SIGNATURES, + ) + } + + /// Publish a broadcast message on the P2P pubsub + pub fn publish_broadcast_message( + &mut self, + message: &BroadcastMessage, + topic_name: &str, ) -> StdResult { let topic = self .topics - .get(MITHRIL_SIGNATURES_TOPIC_NAME) + .get(topic_name) .ok_or(PeerError::MissingTopic()) .with_context(|| { - format!( - "Can not publish signature on invalid topic: {MITHRIL_SIGNATURES_TOPIC_NAME}" - ) + format!("Can not publish broadcast message on invalid topic: {topic_name}") })? .to_owned(); - let data = serde_json::to_vec(message) - .with_context(|| "Can not publish signature with invalid format")?; + let data = serde_json::to_vec(message).with_context(|| { + format!("Can not publish broadcast message with invalid format on topic {topic_name}") + })?; let message_id = self .swarm .as_mut() .map(|swarm| swarm.behaviour_mut().gossipsub.publish(topic, data)) .transpose() - .with_context(|| "Can not publish signature on P2P pubsub")? + .with_context(|| { + format!("Can not publish broadcast message on {topic_name} P2P pubsub") + })? .ok_or(PeerError::UnavailableSwarm()) - .with_context(|| "Can not publish signature without swarm")?; + .with_context(|| { + format!( + "Can not publish broadcast message on {topic_name} P2P pubsub without swarm" + ) + })?; + Ok(message_id.to_owned()) } + /// Publish a signer registration on the P2P pubsub + pub fn publish_signer_registration( + &mut self, + message: &RegisterSignerMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSigner(message.to_owned()), + mithril_p2p_topic::SIGNERS, + ) + } + /// Connect to a remote peer pub fn dial(&mut self, addr: Multiaddr) -> StdResult<()> { debug!("Peer: dialing to"; "address" => format!("{addr:?}"), "local_peer_id" => format!("{:?}", self.local_peer_id())); diff --git a/mithril-relay/src/relay/aggregator.rs b/mithril-relay/src/relay/aggregator.rs index 80f19673fec..380c62c1230 100644 --- a/mithril-relay/src/relay/aggregator.rs +++ b/mithril-relay/src/relay/aggregator.rs @@ -1,7 +1,10 @@ -use crate::p2p::{Peer, PeerEvent}; +use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; use anyhow::anyhow; use libp2p::Multiaddr; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use mithril_common::{ + messages::{RegisterSignatureMessage, RegisterSignerMessage}, + StdResult, +}; use reqwest::StatusCode; use slog_scope::{error, info}; @@ -48,25 +51,68 @@ impl AggregatorRelay { } } + async fn notify_signer_to_aggregator( + &self, + signer_message: &RegisterSignerMessage, + ) -> StdResult<()> { + let response = reqwest::Client::new() + .post(format!("{}/register-signer", self.aggregator_endpoint)) + .json(signer_message) + //.header(MITHRIL_API_VERSION_HEADER, "0.1.13") // TODO: retrieve current version + .send() + .await; + match response { + Ok(response) => match response.status() { + StatusCode::CREATED => { + info!("Relay aggregator: sent successfully signer registration message to aggregator"; "signer_message" => format!("{:#?}", signer_message)); + Ok(()) + } + status => { + error!("Relay aggregator: Post `/register-signer` should have returned a 201 status code, got: {status}"); + Err(anyhow!("Post `/register-signer` should have returned a 201 status code, got: {status}")) + } + }, + Err(err) => { + error!("Relay aggregator: Post `/register-signer` failed: {err:?}"); + Err(anyhow!("Post `/register-signer` failed: {err:?}")) + } + } + } + /// Tick the aggregator relay pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { - if let Ok(Some(signature_message_received)) = self - .peer - .convert_peer_event_to_signature_message(peer_event) - { - let retry_max = 3; - let mut retry_count = 0; - while let Err(e) = self - .notify_signature_to_aggregator(&signature_message_received) - .await - { - retry_count += 1; - if retry_count >= retry_max { - error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}")); - return Err(e); + match self.peer.convert_peer_event_to_message(peer_event) { + Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { + let retry_max = 3; + let mut retry_count = 0; + while let Err(e) = self + .notify_signer_to_aggregator(&signer_message_received) + .await + { + retry_count += 1; + if retry_count >= retry_max { + error!("Relay aggregator: failed to send signer registration message to aggregator after {retry_count} attempts"; "signer_message" => format!("{:#?}", signer_message_received), "error" => format!("{e:?}")); + return Err(e); + } + } + } + Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { + let retry_max = 3; + let mut retry_count = 0; + while let Err(e) = self + .notify_signature_to_aggregator(&signature_message_received) + .await + { + retry_count += 1; + if retry_count >= retry_max { + error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}")); + return Err(e); + } } } + Ok(None) => {} + Err(e) => return Err(e), } } diff --git a/mithril-relay/src/relay/passive.rs b/mithril-relay/src/relay/passive.rs index 4758267f00e..66f6ba37157 100644 --- a/mithril-relay/src/relay/passive.rs +++ b/mithril-relay/src/relay/passive.rs @@ -1,6 +1,6 @@ -use crate::p2p::{Peer, PeerBehaviourEvent, PeerEvent}; -use libp2p::{gossipsub, Multiaddr}; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; +use libp2p::Multiaddr; +use mithril_common::StdResult; use slog_scope::{debug, info}; /// A passive relay @@ -11,44 +11,35 @@ pub struct PassiveRelay { } impl PassiveRelay { - /// Create a passive relay - /// TODO: should be replaced by Self::start(...) - pub fn new(addr: &Multiaddr) -> Self { - Self { - peer: Peer::new(addr), - } - } - /// Start a passive relay - pub async fn start(self) -> StdResult { + pub async fn start(addr: &Multiaddr) -> StdResult { debug!("PassiveRelay: starting..."); Ok(Self { - peer: self.peer.start().await?, + peer: Peer::new(addr).start().await?, }) } - /// Convert event to signature message + /// Convert event to broadcast message /// TODO: should be removed - pub fn convert_event( + pub fn convert_peer_event_to_message( &mut self, event: PeerEvent, - ) -> StdResult> { - match event { - PeerEvent::Behaviour { - event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }), - } => Ok(Some(serde_json::from_slice(&message.data)?)), - _ => Ok(None), - } + ) -> StdResult> { + self.peer.convert_peer_event_to_message(event) } /// Tick the passive relay pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { - if let Ok(Some(signature_message_received)) = self - .peer - .convert_peer_event_to_signature_message(peer_event) - { - info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received)); + match self.peer.convert_peer_event_to_message(peer_event) { + Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { + info!("Relay passive: received signer registration message from P2P network"; "signer_message" => format!("{:#?}", signer_message_received)); + } + Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { + info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received)); + } + Ok(None) => {} + Err(e) => return Err(e), } } diff --git a/mithril-relay/src/relay/signer.rs b/mithril-relay/src/relay/signer.rs index a0859b6bd5f..a5d71c529a5 100644 --- a/mithril-relay/src/relay/signer.rs +++ b/mithril-relay/src/relay/signer.rs @@ -1,12 +1,15 @@ -use crate::p2p::{Peer, PeerEvent}; +use crate::{ + p2p::{Peer, PeerEvent}, + repeater::MessageRepeater, +}; use libp2p::Multiaddr; use mithril_common::{ - messages::RegisterSignatureMessage, + messages::{RegisterSignatureMessage, RegisterSignerMessage}, test_utils::test_http_server::{test_http_server_with_socket_address, TestHttpServer}, StdResult, }; use slog_scope::{debug, info}; -use std::net::SocketAddr; +use std::{net::SocketAddr, sync::Arc, time::Duration}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use warp::Filter; @@ -15,6 +18,8 @@ pub struct SignerRelay { server: TestHttpServer, peer: Peer, signature_rx: UnboundedReceiver, + signer_rx: UnboundedReceiver, + signer_repeater: Arc>, } impl SignerRelay { @@ -23,24 +28,41 @@ impl SignerRelay { address: &Multiaddr, server_port: &u16, aggregator_endpoint: &str, + signer_repeater_delay: &Duration, ) -> StdResult { debug!("SignerRelay: starting..."); let (signature_tx, signature_rx) = unbounded_channel::(); + let (signer_tx, signer_rx) = unbounded_channel::(); + let signer_repeater = Arc::new(MessageRepeater::new( + signer_tx.clone(), + signer_repeater_delay.to_owned(), + )); let peer = Peer::new(address).start().await?; - let server = Self::start_http_server(server_port, aggregator_endpoint, signature_tx).await; + let server = Self::start_http_server( + server_port, + aggregator_endpoint, + signer_tx, + signature_tx, + signer_repeater.clone(), + ) + .await; info!("SignerRelay: listening on"; "address" => format!("{:?}", server.address())); Ok(Self { server, peer, signature_rx, + signer_rx, + signer_repeater, }) } async fn start_http_server( server_port: &u16, aggregator_endpoint: &str, + signer_tx: UnboundedSender, signature_tx: UnboundedSender, + signer_repeater: Arc>, ) -> TestHttpServer { test_http_server_with_socket_address( warp::path("register-signatures") @@ -51,9 +73,8 @@ impl SignerRelay { .or(warp::path("register-signer") .and(warp::post()) .and(warp::body::json()) - .and(middlewares::with_aggregator_endpoint( - aggregator_endpoint.to_string(), - )) + .and(middlewares::with_transmitter(signer_tx)) + .and(middlewares::with_repeater(signer_repeater.clone())) .and_then(handlers::register_signer_handler)) .or(warp::path("epoch-settings") .and(warp::get()) @@ -82,11 +103,25 @@ impl SignerRelay { Ok(()) } None => { - debug!("SignerRelay: no message available"); + debug!("SignerRelay: no signature message available"); + Ok(()) + } + } + }, + message = self.signer_rx.recv() => { + match message { + Some(signer_message) => { + info!("SignerRelay: publish signer-registration to p2p network"; "message" => format!("{signer_message:#?}")); + self.peer.publish_signer_registration(&signer_message)?; + Ok(()) + } + None => { + debug!("SignerRelay: no signer message available"); Ok(()) } } }, + _ = self.signer_repeater.repeat_message() => {Ok(())}, _event = self.peer.tick_swarm() => {Ok(())} } } @@ -119,16 +154,24 @@ impl SignerRelay { } mod middlewares { - use std::convert::Infallible; + use std::{convert::Infallible, fmt::Debug, sync::Arc}; use tokio::sync::mpsc::UnboundedSender; use warp::Filter; + use crate::repeater::MessageRepeater; + pub fn with_transmitter( tx: UnboundedSender, ) -> impl Filter,), Error = Infallible> + Clone { warp::any().map(move || tx.clone()) } + pub fn with_repeater( + repeater: Arc>, + ) -> impl Filter>,), Error = Infallible> + Clone { + warp::any().map(move || repeater.clone()) + } + pub fn with_aggregator_endpoint( aggregator_endpoint: String, ) -> impl Filter + Clone { @@ -140,21 +183,30 @@ mod handlers { use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage}; use reqwest::{Error, Response}; use slog_scope::debug; - use std::convert::Infallible; + use std::{convert::Infallible, sync::Arc}; use tokio::sync::mpsc::UnboundedSender; use warp::http::StatusCode; + use crate::repeater; + pub async fn register_signer_handler( register_signer_message: RegisterSignerMessage, - aggregator_endpoint: String, + tx: UnboundedSender, + repeater: Arc>, ) -> Result { debug!("SignerRelay: serve HTTP route /register-signer"; "register_signer_message" => format!("{register_signer_message:#?}")); - let response = reqwest::Client::new() - .post(format!("{aggregator_endpoint}/register-signer")) - .json(®ister_signer_message) - .send() - .await; - reply_response(response).await + + repeater.set_message(register_signer_message.clone()).await; + match tx.send(register_signer_message) { + Ok(_) => Ok(Box::new(warp::reply::with_status( + "".to_string(), + StatusCode::CREATED, + ))), + Err(err) => Ok(Box::new(warp::reply::with_status( + format!("{err:?}"), + StatusCode::INTERNAL_SERVER_ERROR, + ))), + } } pub async fn register_signatures_handler( diff --git a/mithril-relay/src/repeater.rs b/mithril-relay/src/repeater.rs new file mode 100644 index 00000000000..d94d7a0f3c7 --- /dev/null +++ b/mithril-relay/src/repeater.rs @@ -0,0 +1,128 @@ +use anyhow::anyhow; +use mithril_common::StdResult; +use slog_scope::debug; +use std::{fmt::Debug, sync::Arc, time::Duration}; +use tokio::{ + sync::{mpsc::UnboundedSender, Mutex}, + time::Instant, +}; + +/// A message repeater will send a copy of the message to a channel at a given frequency +pub struct MessageRepeater { + message: Arc>>, + tx_message: UnboundedSender, + delay: Duration, + next_repeat_at: Arc>>, +} + +impl MessageRepeater { + /// Factory for MessageRepeater + pub fn new(tx_message: UnboundedSender, delay: Duration) -> Self { + Self { + message: Arc::new(Mutex::new(None)), + tx_message, + delay, + next_repeat_at: Arc::new(Mutex::new(None)), + } + } + + async fn reset_next_repeat_at(&self) { + debug!("MessageRepeater: reset next_repeat_at"); + *self.next_repeat_at.lock().await = Some(Instant::now() + self.delay); + } + + /// Set the message to repeat + pub async fn set_message(&self, message: M) { + debug!("MessageRepeater: set message"; "message" => format!("{:#?}", message)); + *self.message.lock().await = Some(message); + self.reset_next_repeat_at().await; + } + + /// Start repeating the message if any + pub async fn repeat_message(&self) -> StdResult<()> { + let wait_delay = match self.next_repeat_at.lock().await.as_ref() { + None => self.delay, + Some(next_repeat_at) => next_repeat_at + .checked_duration_since(Instant::now()) + .unwrap_or_default(), + }; + tokio::time::sleep(wait_delay).await; + match self.message.lock().await.as_ref() { + Some(message) => { + debug!("MessageRepeater: repeat message"; "message" => format!("{:#?}", message)); + self.tx_message + .send(message.clone()) + .map_err(|e| anyhow!(e))? + } + None => { + debug!("MessageRepeater: no message to repeat"); + } + } + self.reset_next_repeat_at().await; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use tokio::{sync::mpsc, time}; + + use super::*; + + #[tokio::test] + async fn should_repeat_message_when_exists() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let delay = Duration::from_millis(100); + let repeater = MessageRepeater::new(tx, delay); + + let message = "Hello, world!"; + repeater.set_message(message.to_string()).await; + repeater.repeat_message().await.unwrap(); + + let received = rx.recv().await.unwrap(); + assert_eq!(message, received); + } + + #[tokio::test] + async fn should_repeat_message_when_exists_with_expected_delay() { + let (tx, _rx) = mpsc::unbounded_channel(); + let delay = Duration::from_secs(1); + let repeater = MessageRepeater::new(tx, delay); + + let message = "Hello, world!"; + repeater.set_message(message.to_string()).await; + + let result = tokio::select! { + _ = time::sleep(delay - Duration::from_millis(100)) => {Err(anyhow!("Timeout"))} + _ = repeater.repeat_message() => {Ok(())} + }; + + result.expect_err("should have timed out"); + } + + #[tokio::test] + async fn should_do_nothing_when_message_not_exists() { + let (tx, rx) = mpsc::unbounded_channel::(); + let delay = Duration::from_millis(100); + let repeater = MessageRepeater::new(tx, delay); + + repeater.repeat_message().await.unwrap(); + + assert!(rx.is_empty()); + } + + #[tokio::test] + async fn should_do_nothing_when_message_not_exists_with_expected_delay() { + let (tx, _rx) = mpsc::unbounded_channel::(); + let delay = Duration::from_secs(1); + let repeater = MessageRepeater::new(tx, delay); + + let result = tokio::select! { + _ = time::sleep(delay - Duration::from_millis(100)) => {Err(anyhow!("Timeout"))} + _ = repeater.repeat_message() => {Ok(())} + }; + + result.expect_err("should have timed out"); + } +} diff --git a/mithril-relay/tests/tests.rs b/mithril-relay/tests/register_signer_signature.rs similarity index 56% rename from mithril-relay/tests/tests.rs rename to mithril-relay/tests/register_signer_signature.rs index e738092e258..37ae694dd80 100644 --- a/mithril-relay/tests/tests.rs +++ b/mithril-relay/tests/register_signer_signature.rs @@ -1,15 +1,20 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use libp2p::{gossipsub, Multiaddr}; -use mithril_common::messages::RegisterSignatureMessage; -use mithril_relay::{p2p::PeerBehaviourEvent, p2p::PeerEvent, PassiveRelay, SignerRelay}; +use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage}; +use mithril_relay::{ + p2p::{BroadcastMessage, PeerBehaviourEvent, PeerEvent}, + PassiveRelay, SignerRelay, +}; use reqwest::StatusCode; use slog::{Drain, Level, Logger}; -use slog_scope::info; +use slog_scope::{error, info}; // Launch a relay that connects to P2P network. The relay is a peer in the P2P -// network. The relay sends some signatures that must be received by other +// network. The relay sends some signer registrations that must be received by other // relays. +// TODO: this test is not optimal and should be refactored for better performances, +// handling a variable number of peers and with test extensions to avoid code duplication fn build_logger(log_level: Level) -> Logger { let decorator = slog_term::TermDecorator::new().build(); @@ -21,7 +26,7 @@ fn build_logger(log_level: Level) -> Logger { } #[tokio::test] -async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { +async fn should_receive_registrations_from_signers_when_subscribed_to_pubsub() { let log_level = Level::Info; let _guard = slog_scope::set_global_logger(build_logger(log_level)); @@ -29,16 +34,21 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { let total_peers = 1 + total_p2p_client; let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); let server_port = 0; - let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); // TODO: to implement with test http server - let mut signer_relay = SignerRelay::start(&addr, &server_port, &aggregator_endpoint) - .await - .expect("Relay start failed"); + let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); + let signer_repeater_delay = Duration::from_secs(100); + let mut signer_relay = SignerRelay::start( + &addr, + &server_port, + &aggregator_endpoint, + &signer_repeater_delay, + ) + .await + .expect("Relay start failed"); let relay_address = signer_relay.address(); let relay_peer_address = signer_relay.peer_address().unwrap(); info!("Test: relay_address is '{relay_address:?}'"); - let mut p2p_client1 = PassiveRelay::new(&addr) - .start() + let mut p2p_client1 = PassiveRelay::start(&addr) .await .expect("P2P client start failed"); p2p_client1 @@ -46,8 +56,7 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { .dial(relay_peer_address.clone()) .expect("P2P client dial to the relay should not fail"); - let mut p2p_client2 = PassiveRelay::new(&addr) - .start() + let mut p2p_client2 = PassiveRelay::start(&addr) .await .expect("P2P client start failed"); p2p_client2 @@ -94,6 +103,61 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { } } + let signer_relay_thread = tokio::spawn(async move { + loop { + if let Err(err) = signer_relay.tick().await { + error!("RelaySigner: tick error"; "error" => format!("{err:#?}")); + } + } + }); + + info!("Test: send a signer registration to the relay via HTTP gateway"); + let mut signer_message_sent = RegisterSignerMessage::dummy(); + signer_message_sent.party_id = format!("{}-new", signer_message_sent.party_id); + let response = reqwest::Client::new() + .post(format!("http://{}/register-signer", relay_address)) + .json(&signer_message_sent) + .send() + .await; + match response { + Ok(response) => { + match response.status() { + StatusCode::CREATED => {} + status => { + panic!("Post `/register-signer` should have returned a 201 status code, got: {status}") + } + } + } + Err(err) => panic!("Post `/register-signer` failed: {err:?}"), + } + + info!("Test: wait for P2P clients to receive the signer registrations"); + let mut total_peers_has_received_message = 0; + loop { + tokio::select! { + event = p2p_client1.tick_peer() => { + if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client1.convert_peer_event_to_message(event.unwrap().unwrap()) + { + info!("Test: client1 consumed signer registration: {signer_message_received:#?}"); + assert_eq!(signer_message_sent, signer_message_received); + total_peers_has_received_message += 1 + } + } + event = p2p_client2.tick_peer() => { + if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client2.convert_peer_event_to_message(event.unwrap().unwrap()) + { + info!("Test: client2 consumed signer registration: {signer_message_received:#?}"); + assert_eq!(signer_message_sent, signer_message_received); + total_peers_has_received_message += 1 + } + } + } + if total_peers_has_received_message == total_p2p_client { + info!("Test: All P2P clients have consumed the signer registration"); + break; + } + } + info!("Test: send a signature to the relay via HTTP gateway"); let mut signature_message_sent = RegisterSignatureMessage::dummy(); signature_message_sent.party_id = format!("{}-new", signature_message_sent.party_id); @@ -116,11 +180,8 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { let mut total_peers_has_received_message = 0; loop { tokio::select! { - _event = signer_relay.tick() => { - - }, event = p2p_client1.tick_peer() => { - if let Ok(Some(signature_message_received)) = p2p_client1.convert_event(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client1.convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client1 consumed signature: {signature_message_received:#?}"); assert_eq!(signature_message_sent, signature_message_received); @@ -128,7 +189,7 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { } } event = p2p_client2.tick_peer() => { - if let Ok(Some(signature_message_received)) = p2p_client2.convert_event(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client2.convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client2 consumed signature: {signature_message_received:#?}"); assert_eq!(signature_message_sent, signature_message_received); @@ -136,10 +197,11 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { } } } - let _ = signer_relay.tick_peer().await.unwrap(); if total_peers_has_received_message == total_p2p_client { info!("Test: All P2P clients have consumed the signature"); break; } } + + signer_relay_thread.abort(); } diff --git a/mithril-test-lab/mithril-end-to-end/Cargo.toml b/mithril-test-lab/mithril-end-to-end/Cargo.toml index 3a19b0f9f3f..9e399131116 100644 --- a/mithril-test-lab/mithril-end-to-end/Cargo.toml +++ b/mithril-test-lab/mithril-end-to-end/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-end-to-end" -version = "0.4.9" +version = "0.4.10" authors = { workspace = true } edition = { workspace = true } documentation = { workspace = true } diff --git a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs index 92d383bd234..c4c22f77373 100644 --- a/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs +++ b/mithril-test-lab/mithril-end-to-end/src/mithril/relay_signer.rs @@ -31,6 +31,7 @@ impl RelaySigner { ("SERVER_PORT", server_port_str.as_str()), ("AGGREGATOR_ENDPOINT", aggregator_endpoint), ("DIAL_TO", &dial_to), + ("SIGNER_REPEATER_DELAY", "100"), ]); let args = vec!["-vvv", "signer"];