Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 116 additions & 2 deletions node/src/action_kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ use crate::ledger::read::LedgerReadAction;
use crate::ledger::write::LedgerWriteAction;
use crate::ledger::LedgerAction;
use crate::p2p::channels::best_tip::P2pChannelsBestTipAction;
use crate::p2p::channels::best_tip_effectful::P2pChannelsBestTipEffectfulAction;
use crate::p2p::channels::rpc::P2pChannelsRpcAction;
use crate::p2p::channels::rpc_effectful::P2pChannelsRpcEffectfulAction;
use crate::p2p::channels::snark::P2pChannelsSnarkAction;
use crate::p2p::channels::snark_effectful::P2pChannelsSnarkEffectfulAction;
use crate::p2p::channels::snark_job_commitment::P2pChannelsSnarkJobCommitmentAction;
use crate::p2p::channels::snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction;
use crate::p2p::channels::streaming_rpc::P2pChannelsStreamingRpcAction;
use crate::p2p::channels::streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction;
use crate::p2p::channels::transaction::P2pChannelsTransactionAction;
use crate::p2p::channels::{P2pChannelsAction, P2pChannelsMessageReceivedAction};
use crate::p2p::channels::transaction_effectful::P2pChannelsTransactionEffectfulAction;
use crate::p2p::channels::{
P2pChannelsAction, P2pChannelsEffectfulAction, P2pChannelsMessageReceivedAction,
};
use crate::p2p::connection::incoming::P2pConnectionIncomingAction;
use crate::p2p::connection::incoming_effectful::P2pConnectionIncomingEffectfulAction;
use crate::p2p::connection::outgoing::P2pConnectionOutgoingAction;
Expand Down Expand Up @@ -173,6 +181,9 @@ pub enum ActionKind {
P2pChannelsBestTipRequestReceived,
P2pChannelsBestTipRequestSend,
P2pChannelsBestTipResponseSend,
P2pChannelsBestTipEffectfulInit,
P2pChannelsBestTipEffectfulRequestSend,
P2pChannelsBestTipEffectfulResponseSend,
P2pChannelsMessageReceived,
P2pChannelsRpcInit,
P2pChannelsRpcPending,
Expand All @@ -183,6 +194,9 @@ pub enum ActionKind {
P2pChannelsRpcResponseReceived,
P2pChannelsRpcResponseSend,
P2pChannelsRpcTimeout,
P2pChannelsRpcEffectfulInit,
P2pChannelsRpcEffectfulRequestSend,
P2pChannelsRpcEffectfulResponseSend,
P2pChannelsSnarkInit,
P2pChannelsSnarkLibp2pBroadcast,
P2pChannelsSnarkLibp2pReceived,
Expand All @@ -193,6 +207,9 @@ pub enum ActionKind {
P2pChannelsSnarkRequestReceived,
P2pChannelsSnarkRequestSend,
P2pChannelsSnarkResponseSend,
P2pChannelsSnarkEffectfulInit,
P2pChannelsSnarkEffectfulRequestSend,
P2pChannelsSnarkEffectfulResponseSend,
P2pChannelsSnarkJobCommitmentInit,
P2pChannelsSnarkJobCommitmentPending,
P2pChannelsSnarkJobCommitmentPromiseReceived,
Expand All @@ -201,6 +218,9 @@ pub enum ActionKind {
P2pChannelsSnarkJobCommitmentRequestReceived,
P2pChannelsSnarkJobCommitmentRequestSend,
P2pChannelsSnarkJobCommitmentResponseSend,
P2pChannelsSnarkJobCommitmentEffectfulInit,
P2pChannelsSnarkJobCommitmentEffectfulRequestSend,
P2pChannelsSnarkJobCommitmentEffectfulResponseSend,
P2pChannelsStreamingRpcInit,
P2pChannelsStreamingRpcPending,
P2pChannelsStreamingRpcReady,
Expand All @@ -215,6 +235,11 @@ pub enum ActionKind {
P2pChannelsStreamingRpcResponseSendInit,
P2pChannelsStreamingRpcResponseSent,
P2pChannelsStreamingRpcTimeout,
P2pChannelsStreamingRpcEffectfulInit,
P2pChannelsStreamingRpcEffectfulRequestSend,
P2pChannelsStreamingRpcEffectfulResponseNextPartGet,
P2pChannelsStreamingRpcEffectfulResponsePartSend,
P2pChannelsStreamingRpcEffectfulResponseSendInit,
P2pChannelsTransactionInit,
P2pChannelsTransactionLibp2pBroadcast,
P2pChannelsTransactionLibp2pReceived,
Expand All @@ -225,6 +250,9 @@ pub enum ActionKind {
P2pChannelsTransactionRequestReceived,
P2pChannelsTransactionRequestSend,
P2pChannelsTransactionResponseSend,
P2pChannelsTransactionEffectfulInit,
P2pChannelsTransactionEffectfulRequestSend,
P2pChannelsTransactionEffectfulResponseSend,
P2pConnectionIncomingAnswerReady,
P2pConnectionIncomingAnswerSdpCreateError,
P2pConnectionIncomingAnswerSdpCreatePending,
Expand Down Expand Up @@ -568,7 +596,7 @@ pub enum ActionKind {
}

impl ActionKind {
pub const COUNT: u16 = 470;
pub const COUNT: u16 = 490;
}

impl std::fmt::Display for ActionKind {
Expand Down Expand Up @@ -626,6 +654,7 @@ impl ActionKindGet for P2pAction {
Self::DisconnectionEffectful(a) => a.kind(),
Self::Identify(a) => a.kind(),
Self::Channels(a) => a.kind(),
Self::ChannelsEffectful(a) => a.kind(),
Self::Peer(a) => a.kind(),
Self::Network(a) => a.kind(),
}
Expand Down Expand Up @@ -961,6 +990,19 @@ impl ActionKindGet for P2pChannelsAction {
}
}

impl ActionKindGet for P2pChannelsEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::BestTip(a) => a.kind(),
Self::Rpc(a) => a.kind(),
Self::Snark(a) => a.kind(),
Self::SnarkJobCommitment(a) => a.kind(),
Self::StreamingRpc(a) => a.kind(),
Self::Transaction(a) => a.kind(),
}
}
}

impl ActionKindGet for P2pPeerAction {
fn kind(&self) -> ActionKind {
match self {
Expand Down Expand Up @@ -1415,6 +1457,78 @@ impl ActionKindGet for P2pChannelsStreamingRpcAction {
}
}

impl ActionKindGet for P2pChannelsBestTipEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsBestTipEffectfulInit,
Self::RequestSend { .. } => ActionKind::P2pChannelsBestTipEffectfulRequestSend,
Self::ResponseSend { .. } => ActionKind::P2pChannelsBestTipEffectfulResponseSend,
}
}
}

impl ActionKindGet for P2pChannelsRpcEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsRpcEffectfulInit,
Self::RequestSend { .. } => ActionKind::P2pChannelsRpcEffectfulRequestSend,
Self::ResponseSend { .. } => ActionKind::P2pChannelsRpcEffectfulResponseSend,
}
}
}

