Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions p2p/src/network/p2p_network_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ impl P2pNetworkState {
.get(a.peer_id())
.and_then(|cn| cn.get(&stream_id))
}),
RpcStreamId::WithQuery(id) => self
.scheduler
.rpc_incoming_streams
.get(a.peer_id())
.and_then(|streams| {
streams.iter().find_map(|(_, state)| {
if state
.pending
.as_ref()
.map_or(false, |query_header| query_header.id == id)
{
Some(state)
} else {
None
}
})
}),
RpcStreamId::AnyIncoming => self
.scheduler
.rpc_incoming_streams
Expand Down Expand Up @@ -175,6 +192,23 @@ impl P2pNetworkState {
.get_mut(a.peer_id())
.and_then(|cn| cn.get_mut(&stream_id))
}),
RpcStreamId::WithQuery(id) => self
.scheduler
.rpc_incoming_streams
.get_mut(a.peer_id())
.and_then(|streams| {
streams.iter_mut().find_map(|(_, state)| {
if state
.pending
.as_ref()
.map_or(false, |query_header| query_header.id == id)
{
Some(state)
} else {
None
}
})
}),
RpcStreamId::AnyIncoming => {
if let Some(streams) = self.scheduler.rpc_incoming_streams.get_mut(a.peer_id()) {
if let Some((k, _)) = streams.first_key_value() {
Expand Down
12 changes: 10 additions & 2 deletions p2p/src/network/rpc/p2p_network_rpc_actions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::net::SocketAddr;

use mina_p2p_messages::rpc_kernel::{QueryHeader, ResponseHeader};
use mina_p2p_messages::rpc_kernel::{QueryHeader, QueryID, ResponseHeader};
use openmina_core::{action_debug, action_trace, ActionEvent};
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -55,6 +55,7 @@ pub enum P2pNetworkRpcAction {

pub enum RpcStreamId {
Exact(StreamId),
WithQuery(QueryID),
AnyIncoming,
AnyOutgoing,
}
Expand All @@ -67,7 +68,10 @@ impl P2pNetworkRpcAction {
Self::IncomingMessage { stream_id, .. } => RpcStreamId::Exact(*stream_id),
Self::PrunePending { stream_id, .. } => RpcStreamId::Exact(*stream_id),
Self::OutgoingQuery { .. } => RpcStreamId::AnyOutgoing,
Self::OutgoingResponse { .. } => RpcStreamId::AnyOutgoing,
Self::OutgoingResponse {
response: ResponseHeader { id },
..
} => RpcStreamId::WithQuery(*id),
Self::OutgoingData { stream_id, .. } => RpcStreamId::Exact(*stream_id),
}
}
Expand Down Expand Up @@ -146,20 +150,23 @@ fn log_message<T>(
match message {
RpcMessage::Handshake => action_trace!(
context,
kind = "P2pNetworkRpcIncomingMessage",
addr = display(addr),
peer_id = display(peer_id),
stream_id,
message_kind = "handshake"
),
RpcMessage::Heartbeat => action_trace!(
context,
kind = "P2pNetworkRpcIncomingMessage",
addr = display(addr),
peer_id = display(peer_id),
stream_id,
message_kind = "heartbeat"
),
RpcMessage::Query { header, .. } => action_debug!(
context,
kind = "P2pNetworkRpcIncomingMessage",
addr = display(addr),
peer_id = display(peer_id),
stream_id,
Expand All @@ -168,6 +175,7 @@ fn log_message<T>(
),
RpcMessage::Response { header, .. } => action_debug!(
context,
kind = "P2pNetworkRpcIncomingMessage",
addr = display(addr),
peer_id = display(peer_id),
stream_id,
Expand Down
12 changes: 10 additions & 2 deletions p2p/src/network/rpc/p2p_network_rpc_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ impl P2pNetworkRpcAction {
Store: crate::P2pStore<S>,
{
let Some(state) = store.state().network.find_rpc_state(&self) else {
error!(meta.time(); "cannot find stream for response: {self:?}");
return;
};

Expand Down Expand Up @@ -355,10 +356,17 @@ impl P2pNetworkRpcAction {
response,
data,
} => {
if !matches!(state.pending, Some(QueryHeader { id, .. }) if id == response.id) {
openmina_core::error!(meta.time(); "pending query does not match the response");
return;
}
let stream_id = state.stream_id;
let addr = state.addr;
store.dispatch(P2pNetworkRpcAction::PrunePending { peer_id, stream_id });
store.dispatch(P2pNetworkRpcAction::OutgoingData {
addr: state.addr,
addr,
peer_id,
stream_id: state.stream_id,
stream_id,
data: RpcMessage::Response {
header: response,
bytes: data,
Expand Down
7 changes: 7 additions & 0 deletions p2p/src/network/rpc/p2p_network_rpc_reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,14 @@ impl P2pNetworkRpcState {
} else {
openmina_core::error!(action.time(); "receiving response without query");
}
} else if let RpcMessage::Query { header, .. } = message {
if self.pending.is_none() {
self.pending = Some(header.clone());
} else {
openmina_core::error!(action.time(); "receiving query while another query is pending");
}
}

self.incoming.pop_front();
}
P2pNetworkRpcAction::PrunePending { .. } => {
Expand Down
8 changes: 4 additions & 4 deletions p2p/testing/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ use crate::{
test_node::TestNode,
};

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub enum PeerIdConfig {
#[default]
Derived,
Bytes([u8; 32]),
}

#[derive(Debug, derive_more::From)]
#[derive(Debug, Clone, derive_more::From)]
pub enum Listener {
Rust(RustNodeId),
Libp2p(Libp2pNodeId),
Expand Down Expand Up @@ -232,10 +232,10 @@ impl Cluster {
identity_pub_key: secret_key.public_key(),
initial_peers,
ask_initial_peers_interval: Duration::from_secs(5),
enabled_channels: Default::default(),
enabled_channels: p2p::channels::ChannelId::for_libp2p().collect(),
max_peers: 100,
chain_id: self.chain_id.clone(),
peer_discovery: true,
peer_discovery: config.discovery,
timeouts: config.timeouts,
};

Expand Down
54 changes: 54 additions & 0 deletions p2p/testing/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::{IpAddr, SocketAddr};

use p2p::{
channels::{rpc::P2pChannelsRpcAction, P2pChannelsAction},
connection::{incoming::P2pConnectionIncomingAction, outgoing::P2pConnectionOutgoingAction},
disconnection::P2pDisconnectionAction,
P2pAction, P2pEvent, PeerId,
Expand Down Expand Up @@ -31,6 +32,19 @@ pub enum RustNodeEvent {
peer_id: PeerId,
reason: String,
},
RpcChannelReady {
peer_id: PeerId,
},
RpcChannelRequestReceived {
peer_id: PeerId,
id: p2p::channels::rpc::P2pRpcId,
request: p2p::channels::rpc::P2pRpcRequest,
},
RpcChannelResponseReceived {
peer_id: PeerId,
id: p2p::channels::rpc::P2pRpcId,
response: Option<p2p::channels::rpc::P2pRpcResponse>,
},
P2p {
event: P2pEvent,
},
Expand Down Expand Up @@ -96,6 +110,46 @@ pub(super) fn event_mapper_effect(store: &mut super::redux::Store, action: P2pAc
},
),

P2pAction::Channels(action) => match action {
P2pChannelsAction::Rpc(action) => match action {
P2pChannelsRpcAction::Ready { peer_id } => {
store_event(store, RustNodeEvent::RpcChannelReady { peer_id })
}
P2pChannelsRpcAction::RequestReceived {
peer_id,
id,
request,
} => {
if matches!(store.service.peek_rust_node_event(), Some(RustNodeEvent::RpcChannelReady { peer_id: pid }) if pid == &peer_id )
{
store.service.rust_node_event();
}
store_event(
store,
RustNodeEvent::RpcChannelRequestReceived {
peer_id,
id,
request,
},
)
}
P2pChannelsRpcAction::ResponseReceived {
peer_id,
id,
response,
} => store_event(
store,
RustNodeEvent::RpcChannelResponseReceived {
peer_id,
id,
response,
},
),
_ => {}
},
_ => {}
},

P2pAction::Network(p2p::P2pNetworkAction::Scheduler(action)) => match action {
p2p::P2pNetworkSchedulerAction::InterfaceDetected { ip } => {
store_event(store, RustNodeEvent::Interface { addr: ip })
Expand Down
1 change: 1 addition & 0 deletions p2p/testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ pub mod cluster;
pub mod predicates;
pub mod service;
pub mod stream;
pub mod utils;

pub use futures;
73 changes: 72 additions & 1 deletion p2p/testing/src/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::future::{ready, Ready};
use std::{
collections::{BTreeSet, HashSet},
future::{ready, Ready},
};

use p2p::PeerId;

Expand All @@ -21,6 +24,43 @@ pub fn listener_is_ready(id: RustNodeId) -> impl FnMut(ClusterEvent) -> Ready<bo
}
}

/// Predicate returning true for a cluster event corresponging to the specified node started listening.
pub fn listeners_are_ready<I>(ids: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
where
I: IntoIterator<Item = RustNodeId>,
{
let mut ids: HashSet<RustNodeId> = HashSet::from_iter(ids.into_iter());
move |event| {
ready(
if let Some((event_id, RustNodeEvent::ListenerReady { .. })) = event.rust() {
ids.remove(event_id) && ids.is_empty()
} else {
false
},
)
}
}

pub fn nodes_peers_are_ready<I>(nodes_peers: I) -> impl FnMut(ClusterEvent) -> Ready<bool>
where
I: IntoIterator<Item = (RustNodeId, PeerId)>,
{
let mut nodes_peers = BTreeSet::from_iter(nodes_peers.into_iter());
move |event| {
ready(
if let ClusterEvent::Rust {
id,
event: RustNodeEvent::PeerConnected { peer_id, .. },
} = event
{
nodes_peers.remove(&(id, peer_id)) && nodes_peers.is_empty()
} else {
false
},
)
}
}

/// Predicate returning true when encountered an event signalling that the peer `peer_id` is connected to the node `id`.
pub fn peer_is_connected(
id: RustNodeId,
Expand Down Expand Up @@ -63,3 +103,34 @@ pub fn default_errors(event: &ClusterEvent) -> bool {
_ => false,
}
}

/// For an event for a rust node _id_, that `f` maps to `Some(v)`,
/// removes the pair `(id, v)` from the `nodes_items`, returning `true` if it
/// runs out.
pub fn all_nodes_with_value<T, I, F>(
nodes_items: I,
mut f: F,
) -> impl FnMut(ClusterEvent) -> Ready<bool>
where
T: PartialEq + Eq,
I: IntoIterator<Item = (RustNodeId, T)>,
F: FnMut(RustNodeEvent) -> Option<T>,
{
let mut nodes_items = Vec::from_iter(nodes_items.into_iter());
move |event| {
ready(if let ClusterEvent::Rust { id, event } = event {
f(event)
.and_then(|v| {
nodes_items
.iter()
.position(|(_id, _v)| _id == &id && _v == &v)
})
.map_or(false, |i| {
nodes_items.swap_remove(i);
nodes_items.is_empty()
})
} else {
false
})
}
}
12 changes: 9 additions & 3 deletions p2p/testing/src/rust_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use crate::{
test_node::TestNode,
};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RustNodeId(pub(super) usize);

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct RustNodeConfig {
pub peer_id: PeerIdConfig,
pub initial_peers: Vec<Listener>,
pub timeouts: P2pTimeouts,
pub discovery: bool,
}

impl RustNodeConfig {
Expand All @@ -41,6 +42,11 @@ impl RustNodeConfig {
self.timeouts = timeouts;
self
}

pub fn with_discovery(mut self, discovery: bool) -> Self {
self.discovery = discovery;
self
}
}

pub struct RustNode {
Expand All @@ -56,7 +62,7 @@ impl RustNode {
}
}

pub(super) fn dispatch_action<A>(&mut self, action: A) -> bool
pub fn dispatch_action<A>(&mut self, action: A) -> bool
where
A: Into<P2pAction> + EnablingCondition<P2pState>,
{
Expand Down
Loading