Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion node/src/rpc/rpc_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use openmina_core::{
block::AppliedBlock,
bug_condition,
requests::{RequestId, RpcId, RpcIdType},
transaction::TransactionWithHash,
};
use p2p::{
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
Expand Down Expand Up @@ -499,10 +500,17 @@ impl RpcState {
};
state.requests.insert(*rpc_id, rpc_state);

let commands_with_hash = commands
.clone()
.into_iter()
// TODO: do something it it cannot be hashed?
.filter_map(|cmd| TransactionWithHash::try_new(cmd).ok())
.collect();

let dispatcher = state_context.into_dispatcher();
dispatcher.push(RpcAction::TransactionInjectPending { rpc_id: *rpc_id });
dispatcher.push(TransactionPoolAction::StartVerify {
commands: commands.clone().into_iter().collect(),
commands: commands_with_hash,
from_rpc: Some(*rpc_id),
});
}
Expand Down
44 changes: 22 additions & 22 deletions node/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::sync::Arc;
use std::time::Duration;

use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2, MinaBlockBlockStableV2};
use mina_p2p_messages::v2;
use openmina_core::constants::PROTOCOL_VERSION;
use openmina_core::transaction::TransactionInfo;
use openmina_core::transaction::{TransactionInfo, TransactionWithHash};
use rand::prelude::*;

