Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions node/common/src/service/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,28 +123,23 @@ impl P2pCryptoService for NodeService {
fn sign_key(&mut self, key: &[u8; 32]) -> Vec<u8> {
// TODO: make deterministic
let msg = [b"noise-libp2p-static-key:", key.as_slice()].concat();
let sig = self
.p2p
.mio
.keypair()
.sign(&msg)
.expect("unable to create signature");
let sig = self.p2p.sec_key.sign(&msg);
let libp2p_sec_key = libp2p_identity::Keypair::try_from(self.p2p.sec_key.clone()).unwrap();

let mut payload = vec![];
payload.extend_from_slice(b"\x0a\x24");
payload.extend_from_slice(&self.p2p.mio.keypair().public().encode_protobuf());
payload.extend_from_slice(&libp2p_sec_key.public().encode_protobuf());
payload.extend_from_slice(b"\x12\x40");
payload.extend_from_slice(&sig);
payload.extend_from_slice(&sig.to_bytes());
payload
}

fn sign_publication(&mut self, publication: &[u8]) -> Vec<u8> {
let msg = [b"libp2p-pubsub:", publication].concat();
self.p2p
.mio
.keypair()
.sign(&msg)
.expect("unable to create signature")
.sec_key
.libp2p_pubsub_sign(publication)
.to_bytes()
.to_vec()
}

fn verify_publication(
Expand Down
4 changes: 3 additions & 1 deletion node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ pub enum ActionKind {
P2pNetworkPubsubSignError,
P2pNetworkPubsubValidateIncomingMessage,
P2pNetworkPubsubValidateIncomingMessages,
P2pNetworkPubsubWebRtcRebroadcast,
P2pNetworkPubsubEffectfulSign,
P2pNetworkPubsubEffectfulValidateIncomingMessages,
P2pNetworkRpcHeartbeatSend,
Expand Down Expand Up @@ -717,7 +718,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 607;
pub const COUNT: u16 = 608;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -1961,6 +1962,7 @@ impl ActionKindGet for P2pNetworkPubsubAction {
}
Self::Graft { .. } => ActionKind::P2pNetworkPubsubGraft,
Self::Prune { .. } => ActionKind::P2pNetworkPubsubPrune,
Self::WebRtcRebroadcast { .. } => ActionKind::P2pNetworkPubsubWebRtcRebroadcast,
Self::Broadcast { .. } => ActionKind::P2pNetworkPubsubBroadcast,
Self::Sign { .. } => ActionKind::P2pNetworkPubsubSign,
Self::SignError { .. } => ActionKind::P2pNetworkPubsubSignError,
Expand Down
14 changes: 6 additions & 8 deletions node/src/snark_pool/snark_pool_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,13 @@ impl SnarkPoolState {
}
}

// TODO: we only rebroadcast locally produced snarks here.
// libp2p logic already broadcasts everything right now and doesn't
// TODO: libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
if *is_sender_local {
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
});
}
dispatcher.push(P2pChannelsSnarkAction::Libp2pBroadcast {
snark: snark.clone(),
nonce: 0,
is_local: *is_sender_local,
});
}
SnarkPoolAction::P2pSendAll { .. } => {
let (dispatcher, global_state) = state_context.into_dispatcher_and_state();
Expand Down
7 changes: 4 additions & 3 deletions node/src/transaction_pool/transaction_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub enum TransactionPoolAction {
Rebroadcast {
accepted: Vec<ValidCommandWithHash>,
rejected: Vec<(ValidCommandWithHash, diff::Error)>,
is_local: bool,
},
CollectTransactionsByFee,
#[action_event(level = trace)]
Expand Down Expand Up @@ -115,9 +116,9 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
last_index,
)
}),
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
!(accepted.is_empty() && rejected.is_empty())
}
TransactionPoolAction::Rebroadcast {
accepted, rejected, ..
} => !(accepted.is_empty() && rejected.is_empty()),
_ => true,
}
}
Expand Down
18 changes: 13 additions & 5 deletions node/src/transaction_pool/transaction_pool_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,14 @@ impl TransactionPoolState {
if let Some(rpc_action) = rpc_action {
dispatcher.push(rpc_action);
}
// TODO: we only rebroadcast locally injected transactions here.
// libp2p logic already broadcasts everything right now and doesn't
// TODO: libp2p logic already broadcasts everything right now and doesn't
// wait for validation, thad needs to be fixed. See #952
if is_sender_local && was_accepted {
dispatcher.push(TransactionPoolAction::Rebroadcast { accepted, rejected });
if was_accepted {
dispatcher.push(TransactionPoolAction::Rebroadcast {
accepted,
rejected,
is_local: is_sender_local,
});
}
}
TransactionPoolAction::ApplyTransitionFrontierDiff {
Expand Down Expand Up @@ -372,7 +375,11 @@ impl TransactionPoolState {
);
}
}
TransactionPoolAction::Rebroadcast { accepted, rejected } => {
TransactionPoolAction::Rebroadcast {
accepted,
rejected,
is_local,
} => {
let rejected = rejected.iter().map(|(cmd, _)| cmd.data.forget_check());

let all_commands = accepted
Expand All @@ -387,6 +394,7 @@ impl TransactionPoolState {
dispatcher.push(P2pChannelsTransactionAction::Libp2pBroadcast {
transaction: Box::new((&cmd).into()),
nonce: 0,
is_local: *is_local,
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,6 @@ impl TransitionFrontierSyncAction {
if !store.dispatch(TransitionFrontierSyncAction::BlocksNextApplyInit) {
store.dispatch(TransitionFrontierSyncAction::BlocksSuccess);
}

// TODO this should be handled by a callback
store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
message_id: p2p::BroadcastMessageId::BlockHash { hash: hash.clone() },
});
}
TransitionFrontierSyncAction::BlocksSuccess => {}
// Bootstrap/Catchup is practically complete at this point.
Expand Down
14 changes: 14 additions & 0 deletions node/src/transition_frontier/transition_frontier_effects.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use mina_p2p_messages::gossip::GossipNetMessageV2;
use redux::Timestamp;

use crate::block_producer::BlockProducerAction;
use crate::consensus::ConsensusAction;
use crate::ledger::LEDGER_DEPTH;
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
use crate::p2p::P2pNetworkPubsubAction;
use crate::snark_pool::{SnarkPoolAction, SnarkWork};
use crate::stats::sync::SyncingLedger;
use crate::{Store, TransactionPoolAction};
Expand Down Expand Up @@ -305,6 +307,18 @@ fn synced_effects<S: crate::Service>(
best_tip: best_tip.block.clone(),
});
}
// TODO this should be handled by a callback
// If this get dispatched, we received block from libp2p.
if !store.dispatch(P2pNetworkPubsubAction::BroadcastValidatedMessage {
message_id: p2p::BroadcastMessageId::BlockHash {
hash: best_tip.hash().clone(),
},
}) {
// Otherwise block was received from WebRTC so inject it in libp2p.
store.dispatch(P2pNetworkPubsubAction::WebRtcRebroadcast {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if we dispatched this one unconditionally? is the conditional check what avoids the duplication of the message? (I ask because we can do this here because the transition frontier still doesn't use queued actions, but once that is implemented we will have to figure out something else here)

Copy link
Contributor Author

@binier binier Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the conditional check what avoids the duplication of the message?

but once that is implemented we will have to figure out something else here)

yes and yes

message: GossipNetMessageV2::NewState(best_tip.block().clone()),
});
}

let best_tip_hash = best_tip.merkle_root_hash().clone();
store.dispatch(ConsensusAction::Prune);
Expand Down
1 change: 1 addition & 0 deletions p2p/src/channels/snark/p2p_channels_snark_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub enum P2pChannelsSnarkAction {
Libp2pBroadcast {
snark: Snark,
nonce: u32,
is_local: bool,
},
}

Expand Down
17 changes: 13 additions & 4 deletions p2p/src/channels/snark/p2p_channels_snark_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,18 +210,27 @@ impl P2pChannelsSnarkState {
}
Ok(())
}
#[cfg(not(feature = "p2p-libp2p"))]
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
#[cfg(feature = "p2p-libp2p")]
P2pChannelsSnarkAction::Libp2pBroadcast { snark, nonce } => {
P2pChannelsSnarkAction::Libp2pBroadcast {
snark,
nonce,
is_local,
} => {
let dispatcher = state_context.into_dispatcher();
let message = Box::new((snark.statement(), (&snark).into()));
let message = v2::NetworkPoolSnarkPoolDiffVersionedStableV2::AddSolvedWork(message);
let nonce = nonce.into();
let message = GossipNetMessageV2::SnarkPoolDiff { message, nonce };
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
if is_local {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@binier I am confused by this, is this enough to differentiate non-webrtc from webrtc sourced messages? What happens with messages received from libp2p? aren't they going to follow the same path and be considered as webrtc-sourced by this condition here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption is that if msg was received from libp2p, it should still be in mcache so the action won't be enabled because this will be false: https://github.com/openmina/openmina/blob/b5c83e8fb6276306d730a627a0cb79da9af301a4/p2p/src/network/pubsub/p2p_network_pubsub_actions.rs#L206

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but if that is the case please add a comment clarifying that there, because right now it is quite confusing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also may break accidentally if for some reason that caching logic changes somehow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also may break accidentally if for some reason that caching logic changes somehow

It will "break" anyways if that happens. Though right now maybe not, coz block rebroadcast won't wait for block application right? It seems to do only precheck and rebroadcast in that case.

Copy link
Collaborator

@tizoc tizoc Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@binier block rebroadcast now waits for the application to be successful, Grga implemented that in #1001 (but just for blocks at the moment, this doesn't apply to transactions or snarks yet).

My point is that if for whatever reason this code needs to be changed (or somebody is trying to debug something semi-related), it would be better to not depend on our own memories or to have to re-understand the code to understand the assumptions made here (which depend on code from somewhere else). The comment makes the (non-obvious) assumption very explicit and will save time (both in that case, or any other kind of refactor affecting this).

dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
} else {
// rebroadcast snark if received from webrtc network, otherwise noop.
dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
}
Ok(())
}
#[cfg(not(feature = "p2p-libp2p"))]
P2pChannelsSnarkAction::Libp2pBroadcast { .. } => Ok(()),
P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
let (dispatcher, state) = state_context.into_dispatcher_and_state();
let p2p_state: &P2pState = state.substate()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum P2pChannelsTransactionAction {
Libp2pBroadcast {
transaction: Box<Transaction>,
nonce: u32,
is_local: bool,
},
}

Expand Down
13 changes: 11 additions & 2 deletions p2p/src/channels/transaction/p2p_channels_transaction_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,23 @@ impl P2pChannelsTransactionState {
#[cfg(not(feature = "p2p-libp2p"))]
P2pChannelsTransactionAction::Libp2pBroadcast { .. } => Ok(()),
#[cfg(feature = "p2p-libp2p")]
P2pChannelsTransactionAction::Libp2pBroadcast { transaction, nonce } => {
P2pChannelsTransactionAction::Libp2pBroadcast {
transaction,
nonce,
is_local,
} => {
let dispatcher = state_context.into_dispatcher();
let message = v2::NetworkPoolTransactionPoolDiffVersionedStableV2(
std::iter::once(*transaction).collect(),
);
let nonce = nonce.into();
let message = GossipNetMessageV2::TransactionPoolDiff { message, nonce };
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
if is_local {
dispatcher.push(P2pNetworkPubsubAction::Broadcast { message });
} else {
// rebroadcast block if received from webrtc network, otherwise noop.
dispatcher.push(P2pNetworkPubsubAction::WebRtcRebroadcast { message });
}
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions p2p/src/identity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ pub use public_key::PublicKey;

mod secret_key;
pub use secret_key::{EncryptableType, SecretKey};

mod signature;
pub use signature::Signature;
21 changes: 19 additions & 2 deletions p2p/src/identity/secret_key.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::{fmt, path::Path, str::FromStr};

use base64::Engine;
use ed25519_dalek::SigningKey as Ed25519SecretKey;
use ed25519_dalek::{ed25519::signature::SignerMut, SigningKey as Ed25519SecretKey};
use openmina_core::{EncryptedSecretKey, EncryptedSecretKeyFile, EncryptionError};
use rand::{CryptoRng, Rng};
use serde::{Deserialize, Serialize};

use crate::identity::PublicKey;
use super::{PublicKey, Signature};

#[derive(Clone)]
pub struct SecretKey(Ed25519SecretKey);
Expand Down Expand Up @@ -170,6 +170,23 @@ impl SecretKey {
let data: Vec<u8> = self.decrypt_raw(other_pk, ciphertext.as_ref())?;
serde_json::from_slice(&data).map_err(Box::<dyn std::error::Error>::from)
}

pub fn sign(&mut self, data: &[u8]) -> Signature {
Signature(self.0.sign(data))
}

pub fn libp2p_pubsub_sign(&mut self, msg: &[u8]) -> Signature {
self.sign(&[b"libp2p-pubsub:", msg].concat())
}

pub fn libp2p_pubsub_pb_message_sign(
&mut self,
msg: &crate::pb::Message,
) -> Result<Signature, prost::EncodeError> {
let mut buf = Vec::new();
prost::Message::encode(msg, &mut buf)?;
Ok(self.libp2p_pubsub_sign(&buf))
}
}

pub trait EncryptableType: Serialize + for<'a> Deserialize<'a> {
Expand Down
Loading
Loading