Skip to content

Commit

Permalink
chore(release): non gossip handler shall not throw gossip msg up
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 22, 2023
1 parent 1e33717 commit a1e7620
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 14 deletions.
2 changes: 1 addition & 1 deletion sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ impl Client {
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
info!("Subscribing to topic id: {topic_id}");
self.network.subscribe_to_topic(topic_id)?;
self.network.start_listen_gossip()?;
self.network.start_handle_gossip()?;
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ pub enum SwarmCmd {
/// Raw bytes of the message to publish
msg: Bytes,
},
GossipListener,
GossipHandler,
}

/// Debug impl for SwarmCmd to avoid printing full Record, instead only RecodKey
Expand Down Expand Up @@ -259,8 +259,8 @@ impl Debug for SwarmCmd {
SwarmCmd::SendRequest { req, peer, .. } => {
write!(f, "SwarmCmd::SendRequest req: {:?}, peer: {:?}", req, peer)
}
SwarmCmd::GossipListener => {
write!(f, "SwarmCmd::GossipListener")
SwarmCmd::GossipHandler => {
write!(f, "SwarmCmd::GossipHandler")
}
}
}
Expand Down Expand Up @@ -616,7 +616,7 @@ impl SwarmDriver {
SwarmCmd::GossipsubPublish { topic_id, msg } => {
// If we publish a Gossipsub message, we might not receive the same message on our side.
// Hence push an event to notify that we've published a message
if self.is_gossip_listener {
if self.is_gossip_handler {
self.send_event(NetworkEvent::GossipsubMsgPublished {
topic: topic_id.clone(),
msg: msg.clone(),
Expand All @@ -627,8 +627,8 @@ impl SwarmDriver {
gossip.publish(topic_id, msg)?;
}
}
SwarmCmd::GossipListener => {
self.is_gossip_listener = true;
SwarmCmd::GossipHandler => {
self.is_gossip_handler = true;
}
}

Expand Down
7 changes: 5 additions & 2 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ impl NetworkBuilder {
// 63 will mean at least 63 most recent peers we have dialed, which should be allow for enough time for the
// `identify` protocol to kick in and get them in the routing table.
dialed_peers: CircularVec::new(63),
is_gossip_listener: false,
is_gossip_handler: false,
};

Ok((
Expand Down Expand Up @@ -531,7 +531,10 @@ pub struct SwarmDriver {
pub(crate) pending_get_record: PendingGetRecord,
/// A list of the most recent peers we have dialed ourselves.
pub(crate) dialed_peers: CircularVec<PeerId>,
pub(crate) is_gossip_listener: bool,
// For normal nodes, though they subscribe to the gossip topic
// (to ensure no miss-up by carrying out libp2p low level gossip forwarding),
// they are not supposed to process the gossip msg that received from libp2p.
pub(crate) is_gossip_handler: bool,
}

impl SwarmDriver {
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl SwarmDriver {

#[cfg(feature = "open-metrics")]
self.network_metrics.record(&event);
if self.is_gossip_listener {
if self.is_gossip_handler {
match event {
libp2p::gossipsub::Event::Message {
message,
Expand Down
4 changes: 2 additions & 2 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ impl Network {
Ok(state)
}

pub fn start_listen_gossip(&self) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipListener)
pub fn start_handle_gossip(&self) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::GossipHandler)
}

// Helper to send SwarmCmd
Expand Down
9 changes: 9 additions & 0 deletions sn_node/src/bin/safenode/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,15 @@ impl SafeNode for SafeNodeRpcService {

let topic = &request.get_ref().topic;

// Assuming the rpc subscription request also force the node to handle the gossip.
// So far, this is only used during test to allow counting the gossip msgs received by node.
if let Err(err) = self.running_node.start_handle_gossip() {
return Err(Status::new(
Code::Internal,
format!("Failed to start handle gossip: {err}"),
));
}

match self.running_node.subscribe_to_topic(topic.clone()) {
Ok(()) => Ok(Response::new(GossipsubSubscribeResponse {})),
Err(err) => Err(Status::new(
Expand Down
7 changes: 6 additions & 1 deletion sn_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ impl RunningNode {
/// Subscribe to given gossipsub topic
pub fn subscribe_to_topic(&self, topic_id: String) -> Result<()> {
self.network.subscribe_to_topic(topic_id)?;
self.network.start_listen_gossip()?;
Ok(())
}

/// Starts handling gossipsub topics
pub fn start_handle_gossip(&self) -> Result<()> {
self.network.start_handle_gossip()?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl Node {
match node_cmd {
Ok(NodeCmd::TransferNotifsFilter(filter)) => {
self.transfer_notifs_filter = filter;
let _ = self.network.start_listen_gossip();
let _ = self.network.start_handle_gossip();
}
Err(err) => error!("When trying to read from the NodeCmds channel/receiver: {err:?}")
}
Expand Down

0 comments on commit a1e7620

Please sign in to comment.