use openmina_core::block::BlockWithHash;
Expand Down Expand Up @@ -524,108 +524,108 @@ impl P2p {
fn p2p_callbacks() -> P2pCallbacks {
P2pCallbacks {
on_p2p_channels_transaction_received: Some(redux::callback!(
on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action{
on_p2p_channels_transaction_received((peer_id: PeerId, info: Box<TransactionInfo>)) -> crate::Action {
TransactionPoolCandidateAction::InfoReceived {
peer_id,
info: *info,
}
}
)),
on_p2p_channels_transaction_libp2p_received: Some(redux::callback!(
on_p2p_channels_transaction_libp2p_received(transaction: Box<MinaBaseUserCommandStableV2>) -> crate::Action{
on_p2p_channels_transaction_libp2p_received(transaction: Box<TransactionWithHash>) -> crate::Action {
TransactionPoolAction::StartVerify {
commands: std::iter::once(*transaction).collect(),
from_rpc: None
}
}
)),
on_p2p_channels_snark_job_commitment_received: Some(redux::callback!(
on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action{
on_p2p_channels_snark_job_commitment_received((peer_id: PeerId, commitment: Box<SnarkJobCommitment>)) -> crate::Action {
SnarkPoolAction::CommitmentAdd { commitment: *commitment, sender: peer_id }
}
)),
on_p2p_channels_snark_received: Some(redux::callback!(
on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action{
on_p2p_channels_snark_received((peer_id: PeerId, snark: Box<SnarkInfo>)) -> crate::Action {
SnarkPoolCandidateAction::InfoReceived { peer_id, info: *snark }
}
)),
on_p2p_channels_snark_libp2p_received: Some(redux::callback!(
on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action{
on_p2p_channels_snark_libp2p_received((peer_id: PeerId, snark: Box<Snark>)) -> crate::Action {
SnarkPoolCandidateAction::WorkFetchSuccess { peer_id, work: *snark }
}
)),
on_p2p_channels_streaming_rpc_ready: Some(redux::callback!(
on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action{
on_p2p_channels_streaming_rpc_ready(_var: ()) -> crate::Action {
P2pCallbacksAction::P2pChannelsStreamingRpcReady
}
)),
on_p2p_channels_best_tip_request_received: Some(redux::callback!(
on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action{
on_p2p_channels_best_tip_request_received(peer_id: PeerId) -> crate::Action {
P2pCallbacksAction::RpcRespondBestTip { peer_id }
}
)),
on_p2p_disconnection_finish: Some(redux::callback!(
on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action{
on_p2p_disconnection_finish(peer_id: PeerId) -> crate::Action {
P2pCallbacksAction::P2pDisconnection { peer_id }
}
)),
on_p2p_connection_outgoing_error: Some(redux::callback!(
on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action{
on_p2p_connection_outgoing_error((rpc_id: RpcId, error: P2pConnectionOutgoingError)) -> crate::Action {
RpcAction::P2pConnectionOutgoingError { rpc_id, error }
}
)),
on_p2p_connection_outgoing_success: Some(redux::callback!(
on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action{
on_p2p_connection_outgoing_success(rpc_id: RpcId) -> crate::Action {
RpcAction::P2pConnectionOutgoingSuccess { rpc_id }
}
)),
on_p2p_connection_incoming_error: Some(redux::callback!(
on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action{
on_p2p_connection_incoming_error((rpc_id: RpcId, error: String)) -> crate::Action {
RpcAction::P2pConnectionIncomingError { rpc_id, error }
}
)),
on_p2p_connection_incoming_success: Some(redux::callback!(
on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action{
on_p2p_connection_incoming_success(rpc_id: RpcId) -> crate::Action {
RpcAction::P2pConnectionIncomingSuccess { rpc_id }
}
)),
on_p2p_connection_incoming_answer_ready: Some(redux::callback!(
on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action{
on_p2p_connection_incoming_answer_ready((rpc_id: RpcId, peer_id: PeerId, answer: P2pConnectionResponse)) -> crate::Action {
RpcAction::P2pConnectionIncomingAnswerReady { rpc_id, answer, peer_id }
}
)),
on_p2p_peer_best_tip_update: Some(redux::callback!(
on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<MinaBlockBlockStableV2>>) -> crate::Action{
on_p2p_peer_best_tip_update(best_tip: BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>) -> crate::Action {
ConsensusAction::P2pBestTipUpdate { best_tip }
}
)),
on_p2p_channels_rpc_ready: Some(redux::callback!(
on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action{
on_p2p_channels_rpc_ready(peer_id: PeerId) -> crate::Action {
P2pCallbacksAction::P2pChannelsRpcReady { peer_id }
}
)),
on_p2p_channels_rpc_timeout: Some(redux::callback!(
on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action{
on_p2p_channels_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
P2pCallbacksAction::P2pChannelsRpcTimeout { peer_id, id }
}
)),
on_p2p_channels_rpc_response_received: Some(redux::callback!(
on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action{
on_p2p_channels_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<Box<P2pRpcResponse>>)) -> crate::Action {
P2pCallbacksAction::P2pChannelsRpcResponseReceived { peer_id, id, response }
}
)),
on_p2p_channels_rpc_request_received: Some(redux::callback!(
on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action{
on_p2p_channels_rpc_request_received((peer_id: PeerId, id: P2pRpcId, request: Box<P2pRpcRequest>)) -> crate::Action {
P2pCallbacksAction::P2pChannelsRpcRequestReceived { peer_id, id, request }
}
)),
on_p2p_channels_streaming_rpc_response_received: Some(redux::callback!(
on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action{
on_p2p_channels_streaming_rpc_response_received((peer_id: PeerId, id: P2pRpcId, response: Option<P2pStreamingRpcResponseFull>)) -> crate::Action {
P2pCallbacksAction::P2pChannelsStreamingRpcResponseReceived { peer_id, id, response }
}
)),
on_p2p_channels_streaming_rpc_timeout: Some(redux::callback!(
on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action{
on_p2p_channels_streaming_rpc_timeout((peer_id: PeerId, id: P2pRpcId)) -> crate::Action {
P2pCallbacksAction::P2pChannelsStreamingRpcTimeout { peer_id, id }
}
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl TransactionPoolCandidatesState {

let transaction_hashes = batch.iter().map(|tx| tx.hash().clone()).collect();
dispatcher.push(TransactionPoolAction::StartVerify {
commands: batch.into_iter().map(|tx| tx.into_body()).collect(),
commands: batch.into_iter().collect(),
from_rpc: None,
});
dispatcher.push(TransactionPoolCandidateAction::VerifyPending {
Expand Down
20 changes: 10 additions & 10 deletions node/src/transaction_pool/transaction_pool_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ use ledger::{
},
Account, AccountId,
};
use mina_p2p_messages::{
list::List,
v2::{self, LedgerHash},
};
use openmina_core::{requests::RpcId, ActionEvent};
use mina_p2p_messages::{list::List, v2};
use openmina_core::{requests::RpcId, transaction::TransactionWithHash, ActionEvent};
use redux::Callback;
use serde::{Deserialize, Serialize};

Expand All @@ -25,7 +22,7 @@ pub type TransactionPoolActionWithMetaRef<'a> = redux::ActionWithMeta<&'a Transa
pub enum TransactionPoolAction {
Candidate(TransactionPoolCandidateAction),
StartVerify {
commands: List<v2::MinaBaseUserCommandStableV2>,
commands: List<TransactionWithHash>,
from_rpc: Option<RpcId>,
},
StartVerifyWithAccounts {
Expand All @@ -38,13 +35,13 @@ pub enum TransactionPoolAction {
errors: Vec<String>,
},
BestTipChanged {
best_tip_hash: LedgerHash,
best_tip_hash: v2::LedgerHash,
},
BestTipChangedWithAccounts {
accounts: BTreeMap<AccountId, Account>,
},
ApplyVerifiedDiff {
best_tip_hash: LedgerHash,
best_tip_hash: v2::LedgerHash,
diff: DiffVerified,
/// Diff was crearted locally, or from remote peer ?
is_sender_local: bool,
Expand All @@ -55,7 +52,7 @@ pub enum TransactionPoolAction {
pending_id: PendingId,
},
ApplyTransitionFrontierDiff {
best_tip_hash: LedgerHash,
best_tip_hash: v2::LedgerHash,
diff: BestTipDiff,
},
ApplyTransitionFrontierDiffWithAccounts {
Expand All @@ -81,6 +78,9 @@ impl redux::EnablingCondition<crate::State> for TransactionPoolAction {
fn is_enabled(&self, state: &crate::State, time: redux::Timestamp) -> bool {
match self {
TransactionPoolAction::Candidate(a) => a.is_enabled(state, time),
TransactionPoolAction::StartVerify { commands, .. } => commands
.iter()
.any(|cmd| !state.transaction_pool.contains(cmd.hash())),
TransactionPoolAction::P2pSendAll => true,
TransactionPoolAction::P2pSend { peer_id } => state
.p2p
Expand Down Expand Up @@ -125,7 +125,7 @@ type TransactionPoolEffectfulActionCallback = Callback<(
pub enum TransactionPoolEffectfulAction {
FetchAccounts {
account_ids: BTreeSet<AccountId>,
ledger_hash: LedgerHash,
ledger_hash: v2::LedgerHash,
on_result: TransactionPoolEffectfulActionCallback,
pending_id: Option<PendingId>,
from_rpc: Option<RpcId>,
Expand Down
7 changes: 6 additions & 1 deletion node/src/transaction_pool/transaction_pool_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use ledger::{
Account, AccountId,
};
use openmina_core::{
bug_condition, constants::constraint_constants, requests::RpcId, transaction::Transaction,
bug_condition,
constants::constraint_constants,
requests::RpcId,
transaction::{Transaction, TransactionWithHash},
};
use p2p::channels::transaction::P2pChannelsTransactionAction;
use redux::callback;
Expand Down Expand Up @@ -56,6 +59,7 @@ impl TransactionPoolState {
TransactionPoolAction::StartVerify { commands, from_rpc } => {
let Ok(commands) = commands
.iter()
.map(TransactionWithHash::body)
.map(UserCommand::try_from)
.collect::<Result<Vec<_>, _>>()
else {
Expand Down Expand Up @@ -95,6 +99,7 @@ impl TransactionPoolState {
// TODO: Convert those commands only once
let Ok(commands) = commands
.iter()
.map(TransactionWithHash::body)
.map(UserCommand::try_from)
.collect::<Result<Vec<_>, _>>()
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
P2pNetworkPubsubAction, P2pState,
};
use mina_p2p_messages::{gossip::GossipNetMessageV2, v2};
use openmina_core::{bug_condition, Substate};
use openmina_core::{bug_condition, transaction::TransactionWithHash, Substate};
use redux::ActionWithMeta;

impl P2pChannelsTransactionState {
Expand Down Expand Up @@ -220,7 +220,9 @@ impl P2pChannelsTransactionState {
.callbacks
.on_p2p_channels_transaction_libp2p_received
{
dispatcher.push_callback(callback.clone(), transaction);
if let Ok(transaction) = TransactionWithHash::try_new(*transaction) {
dispatcher.push_callback(callback.clone(), Box::new(transaction));
}
}

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions p2p/src/p2p_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use openmina_core::{
impl_substate_access,
requests::RpcId,
snark::{Snark, SnarkInfo, SnarkJobCommitment},
transaction::TransactionInfo,
transaction::{TransactionInfo, TransactionWithHash},
ChainId, SubstateAccess,
};
use redux::{Callback, Timestamp};
Expand Down Expand Up @@ -35,7 +35,7 @@ use crate::{
Limit, P2pConfig, P2pLimits, P2pNetworkKadState, P2pNetworkPubsubState,
P2pNetworkSchedulerState, P2pTimeouts, PeerId,
};
use mina_p2p_messages::v2::{MinaBaseUserCommandStableV2, MinaBlockBlockStableV2};
use mina_p2p_messages::v2;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct P2pState {
Expand Down Expand Up @@ -496,8 +496,7 @@ pub struct P2pCallbacks {
/// Callback for [`P2pChannelsTransactionAction::Received`]
pub on_p2p_channels_transaction_received: OptionalCallback<(PeerId, Box<TransactionInfo>)>,
/// Callback for [`P2pChannelsTransactionAction::Libp2pReceived`]
pub on_p2p_channels_transaction_libp2p_received:
OptionalCallback<Box<MinaBaseUserCommandStableV2>>,
pub on_p2p_channels_transaction_libp2p_received: OptionalCallback<Box<TransactionWithHash>>,
/// Callback for [`P2pChannelsSnarkJobCommitmentAction::Received`]
pub on_p2p_channels_snark_job_commitment_received:
OptionalCallback<(PeerId, Box<SnarkJobCommitment>)>,
Expand Down Expand Up @@ -529,7 +528,8 @@ pub struct P2pCallbacks {
OptionalCallback<(RpcId, PeerId, P2pConnectionResponse)>,

/// Callback for [`P2pPeerAction::BestTipUpdate`]
pub on_p2p_peer_best_tip_update: OptionalCallback<BlockWithHash<Arc<MinaBlockBlockStableV2>>>,
pub on_p2p_peer_best_tip_update:
OptionalCallback<BlockWithHash<Arc<v2::MinaBlockBlockStableV2>>>,

/// Callback for [`P2pChannelsRpcAction::Ready`]
pub on_p2p_channels_rpc_ready: OptionalCallback<PeerId>,
Expand Down
Loading