diff --git a/mithril-relay/src/lib.rs b/mithril-relay/src/lib.rs index cbeb80207b..f08b4b8956 100644 --- a/mithril-relay/src/lib.rs +++ b/mithril-relay/src/lib.rs @@ -12,5 +12,8 @@ pub use relay::AggregatorRelay; pub use relay::PassiveRelay; pub use relay::SignerRelay; +/// The topic name where signer registrations are published +pub const MITHRIL_SIGNERS_TOPIC_NAME: &str = "mithril/signers"; + /// The topic name where signatures are published pub const MITHRIL_SIGNATURES_TOPIC_NAME: &str = "mithril/signatures"; diff --git a/mithril-relay/src/p2p/peer.rs b/mithril-relay/src/p2p/peer.rs index 9414642e35..6e629d0400 100644 --- a/mithril-relay/src/p2p/peer.rs +++ b/mithril-relay/src/p2p/peer.rs @@ -8,11 +8,15 @@ use libp2p::{ swarm::{self, DialError, NetworkBehaviour}, tcp, yamux, Multiaddr, PeerId, Swarm, Transport, }; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use mithril_common::{ + messages::{RegisterSignatureMessage, RegisterSignerMessage}, + StdResult, +}; +use serde::{Deserialize, Serialize}; use slog_scope::{debug, info}; use std::{collections::HashMap, time::Duration}; -use crate::{p2p::PeerError, MITHRIL_SIGNATURES_TOPIC_NAME}; +use crate::{p2p::PeerError, MITHRIL_SIGNATURES_TOPIC_NAME, MITHRIL_SIGNERS_TOPIC_NAME}; /// The idle connection timeout for a P2P connection const P2P_IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30); @@ -54,6 +58,16 @@ pub enum PeerEvent { /// The topic name of a P2P pubsub pub type TopicName = String; +/// The broadcast message received from a Gossip sub event +#[derive(Serialize, Deserialize)] +pub enum BroadcastMessage { + /// A signer registration message received from the Gossip sub + RegisterSigner(RegisterSignerMessage), + + /// A signature registration message received from the Gossip sub + RegisterSignature(RegisterSignatureMessage), +} + /// A peer in the P2P network pub struct Peer { topics: HashMap, @@ -75,10 +89,16 @@ impl Peer { } fn build_topics() -> HashMap { - HashMap::from([( - MITHRIL_SIGNATURES_TOPIC_NAME.into(), - gossipsub::IdentTopic::new(MITHRIL_SIGNATURES_TOPIC_NAME), - )]) + HashMap::from([ + ( + MITHRIL_SIGNATURES_TOPIC_NAME.into(), + gossipsub::IdentTopic::new(MITHRIL_SIGNATURES_TOPIC_NAME), + ), + ( + MITHRIL_SIGNERS_TOPIC_NAME.into(), + gossipsub::IdentTopic::new(MITHRIL_SIGNERS_TOPIC_NAME), + ), + ]) } /// Start the peer @@ -137,11 +157,11 @@ impl Peer { Ok(self) } - /// Convert a peer event to a signature message - pub fn convert_peer_event_to_signature_message( + /// Convert a peer event to a broadcast message + pub fn convert_peer_event_to_message( &mut self, event: PeerEvent, - ) -> StdResult> { + ) -> StdResult> { match event { PeerEvent::Behaviour { event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }), @@ -189,31 +209,60 @@ impl Peer { pub fn publish_signature( &mut self, message: &RegisterSignatureMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSignature(message.to_owned()), + MITHRIL_SIGNATURES_TOPIC_NAME, + ) + } + + /// Publish a broadcast message on the P2P pubsub + pub fn publish_broadcast_message( + &mut self, + message: &BroadcastMessage, + topic_name: &str, ) -> StdResult { let topic = self .topics - .get(MITHRIL_SIGNATURES_TOPIC_NAME) + .get(topic_name) .ok_or(PeerError::MissingTopic()) .with_context(|| { - format!( - "Can not publish signature on invalid topic: {MITHRIL_SIGNATURES_TOPIC_NAME}" - ) + format!("Can not publish broadcast message on invalid topic: {topic_name}") })? .to_owned(); - let data = serde_json::to_vec(message) - .with_context(|| "Can not publish signature with invalid format")?; + let data = serde_json::to_vec(message).with_context(|| { + format!("Can not publish broadcast message with invalid format on topic {topic_name}") + })?; let message_id = self .swarm .as_mut() .map(|swarm| swarm.behaviour_mut().gossipsub.publish(topic, data)) .transpose() - .with_context(|| "Can not publish signature on P2P pubsub")? + .with_context(|| { + format!("Can not publish broadcast message on {topic_name} P2P pubsub") + })? .ok_or(PeerError::UnavailableSwarm()) - .with_context(|| "Can not publish signature without swarm")?; + .with_context(|| { + format!( + "Can not publish broadcast message on {topic_name} P2P pubsub without swarm" + ) + })?; + Ok(message_id.to_owned()) } + /// Publish a signer registration on the P2P pubsub + pub fn publish_signer( + &mut self, + message: &RegisterSignerMessage, + ) -> StdResult { + self.publish_broadcast_message( + &BroadcastMessage::RegisterSigner(message.to_owned()), + MITHRIL_SIGNERS_TOPIC_NAME, + ) + } + /// Connect to a remote peer pub fn dial(&mut self, addr: Multiaddr) -> StdResult<()> { debug!("Peer: dialing to"; "address" => format!("{addr:?}"), "local_peer_id" => format!("{:?}", self.local_peer_id())); diff --git a/mithril-relay/src/relay/aggregator.rs b/mithril-relay/src/relay/aggregator.rs index 80f19673fe..380c62c123 100644 --- a/mithril-relay/src/relay/aggregator.rs +++ b/mithril-relay/src/relay/aggregator.rs @@ -1,7 +1,10 @@ -use crate::p2p::{Peer, PeerEvent}; +use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; use anyhow::anyhow; use libp2p::Multiaddr; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use mithril_common::{ + messages::{RegisterSignatureMessage, RegisterSignerMessage}, + StdResult, +}; use reqwest::StatusCode; use slog_scope::{error, info}; @@ -48,25 +51,68 @@ impl AggregatorRelay { } } + async fn notify_signer_to_aggregator( + &self, + signer_message: &RegisterSignerMessage, + ) -> StdResult<()> { + let response = reqwest::Client::new() + .post(format!("{}/register-signer", self.aggregator_endpoint)) + .json(signer_message) + //.header(MITHRIL_API_VERSION_HEADER, "0.1.13") // TODO: retrieve current version + .send() + .await; + match response { + Ok(response) => match response.status() { + StatusCode::CREATED => { + info!("Relay aggregator: sent successfully signer registration message to aggregator"; "signer_message" => format!("{:#?}", signer_message)); + Ok(()) + } + status => { + error!("Relay aggregator: Post `/register-signer` should have returned a 201 status code, got: {status}"); + Err(anyhow!("Post `/register-signer` should have returned a 201 status code, got: {status}")) + } + }, + Err(err) => { + error!("Relay aggregator: Post `/register-signer` failed: {err:?}"); + Err(anyhow!("Post `/register-signer` failed: {err:?}")) + } + } + } + /// Tick the aggregator relay pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { - if let Ok(Some(signature_message_received)) = self - .peer - .convert_peer_event_to_signature_message(peer_event) - { - let retry_max = 3; - let mut retry_count = 0; - while let Err(e) = self - .notify_signature_to_aggregator(&signature_message_received) - .await - { - retry_count += 1; - if retry_count >= retry_max { - error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}")); - return Err(e); + match self.peer.convert_peer_event_to_message(peer_event) { + Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { + let retry_max = 3; + let mut retry_count = 0; + while let Err(e) = self + .notify_signer_to_aggregator(&signer_message_received) + .await + { + retry_count += 1; + if retry_count >= retry_max { + error!("Relay aggregator: failed to send signer registration message to aggregator after {retry_count} attempts"; "signer_message" => format!("{:#?}", signer_message_received), "error" => format!("{e:?}")); + return Err(e); + } + } + } + Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { + let retry_max = 3; + let mut retry_count = 0; + while let Err(e) = self + .notify_signature_to_aggregator(&signature_message_received) + .await + { + retry_count += 1; + if retry_count >= retry_max { + error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}")); + return Err(e); + } } } + Ok(None) => {} + Err(e) => return Err(e), } } diff --git a/mithril-relay/src/relay/passive.rs b/mithril-relay/src/relay/passive.rs index 4758267f00..ebf89cde87 100644 --- a/mithril-relay/src/relay/passive.rs +++ b/mithril-relay/src/relay/passive.rs @@ -1,6 +1,6 @@ -use crate::p2p::{Peer, PeerBehaviourEvent, PeerEvent}; -use libp2p::{gossipsub, Multiaddr}; -use mithril_common::{messages::RegisterSignatureMessage, StdResult}; +use crate::p2p::{BroadcastMessage, Peer, PeerEvent}; +use libp2p::Multiaddr; +use mithril_common::StdResult; use slog_scope::{debug, info}; /// A passive relay @@ -27,28 +27,27 @@ impl PassiveRelay { }) } - /// Convert event to signature message + /// Convert event to broadcast message /// TODO: should be removed - pub fn convert_event( + pub fn convert_peer_event_to_message( &mut self, event: PeerEvent, - ) -> StdResult> { - match event { - PeerEvent::Behaviour { - event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }), - } => Ok(Some(serde_json::from_slice(&message.data)?)), - _ => Ok(None), - } + ) -> StdResult> { + self.peer.convert_peer_event_to_message(event) } /// Tick the passive relay pub async fn tick(&mut self) -> StdResult<()> { if let Some(peer_event) = self.peer.tick_swarm().await? { - if let Ok(Some(signature_message_received)) = self - .peer - .convert_peer_event_to_signature_message(peer_event) - { - info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received)); + match self.peer.convert_peer_event_to_message(peer_event) { + Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => { + info!("Relay passive: received signer registration message from P2P network"; "signer_message" => format!("{:#?}", signer_message_received)); + } + Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => { + info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received)); + } + Ok(None) => {} + Err(e) => return Err(e), } } diff --git a/mithril-relay/src/relay/signer.rs b/mithril-relay/src/relay/signer.rs index a0859b6bd5..6b471dddc4 100644 --- a/mithril-relay/src/relay/signer.rs +++ b/mithril-relay/src/relay/signer.rs @@ -1,7 +1,7 @@ use crate::p2p::{Peer, PeerEvent}; use libp2p::Multiaddr; use mithril_common::{ - messages::RegisterSignatureMessage, + messages::{RegisterSignatureMessage, RegisterSignerMessage}, test_utils::test_http_server::{test_http_server_with_socket_address, TestHttpServer}, StdResult, }; @@ -15,6 +15,7 @@ pub struct SignerRelay { server: TestHttpServer, peer: Peer, signature_rx: UnboundedReceiver, + signer_rx: UnboundedReceiver, } impl SignerRelay { @@ -26,20 +27,25 @@ impl SignerRelay { ) -> StdResult { debug!("SignerRelay: starting..."); let (signature_tx, signature_rx) = unbounded_channel::(); + let (signer_tx, signer_rx) = unbounded_channel::(); let peer = Peer::new(address).start().await?; - let server = Self::start_http_server(server_port, aggregator_endpoint, signature_tx).await; + let server = + Self::start_http_server(server_port, aggregator_endpoint, signer_tx, signature_tx) + .await; info!("SignerRelay: listening on"; "address" => format!("{:?}", server.address())); Ok(Self { server, peer, signature_rx, + signer_rx, }) } async fn start_http_server( server_port: &u16, aggregator_endpoint: &str, + signer_tx: UnboundedSender, signature_tx: UnboundedSender, ) -> TestHttpServer { test_http_server_with_socket_address( @@ -51,9 +57,7 @@ impl SignerRelay { .or(warp::path("register-signer") .and(warp::post()) .and(warp::body::json()) - .and(middlewares::with_aggregator_endpoint( - aggregator_endpoint.to_string(), - )) + .and(middlewares::with_transmitter(signer_tx)) .and_then(handlers::register_signer_handler)) .or(warp::path("epoch-settings") .and(warp::get()) @@ -82,7 +86,20 @@ impl SignerRelay { Ok(()) } None => { - debug!("SignerRelay: no message available"); + debug!("SignerRelay: no signature message available"); + Ok(()) + } + } + }, + message = self.signer_rx.recv() => { + match message { + Some(signer_message) => { + info!("SignerRelay: publish signer-registration to p2p network"; "message" => format!("{signer_message:#?}")); + self.peer.publish_signer(&signer_message)?; + Ok(()) + } + None => { + debug!("SignerRelay: no signer message available"); Ok(()) } } @@ -146,15 +163,19 @@ mod handlers { pub async fn register_signer_handler( register_signer_message: RegisterSignerMessage, - aggregator_endpoint: String, + tx: UnboundedSender, ) -> Result { debug!("SignerRelay: serve HTTP route /register-signer"; "register_signer_message" => format!("{register_signer_message:#?}")); - let response = reqwest::Client::new() - .post(format!("{aggregator_endpoint}/register-signer")) - .json(®ister_signer_message) - .send() - .await; - reply_response(response).await + match tx.send(register_signer_message) { + Ok(_) => Ok(Box::new(warp::reply::with_status( + "".to_string(), + StatusCode::CREATED, + ))), + Err(err) => Ok(Box::new(warp::reply::with_status( + format!("{err:?}"), + StatusCode::INTERNAL_SERVER_ERROR, + ))), + } } pub async fn register_signatures_handler( diff --git a/mithril-relay/tests/tests.rs b/mithril-relay/tests/register_signer_signature.rs similarity index 63% rename from mithril-relay/tests/tests.rs rename to mithril-relay/tests/register_signer_signature.rs index e738092e25..3549db2b0c 100644 --- a/mithril-relay/tests/tests.rs +++ b/mithril-relay/tests/register_signer_signature.rs @@ -1,15 +1,18 @@ use std::sync::Arc; use libp2p::{gossipsub, Multiaddr}; -use mithril_common::messages::RegisterSignatureMessage; -use mithril_relay::{p2p::PeerBehaviourEvent, p2p::PeerEvent, PassiveRelay, SignerRelay}; +use mithril_common::messages::{RegisterSignatureMessage, RegisterSignerMessage}; +use mithril_relay::{ + p2p::{BroadcastMessage, PeerBehaviourEvent, PeerEvent}, + PassiveRelay, SignerRelay, +}; use reqwest::StatusCode; use slog::{Drain, Level, Logger}; use slog_scope::info; // Launch a relay that connects to P2P network. The relay is a peer in the P2P -// network. The relay sends some signatures that must be received by other -// relays. +// network. The relay sends some signer regsitrations that must be received by other +// relays, then sends some signatures that must be also received by other relays. fn build_logger(log_level: Level) -> Logger { let decorator = slog_term::TermDecorator::new().build(); @@ -21,7 +24,7 @@ fn build_logger(log_level: Level) -> Logger { } #[tokio::test] -async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { +async fn should_receive_signers_registrations_from_signers_when_subscribed_to_pubsub() { let log_level = Level::Info; let _guard = slog_scope::set_global_logger(build_logger(log_level)); @@ -29,7 +32,7 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { let total_peers = 1 + total_p2p_client; let addr: Multiaddr = "/ip4/0.0.0.0/tcp/0".parse().unwrap(); let server_port = 0; - let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); // TODO: to implement with test http server + let aggregator_endpoint = "http://0.0.0.0:1234".to_string(); let mut signer_relay = SignerRelay::start(&addr, &server_port, &aggregator_endpoint) .await .expect("Relay start failed"); @@ -94,6 +97,57 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { } } + info!("Test: send a signer registration to the relay via HTTP gateway"); + let mut signer_message_sent = RegisterSignerMessage::dummy(); + signer_message_sent.party_id = format!("{}-new", signer_message_sent.party_id); + let response = reqwest::Client::new() + .post(format!("http://{}/register-signer", relay_address)) + .json(&signer_message_sent) + .send() + .await; + match response { + Ok(response) => { + match response.status() { + StatusCode::CREATED => {} + status => { + panic!("Post `/register-signer` should have returned a 201 status code, got: {status}") + } + } + } + Err(err) => panic!("Post `/register-signer` failed: {err:?}"), + } + + info!("Test: wait for P2P clients to receive the signer registrations"); + let mut total_peers_has_received_message = 0; + loop { + tokio::select! { + _event = signer_relay.tick() => { + + }, + event = p2p_client1.tick_peer() => { + if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client1.convert_peer_event_to_message(event.unwrap().unwrap()) + { + info!("Test: client1 consumed signer registration: {signer_message_received:#?}"); + assert_eq!(signer_message_sent, signer_message_received); + total_peers_has_received_message += 1 + } + } + event = p2p_client2.tick_peer() => { + if let Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) = p2p_client2.convert_peer_event_to_message(event.unwrap().unwrap()) + { + info!("Test: client2 consumed signer registration: {signer_message_received:#?}"); + assert_eq!(signer_message_sent, signer_message_received); + total_peers_has_received_message += 1 + } + } + } + let _ = signer_relay.tick_peer().await.unwrap(); + if total_peers_has_received_message == total_p2p_client { + info!("Test: All P2P clients have consumed the signer registration"); + break; + } + } + info!("Test: send a signature to the relay via HTTP gateway"); let mut signature_message_sent = RegisterSignatureMessage::dummy(); signature_message_sent.party_id = format!("{}-new", signature_message_sent.party_id); @@ -120,7 +174,7 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { }, event = p2p_client1.tick_peer() => { - if let Ok(Some(signature_message_received)) = p2p_client1.convert_event(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client1.convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client1 consumed signature: {signature_message_received:#?}"); assert_eq!(signature_message_sent, signature_message_received); @@ -128,7 +182,7 @@ async fn should_receive_signatures_from_signers_when_subscribed_to_pubsub() { } } event = p2p_client2.tick_peer() => { - if let Ok(Some(signature_message_received)) = p2p_client2.convert_event(event.unwrap().unwrap()) + if let Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) = p2p_client2.convert_peer_event_to_message(event.unwrap().unwrap()) { info!("Test: client2 consumed signature: {signature_message_received:#?}"); assert_eq!(signature_message_sent, signature_message_received);