Skip to content

Commit

Permalink
chore(node): remove replication data bundling
Browse files Browse the repository at this point in the history
Now that we have a limit per msg and batches determined by sender
  • Loading branch information
joshuef committed Dec 22, 2022
1 parent c70703f commit 0436915
Showing 1 changed file with 10 additions and 54 deletions.
64 changes: 10 additions & 54 deletions sn_node/src/node/flow_ctrl/mod.rs
Expand Up @@ -29,7 +29,7 @@ use periodic_checks::PeriodicChecksTimestamps;
use sn_dysfunction::DysfunctionDetection;
use sn_interface::{
messaging::system::{NodeDataCmd, NodeMsg},
types::{log_markers::LogMarker, DataAddress, Peer, ReplicatedData},
types::{log_markers::LogMarker, DataAddress, Peer},
};

use super::DataStorage;
Expand Down Expand Up @@ -160,32 +160,21 @@ impl FlowCtrl {
let _ = tokio::task::spawn(async move {
// is there a simple way to dedupe common data going to many peers?
// is any overhead reduction worth the increased complexity?
while let Some((mut data_addresses, peer)) = data_replication_receiver.recv().await {
while let Some((data_addresses, peer)) = data_replication_receiver.recv().await {
let send_cmd_channel = cmd_channel.clone();
let the_node = node_arc.clone();
let data_storage = node_data_storage.clone();
// move replication off thread so we don't block the receiver
let _ = tokio::task::spawn(async move {
// sort the addresses so that we're batching out closest data first
data_addresses
.sort_by(|lhs, rhs| peer.name().cmp_distance(lhs.name(), rhs.name()));

// The messages shall be bundled by size AND numbers.
// That is: a bundle get sent out whichever the total size and total numbers
// reached the upper limit first.
let mut data_bundle = DataBundle::default();

debug!(
"{:?} Data {:?} to: {:?}",
LogMarker::SendingMissingReplicatedData,
data_addresses,
peer,
);

for (i, address) in data_addresses.iter().enumerate() {
// enumerate is 0 indexed, let's correct for that for counting
// and then comparing to data_addresses
let iteration = i + 1;
for address in data_addresses.iter() {
let mut data_bundle = vec![];
match data_storage.get_from_local_store(address).await {
Ok(data) => {
data_bundle.push(data);
Expand All @@ -195,19 +184,14 @@ impl FlowCtrl {
}
};

// if we hit a multiple of the batch limit or we're at the last data to send...
if data_bundle.shall_flush() || iteration == data_addresses.len() {
trace!("Sending out data batch on i:{iteration:?} to {peer:?}");
let msg = NodeMsg::NodeDataCmd(NodeDataCmd::ReplicateData(
data_bundle.take(),
));
trace!("Sending out data batch to {peer:?}");
let msg = NodeMsg::NodeDataCmd(NodeDataCmd::ReplicateData(data_bundle));

let node_context = the_node.read().await.context();
let node_context = the_node.read().await.context();

let cmd = Cmd::send_msg(msg, Peers::Single(peer), node_context.clone());
if let Err(error) = send_cmd_channel.send((cmd, vec![])).await {
error!("Failed to enqueue send msg command for replication of data batch to {peer:?}: {error:?}");
}
let cmd = Cmd::send_msg(msg, Peers::Single(peer), node_context.clone());
if let Err(error) = send_cmd_channel.send((cmd, vec![])).await {
error!("Failed to enqueue send msg command for replication of data batch to {peer:?}: {error:?}");
}
}
});
Expand Down Expand Up @@ -251,31 +235,3 @@ impl FlowCtrl {
})
}
}

static DATA_BUNDLE_SIZE_LIMIT: u64 = 10_000_000;
static DATA_BUNDLE_ENTRY_LIMIT: usize = 250;

#[derive(Default)]
struct DataBundle {
data_batch: Vec<ReplicatedData>,
total_size: u64,
}

impl DataBundle {
fn push(&mut self, data: ReplicatedData) {
self.total_size += data.size();
self.data_batch.push(data);
}

fn take(&mut self) -> Vec<ReplicatedData> {
let data_batch = self.data_batch.clone();
self.data_batch = vec![];
self.total_size = 0;
data_batch
}

fn shall_flush(&self) -> bool {
self.total_size >= DATA_BUNDLE_SIZE_LIMIT
|| self.data_batch.len() >= DATA_BUNDLE_ENTRY_LIMIT
}
}

0 comments on commit 0436915

Please sign in to comment.