Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dont artifically push replication #964

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 0 additions & 12 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,6 @@ pub struct SwarmLocalState {

impl SwarmDriver {
pub(crate) fn handle_cmd(&mut self, cmd: SwarmCmd) -> Result<(), Error> {
let drives_forward_replication = matches!(
cmd,
SwarmCmd::PutLocalRecord { .. } | SwarmCmd::AddKeysToReplicationFetcher { .. }
);

match cmd {
SwarmCmd::AddKeysToReplicationFetcher { holder, keys } => {
// Only store record from Replication that close enough to us.
Expand Down Expand Up @@ -589,13 +584,6 @@ impl SwarmDriver {
}
}

// in case we're a node and not driving forward and there are keys to replicate, let's fire events for that
if !self.is_client && !drives_forward_replication {
let keys_to_fetch = self.replication_fetcher.next_keys_to_fetch();
if !keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysForReplication(keys_to_fetch));
}
}
Ok(())
}

Expand Down
7 changes: 6 additions & 1 deletion sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,12 @@ impl SwarmDriver {
self.network_metrics.record(&event);
if self.is_gossip_listener {
match event {
libp2p::gossipsub::Event::Message { message, .. } => {
libp2p::gossipsub::Event::Message {
message,
message_id,
..
} => {
info!("Gossipsub message received, id: {message_id:?}");
let topic = message.topic.into_string();
let msg = Bytes::from(message.data);
self.send_event(NetworkEvent::GossipsubMsgReceived { topic, msg });
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub const CLOSE_GROUP_SIZE: usize = 5;

/// The range of peers that will be considered as close to a record target,
/// that a replication of the record shall be sent/accepted to/by the peer.
pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE * 2;
pub const REPLICATE_RANGE: usize = CLOSE_GROUP_SIZE + 2;

/// Majority of a given group (i.e. > 1/2).
#[inline]
Expand Down
6 changes: 5 additions & 1 deletion sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,16 @@ impl NodeRecordStore {
}

fn read_from_disk<'a>(key: &Key, storage_dir: &Path) -> Option<Cow<'a, Record>> {
let start = std::time::Instant::now();
let filename = Self::key_to_hex(key);
let file_path = storage_dir.join(&filename);

match fs::read(file_path) {
Ok(value) => {
debug!("Retrieved record from disk! filename: {filename}");
debug!(
"Retrieved record from disk! filename: {filename} after {:?}",
start.elapsed()
);
let record = Record {
key: key.clone(),
value,
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/src/replication_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
};

// Max parallel fetches that can be undertaken at the same time.
const MAX_PARALLEL_FETCH: usize = K_VALUE.get() * 4;
const MAX_PARALLEL_FETCH: usize = K_VALUE.get();

// The duration after which a peer will be considered failed to fetch data from,
// if no response got from that peer.
Expand Down
10 changes: 7 additions & 3 deletions sn_node/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ impl NodeEventsChannel {

// Broadcast a new event, meant to be a helper only used by the sn_node's internals.
pub(crate) fn broadcast(&self, event: NodeEvent) {
if let Err(err) = self.0.send(event.clone()) {
trace!("Error occurred when trying to broadcast a node event ({event:?}): {err}");
let event_string = format!("{:?}", event);
if let Err(err) = self.0.send(event) {
trace!(
"Error occurred when trying to broadcast a node event ({event_string:?}): {err}"
);
}
}

Expand All @@ -50,7 +53,7 @@ impl NodeEventsChannel {
}

/// Type of events broadcasted by the node to the public API.
#[derive(Clone, Serialize, Debug, Deserialize)]
#[derive(Clone, Serialize, custom_debug::Debug, Deserialize)]
pub enum NodeEvent {
/// The node has been connected to the network
ConnectedToNetwork,
Expand All @@ -71,6 +74,7 @@ pub enum NodeEvent {
/// Topic the message was published on
topic: String,
/// The raw bytes of the received message
#[debug(skip)]
msg: Bytes,
},
/// Transfer notification message received for a public key
Expand Down
33 changes: 17 additions & 16 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,25 +385,26 @@ impl Node {
}
NetworkEvent::GossipsubMsgReceived { topic, msg }
| NetworkEvent::GossipsubMsgPublished { topic, msg } => {
if self.events_channel.receiver_count() > 0 {
if topic == TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially,
// and we try to decode it only if it's referring to a PK the user is interested in
if let Some(filter_pk) = self.transfer_notifs_filter {
match try_decode_transfer_notif(&msg, filter_pk) {
Ok(Some(notif_event)) => self.events_channel.broadcast(notif_event),
Ok(None) => { /* transfer notif filered out */ }
Err(err) => {
warn!("GossipsubMsg matching the transfer notif. topic name, couldn't be decoded as such: {err:?}");
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
}
if self.events_channel.receiver_count() == 0 {
return;
}
if topic == TRANSFER_NOTIF_TOPIC {
// this is expected to be a notification of a transfer which we treat specially,
// and we try to decode it only if it's referring to a PK the user is interested in
if let Some(filter_pk) = self.transfer_notifs_filter {
match try_decode_transfer_notif(&msg, filter_pk) {
Ok(Some(notif_event)) => self.events_channel.broadcast(notif_event),
Ok(None) => { /* transfer notif filered out */ }
Err(err) => {
warn!("GossipsubMsg matching the transfer notif. topic name, couldn't be decoded as such: {err:?}");
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
}
}
} else {
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
}
} else {
self.events_channel
.broadcast(NodeEvent::GossipsubMsg { topic, msg });
}
}
}
Expand Down