From 0436915e88c1422d487d370ad718fda4c6c578a2 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Wed, 21 Dec 2022 07:46:16 +0100 Subject: [PATCH] chore(node): remove replication data bundling Now that we have a limit per msg and batches determined by sender --- sn_node/src/node/flow_ctrl/mod.rs | 64 +++++-------------------------- 1 file changed, 10 insertions(+), 54 deletions(-) diff --git a/sn_node/src/node/flow_ctrl/mod.rs b/sn_node/src/node/flow_ctrl/mod.rs index f40d1c2a0e..a793e3b2ce 100644 --- a/sn_node/src/node/flow_ctrl/mod.rs +++ b/sn_node/src/node/flow_ctrl/mod.rs @@ -29,7 +29,7 @@ use periodic_checks::PeriodicChecksTimestamps; use sn_dysfunction::DysfunctionDetection; use sn_interface::{ messaging::system::{NodeDataCmd, NodeMsg}, - types::{log_markers::LogMarker, DataAddress, Peer, ReplicatedData}, + types::{log_markers::LogMarker, DataAddress, Peer}, }; use super::DataStorage; @@ -160,21 +160,12 @@ impl FlowCtrl { let _ = tokio::task::spawn(async move { // is there a simple way to dedupe common data going to many peers? // is any overhead reduction worth the increased complexity? - while let Some((mut data_addresses, peer)) = data_replication_receiver.recv().await { + while let Some((data_addresses, peer)) = data_replication_receiver.recv().await { let send_cmd_channel = cmd_channel.clone(); let the_node = node_arc.clone(); let data_storage = node_data_storage.clone(); // move replication off thread so we don't block the receiver let _ = tokio::task::spawn(async move { - // sort the addresses so that we're batching out closest data first - data_addresses - .sort_by(|lhs, rhs| peer.name().cmp_distance(lhs.name(), rhs.name())); - - // The messages shall be bundled by size AND numbers. - // That is: a bundle get sent out whichever the total size and total numbers - // reached the upper limit first. - let mut data_bundle = DataBundle::default(); - debug!( "{:?} Data {:?} to: {:?}", LogMarker::SendingMissingReplicatedData, @@ -182,10 +173,8 @@ impl FlowCtrl { peer, ); - for (i, address) in data_addresses.iter().enumerate() { - // enumerate is 0 indexed, let's correct for that for counting - // and then comparing to data_addresses - let iteration = i + 1; + for address in data_addresses.iter() { + let mut data_bundle = vec![]; match data_storage.get_from_local_store(address).await { Ok(data) => { data_bundle.push(data); @@ -195,19 +184,14 @@ impl FlowCtrl { } }; - // if we hit a multiple of the batch limit or we're at the last data to send... - if data_bundle.shall_flush() || iteration == data_addresses.len() { - trace!("Sending out data batch on i:{iteration:?} to {peer:?}"); - let msg = NodeMsg::NodeDataCmd(NodeDataCmd::ReplicateData( - data_bundle.take(), - )); + trace!("Sending out data batch to {peer:?}"); + let msg = NodeMsg::NodeDataCmd(NodeDataCmd::ReplicateData(data_bundle)); - let node_context = the_node.read().await.context(); + let node_context = the_node.read().await.context(); - let cmd = Cmd::send_msg(msg, Peers::Single(peer), node_context.clone()); - if let Err(error) = send_cmd_channel.send((cmd, vec![])).await { - error!("Failed to enqueue send msg command for replication of data batch to {peer:?}: {error:?}"); - } + let cmd = Cmd::send_msg(msg, Peers::Single(peer), node_context.clone()); + if let Err(error) = send_cmd_channel.send((cmd, vec![])).await { + error!("Failed to enqueue send msg command for replication of data batch to {peer:?}: {error:?}"); } } }); @@ -251,31 +235,3 @@ impl FlowCtrl { }) } } - -static DATA_BUNDLE_SIZE_LIMIT: u64 = 10_000_000; -static DATA_BUNDLE_ENTRY_LIMIT: usize = 250; - -#[derive(Default)] -struct DataBundle { - data_batch: Vec, - total_size: u64, -} - -impl DataBundle { - fn push(&mut self, data: ReplicatedData) { - self.total_size += data.size(); - self.data_batch.push(data); - } - - fn take(&mut self) -> Vec { - let data_batch = self.data_batch.clone(); - self.data_batch = vec![]; - self.total_size = 0; - data_batch - } - - fn shall_flush(&self) -> bool { - self.total_size >= DATA_BUNDLE_SIZE_LIMIT - || self.data_batch.len() >= DATA_BUNDLE_ENTRY_LIMIT - } -}