diff --git a/node/common/src/service/p2p.rs b/node/common/src/service/p2p.rs index 50f0df676f..b10e389a12 100644 --- a/node/common/src/service/p2p.rs +++ b/node/common/src/service/p2p.rs @@ -123,28 +123,23 @@ impl P2pCryptoService for NodeService { fn sign_key(&mut self, key: &[u8; 32]) -> Vec { // TODO: make deterministic let msg = [b"noise-libp2p-static-key:", key.as_slice()].concat(); - let sig = self - .p2p - .mio - .keypair() - .sign(&msg) - .expect("unable to create signature"); + let sig = self.p2p.sec_key.sign(&msg); + let libp2p_sec_key = libp2p_identity::Keypair::try_from(self.p2p.sec_key.clone()).unwrap(); let mut payload = vec![]; payload.extend_from_slice(b"\x0a\x24"); - payload.extend_from_slice(&self.p2p.mio.keypair().public().encode_protobuf()); + payload.extend_from_slice(&libp2p_sec_key.public().encode_protobuf()); payload.extend_from_slice(b"\x12\x40"); - payload.extend_from_slice(&sig); + payload.extend_from_slice(&sig.to_bytes()); payload } fn sign_publication(&mut self, publication: &[u8]) -> Vec { - let msg = [b"libp2p-pubsub:", publication].concat(); self.p2p - .mio - .keypair() - .sign(&msg) - .expect("unable to create signature") + .sec_key + .libp2p_pubsub_sign(publication) + .to_bytes() + .to_vec() } fn verify_publication( diff --git a/node/src/action_kind.rs b/node/src/action_kind.rs index 2a7813c180..65a6925e87 100644 --- a/node/src/action_kind.rs +++ b/node/src/action_kind.rs @@ -425,6 +425,7 @@ pub enum ActionKind { P2pNetworkPubsubSignError, P2pNetworkPubsubValidateIncomingMessage, P2pNetworkPubsubValidateIncomingMessages, + P2pNetworkPubsubWebRtcRebroadcast, P2pNetworkPubsubEffectfulSign, P2pNetworkPubsubEffectfulValidateIncomingMessages, P2pNetworkRpcHeartbeatSend, @@ -717,7 +718,7 @@ pub enum ActionKind { } impl ActionKind { - pub const COUNT: u16 = 607; + pub const COUNT: u16 = 608; } impl std::fmt::Display for ActionKind { @@ -1961,6 +1962,7 @@ impl ActionKindGet for P2pNetworkPubsubAction { } Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft, Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune, + Self::WebRtcRebroadcast { .. } => ActionKind::P2pNetworkPubsubWebRtcRebroadcast, Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast, Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign, Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError, diff --git a/node/src/snark_pool/snark_pool_reducer.rs b/node/src/snark_pool/snark_pool_reducer.rs index b8d4c6f6ad..ad0cd19057 100644 --- a/node/src/snark_pool/snark_pool_reducer.rs +++ b/node/src/snark_pool/snark_pool_reducer.rs @@ -202,15 +202,13 @@ impl SnarkPoolState { } } - // TODO: we only rebroadcast locally produced snarks here. - // libp2p logic already broadcasts everything right now and doesn't + // TODO: libp2p logic already broadcasts everything right now and doesn't // wait for validation, thad needs to be fixed. See #952 - if *is_sender_local { - dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast { - snark: snark.clone(), - nonce: 0, - }); - } + dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast { + snark: snark.clone(), + nonce: 0, + is_local: *is_sender_local, + }); } SnarkPoolAction::P2pSendAll { .. } => { let (dispatcher, global_state) = state_context.into_dispatcher_and_state(); diff --git a/node/src/transaction_pool/transaction_pool_actions.rs b/node/src/transaction_pool/transaction_pool_actions.rs index 6f685cad02..f271feff89 100644 --- a/node/src/transaction_pool/transaction_pool_actions.rs +++ b/node/src/transaction_pool/transaction_pool_actions.rs @@ -69,6 +69,7 @@ pub enum TransactionPoolAction { Rebroadcast { accepted: Vec, rejected: Vec<(ValidCommandWithHash, diff::Error)>, + is_local: bool, }, CollectTransactionsByFee, #[action_event(level = trace)] @@ -115,9 +116,9 @@ impl redux::EnablingCondition for TransactionPoolAction { last_index, ) }), - TransactionPoolAction::Rebroadcast { accepted, rejected } => { - !(accepted.is_empty() && rejected.is_empty()) - } + TransactionPoolAction::Rebroadcast { + accepted, rejected, .. + } => !(accepted.is_empty() && rejected.is_empty()), _ => true, } } diff --git a/node/src/transaction_pool/transaction_pool_reducer.rs b/node/src/transaction_pool/transaction_pool_reducer.rs index d0f7c5f6af..c1b4a4e1c4 100644 --- a/node/src/transaction_pool/transaction_pool_reducer.rs +++ b/node/src/transaction_pool/transaction_pool_reducer.rs @@ -300,11 +300,14 @@ impl TransactionPoolState { if let Some(rpc_action) = rpc_action { dispatcher.push(rpc_action); } - // TODO: we only rebroadcast locally injected transactions here. - // libp2p logic already broadcasts everything right now and doesn't + // TODO: libp2p logic already broadcasts everything right now and doesn't // wait for validation, thad needs to be fixed. See #952 - if is_sender_local && was_accepted { - dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected }); + if was_accepted { + dispatcher.push(TransactionPoolAction::Rebroadcast { + accepted, + rejected, + is_local: is_sender_local, + }); } } TransactionPoolAction::ApplyTransitionFrontierDiff { @@ -372,7 +375,11 @@ impl TransactionPoolState { ); } } - TransactionPoolAction::Rebroadcast { accepted, rejected } => { + TransactionPoolAction::Rebroadcast { + accepted, + rejected, + is_local, + } => { let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check()); let all_commands = accepted @@ -387,6 +394,7 @@ impl TransactionPoolState { dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast { transaction: Box::new((&cmd).into()), nonce: 0, + is_local: *is_local, }); } } diff --git a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs index 54e26eca88..526d0b545c 100644 --- a/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs +++ b/node/src/transition_frontier/sync/transition_frontier_sync_effects.rs @@ -322,11 +322,6 @@ impl TransitionFrontierSyncAction { if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) { store.dispatch(TransitionFrontierSyncAction::BlocksSuccess); } - - // TODO this should be handled by a callback - store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage { - message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() }, - }); } TransitionFrontierSyncAction::BlocksSuccess => {} // Bootstrap/Catchup is practically complete at this point. diff --git a/node/src/transition_frontier/transition_frontier_effects.rs b/node/src/transition_frontier/transition_frontier_effects.rs index c8b2ae97fe..6f8f7f06f5 100644 --- a/node/src/transition_frontier/transition_frontier_effects.rs +++ b/node/src/transition_frontier/transition_frontier_effects.rs @@ -1,9 +1,11 @@ +use mina_p2p_messages::gossip::GossipNetMessageV2; use redux::Timestamp; use crate::block_producer::BlockProducerAction; use crate::consensus::ConsensusAction; use crate::ledger::LEDGER_DEPTH; use crate::p2p::channels::best_tip::P2pChannelsBestTipAction; +use crate::p2p::P2pNetworkPubsubAction; use crate::snark_pool::{SnarkPoolAction, SnarkWork}; use crate::stats::sync::SyncingLedger; use crate::{Store, TransactionPoolAction}; @@ -305,6 +307,18 @@ fn synced_effects( best_tip: best_tip.block.clone(), }); } + // TODO this should be handled by a callback + // If this get dispatched, we received block from libp2p. + if !store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: p2p::BroadcastMessageId::BlockHash { + hash: best_tip.hash().clone(), + }, + }) { + // Otherwise block was received from WebRTC so inject it in libp2p. + store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast { + message: GossipNetMessageV2::NewState(best_tip.block().clone()), + }); + } let best_tip_hash = best_tip.merkle_root_hash().clone(); store.dispatch(ConsensusAction::Prune); diff --git a/p2p/src/channels/snark/p2p_channels_snark_actions.rs b/p2p/src/channels/snark/p2p_channels_snark_actions.rs index 0847e14cd0..dba737da18 100644 --- a/p2p/src/channels/snark/p2p_channels_snark_actions.rs +++ b/p2p/src/channels/snark/p2p_channels_snark_actions.rs @@ -54,6 +54,7 @@ pub enum P2pChannelsSnarkAction { Libp2pBroadcast { snark: Snark, nonce: u32, + is_local: bool, }, } diff --git a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs index b92755a073..bb6ab1e57b 100644 --- a/p2p/src/channels/snark/p2p_channels_snark_reducer.rs +++ b/p2p/src/channels/snark/p2p_channels_snark_reducer.rs @@ -210,18 +210,27 @@ impl P2pChannelsSnarkState { } Ok(()) } + #[cfg(not(feature = "p2p-libp2p"))] + P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()), #[cfg(feature = "p2p-libp2p")] - P2pChannelsSnarkAction::Libp2pBroadcast { snark, nonce } => { + P2pChannelsSnarkAction::Libp2pBroadcast { + snark, + nonce, + is_local, + } => { let dispatcher = state_context.into_dispatcher(); let message = Box::new((snark.statement(), (&snark).into())); let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message); let nonce = nonce.into(); let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce }; - dispatcher.push(P2pNetworkPubsubAction::Broadcast { message }); + if is_local { + dispatcher.push(P2pNetworkPubsubAction::Broadcast { message }); + } else { + // rebroadcast snark if received from webrtc network, otherwise noop. + dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message }); + } Ok(()) } - #[cfg(not(feature = "p2p-libp2p"))] - P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()), P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => { let (dispatcher, state) = state_context.into_dispatcher_and_state(); let p2p_state: &P2pState = state.substate()?; diff --git a/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs b/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs index 717e9d8321..fc75742ba8 100644 --- a/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs +++ b/p2p/src/channels/transaction/p2p_channels_transaction_actions.rs @@ -52,6 +52,7 @@ pub enum P2pChannelsTransactionAction { Libp2pBroadcast { transaction: Box, nonce: u32, + is_local: bool, }, } diff --git a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs index 48820e1436..9b9e483cde 100644 --- a/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs +++ b/p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs @@ -230,14 +230,23 @@ impl P2pChannelsTransactionState { #[cfg(not(feature = "p2p-libp2p"))] P2pChannelsTransactionAction::Libp2pBroadcast { .. } => Ok(()), #[cfg(feature = "p2p-libp2p")] - P2pChannelsTransactionAction::Libp2pBroadcast { transaction, nonce } => { + P2pChannelsTransactionAction::Libp2pBroadcast { + transaction, + nonce, + is_local, + } => { let dispatcher = state_context.into_dispatcher(); let message = v2::NetworkPoolTransactionPoolDiffVersionedStableV2( std::iter::once(*transaction).collect(), ); let nonce = nonce.into(); let message = GossipNetMessageV2::TransactionPoolDiff { message, nonce }; - dispatcher.push(P2pNetworkPubsubAction::Broadcast { message }); + if is_local { + dispatcher.push(P2pNetworkPubsubAction::Broadcast { message }); + } else { + // rebroadcast block if received from webrtc network, otherwise noop. + dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message }); + } Ok(()) } } diff --git a/p2p/src/identity/mod.rs b/p2p/src/identity/mod.rs index 50947d1a9e..55a59e2151 100644 --- a/p2p/src/identity/mod.rs +++ b/p2p/src/identity/mod.rs @@ -6,3 +6,6 @@ pub use public_key::PublicKey; mod secret_key; pub use secret_key::{EncryptableType, SecretKey}; + +mod signature; +pub use signature::Signature; diff --git a/p2p/src/identity/secret_key.rs b/p2p/src/identity/secret_key.rs index 9b4b272f4f..7e6ba1ad81 100644 --- a/p2p/src/identity/secret_key.rs +++ b/p2p/src/identity/secret_key.rs @@ -1,12 +1,12 @@ use std::{fmt, path::Path, str::FromStr}; use base64::Engine; -use ed25519_dalek::SigningKey as Ed25519SecretKey; +use ed25519_dalek::{ed25519::signature::SignerMut, SigningKey as Ed25519SecretKey}; use openmina_core::{EncryptedSecretKey, EncryptedSecretKeyFile, EncryptionError}; use rand::{CryptoRng, Rng}; use serde::{Deserialize, Serialize}; -use crate::identity::PublicKey; +use super::{PublicKey, Signature}; #[derive(Clone)] pub struct SecretKey(Ed25519SecretKey); @@ -170,6 +170,23 @@ impl SecretKey { let data: Vec = self.decrypt_raw(other_pk, ciphertext.as_ref())?; serde_json::from_slice(&data).map_err(Box::::from) } + + pub fn sign(&mut self, data: &[u8]) -> Signature { + Signature(self.0.sign(data)) + } + + pub fn libp2p_pubsub_sign(&mut self, msg: &[u8]) -> Signature { + self.sign(&[b"libp2p-pubsub:", msg].concat()) + } + + pub fn libp2p_pubsub_pb_message_sign( + &mut self, + msg: &crate::pb::Message, + ) -> Result { + let mut buf = Vec::new(); + prost::Message::encode(msg, &mut buf)?; + Ok(self.libp2p_pubsub_sign(&buf)) + } } pub trait EncryptableType: Serialize + for<'a> Deserialize<'a> { diff --git a/p2p/src/identity/signature.rs b/p2p/src/identity/signature.rs new file mode 100644 index 0000000000..7b57fa0d3b --- /dev/null +++ b/p2p/src/identity/signature.rs @@ -0,0 +1,128 @@ +use std::{ + fmt, + io::{Read, Write}, + str::FromStr, +}; + +use binprot::{BinProtRead, BinProtWrite}; +use ed25519_dalek::Signature as Ed25519Signature; +use serde::{ + de::{SeqAccess, Visitor}, + Deserialize, Serialize, +}; + +#[derive(Eq, PartialEq, Clone)] +pub struct Signature(pub(super) Ed25519Signature); + +impl Signature { + const BYTE_SIZE: usize = Ed25519Signature::BYTE_SIZE; + + pub fn from_bytes(bytes: [u8; Self::BYTE_SIZE]) -> Self { + Self(Ed25519Signature::from_bytes(&bytes)) + } + + pub fn to_bytes(&self) -> [u8; Self::BYTE_SIZE] { + self.0.to_bytes() + } +} + +impl FromStr for Signature { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + hex::decode(s)? + .try_into() + .map(Self::from_bytes) + .or(Err(hex::FromHexError::InvalidStringLength)) + } +} + +impl fmt::Display for Signature { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(self.to_bytes())) + } +} + +impl fmt::Debug for Signature { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Signature({self})") + } +} + +impl From for [u8; Signature::BYTE_SIZE] { + fn from(value: Signature) -> Self { + value.to_bytes() + } +} + +impl Serialize for Signature { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.serialize_str(&self.to_string()) + } else { + self.to_bytes().serialize(serializer) + } + } +} + +impl<'de> serde::Deserialize<'de> for Signature { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + if deserializer.is_human_readable() { + let s: String = Deserialize::deserialize(deserializer)?; + Ok(s.parse().map_err(serde::de::Error::custom)?) + } else { + struct ArrayVisitor; + + impl<'de> Visitor<'de> for ArrayVisitor { + type Value = [u8; Signature::BYTE_SIZE]; + + fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "signature bytes({})", Signature::BYTE_SIZE) + } + + #[inline] + fn visit_seq(self, mut a: A) -> Result + where + A: SeqAccess<'de>, + { + let mut bytes: Self::Value = [0; Signature::BYTE_SIZE]; + + for (i, byte) in bytes.iter_mut().enumerate() { + *byte = a + .next_element()? + .ok_or(serde::de::Error::invalid_length(i + 1, &self))?; + } + + Ok(bytes) + } + } + + deserializer + .deserialize_tuple(Self::BYTE_SIZE, ArrayVisitor) + .map(Self::from_bytes) + } + } +} + +impl BinProtWrite for Signature { + fn binprot_write(&self, w: &mut W) -> std::io::Result<()> { + w.write_all(&self.to_bytes()) + } +} + +impl BinProtRead for Signature { + fn binprot_read(r: &mut R) -> Result + where + Self: Sized, + { + let mut buf = [0; Ed25519Signature::BYTE_SIZE]; + r.read_exact(&mut buf)?; + Ok(Self::from_bytes(buf)) + } +} diff --git a/p2p/src/network/pubsub/mod.rs b/p2p/src/network/pubsub/mod.rs index f93cab0c71..79ab3d009f 100644 --- a/p2p/src/network/pubsub/mod.rs +++ b/p2p/src/network/pubsub/mod.rs @@ -20,6 +20,13 @@ const TOPIC: &str = "coda/consensus-messages/0.0.1"; pub mod pubsub_effectful; pub use pubsub_effectful::P2pNetworkPubsubEffectfulAction; +use binprot::BinProtWrite; +use mina_p2p_messages::gossip::GossipNetMessageV2; +use openmina_core::bug_condition; +use sha2::{Digest, Sha256}; + +use crate::identity::SecretKey; + #[derive(serde::Serialize, serde:: Deserialize, Debug, Clone)] pub enum BroadcastMessageId { BlockHash { @@ -29,3 +36,29 @@ pub enum BroadcastMessageId { message_id: P2pNetworkPubsubMessageCacheId, }, } + +pub(super) fn webrtc_source_sk(message: &GossipNetMessageV2) -> SecretKey { + let mut hasher = Sha256::new(); + if let Err(err) = message.binprot_write(&mut hasher) { + bug_condition!("trying to broadcast message which can't be binprot serialized! err: {err}"); + return SecretKey::from_bytes([0; 32]); + } + SecretKey::from_bytes(hasher.finalize().into()) +} + +pub(super) fn webrtc_source_sk_from_bytes(bytes: &[u8]) -> SecretKey { + let mut hasher = Sha256::new(); + hasher.update(bytes); + SecretKey::from_bytes(hasher.finalize().into()) +} + +pub(super) fn encode_message(message: &GossipNetMessageV2) -> std::io::Result> { + let mut buffer = vec![0; 8]; + + message.binprot_write(&mut buffer)?; + + let len = buffer.len() - 8; + buffer[..8].clone_from_slice(&(len as u64).to_le_bytes()); + + Ok(buffer) +} diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs index fea98478ec..63ff30c9d4 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs @@ -77,6 +77,18 @@ pub enum P2pNetworkPubsubAction { topic_id: String, }, + /// Rebroadcast message received from WebRTC connection. + /// + /// Expected to be dispatched after the message has been processed, + /// in spite of whether it was received from libp2p or webrtc network. + /// + /// If received from libp2p network, or if we have already broadcasted + /// this message, the message will be in the `mcache` state, + /// in which case the action won't be enabled (will be filtered out). + WebRtcRebroadcast { + message: GossipNetMessageV2, + }, + /// Initiate the broadcasting of a message to all subscribed peers. /// /// **Fields:** @@ -190,6 +202,17 @@ impl redux::EnablingCondition for P2pNetworkPubsubAction { .topics .get(topic_id) .is_some_and(|topics| topics.contains_key(peer_id)), + P2pNetworkPubsubAction::WebRtcRebroadcast { message } => { + let source = super::webrtc_source_sk(message) + .public_key() + .peer_id() + .try_into() + .unwrap(); + pubsub + .mcache + .get_message(&P2pNetworkPubsubMessageCacheId { source, seqno: 0 }) + .is_none() + } P2pNetworkPubsubAction::BroadcastValidatedMessage { message_id } | P2pNetworkPubsubAction::RejectMessage { message_id: Some(message_id), diff --git a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs index 5509a1e551..95b616f33a 100644 --- a/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs +++ b/p2p/src/network/pubsub/p2p_network_pubsub_reducer.rs @@ -389,18 +389,74 @@ impl P2pNetworkPubsubState { Ok(()) } P2pNetworkPubsubAction::OutgoingMessageError { .. } => Ok(()), - P2pNetworkPubsubAction::Broadcast { message } => { - let mut buffer = vec![0; 8]; + P2pNetworkPubsubAction::WebRtcRebroadcast { message } => { + let data = match super::encode_message(&message) { + Err(err) => { + bug_condition!("binprot serialization error: {err}"); + return Ok(()); + } + Ok(data) => data, + }; - if binprot::BinProtWrite::binprot_write(&message, &mut buffer).is_err() { - bug_condition!("binprot serialization error"); - return Ok(()); - } + let mut source_sk = super::webrtc_source_sk_from_bytes(&data[8..]); + let source_peer_id = source_sk.public_key().peer_id(); + let message_id = P2pNetworkPubsubMessageCacheId { + source: libp2p_identity::PeerId::try_from(source_peer_id).unwrap(), + seqno: 0, + }; + let mut msg = pb::Message { + from: Some(message_id.source.to_bytes().to_vec()), + data: Some(data), + seqno: Some(message_id.seqno.to_be_bytes().to_vec()), + topic: super::TOPIC.to_owned(), + signature: None, + key: None, + }; - let len = buffer.len() - 8; - buffer[..8].clone_from_slice(&(len as u64).to_le_bytes()); + msg.signature = match source_sk.libp2p_pubsub_pb_message_sign(&msg) { + Err(err) => { + bug_condition!("pubsub prost encode error: {err}"); + return Ok(()); + } + Ok(sig) => Some(sig.to_bytes().to_vec()), + }; - Self::prepare_to_sign(state_context, buffer) + let message_state = match &message { + GossipNetMessageV2::NewState(block) => { + P2pNetworkPubsubMessageCacheMessage::PreValidatedBlockMessage { + block_hash: block.try_hash()?, + message: msg, + peer_id: source_peer_id, + time, + } + } + _ => P2pNetworkPubsubMessageCacheMessage::PreValidated { + message: msg, + peer_id: source_peer_id, + time, + }, + }; + + pubsub_state.mcache.map.insert(message_id, message_state); + + let dispatcher = state_context.into_dispatcher(); + + dispatcher.push(P2pNetworkPubsubAction::BroadcastValidatedMessage { + message_id: super::BroadcastMessageId::MessageId { message_id }, + }); + + Ok(()) + } + P2pNetworkPubsubAction::Broadcast { message } => { + let data = match super::encode_message(&message) { + Err(err) => { + bug_condition!("binprot serialization error: {err}"); + return Ok(()); + } + Ok(data) => data, + }; + + Self::prepare_to_sign(state_context, data) } P2pNetworkPubsubAction::Sign { seqno, @@ -428,11 +484,7 @@ impl P2pNetworkPubsubState { }; let dispatcher = state_context.into_dispatcher(); - dispatcher.push(P2pNetworkPubsubEffectfulAction::Sign { - author, - topic, - message, - }); + dispatcher.push(P2pNetworkPubsubEffectfulAction::Sign { author, message }); Ok(()) } P2pNetworkPubsubAction::SignError { .. } => { diff --git a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_actions.rs b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_actions.rs index 3120fa3452..49a7fb8f3c 100644 --- a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_actions.rs +++ b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_actions.rs @@ -9,13 +9,8 @@ pub enum P2pNetworkPubsubEffectfulAction { /// /// **Fields:** /// - `author`: The identifier of the peer authoring the message. - /// - `topic`: The topic under which the message is published. /// - `message`: The protobuf message to be signed. - Sign { - author: PeerId, - topic: String, - message: Message, - }, + Sign { author: PeerId, message: Message }, /// Validate a batch of incoming messages from a peer. /// diff --git a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs index d47bce28ee..028df638b6 100644 --- a/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs +++ b/p2p/src/network/pubsub/pubsub_effectful/p2p_network_pubsub_effectful_effects.rs @@ -29,14 +29,13 @@ impl P2pNetworkPubsubEffectfulAction { Store::Service: P2pCryptoService, { match self { - P2pNetworkPubsubEffectfulAction::Sign { - author, - topic, - message, - } => { + P2pNetworkPubsubEffectfulAction::Sign { author, message } => { let mut publication = vec![]; if prost::Message::encode(&message, &mut publication).is_err() { - store.dispatch(P2pNetworkPubsubAction::SignError { author, topic }); + store.dispatch(P2pNetworkPubsubAction::SignError { + author, + topic: message.topic, + }); } else { let signature = store.service().sign_publication(&publication).into(); store.dispatch(P2pNetworkPubsubAction::BroadcastSigned { signature });