diff --git a/node/testing/src/cluster/mod.rs b/node/testing/src/cluster/mod.rs index c65d65d242..596f8aa8dd 100644 --- a/node/testing/src/cluster/mod.rs +++ b/node/testing/src/cluster/mod.rs @@ -280,7 +280,7 @@ impl Cluster { initial_peers, external_addrs: vec![], enabled_channels: ChannelId::iter_all().collect(), - peer_discovery: true, + peer_discovery: testing_config.peer_discovery, timeouts: testing_config.timeouts, limits: P2pLimits::default().with_max_peers(Some(testing_config.max_peers)), meshsub: P2pMeshsubConfig { diff --git a/node/testing/src/node/rust/config.rs b/node/testing/src/node/rust/config.rs index 4754ebad74..1ab15a902b 100644 --- a/node/testing/src/node/rust/config.rs +++ b/node/testing/src/node/rust/config.rs @@ -40,6 +40,7 @@ pub struct RustNodeTestingConfig { pub libp2p_port: Option, #[serde(default)] pub recorder: Recorder, + pub peer_discovery: bool, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -68,6 +69,7 @@ impl RustNodeTestingConfig { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, } } @@ -83,6 +85,7 @@ impl RustNodeTestingConfig { timeouts: P2pTimeouts::without_rpc(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, } } @@ -118,4 +121,9 @@ impl RustNodeTestingConfig { )); self } + + pub fn with_no_peer_discovery(mut self) -> Self { + self.peer_discovery = false; + self + } } diff --git a/node/testing/src/scenarios/driver.rs b/node/testing/src/scenarios/driver.rs index 364168e275..d34d1cf90c 100644 --- a/node/testing/src/scenarios/driver.rs +++ b/node/testing/src/scenarios/driver.rs @@ -190,6 +190,26 @@ impl<'cluster> Driver<'cluster> { } } + /// Waits for a specific event that satisfies the given predicate, executing all events encountered along the way. + /// + /// # Arguments + /// + /// * `duration` - Maximum time to wait for the event + /// * `f` - A predicate function that takes a node ID, event, and state, returning true when the desired event is found + /// + /// # Returns + /// + /// Returns a Result containing: + /// * `Some((node_id, event))` - If an event satisfying the predicate is found before the timeout + /// * `None` - If no matching event is found within the timeout period + /// + /// # Example + /// + /// ```no_run + /// driver.wait_for(Duration::from_secs(5), |node_id, event, state| { + /// matches!(event, Event::BlockReceived { .. }) + /// }).await?; + /// ``` pub async fn wait_for( &mut self, duration: Duration, diff --git a/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs b/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs index 07fd97d8ff..1e4898f8a5 100644 --- a/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs +++ b/node/testing/src/scenarios/multi_node/vrf_correct_ledgers.rs @@ -51,6 +51,7 @@ impl MultiNodeVrfGetCorrectLedgers { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs b/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs index 97c0e0a8c7..03e5733076 100644 --- a/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs +++ b/node/testing/src/scenarios/multi_node/vrf_correct_slots.rs @@ -57,6 +57,7 @@ impl MultiNodeVrfGetCorrectSlots { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }); tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs index 886ae23b27..98e2c1d87d 100644 --- a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs +++ b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_correct_ledgers.rs @@ -58,6 +58,7 @@ impl MultiNodeVrfEpochBoundsCorrectLedger { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }; let producer_node = runner.add_rust_node(RustNodeTestingConfig { diff --git a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs index 77eb09bc8f..a126aac773 100644 --- a/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs +++ b/node/testing/src/scenarios/multi_node/vrf_epoch_bounds_evaluation.rs @@ -44,6 +44,7 @@ impl MultiNodeVrfEpochBoundsEvaluation { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }; let producer_node = runner.add_rust_node(RustNodeTestingConfig { diff --git a/node/testing/src/scenarios/p2p/pubsub.rs b/node/testing/src/scenarios/p2p/pubsub.rs index 225e1d5eb5..afd9635267 100644 --- a/node/testing/src/scenarios/p2p/pubsub.rs +++ b/node/testing/src/scenarios/p2p/pubsub.rs @@ -3,19 +3,21 @@ use std::time::Duration; use crate::{ hosts, node::RustNodeTestingConfig, - scenarios::{ClusterRunner, Driver}, + scenarios::{ClusterRunner, RunCfg, RunCfgAdvanceTime}, }; /// Receive a block via meshsub +/// 1. Create a normal node with default devnet config, with devnet peers as initial peers +/// 2. Wait for 2 minutes +/// 3. Create a node with discovery disabled and first node as only peer +/// 4. Wait for first node to broadcast block to second one #[derive(documented::Documented, Default, Clone, Copy)] pub struct P2pReceiveBlock; impl P2pReceiveBlock { pub async fn run(self, mut runner: ClusterRunner<'_>) { - let config = RustNodeTestingConfig::devnet_default() - // make sure it will not ask initial peers - .max_peers(1) - .initial_peers(vec![hosts::devnet()[0].clone()]); + let config = RustNodeTestingConfig::devnet_default().initial_peers(hosts::devnet()); + let retransmitter_openmina_node = runner.add_rust_node(config); let retransmitter_peer_id = runner .node(retransmitter_openmina_node) @@ -24,30 +26,43 @@ impl P2pReceiveBlock { .p2p .my_id(); + let _ = runner + .run( + RunCfg::default() + .timeout(Duration::from_secs(120)) + .advance_time(RunCfgAdvanceTime::Real) + .action_handler(|_, _, _, _| false), + ) + .await; + let config = RustNodeTestingConfig::devnet_default() - // make sure it will not ask initial peers - .max_peers(1) + // Make sure it doesn't connect to any more peers + .with_no_peer_discovery() .initial_peers(vec![retransmitter_openmina_node.into()]); + let receiver_openmina_node = runner.add_rust_node(config); - let mut driver = Driver::new(runner); - driver - .wait_for(Duration::from_secs(20 * 60), |node, _, state| { - let Some(p2p) = state.p2p.ready() else { - return false; - }; - node == receiver_openmina_node - && p2p - .network - .scheduler - .broadcast_state - .incoming_block - .as_ref() - .map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id)) - }) - .await - .unwrap(); + runner + .run( + RunCfg::default() + .timeout(Duration::from_secs(60 * 30)) + .advance_time(RunCfgAdvanceTime::Real) + .action_handler(move |node, state, _, _| { + let Some(p2p) = state.p2p.ready() else { + return false; + }; - eprintln!("passed"); + node == receiver_openmina_node + && p2p + .network + .scheduler + .broadcast_state + .incoming_block + .as_ref() + .map_or(false, |(peer_id, _)| peer_id.eq(&retransmitter_peer_id)) + }), + ) + .await + .expect("Failed to receive block"); } } diff --git a/node/testing/src/scenarios/solo_node/sync_to_genesis.rs b/node/testing/src/scenarios/solo_node/sync_to_genesis.rs index f5ffc5eb6a..b7af3fedad 100644 --- a/node/testing/src/scenarios/solo_node/sync_to_genesis.rs +++ b/node/testing/src/scenarios/solo_node/sync_to_genesis.rs @@ -54,6 +54,7 @@ impl SoloNodeSyncToGenesis { timeouts: Default::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }); runner diff --git a/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs b/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs index 6e795b221f..d0957d931f 100644 --- a/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs +++ b/node/testing/src/scenarios/solo_node/sync_to_genesis_custom.rs @@ -70,6 +70,7 @@ impl SoloNodeSyncToGenesisCustom { timeouts: P2pTimeouts::default(), libp2p_port: None, recorder: Default::default(), + peer_discovery: true, }); runner diff --git a/node/testing/src/simulator/mod.rs b/node/testing/src/simulator/mod.rs index af3c5c555f..f30e232cec 100644 --- a/node/testing/src/simulator/mod.rs +++ b/node/testing/src/simulator/mod.rs @@ -50,6 +50,7 @@ impl Simulator { timeouts: Default::default(), libp2p_port: None, recorder: self.config.recorder.clone(), + peer_discovery: true, } }