Skip to content

Commit

Permalink
feat: make relay broadcast signer registrations with P2P network
Browse files Browse the repository at this point in the history
  • Loading branch information
jpraynaud committed Mar 28, 2024
1 parent 46c7b79 commit 4aad51e
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 67 deletions.
3 changes: 3 additions & 0 deletions mithril-relay/src/lib.rs
Expand Up @@ -12,5 +12,8 @@ pub use relay::AggregatorRelay;
pub use relay::PassiveRelay;
pub use relay::SignerRelay;

/// The topic name where signer registrations are published
pub const MITHRIL_SIGNERS_TOPIC_NAME: &str = "mithril/signers";

/// The topic name where signatures are published
pub const MITHRIL_SIGNATURES_TOPIC_NAME: &str = "mithril/signatures";
83 changes: 66 additions & 17 deletions mithril-relay/src/p2p/peer.rs
Expand Up @@ -8,11 +8,15 @@ use libp2p::{
swarm::{self, DialError, NetworkBehaviour},
tcp, yamux, Multiaddr, PeerId, Swarm, Transport,
};
use mithril_common::{messages::RegisterSignatureMessage, StdResult};
use mithril_common::{
messages::{RegisterSignatureMessage, RegisterSignerMessage},
StdResult,
};
use serde::{Deserialize, Serialize};
use slog_scope::{debug, info};
use std::{collections::HashMap, time::Duration};

use crate::{p2p::PeerError, MITHRIL_SIGNATURES_TOPIC_NAME};
use crate::{p2p::PeerError, MITHRIL_SIGNATURES_TOPIC_NAME, MITHRIL_SIGNERS_TOPIC_NAME};

