Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/channels.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub use tokio::sync::{mpsc, oneshot};
pub use tokio::sync::{broadcast, mpsc, oneshot};
4 changes: 4 additions & 0 deletions node/common/src/service/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl webrtc_with_libp2p::P2pServiceWebrtcWithLibp2p for NodeService {
fn mio(&mut self) -> &mut mio::MioService {
&mut self.p2p.mio
}

fn connections(&self) -> std::collections::BTreeSet<PeerId> {
self.p2p.webrtc.peers.keys().copied().collect()
}
}

#[cfg(feature = "p2p-libp2p")]
Expand Down
4 changes: 4 additions & 0 deletions node/common/src/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl node::Service for NodeService {
fn recorder(&mut self) -> &mut Recorder {
&mut self.recorder
}

fn is_replay(&self) -> bool {
self.replayer.is_some()
}
}

impl redux::TimeService for NodeService {
Expand Down
4 changes: 4 additions & 0 deletions node/invariants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ pub use invariant_result::{InvariantIgnoreReason, InvariantResult};
pub mod no_recursion;
use no_recursion::*;

pub mod p2p;
use p2p::*;

pub mod transition_frontier;
use transition_frontier::*;

Expand Down Expand Up @@ -104,6 +107,7 @@ macro_rules! define_invariants_enum {

define_invariants_enum! {
NoRecursion,
P2pStatesAreConsistent,
TransitionFrontierOnlySyncsToBetterBlocks,
}

Expand Down
2 changes: 2 additions & 0 deletions node/invariants/src/p2p/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod states_are_consistent;
pub use states_are_consistent::*;
58 changes: 58 additions & 0 deletions node/invariants/src/p2p/states_are_consistent.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::collections::BTreeSet;

use node::{p2p::PeerId, ActionKind, ActionWithMeta, Service, Store};

use crate::{Invariant, InvariantResult};

/// Makes sure that:
/// 1. For WebRTC peers, we have same number of peers in state and service.
/// 2. TODO: libp2p
#[derive(documented::Documented, Default, Clone, Copy)]
pub struct P2pStatesAreConsistent;

impl Invariant for P2pStatesAreConsistent {
type InternalState = ();
fn triggers(&self) -> &[ActionKind] {
&[ActionKind::P2pPeerReady, ActionKind::P2pDisconnectionFinish]
}

fn check<S: Service>(
self,
_: &mut Self::InternalState,
store: &Store<S>,
_action: &ActionWithMeta,
) -> InvariantResult {
if let Some((missing_connections, extra_connections)) =
self.webrtc_peer_inconsistencies(store)
{
return InvariantResult::Violation(format!("WebRTC inconsistency! missing_connections:\n{missing_connections:?}\nextra_connections:\n{extra_connections:?}"));
}

InvariantResult::Ok
}
}

impl P2pStatesAreConsistent {
fn webrtc_peer_inconsistencies<S: Service>(
self,
store: &Store<S>,
) -> Option<(BTreeSet<PeerId>, BTreeSet<PeerId>)> {
if store.service.is_replay() {
return None;
}
let p2p_state = store.state().p2p.ready()?;
let mut connections = store.service.connections();
let peers = p2p_state
.peers
.iter()
.filter(|(_, s)| !s.is_libp2p() && s.status.is_connected_or_connecting())
.map(|(peer_id, _)| *peer_id)
.filter(|peer_id| !connections.remove(peer_id))
.collect::<BTreeSet<_>>();

if !peers.is_empty() || !connections.is_empty() {
return Some((peers, connections));
}
None
}
}
1 change: 1 addition & 0 deletions node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ pub trait Service:
{
fn stats(&mut self) -> Option<&mut Stats>;
fn recorder(&mut self) -> &mut Recorder;
fn is_replay(&self) -> bool;
}
8 changes: 8 additions & 0 deletions node/testing/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ impl node::Service for NodeTestingService {
fn recorder(&mut self) -> &mut Recorder {
self.real.recorder()
}

fn is_replay(&self) -> bool {
self.is_replay
}
}

impl P2pCryptoService for NodeTestingService {
Expand Down Expand Up @@ -411,6 +415,10 @@ impl P2pServiceWebrtcWithLibp2p for NodeTestingService {
fn mio(&mut self) -> &mut node::p2p::service_impl::mio::MioService {
self.real.mio()
}

fn connections(&self) -> std::collections::BTreeSet<PeerId> {
self.real.connections()
}
}

impl SnarkBlockVerifyService for NodeTestingService {
Expand Down
2 changes: 1 addition & 1 deletion p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ p2p-testing = { path = "testing" }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
redux = { workspace = true, features=["serializable_callbacks"] }
tokio = { version = "1.26", features = ["rt"] }
webrtc = { git = "https://github.com/openmina/webrtc.git", branch = "openmina-v0.11.0", optional = true }
webrtc = { git = "https://github.com/openmina/webrtc.git", rev = "e8705db39af1b198b324a5db6ff57fb213ba75e9", optional = true }
reqwest = { version = "0.11", features = ["json"] }
mio = { version = "0.8.11", features = ["os-poll", "net"] }
libc = { version = "0.2.151" }
Expand Down
4 changes: 4 additions & 0 deletions p2p/src/connection/p2p_connection_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::collections::BTreeSet;

use crate::{identity::PublicKey, webrtc, PeerId};

use super::outgoing::P2pConnectionOutgoingInitOpts;

pub trait P2pConnectionService: redux::Service {
fn connections(&self) -> BTreeSet<PeerId>;

fn random_pick(
&mut self,
list: &[P2pConnectionOutgoingInitOpts],
Expand Down
Loading
Loading