impl ActionKindGet for P2pChannelsSnarkEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsSnarkEffectfulInit,
Self::RequestSend { .. } => ActionKind::P2pChannelsSnarkEffectfulRequestSend,
Self::ResponseSend { .. } => ActionKind::P2pChannelsSnarkEffectfulResponseSend,
}
}
}

impl ActionKindGet for P2pChannelsSnarkJobCommitmentEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsSnarkJobCommitmentEffectfulInit,
Self::RequestSend { .. } => {
ActionKind::P2pChannelsSnarkJobCommitmentEffectfulRequestSend
}
Self::ResponseSend { .. } => {
ActionKind::P2pChannelsSnarkJobCommitmentEffectfulResponseSend
}
}
}
}

impl ActionKindGet for P2pChannelsStreamingRpcEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsStreamingRpcEffectfulInit,
Self::RequestSend { .. } => ActionKind::P2pChannelsStreamingRpcEffectfulRequestSend,
Self::ResponseNextPartGet { .. } => {
ActionKind::P2pChannelsStreamingRpcEffectfulResponseNextPartGet
}
Self::ResponseSendInit { .. } => {
ActionKind::P2pChannelsStreamingRpcEffectfulResponseSendInit
}
Self::ResponsePartSend { .. } => {
ActionKind::P2pChannelsStreamingRpcEffectfulResponsePartSend
}
}
}
}

