Skip to content

Commit

Permalink
fix(core): fixes stale chain metadata being sent to listening state (#…
Browse files Browse the repository at this point in the history
…5039)

Description
---
- Removes "caching" of peer metadata
- Send only one peer chain metadata at a time to the listening state
- increase chain metadata event channel to 20 (so that 20 of the last chain metadata received can be read by listening)

Motivation and Context
---
Fixes #5037 and #5030 (needs to be confirmed since this case is not easy to reproduce)

What I've observed is that the peer is banned twice. The first time, for 30 minutes (between 04:07:23.802123900 and 2022-12-09 04:41:26.365429300) then again permanently (bad metadata sig). It's clear in the logs that the pong is received before being banned, the node is banned (chain metadata service clears the peer's metadata) but the message is already in the domain pipeline so another one is received. The chain metadata service now keeps the peer's chain metadata and sends it to the listening state every time because it is not cleared.

TL;DR classic race condition.

Order of events:

- peer is banned for 30 minutes
- peer ban expires
- we receive a ping from the peer
- at almost the same time peer is banned (see logs in [this comment](#5030 (comment)))
- chain metadata service clears the peer from peer_chain_metadata
- the ping/pong is received (already in the pipeline from before the ban)
- the chain metadata is added to the vec, and is never cleared (because the peer is not banned again)
- the peer stays in the list despite being banned and NOT connected, so the header sync continues to try to connect to it


How Has This Been Tested?
---
Manually: rewind-blockchain and sync, enters sync mode timeously
Removed some tests that test removed functions
Added a new unit test for determine_sync_state
  • Loading branch information
sdbondi committed Dec 12, 2022
1 parent 62dfd38 commit aaf99b7
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 374 deletions.
11 changes: 10 additions & 1 deletion base_layer/core/src/base_node/chain_metadata_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,19 @@ impl Display for PeerChainMetadata {

#[derive(Debug)]
pub enum ChainMetadataEvent {
PeerChainMetadataReceived(Vec<PeerChainMetadata>),
PeerChainMetadataReceived(PeerChainMetadata),
NetworkSilence,
}

impl ChainMetadataEvent {
pub fn peer_metadata(&self) -> Option<PeerChainMetadata> {
match self {
Self::PeerChainMetadataReceived(metadata) => Some(metadata.clone()),
_ => None,
}
}
}

#[derive(Clone)]
pub struct ChainMetadataHandle {
event_stream: broadcast::Sender<Arc<ChainMetadataEvent>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_comms::connectivity::ConnectivityRequester;
use tari_p2p::services::liveness::LivenessHandle;
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tokio::sync::broadcast;
Expand All @@ -38,17 +37,16 @@ impl ServiceInitializer for ChainMetadataServiceInitializer {
async fn initialize(&mut self, context: ServiceInitializerContext) -> Result<(), ServiceInitializationError> {
debug!(target: LOG_TARGET, "Initializing Chain Metadata Service");
// Buffer size set to 1 because only the most recent metadata is applicable
let (publisher, _) = broadcast::channel(1);
let (publisher, _) = broadcast::channel(20);

let handle = ChainMetadataHandle::new(publisher.clone());
context.register_handle(handle);

context.spawn_until_shutdown(|handles| {
let liveness = handles.expect_handle::<LivenessHandle>();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
let connectivity = handles.expect_handle::<ConnectivityRequester>();

ChainMetadataService::new(liveness, base_node, connectivity, publisher).run()
ChainMetadataService::new(liveness, base_node, publisher).run()
});

debug!(target: LOG_TARGET, "Chain Metadata Service initialized");
Expand Down
166 changes: 27 additions & 139 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use num_format::{Locale, ToFormattedString};
use prost::Message;
use tari_common::log_if_error;
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{
connectivity::{ConnectivityEvent, ConnectivityRequester},
message::MessageExt,
};
use tari_comms::message::MessageExt;
use tari_p2p::services::liveness::{LivenessEvent, LivenessHandle, MetadataKey, PingPongEvent};
use tokio::sync::broadcast;

Expand All @@ -49,8 +46,6 @@ const NUM_ROUNDS_NETWORK_SILENCE: u16 = 3;
pub(super) struct ChainMetadataService {
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
peer_chain_metadata: Vec<PeerChainMetadata>,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
number_of_rounds_no_pings: u16,
}
Expand All @@ -64,14 +59,11 @@ impl ChainMetadataService {
pub fn new(
liveness: LivenessHandle,
base_node: LocalNodeCommsInterface,
connectivity: ConnectivityRequester,
event_publisher: broadcast::Sender<Arc<ChainMetadataEvent>>,
) -> Self {
Self {
liveness,
base_node,
peer_chain_metadata: Vec::new(),
connectivity,
event_publisher,
number_of_rounds_no_pings: 0,
}
Expand All @@ -81,7 +73,6 @@ impl ChainMetadataService {
pub async fn run(mut self) {
let mut liveness_event_stream = self.liveness.get_event_stream();
let mut block_event_stream = self.base_node.get_block_event_stream();
let mut connectivity_events = self.connectivity.get_event_subscription();

log_if_error!(
target: LOG_TARGET,
Expand All @@ -108,29 +99,10 @@ impl ChainMetadataService {
);
},

Ok(event) = connectivity_events.recv() => {
self.handle_connectivity_event(event);
}
}
}
}

fn handle_connectivity_event(&mut self, event: ConnectivityEvent) {
use ConnectivityEvent::{PeerBanned, PeerDisconnected};
match event {
PeerDisconnected(node_id) | PeerBanned(node_id) => {
if let Some(pos) = self.peer_chain_metadata.iter().position(|p| *p.node_id() == node_id) {
debug!(
target: LOG_TARGET,
"Removing disconnected/banned peer `{}` from chain metadata list ", node_id
);
self.peer_chain_metadata.remove(pos);
}
},
_ => {},
}
}

/// Handle BlockEvents
async fn handle_block_event(&mut self, event: &BlockEvent) -> Result<(), ChainMetadataSyncError> {
match event {
Expand Down Expand Up @@ -166,8 +138,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
Expand All @@ -179,8 +150,7 @@ impl ChainMetadataService {
);
self.number_of_rounds_no_pings = 0;
if event.metadata.has(MetadataKey::ChainMetadata) {
self.collect_chain_state_from_ping_pong(event)?;
self.send_chain_metadata_to_event_publisher().await?;
self.send_chain_metadata_to_event_publisher(event).await?;
}
},
// New ping round has begun
Expand All @@ -197,10 +167,6 @@ impl ChainMetadataService {
self.number_of_rounds_no_pings = 0;
}
}
// Ensure that we're waiting for the correct amount of peers to respond
// and have allocated space for their replies

self.resize_chainstate_buffer(*num_peers);
},
}

Expand All @@ -212,31 +178,10 @@ impl ChainMetadataService {
Ok(())
}

async fn send_chain_metadata_to_event_publisher(&mut self) -> Result<(), ChainMetadataSyncError> {
// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
self.peer_chain_metadata.clone(),
)));

Ok(())
}

fn resize_chainstate_buffer(&mut self, n: usize) {
match self.peer_chain_metadata.capacity() {
cap if n > cap => {
let additional = n - self.peer_chain_metadata.len();
self.peer_chain_metadata.reserve_exact(additional);
},
cap if n < cap => {
self.peer_chain_metadata.shrink_to(cap);
},
_ => {},
}
}

fn collect_chain_state_from_ping_pong(&mut self, event: &PingPongEvent) -> Result<(), ChainMetadataSyncError> {
async fn send_chain_metadata_to_event_publisher(
&mut self,
event: &PingPongEvent,
) -> Result<(), ChainMetadataSyncError> {
let chain_metadata_bytes = event
.metadata
.get(MetadataKey::ChainMetadata)
Expand All @@ -252,19 +197,15 @@ impl ChainMetadataService {
chain_metadata.accumulated_difficulty().to_formatted_string(&Locale::en),
);

if let Some(pos) = self
.peer_chain_metadata
.iter()
.position(|peer_chainstate| *peer_chainstate.node_id() == event.node_id)
{
self.peer_chain_metadata.remove(pos);
}
let peer_chain_metadata = PeerChainMetadata::new(event.node_id.clone(), chain_metadata, event.latency);

// send only fails if there are no subscribers.
let _size = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
peer_chain_metadata,
)));

self.peer_chain_metadata.push(PeerChainMetadata::new(
event.node_id.clone(),
chain_metadata,
event.latency,
));
Ok(())
}
}
Expand All @@ -274,13 +215,7 @@ mod test {
use std::convert::TryInto;

use futures::StreamExt;
use tari_comms::{
peer_manager::NodeId,
test_utils::{
mocks::{create_connectivity_mock, ConnectivityManagerMockState},
node_identity::build_many_node_identities,
},
};
use tari_comms::peer_manager::NodeId;
use tari_p2p::services::liveness::{
mock::{create_p2p_liveness_mock, LivenessMockState},
LivenessRequest,
Expand Down Expand Up @@ -323,33 +258,24 @@ mod test {
fn setup() -> (
ChainMetadataService,
LivenessMockState,
ConnectivityManagerMockState,
reply_channel::TryReceiver<NodeCommsRequest, NodeCommsResponse, CommsInterfaceError>,
broadcast::Receiver<Arc<ChainMetadataEvent>>,
) {
let (liveness_handle, mock, _) = create_p2p_liveness_mock(1);
let liveness_mock_state = mock.get_mock_state();
task::spawn(mock.run());

let (base_node, base_node_receiver) = create_base_node_nci();
let (publisher, _) = broadcast::channel(1);

let (connectivity, mock) = create_connectivity_mock();
let connectivity_mock_state = mock.get_shared_state();
task::spawn(mock.run());
let (publisher, event_rx) = broadcast::channel(10);

let service = ChainMetadataService::new(liveness_handle, base_node, connectivity, publisher);
let service = ChainMetadataService::new(liveness_handle, base_node, publisher);

(
service,
liveness_mock_state,
connectivity_mock_state,
base_node_receiver,
)
(service, liveness_mock_state, base_node_receiver, event_rx)
}

#[tokio::test]
async fn update_liveness_chain_metadata() {
let (mut service, liveness_mock_state, _, mut base_node_receiver) = setup();
let (mut service, liveness_mock_state, mut base_node_receiver, _) = setup();

let mut proto_chain_metadata = create_sample_proto_chain_metadata();
proto_chain_metadata.height_of_longest_chain = Some(123);
Expand All @@ -375,7 +301,7 @@ mod test {
}
#[tokio::test]
async fn handle_liveness_event_ok() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut events_rx) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
Expand All @@ -388,57 +314,19 @@ mod test {
latency: None,
};

// To prevent the chain metadata buffer being flushed after receiving a single pong event,
// extend it's capacity to 2
service.peer_chain_metadata.reserve_exact(2);
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert_eq!(service.peer_chain_metadata.len(), 1);
let metadata = service.peer_chain_metadata.remove(0);
let metadata = events_rx.recv().await.unwrap().peer_metadata().unwrap();
assert_eq!(*metadata.node_id(), node_id);
assert_eq!(
metadata.claimed_chain_metadata().height_of_longest_chain(),
proto_chain_metadata.height_of_longest_chain.unwrap()
);
}

#[tokio::test]
async fn handle_liveness_event_banned_peer() {
let (mut service, _, _, _) = setup();

let mut metadata = Metadata::new();
let proto_chain_metadata = create_sample_proto_chain_metadata();
metadata.insert(MetadataKey::ChainMetadata, proto_chain_metadata.to_encoded_bytes());

service.peer_chain_metadata.reserve_exact(3);

let nodes = build_many_node_identities(2, Default::default());
for node in &nodes {
let pong_event = PingPongEvent {
metadata: metadata.clone(),
node_id: node.node_id().clone(),
latency: None,
};

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
}

assert!(service
.peer_chain_metadata
.iter()
.any(|p| p.node_id() == nodes[0].node_id()));
service.handle_connectivity_event(ConnectivityEvent::PeerBanned(nodes[0].node_id().clone()));
// Check that banned peer was removed
assert!(service
.peer_chain_metadata
.iter()
.all(|p| p.node_id() != nodes[0].node_id()));
}

#[tokio::test]
async fn handle_liveness_event_no_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let metadata = Metadata::new();
let node_id = NodeId::new();
Expand All @@ -450,12 +338,12 @@ mod test {

let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
service.handle_liveness_event(&sample_event).await.unwrap();
assert!(service.peer_chain_metadata.is_empty());
assert!(event_rx.try_recv().is_err());
}

#[tokio::test]
async fn handle_liveness_event_bad_metadata() {
let (mut service, _, _, _) = setup();
let (mut service, _, _, mut event_rx) = setup();

let mut metadata = Metadata::new();
metadata.insert(MetadataKey::ChainMetadata, b"no-good".to_vec());
Expand All @@ -469,6 +357,6 @@ mod test {
let sample_event = LivenessEvent::ReceivedPong(Box::new(pong_event));
let err = service.handle_liveness_event(&sample_event).await.unwrap_err();
unpack_enum!(ChainMetadataSyncError::DecodeError(_err) = err);
assert_eq!(service.peer_chain_metadata.len(), 0);
assert!(event_rx.try_recv().is_err());
}
}
Loading

0 comments on commit aaf99b7

Please sign in to comment.