Skip to content

Commit

Permalink
feat: improve basenode switch from listening to lagging mode (#3255)
Browse files Browse the repository at this point in the history
Description
---
This PR changes the peer metadata push to listing mode speed to push every time it receives a chain metadata ping or pong message. 

Motivation and Context
---
This is introduced to allow a node to switch faster and not wait till it received it all the pings and pongs from a node.

How Has This Been Tested?
---

Run all unit tests and manually ran node.
  • Loading branch information
SWvheerden committed Aug 30, 2021
1 parent 8e4be07 commit 9dc335f
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions base_layer/core/src/base_node/chain_metadata_service/service.rs
Expand Up @@ -164,14 +164,15 @@ impl ChainMetadataService {

async fn handle_liveness_event(&mut self, event: &LivenessEvent) -> Result<(), ChainMetadataSyncError> {
match event {
// Received a ping, check if our neighbour sent it and it contains ChainMetadata
// Received a ping, check if it contains ChainMetadata
LivenessEvent::ReceivedPing(event) => {
trace!(
target: LOG_TARGET,
"Received ping from neighbouring node '{}'.",
event.node_id
);
self.collect_chain_state_from_ping(&event.node_id, &event.metadata)?;
self.send_chain_metadata_to_event_publisher().await?;
},
// Received a pong, check if our neighbour sent it and it contains ChainMetadata
LivenessEvent::ReceivedPong(event) => {
Expand All @@ -181,37 +182,31 @@ impl ChainMetadataService {
event.node_id
);
self.collect_chain_state_from_pong(&event.node_id, &event.metadata)?;

// All peers have responded in this round, send the chain metadata to the base node service
if self.peer_chain_metadata.len() >= self.peer_chain_metadata.capacity() {
self.flush_chain_metadata_to_event_publisher().await?;
}
self.send_chain_metadata_to_event_publisher().await?;
},
// New ping round has begun
LivenessEvent::PingRoundBroadcast(num_peers) => {
debug!(
target: LOG_TARGET,
"New chain metadata round sent to {} peer(s)", num_peers
);
// If we have chain metadata to send to the base node service, send them now
// because the next round of pings is happening.
self.flush_chain_metadata_to_event_publisher().await?;
// Ensure that we're waiting for the correct amount of peers to respond
// and have allocated space for their replies

self.resize_chainstate_buffer(*num_peers);
},
}

Ok(())
}

async fn flush_chain_metadata_to_event_publisher(&mut self) -> Result<(), ChainMetadataSyncError> {
let chain_metadata = self.peer_chain_metadata.drain(..).collect::<Vec<_>>();

async fn send_chain_metadata_to_event_publisher(&mut self) -> Result<(), ChainMetadataSyncError> {
// send only fails if there are no subscribers.
let _ = self
.event_publisher
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(chain_metadata)));
.send(Arc::new(ChainMetadataEvent::PeerChainMetadataReceived(
self.peer_chain_metadata.clone(),
)));

Ok(())
}
Expand Down Expand Up @@ -289,7 +284,6 @@ impl ChainMetadataService {

self.peer_chain_metadata
.push(PeerChainMetadata::new(node_id.clone(), chain_metadata));

Ok(())
}
}
Expand Down

0 comments on commit 9dc335f

Please sign in to comment.