impl ActionKindGet for P2pChannelsTransactionEffectfulAction {
fn kind(&self) -> ActionKind {
match self {
Self::Init { .. } => ActionKind::P2pChannelsTransactionEffectfulInit,
Self::RequestSend { .. } => ActionKind::P2pChannelsTransactionEffectfulRequestSend,
Self::ResponseSend { .. } => ActionKind::P2pChannelsTransactionEffectfulResponseSend,
}
}
}

impl ActionKindGet for P2pNetworkSchedulerAction {
fn kind(&self) -> ActionKind {
match self {
Expand Down
11 changes: 11 additions & 0 deletions node/src/logger/logger_effects.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use openmina_core::log::inner::field::{display, DisplayValue};
use openmina_core::log::inner::Value;
use openmina_core::log::{time_to_str, ActionEvent, EventContext};
use p2p::channels::P2pChannelsEffectfulAction;
use p2p::connection::P2pConnectionEffectfulAction;
use p2p::{P2pNetworkConnectionError, P2pNetworkSchedulerAction, PeerId};

Expand Down Expand Up @@ -70,6 +71,16 @@ pub fn logger_effects<S: Service>(store: &Store<S>, action: ActionWithMetaRef<'_
P2pChannelsAction::Rpc(action) => action.action_event(&context),
P2pChannelsAction::StreamingRpc(action) => action.action_event(&context),
},
P2pAction::ChannelsEffectful(action) => match action {
P2pChannelsEffectfulAction::BestTip(action) => action.action_event(&context),
P2pChannelsEffectfulAction::Rpc(action) => action.action_event(&context),
P2pChannelsEffectfulAction::StreamingRpc(action) => action.action_event(&context),
P2pChannelsEffectfulAction::SnarkJobCommitment(action) => {
action.action_event(&context)
}
P2pChannelsEffectfulAction::Snark(action) => action.action_event(&context),
P2pChannelsEffectfulAction::Transaction(action) => action.action_event(&context),
},
P2pAction::Peer(action) => action.action_event(&context),
P2pAction::Network(action) => match action {
P2pNetworkAction::Scheduler(action) => match action {
Expand Down
13 changes: 13 additions & 0 deletions node/src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
pub use ::p2p::*;
use p2p::channels::{
best_tip_effectful::P2pChannelsBestTipEffectfulAction,
rpc_effectful::P2pChannelsRpcEffectfulAction, snark_effectful::P2pChannelsSnarkEffectfulAction,
snark_job_commitment_effectful::P2pChannelsSnarkJobCommitmentEffectfulAction,
streaming_rpc_effectful::P2pChannelsStreamingRpcEffectfulAction,
transaction_effectful::P2pChannelsTransactionEffectfulAction,
};

pub mod channels;
pub mod connection;
Expand Down Expand Up @@ -114,5 +121,11 @@ impl_into_global_action!(p2p::P2pNetworkPnetEffectfulAction);
impl_into_global_action!(connection::incoming_effectful::P2pConnectionIncomingEffectfulAction);
impl_into_global_action!(connection::outgoing_effectful::P2pConnectionOutgoingEffectfulAction);
impl_into_global_action!(p2p::disconnection_effectful::P2pDisconnectionEffectfulAction);
impl_into_global_action!(P2pChannelsBestTipEffectfulAction);
impl_into_global_action!(P2pChannelsStreamingRpcEffectfulAction);
impl_into_global_action!(P2pChannelsTransactionEffectfulAction);
impl_into_global_action!(P2pChannelsSnarkJobCommitmentEffectfulAction);
impl_into_global_action!(P2pChannelsRpcEffectfulAction);
impl_into_global_action!(P2pChannelsSnarkEffectfulAction);

impl p2p::P2pActionTrait<crate::State> for crate::Action {}
54 changes: 25 additions & 29 deletions node/src/p2p/p2p_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use p2p::channels::streaming_rpc::{
P2pChannelsStreamingRpcAction, P2pStreamingRpcRequest, P2pStreamingRpcResponseFull,
};
use p2p::channels::transaction::P2pChannelsTransactionAction;
use p2p::channels::P2pChannelsEffectfulAction;
use p2p::connection::P2pConnectionEffectfulAction;
use p2p::P2pInitializeAction;

Expand Down Expand Up @@ -193,8 +194,8 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
},
P2pAction::DisconnectionEffectful(action) => action.effects(&meta, store),
P2pAction::Channels(action) => match action {
P2pChannelsAction::MessageReceived(action) => {
action.effects(&meta, store);
P2pChannelsAction::MessageReceived(_) => {
// handled by reducer
}
P2pChannelsAction::BestTip(action) => {
if let P2pChannelsBestTipAction::RequestReceived { peer_id } = action {
Expand All @@ -205,11 +206,8 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
});
}
}
action.effects(&meta, store);
}
P2pChannelsAction::Transaction(action) => {
// TODO: does the order matter here? if not this clone can be removed
action.clone().effects(&meta, store);
match action {
P2pChannelsTransactionAction::Received {
peer_id: _,
Expand All @@ -231,28 +229,22 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
_ => {}
}
}
P2pChannelsAction::Snark(action) => {
// TODO: does the order matter here? if not this clone can be removed
action.clone().effects(&meta, store);
match action {
P2pChannelsSnarkAction::Received { peer_id, snark } => {
store.dispatch(SnarkPoolCandidateAction::InfoReceived {
peer_id,
info: *snark,
});
}
P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
store.dispatch(SnarkPoolCandidateAction::WorkReceived {
peer_id,
work: *snark,
});
}
_ => {}
P2pChannelsAction::Snark(action) => match action {
P2pChannelsSnarkAction::Received { peer_id, snark } => {
store.dispatch(SnarkPoolCandidateAction::InfoReceived {
peer_id,
info: *snark,
});
}
}
P2pChannelsSnarkAction::Libp2pReceived { peer_id, snark, .. } => {
store.dispatch(SnarkPoolCandidateAction::WorkReceived {
peer_id,
work: *snark,
});
}
_ => {}
},
P2pChannelsAction::SnarkJobCommitment(action) => {
// TODO: does the order matter here? if not this clone can be removed
action.clone().effects(&meta, store);
if let P2pChannelsSnarkJobCommitmentAction::Received {
peer_id,
commitment,
Expand All @@ -265,8 +257,6 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
}
}
P2pChannelsAction::Rpc(action) => {
// TODO: does the order matter here? if not this clone can be removed
action.clone().effects(&meta, store);
match action {
P2pChannelsRpcAction::Ready { peer_id } => {
store.dispatch(P2pChannelsRpcAction::RequestSend {
Expand Down Expand Up @@ -568,8 +558,6 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
}
}
P2pChannelsAction::StreamingRpc(action) => {
// TODO: does the order matter here? if not this clone can be removed
action.clone().effects(&meta, store);
match action {
P2pChannelsStreamingRpcAction::Ready { .. } => {
store
Expand Down Expand Up @@ -647,6 +635,14 @@ pub fn node_p2p_effects<S: Service>(store: &mut Store<S>, action: P2pActionWithM
}
}
},
P2pAction::ChannelsEffectful(action) => match action {
P2pChannelsEffectfulAction::BestTip(action) => action.effects(&meta, store),
P2pChannelsEffectfulAction::Transaction(action) => action.effects(&meta, store),
P2pChannelsEffectfulAction::StreamingRpc(action) => action.effects(&meta, store),
P2pChannelsEffectfulAction::Snark(action) => action.effects(&meta, store),
P2pChannelsEffectfulAction::Rpc(action) => action.effects(&meta, store),
P2pChannelsEffectfulAction::SnarkJobCommitment(action) => action.effects(&meta, store),
},
P2pAction::Peer(action) => match action {
P2pPeerAction::Discovered { .. }
| P2pPeerAction::Ready { .. }
Expand Down
2 changes: 0 additions & 2 deletions p2p/src/channels/best_tip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ pub use p2p_channels_best_tip_actions::*;

mod p2p_channels_best_tip_reducer;

mod p2p_channels_best_tip_effects;

use binprot_derive::{BinProtRead, BinProtWrite};
use openmina_core::block::ArcBlock;
use serde::{Deserialize, Serialize};
Expand Down
3 changes: 0 additions & 3 deletions p2p/src/channels/best_tip/p2p_channels_best_tip_actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ use crate::{

use super::BestTipPropagationState;

pub type P2pChannelsBestTipActionWithMetaRef<'a> =
redux::ActionWithMeta<&'a P2pChannelsBestTipAction>;

#[derive(Debug, Clone, Serialize, Deserialize, ActionEvent)]
#[action_event(fields(display(peer_id), best_tip = display(&best_tip.hash)))]
pub enum P2pChannelsBestTipAction {
Expand Down
Loading
Loading