/// The idle connection timeout for a P2P connection
const P2P_IDLE_CONNECTION_TIMEOUT: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -54,6 +58,16 @@ pub enum PeerEvent {
/// The topic name of a P2P pubsub
pub type TopicName = String;

/// The broadcast message received from a Gossip sub event
#[derive(Serialize, Deserialize)]
pub enum BroadcastMessage {
/// A signer registration message received from the Gossip sub
RegisterSigner(RegisterSignerMessage),

/// A signature registration message received from the Gossip sub
RegisterSignature(RegisterSignatureMessage),
}

/// A peer in the P2P network
pub struct Peer {
topics: HashMap<TopicName, gossipsub::IdentTopic>,
Expand All @@ -75,10 +89,16 @@ impl Peer {
}

fn build_topics() -> HashMap<TopicName, gossipsub::IdentTopic> {
HashMap::from([(
MITHRIL_SIGNATURES_TOPIC_NAME.into(),
gossipsub::IdentTopic::new(MITHRIL_SIGNATURES_TOPIC_NAME),
)])
HashMap::from([
(
MITHRIL_SIGNATURES_TOPIC_NAME.into(),
gossipsub::IdentTopic::new(MITHRIL_SIGNATURES_TOPIC_NAME),
),
(
MITHRIL_SIGNERS_TOPIC_NAME.into(),
gossipsub::IdentTopic::new(MITHRIL_SIGNERS_TOPIC_NAME),
),
])
}

/// Start the peer
Expand Down Expand Up @@ -137,11 +157,11 @@ impl Peer {
Ok(self)
}

/// Convert a peer event to a signature message
pub fn convert_peer_event_to_signature_message(
/// Convert a peer event to a broadcast message
pub fn convert_peer_event_to_message(
&mut self,
event: PeerEvent,
) -> StdResult<Option<RegisterSignatureMessage>> {
) -> StdResult<Option<BroadcastMessage>> {
match event {
PeerEvent::Behaviour {
event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }),
Expand Down Expand Up @@ -189,31 +209,60 @@ impl Peer {
pub fn publish_signature(
&mut self,
message: &RegisterSignatureMessage,
) -> StdResult<gossipsub::MessageId> {
self.publish_broadcast_message(
&BroadcastMessage::RegisterSignature(message.to_owned()),
MITHRIL_SIGNATURES_TOPIC_NAME,
)
}

/// Publish a broadcast message on the P2P pubsub
pub fn publish_broadcast_message(
&mut self,
message: &BroadcastMessage,
topic_name: &str,
) -> StdResult<gossipsub::MessageId> {
let topic = self
.topics
.get(MITHRIL_SIGNATURES_TOPIC_NAME)
.get(topic_name)
.ok_or(PeerError::MissingTopic())
.with_context(|| {
format!(
"Can not publish signature on invalid topic: {MITHRIL_SIGNATURES_TOPIC_NAME}"
)
format!("Can not publish broadcast message on invalid topic: {topic_name}")
})?
.to_owned();
let data = serde_json::to_vec(message)
.with_context(|| "Can not publish signature with invalid format")?;
let data = serde_json::to_vec(message).with_context(|| {
format!("Can not publish broadcast message with invalid format on topic {topic_name}")
})?;

let message_id = self
.swarm
.as_mut()
.map(|swarm| swarm.behaviour_mut().gossipsub.publish(topic, data))
.transpose()
.with_context(|| "Can not publish signature on P2P pubsub")?
.with_context(|| {
format!("Can not publish broadcast message on {topic_name} P2P pubsub")
})?
.ok_or(PeerError::UnavailableSwarm())
.with_context(|| "Can not publish signature without swarm")?;
.with_context(|| {
format!(
"Can not publish broadcast message on {topic_name} P2P pubsub without swarm"
)
})?;

Ok(message_id.to_owned())
}

/// Publish a signer registration on the P2P pubsub
pub fn publish_signer(
&mut self,
message: &RegisterSignerMessage,
) -> StdResult<gossipsub::MessageId> {
self.publish_broadcast_message(
&BroadcastMessage::RegisterSigner(message.to_owned()),
MITHRIL_SIGNERS_TOPIC_NAME,
)
}

/// Connect to a remote peer
pub fn dial(&mut self, addr: Multiaddr) -> StdResult<()> {
debug!("Peer: dialing to"; "address" => format!("{addr:?}"), "local_peer_id" => format!("{:?}", self.local_peer_id()));
Expand Down
78 changes: 62 additions & 16 deletions mithril-relay/src/relay/aggregator.rs
@@ -1,7 +1,10 @@
use crate::p2p::{Peer, PeerEvent};
use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
use anyhow::anyhow;
use libp2p::Multiaddr;
use mithril_common::{messages::RegisterSignatureMessage, StdResult};
use mithril_common::{
messages::{RegisterSignatureMessage, RegisterSignerMessage},
StdResult,
};
use reqwest::StatusCode;
use slog_scope::{error, info};

Expand Down Expand Up @@ -48,25 +51,68 @@ impl AggregatorRelay {
}
}

async fn notify_signer_to_aggregator(
&self,
signer_message: &RegisterSignerMessage,
) -> StdResult<()> {
let response = reqwest::Client::new()
.post(format!("{}/register-signer", self.aggregator_endpoint))
.json(signer_message)
//.header(MITHRIL_API_VERSION_HEADER, "0.1.13") // TODO: retrieve current version
.send()
.await;
match response {
Ok(response) => match response.status() {
StatusCode::CREATED => {
info!("Relay aggregator: sent successfully signer registration message to aggregator"; "signer_message" => format!("{:#?}", signer_message));
Ok(())
}
status => {
error!("Relay aggregator: Post `/register-signer` should have returned a 201 status code, got: {status}");
Err(anyhow!("Post `/register-signer` should have returned a 201 status code, got: {status}"))
}
},
Err(err) => {
error!("Relay aggregator: Post `/register-signer` failed: {err:?}");
Err(anyhow!("Post `/register-signer` failed: {err:?}"))
}
}
}

/// Tick the aggregator relay
pub async fn tick(&mut self) -> StdResult<()> {
if let Some(peer_event) = self.peer.tick_swarm().await? {
if let Ok(Some(signature_message_received)) = self
.peer
.convert_peer_event_to_signature_message(peer_event)
{
let retry_max = 3;
let mut retry_count = 0;
while let Err(e) = self
.notify_signature_to_aggregator(&signature_message_received)
.await
{
retry_count += 1;
if retry_count >= retry_max {
error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}"));
return Err(e);
match self.peer.convert_peer_event_to_message(peer_event) {
Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => {
let retry_max = 3;
let mut retry_count = 0;
while let Err(e) = self
.notify_signer_to_aggregator(&signer_message_received)
.await
{
retry_count += 1;
if retry_count >= retry_max {
error!("Relay aggregator: failed to send signer registration message to aggregator after {retry_count} attempts"; "signer_message" => format!("{:#?}", signer_message_received), "error" => format!("{e:?}"));
return Err(e);
}
}
}
Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => {
let retry_max = 3;
let mut retry_count = 0;
while let Err(e) = self
.notify_signature_to_aggregator(&signature_message_received)
.await
{
retry_count += 1;
if retry_count >= retry_max {
error!("Relay aggregator: failed to send signature message to aggregator after {retry_count} attempts"; "signature_message" => format!("{:#?}", signature_message_received), "error" => format!("{e:?}"));
return Err(e);
}
}
}
Ok(None) => {}
Err(e) => return Err(e),
}
}

Expand Down
33 changes: 16 additions & 17 deletions mithril-relay/src/relay/passive.rs
@@ -1,6 +1,6 @@
use crate::p2p::{Peer, PeerBehaviourEvent, PeerEvent};
use libp2p::{gossipsub, Multiaddr};
use mithril_common::{messages::RegisterSignatureMessage, StdResult};
use crate::p2p::{BroadcastMessage, Peer, PeerEvent};
use libp2p::Multiaddr;
use mithril_common::StdResult;
use slog_scope::{debug, info};

/// A passive relay
Expand All @@ -27,28 +27,27 @@ impl PassiveRelay {
})
}

/// Convert event to signature message
/// Convert event to broadcast message
/// TODO: should be removed
pub fn convert_event(
pub fn convert_peer_event_to_message(
&mut self,
event: PeerEvent,
) -> StdResult<Option<RegisterSignatureMessage>> {
match event {
PeerEvent::Behaviour {
event: PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }),
} => Ok(Some(serde_json::from_slice(&message.data)?)),
_ => Ok(None),
}
) -> StdResult<Option<BroadcastMessage>> {
self.peer.convert_peer_event_to_message(event)
}

/// Tick the passive relay
pub async fn tick(&mut self) -> StdResult<()> {
if let Some(peer_event) = self.peer.tick_swarm().await? {
if let Ok(Some(signature_message_received)) = self
.peer
.convert_peer_event_to_signature_message(peer_event)
{
info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received));
match self.peer.convert_peer_event_to_message(peer_event) {
Ok(Some(BroadcastMessage::RegisterSigner(signer_message_received))) => {
info!("Relay passive: received signer registration message from P2P network"; "signer_message" => format!("{:#?}", signer_message_received));
}
Ok(Some(BroadcastMessage::RegisterSignature(signature_message_received))) => {
info!("Relay passive: received signature message from P2P network"; "signature_message" => format!("{:#?}", signature_message_received));
}
Ok(None) => {}
Err(e) => return Err(e),
}
}

Expand Down

0 comments on commit 4aad51e

Please sign in to comment.