Skip to content

Commit

Permalink
Merge branch 'main' into refactor_quoting
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonPaulGithub committed Mar 25, 2024
2 parents 66fd0bf + 9a32e28 commit 9768e91
Showing 1 changed file with 50 additions and 41 deletions.
91 changes: 50 additions & 41 deletions sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,64 @@ impl ReplicationFetcher {
pub(crate) fn add_keys(
&mut self,
holder: PeerId,
incoming_keys: Vec<(NetworkAddress, RecordType)>,
mut incoming_keys: Vec<(NetworkAddress, RecordType)>,
locally_stored_keys: &HashMap<RecordKey, (NetworkAddress, RecordType)>,
) -> Vec<(PeerId, RecordKey)> {
self.remove_stored_keys(locally_stored_keys);

let is_new_data = if incoming_keys.len() == 1 {
// The incoming list is for a new data to be replicated out.
let mut keys_to_fetch = vec![];
// For new data, it will be replicated out in a special replication_list of length 1.
// And we shall `fetch` that copy immediately, if it's not being fetched.
if incoming_keys.len() == 1 {
let (record_address, record_type) = incoming_keys[0].clone();
Some((record_address.to_record_key(), record_type))
} else {
None
};

self.to_be_fetched
.retain(|_, time_out| *time_out > Instant::now());

// add non existing keys to the fetcher
incoming_keys
.into_iter()
.for_each(|(key, record_type)| self.add_key(holder, key.to_record_key(), record_type));
let new_data_key = (record_address.to_record_key(), record_type);

let mut keys_to_fetch = self.next_keys_to_fetch();

// For new data, it will be replicated out in a special replication_list of length 1.
// And we shall `fetch` that copy immediately, if it's not being fetched.
if let Some(new_data_key) = is_new_data {
if let Entry::Vacant(entry) = self.on_going_fetches.entry(new_data_key.clone()) {
let (record_key, _record_type) = new_data_key;
keys_to_fetch.push((holder, record_key));
let _ = entry.insert((holder, Instant::now()));
}

// To avoid later on un-necessary actions.
incoming_keys.clear();
}

self.to_be_fetched
.retain(|_, time_out| *time_out > Instant::now());

let mut out_of_range_keys = vec![];
let total_incoming_keys = incoming_keys.len();
// Filter out those out_of_range ones among the imcoming _keys.
if let Some(ref distance_range) = self.distance_range {
let self_address = NetworkAddress::from_peer(self.self_peer_id);

incoming_keys.retain(|(addr, _record_type)| {
let is_in_range = self_address.distance(addr) >= *distance_range;
if !is_in_range {
out_of_range_keys.push(addr.clone());
}
is_in_range
});
}

if !out_of_range_keys.is_empty() {
info!("Among {total_incoming_keys} incoming replications from {holder:?}, found {} out of range", out_of_range_keys.len());
for addr in out_of_range_keys.iter() {
trace!("The incoming record_key {addr:?} is out of range, do not fetch it from {holder:?}");
}
}

// add in-range AND non existing keys to the fetcher
incoming_keys.into_iter().for_each(|(addr, record_type)| {
let _ = self
.to_be_fetched
.entry((addr.to_record_key(), record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
});

keys_to_fetch.extend(self.next_keys_to_fetch());

keys_to_fetch
}

Expand Down Expand Up @@ -213,12 +239,14 @@ impl ReplicationFetcher {
}
});

// now we ensure we clear our any/all failed nodes from our lists.
// now to clear any failed nodes from our lists.
self.to_be_fetched
.retain(|(_, _, holder), _| !failed_holders.contains(holder));

// Such failed_hodlers shall be reported back and be excluded from RT.
self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders));
// Such failed_hodlers (if any) shall be reported back and be excluded from RT.
if !failed_holders.is_empty() {
self.send_event(NetworkEvent::FailedToFetchHolders(failed_holders));
}
}

/// Remove keys that we hold already and no longer need to be replicated.
Expand All @@ -244,25 +272,6 @@ impl ReplicationFetcher {
});
}

/// Add the key if not present yet.
fn add_key(&mut self, holder: PeerId, key: RecordKey, record_type: RecordType) {
// Do nothing if the incoming key is out_of_range
if let Some(ref distance_range) = self.distance_range {
let self_address = NetworkAddress::from_peer(self.self_peer_id);
let in_address = NetworkAddress::from_record_key(&key);

if self_address.distance(&in_address) >= *distance_range {
info!("The incoming record_key {in_address:?} is out of range, do not fetch it from {holder:?}");
return;
}
}

let _ = self
.to_be_fetched
.entry((key, record_type, holder))
.or_insert(Instant::now() + PENDING_TIMEOUT);
}

/// Sends an event after pushing it off thread so as to be non-blocking
/// this is a wrapper around the `mpsc::Sender::send` call
fn send_event(&self, event: NetworkEvent) {
Expand Down

0 comments on commit 9768e91

Please sign in to comment.