diff --git a/.gitignore b/.gitignore index 9af37fbe5b..eba14fa03a 100644 --- a/.gitignore +++ b/.gitignore @@ -60,4 +60,5 @@ buildtools/Output/ clients/base_node_grpc_client/package-lock.json clients/validator_node_grpc_client/package-lock.json -clients/wallet_grpc_client/package-lock.json \ No newline at end of file +clients/wallet_grpc_client/package-lock.json +pie/ diff --git a/base_layer/core/src/mempool/config.rs b/base_layer/core/src/mempool/config.rs index 2517409650..edf1fdef48 100644 --- a/base_layer/core/src/mempool/config.rs +++ b/base_layer/core/src/mempool/config.rs @@ -50,6 +50,8 @@ pub struct MempoolServiceConfig { pub initial_sync_num_peers: usize, /// The maximum number of transactions to sync in a single sync session Default: 10_000 pub initial_sync_max_transactions: usize, + /// The maximum number of blocks added via sync or re-org to triggering a sync + pub block_sync_trigger: usize, } impl Default for MempoolServiceConfig { @@ -57,6 +59,7 @@ impl Default for MempoolServiceConfig { Self { initial_sync_num_peers: 2, initial_sync_max_transactions: 10_000, + block_sync_trigger: 5, } } } diff --git a/base_layer/core/src/mempool/sync_protocol/initializer.rs b/base_layer/core/src/mempool/sync_protocol/initializer.rs index 74ec61af00..ae86a0818e 100644 --- a/base_layer/core/src/mempool/sync_protocol/initializer.rs +++ b/base_layer/core/src/mempool/sync_protocol/initializer.rs @@ -32,7 +32,7 @@ use tari_service_framework::{async_trait, ServiceInitializationError, ServiceIni use tokio::{sync::mpsc, time::sleep}; use crate::{ - base_node::StateMachineHandle, + base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle}, mempool::{ sync_protocol::{MempoolSyncProtocol, MEMPOOL_SYNC_PROTOCOL}, Mempool, @@ -83,8 +83,7 @@ impl ServiceInitializer for MempoolSyncInitializer { log_mdc::extend(mdc.clone()); let state_machine = handles.expect_handle::(); let connectivity = handles.expect_handle::(); - // Ensure that we get an subscription ASAP so that we don't miss any connectivity events - let connectivity_event_subscription = connectivity.get_event_subscription(); + let base_node = handles.expect_handle::(); let mut status_watch = state_machine.get_status_info_watch(); if !status_watch.borrow().bootstrapped { @@ -103,8 +102,9 @@ impl ServiceInitializer for MempoolSyncInitializer { } log_mdc::extend(mdc.clone()); } + let base_node_events = base_node.get_block_event_stream(); - MempoolSyncProtocol::new(config, notif_rx, connectivity_event_subscription, mempool) + MempoolSyncProtocol::new(config, notif_rx, mempool, connectivity, base_node_events) .run() .await; }); diff --git a/base_layer/core/src/mempool/sync_protocol/mod.rs b/base_layer/core/src/mempool/sync_protocol/mod.rs index 952f562249..08ad3bfe36 100644 --- a/base_layer/core/src/mempool/sync_protocol/mod.rs +++ b/base_layer/core/src/mempool/sync_protocol/mod.rs @@ -79,7 +79,7 @@ pub use initializer::MempoolSyncInitializer; use log::*; use prost::Message; use tari_comms::{ - connectivity::{ConnectivityEvent, ConnectivityEventRx}, + connectivity::{ConnectivityEvent, ConnectivityRequester, ConnectivitySelection}, framing, framing::CanonicalFraming, message::MessageExt, @@ -97,6 +97,8 @@ use tokio::{ }; use crate::{ + base_node::comms_interface::{BlockEvent, BlockEventReceiver}, + chain_storage::BlockAddResult, mempool::{metrics, proto, Mempool, MempoolServiceConfig}, proto as shared_proto, transactions::transaction_components::Transaction, @@ -116,10 +118,11 @@ pub static MEMPOOL_SYNC_PROTOCOL: Bytes = Bytes::from_static(b"t/mempool-sync/1" pub struct MempoolSyncProtocol { config: MempoolServiceConfig, protocol_notifier: ProtocolNotificationRx, - connectivity_events: ConnectivityEventRx, mempool: Mempool, num_synched: Arc, permits: Arc, + connectivity: ConnectivityRequester, + block_event_stream: BlockEventReceiver, } impl MempoolSyncProtocol @@ -128,25 +131,31 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static pub fn new( config: MempoolServiceConfig, protocol_notifier: ProtocolNotificationRx, - connectivity_events: ConnectivityEventRx, mempool: Mempool, + connectivity: ConnectivityRequester, + block_event_stream: BlockEventReceiver, ) -> Self { Self { config, protocol_notifier, - connectivity_events, mempool, num_synched: Arc::new(AtomicUsize::new(0)), permits: Arc::new(Semaphore::new(1)), + connectivity, + block_event_stream, } } pub async fn run(mut self) { info!(target: LOG_TARGET, "Mempool protocol handler has started"); + let mut connectivity_events = self.connectivity.get_event_subscription(); loop { tokio::select! { - Ok(event) = self.connectivity_events.recv() => { + Ok(block_event) = self.block_event_stream.recv() => { + self.handle_block_event(&block_event).await; + }, + Ok(event) = connectivity_events.recv() => { self.handle_connectivity_event(event).await; }, @@ -174,6 +183,56 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static } } + async fn handle_block_event(&mut self, block_event: &BlockEvent) { + use BlockEvent::{BlockSyncComplete, ValidBlockAdded}; + match block_event { + ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed: _ }) => { + if added.len() < self.config.block_sync_trigger { + return; + } + }, + BlockSyncComplete(tip, starting_sync_height) => { + let added = tip.height() - starting_sync_height; + if added < self.config.block_sync_trigger as u64 { + return; + } + }, + _ => { + return; + }, + } + // we want to at least sync initial_sync_num_peers, so we reset the num_synced to 0, so it can run till + // initial_sync_num_peers again. This is made to run as a best effort in that it will at least run the + // initial_sync_num_peers + self.num_synched.store(0, Ordering::Release); + let connections = match self + .connectivity + .select_connections(ConnectivitySelection::random_nodes( + self.config.initial_sync_num_peers, + vec![], + )) + .await + { + Ok(v) => { + if v.is_empty() { + error!(target: LOG_TARGET, "Mempool sync could not get any peers to sync to"); + return; + }; + v + }, + Err(e) => { + error!( + target: LOG_TARGET, + "Mempool sync could not get a peer to sync to: {}", e + ); + return; + }, + }; + for connection in connections { + self.spawn_initiator_protocol(connection).await; + } + } + fn is_synched(&self) -> bool { self.num_synched.load(Ordering::SeqCst) >= self.config.initial_sync_num_peers } diff --git a/base_layer/core/src/mempool/sync_protocol/test.rs b/base_layer/core/src/mempool/sync_protocol/test.rs index 1189772ba8..281228587a 100644 --- a/base_layer/core/src/mempool/sync_protocol/test.rs +++ b/base_layer/core/src/mempool/sync_protocol/test.rs @@ -25,13 +25,16 @@ use std::{fmt, io, iter::repeat_with, sync::Arc}; use futures::{Sink, SinkExt, Stream, StreamExt}; use tari_common::configuration::Network; use tari_comms::{ - connectivity::{ConnectivityEvent, ConnectivityEventTx}, + connectivity::ConnectivityEvent, framing, memsocket::MemorySocket, message::MessageExt, peer_manager::PeerFeatures, protocol::{ProtocolEvent, ProtocolNotification, ProtocolNotificationTx}, - test_utils::{mocks::create_peer_connection_mock_pair, node_identity::build_node_identity}, + test_utils::{ + mocks::{create_connectivity_mock, create_peer_connection_mock_pair, ConnectivityManagerMockState}, + node_identity::build_node_identity, + }, Bytes, BytesMut, }; @@ -80,28 +83,37 @@ async fn setup( num_txns: usize, ) -> ( ProtocolNotificationTx, - ConnectivityEventTx, + ConnectivityManagerMockState, Mempool, Vec, ) { let (protocol_notif_tx, protocol_notif_rx) = mpsc::channel(1); - let (connectivity_events_tx, connectivity_events_rx) = broadcast::channel(10); let (mempool, transactions) = new_mempool_with_transactions(num_txns).await; + let (connectivity, connectivity_manager_mock) = create_connectivity_mock(); + let connectivity_manager_mock_state = connectivity_manager_mock.spawn(); + let (block_event_sender, _) = broadcast::channel(1); + let block_receiver = block_event_sender.subscribe(); let protocol = MempoolSyncProtocol::new( Default::default(), protocol_notif_rx, - connectivity_events_rx, mempool.clone(), + connectivity, + block_receiver, ); task::spawn(protocol.run()); - - (protocol_notif_tx, connectivity_events_tx, mempool, transactions) + connectivity_manager_mock_state.wait_until_event_receivers_ready().await; + ( + protocol_notif_tx, + connectivity_manager_mock_state, + mempool, + transactions, + ) } #[tokio::test] async fn empty_set() { - let (_, connectivity_events_tx, mempool1, _) = setup(0).await; + let (_, connectivity_manager_state, mempool1, _) = setup(0).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -109,9 +121,7 @@ async fn empty_set() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -131,7 +141,7 @@ async fn empty_set() { #[tokio::test] async fn synchronise() { - let (_, connectivity_events_tx, mempool1, transactions1) = setup(5).await; + let (_, connectivity_manager_state, mempool1, transactions1) = setup(5).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -139,9 +149,7 @@ async fn synchronise() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -165,17 +173,14 @@ async fn synchronise() { #[tokio::test] async fn duplicate_set() { - let (_, connectivity_events_tx, mempool1, transactions1) = setup(2).await; - + let (_, connectivity_manager_state, mempool1, transactions1) = setup(2).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let (_node1_conn, node1_mock, node2_conn, _) = create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let framed = framing::canonical(substream, MAX_FRAME_SIZE); @@ -269,7 +274,7 @@ async fn initiator_messages() { #[tokio::test] async fn responder_messages() { - let (_, connectivity_events_tx, _, transactions1) = setup(1).await; + let (_, connectivity_manager_state, _, transactions1) = setup(1).await; let node1 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); let node2 = build_node_identity(PeerFeatures::COMMUNICATION_NODE); @@ -277,9 +282,7 @@ async fn responder_messages() { create_peer_connection_mock_pair(node1.to_peer(), node2.to_peer()).await; // This node connected to a peer, so it should open the substream - connectivity_events_tx - .send(ConnectivityEvent::PeerConnected(node2_conn)) - .unwrap(); + connectivity_manager_state.publish_event(ConnectivityEvent::PeerConnected(node2_conn)); let substream = node1_mock.next_incoming_substream().await.unwrap(); let mut framed = framing::canonical(substream, MAX_FRAME_SIZE); diff --git a/common/config/presets/c_base_node.toml b/common/config/presets/c_base_node.toml index 8b72c4a989..fc98a0d3c3 100644 --- a/common/config/presets/c_base_node.toml +++ b/common/config/presets/c_base_node.toml @@ -110,6 +110,8 @@ track_reorgs = true #service.initial_sync_num_peers = 2 # The maximum number of transactions to sync in a single sync session Default: 10_000 #service.initial_sync_max_transactions = 10_000 +# The maximum number of blocks added via sync or re-org to triggering a sync +#block_sync_trigger = 5 [base_node.state_machine] # The initial max sync latency. If a peer fails to stream a header/block within this deadline another sync peer will be diff --git a/comms/core/src/test_utils/mocks/connectivity_manager.rs b/comms/core/src/test_utils/mocks/connectivity_manager.rs index ae29b9211c..9f89e6a34e 100644 --- a/comms/core/src/test_utils/mocks/connectivity_manager.rs +++ b/comms/core/src/test_utils/mocks/connectivity_manager.rs @@ -75,6 +75,17 @@ impl ConnectivityManagerMockState { } } + pub async fn wait_until_event_receivers_ready(&self) { + let mut timeout = 0; + while self.event_tx.receiver_count() == 0 { + time::sleep(Duration::from_millis(10)).await; + timeout += 10; + if timeout > 5000 { + panic!("Event receiver not ready after 5 secs"); + } + } + } + async fn add_call(&self, call_str: String) { self.with_state(|state| state.calls.push(call_str)).await }