Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/testing/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl Cluster {
initial_peers,
external_addrs: vec![],
enabled_channels: ChannelId::iter_all().collect(),
peer_discovery: true,
peer_discovery: testing_config.peer_discovery,
timeouts: testing_config.timeouts,
limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)),
meshsub: P2pMeshsubConfig {
Expand Down
8 changes: 8 additions & 0 deletions node/testing/src/node/rust/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct RustNodeTestingConfig {
pub libp2p_port: Option<u16>,
#[serde(default)]
pub recorder: Recorder,
pub peer_discovery: bool,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down Expand Up @@ -68,6 +69,7 @@ impl RustNodeTestingConfig {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
}
}

Expand All @@ -83,6 +85,7 @@ impl RustNodeTestingConfig {
timeouts: P2pTimeouts::without_rpc(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
}
}

Expand Down Expand Up @@ -118,4 +121,9 @@ impl RustNodeTestingConfig {
));
self
}

pub fn with_no_peer_discovery(mut self) -> Self {
self.peer_discovery = false;
self
}
}
20 changes: 20 additions & 0 deletions node/testing/src/scenarios/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ impl<'cluster> Driver<'cluster> {
}
}

/// Waits for a specific event that satisfies the given predicate, executing all events encountered along the way.
///
/// # Arguments
///
/// * `duration` - Maximum time to wait for the event
/// * `f` - A predicate function that takes a node ID, event, and state, returning true when the desired event is found
///
/// # Returns
///
/// Returns a Result containing:
/// * `Some((node_id, event))` - If an event satisfying the predicate is found before the timeout
/// * `None` - If no matching event is found within the timeout period
///
/// # Example
///
/// ```no_run
/// driver.wait_for(Duration::from_secs(5), |node_id, event, state| {
/// matches!(event, Event::BlockReceived { .. })
/// }).await?;
/// ```
pub async fn wait_for(
&mut self,
duration: Duration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl MultiNodeVrfGetCorrectLedgers {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
});

tokio::time::sleep(Duration::from_secs(2)).await;
Expand Down
1 change: 1 addition & 0 deletions node/testing/src/scenarios/multi_node/vrf_correct_slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl MultiNodeVrfGetCorrectSlots {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
});

tokio::time::sleep(Duration::from_secs(2)).await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl MultiNodeVrfEpochBoundsCorrectLedger {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
};

let producer_node = runner.add_rust_node(RustNodeTestingConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl MultiNodeVrfEpochBoundsEvaluation {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
};

let producer_node = runner.add_rust_node(RustNodeTestingConfig {
Expand Down
65 changes: 40 additions & 25 deletions node/testing/src/scenarios/p2p/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ use std::time::Duration;
use crate::{
hosts,
node::RustNodeTestingConfig,
scenarios::{ClusterRunner, Driver},
scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime},
};

/// Receive a block via meshsub
/// 1. Create a normal node with default devnet config, with devnet peers as initial peers
/// 2. Wait for 2 minutes
/// 3. Create a node with discovery disabled and first node as only peer
/// 4. Wait for first node to broadcast block to second one
#[derive(documented::Documented, Default, Clone, Copy)]
pub struct P2pReceiveBlock;

impl P2pReceiveBlock {
pub async fn run(self, mut runner: ClusterRunner<'_>) {
let config = RustNodeTestingConfig::devnet_default()
// make sure it will not ask initial peers
.max_peers(1)
.initial_peers(vec![hosts::devnet()[0].clone()]);
let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet());

let retransmitter_openmina_node = runner.add_rust_node(config);
let retransmitter_peer_id = runner
.node(retransmitter_openmina_node)
Expand All @@ -24,30 +26,43 @@ impl P2pReceiveBlock {
.p2p
.my_id();

let _ = runner
.run(
RunCfg::default()
.timeout(Duration::from_secs(120))
.advance_time(RunCfgAdvanceTime::Real)
.action_handler(|_, _, _, _| false),
)
.await;

let config = RustNodeTestingConfig::devnet_default()
// make sure it will not ask initial peers
.max_peers(1)
// Make sure it doesn't connect to any more peers
.with_no_peer_discovery()
.initial_peers(vec![retransmitter_openmina_node.into()]);

let receiver_openmina_node = runner.add_rust_node(config);

let mut driver = Driver::new(runner);
driver
.wait_for(Duration::from_secs(20 * 60), |node, _, state| {
let Some(p2p) = state.p2p.ready() else {
return false;
};
node == receiver_openmina_node
&& p2p
.network
.scheduler
.broadcast_state
.incoming_block
.as_ref()
.map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id))
})
.await
.unwrap();
runner
.run(
RunCfg::default()
.timeout(Duration::from_secs(60 * 30))
.advance_time(RunCfgAdvanceTime::Real)
.action_handler(move |node, state, _, _| {
let Some(p2p) = state.p2p.ready() else {
return false;
};

eprintln!("passed");
node == receiver_openmina_node
&& p2p
.network
.scheduler
.broadcast_state
.incoming_block
.as_ref()
.map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id))
}),
)
.await
.expect("Failed to receive block");
}
}
1 change: 1 addition & 0 deletions node/testing/src/scenarios/solo_node/sync_to_genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl SoloNodeSyncToGenesis {
timeouts: Default::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
});

runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl SoloNodeSyncToGenesisCustom {
timeouts: P2pTimeouts::default(),
libp2p_port: None,
recorder: Default::default(),
peer_discovery: true,
});

runner
Expand Down
1 change: 1 addition & 0 deletions node/testing/src/simulator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl Simulator {
timeouts: Default::default(),
libp2p_port: None,
recorder: self.config.recorder.clone(),
peer_discovery: true,
}
}

Expand Down
Loading