Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce unhandled NetworkEvents #982

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 13 additions & 6 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,12 +538,17 @@ impl SwarmDriver {
// be handled.
// `self` then handles the request and sends a response back again to itself.
if peer == *self.swarm.local_peer_id() {
trace!("Sending request to self");

self.send_event(NetworkEvent::RequestReceived {
req,
channel: MsgResponder::FromSelf(sender),
});
trace!("Sending query request to self");
if let Request::Query(query) = req {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
channel: MsgResponder::FromSelf(sender),
});
} else {
// We should never receive a Replicate request from ourselves.
// we already hold this data if we do... so we can ignore
trace!("Replicate cmd to self received, ignoring");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log is bit confusing.
As I understand, it is a non-query request to self, which can be ignored, maybe due to replicate ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeh, we'd be asking ourself to replicate data which we're already holding? So can be ignored

}
} else {
let request_id = self
.swarm
Expand All @@ -552,6 +557,8 @@ impl SwarmDriver {
.send_request(&peer, req);
trace!("Sending request {request_id:?} to peer {peer:?}");
let _ = self.pending_requests.insert(request_id, sender);

trace!("Pending Requests now: {:?}", self.pending_requests.len());
}
}
SwarmCmd::SendResponse { resp, channel } => match channel {
Expand Down
65 changes: 51 additions & 14 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use libp2p::{
};

use sn_protocol::{
messages::{Request, Response},
messages::{Cmd, CmdResponse, Query, Request, Response},
storage::RecordHeader,
NetworkAddress, PrettyPrintRecordKey,
};
Expand Down Expand Up @@ -113,10 +113,15 @@ pub enum MsgResponder {
#[allow(clippy::large_enum_variant)]
/// Events forwarded by the underlying Network; to be used by the upper layers
pub enum NetworkEvent {
/// Incoming `Request` from a peer
RequestReceived {
/// Request
req: Request,
/// Incoming `Cmd` from a peer
CmdRequestReceived {
/// Cmd
cmd: Cmd,
},
/// Incoming `Query` from a peer
QueryRequestReceived {
/// Query
query: Query,
/// The channel to send the `Response` through
channel: MsgResponder,
},
Expand Down Expand Up @@ -159,8 +164,11 @@ pub enum NetworkEvent {
impl Debug for NetworkEvent {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
NetworkEvent::RequestReceived { req, .. } => {
write!(f, "NetworkEvent::RequestReceived({req:?})")
NetworkEvent::CmdRequestReceived { cmd, .. } => {
write!(f, "NetworkEvent::CmdRequestReceived({cmd:?})")
}
NetworkEvent::QueryRequestReceived { query, .. } => {
write!(f, "NetworkEvent::QueryRequestReceived({query:?})")
}
NetworkEvent::ResponseReceived { res, .. } => {
write!(f, "NetworkEvent::ResponseReceived({res:?})")
Expand Down Expand Up @@ -487,10 +495,31 @@ impl SwarmDriver {
..
} => {
trace!("Received request {request_id:?} from peer {peer:?}, req: {request:?}");
self.send_event(NetworkEvent::RequestReceived {
req: request,
channel: MsgResponder::FromPeer(channel),
})
// if the request is replication, we can handle it and send the OK response here,
// as we send that regardless of how we handle the request as its unimportant to the sender.
match request {
Request::Cmd(sn_protocol::messages::Cmd::Replicate { holder, keys }) => {
trace!("Short circuit ReplicateOk response to peer {peer:?}");
let response = Response::Cmd(
sn_protocol::messages::CmdResponse::Replicate(Ok(())),
);
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, response)
.map_err(|_| Error::InternalMsgChannelDropped)?;

self.send_event(NetworkEvent::CmdRequestReceived {
cmd: sn_protocol::messages::Cmd::Replicate { holder, keys },
});
}
Request::Query(query) => {
self.send_event(NetworkEvent::QueryRequestReceived {
query,
channel: MsgResponder::FromPeer(channel),
})
}
}
}
Message::Response {
request_id,
Expand All @@ -507,9 +536,17 @@ impl SwarmDriver {
.send(Ok(response))
.map_err(|_| Error::InternalMsgChannelDropped)?,
None => {
// responses that are not awaited at the call site must be handled
// separately
self.send_event(NetworkEvent::ResponseReceived { res: response });
if let Response::Cmd(CmdResponse::Replicate(Ok(()))) = response {
// Nothing to do, response was fine
// This only exists to ensure we dont drop the handle and
// exit early, potentially logging false connection woes
} else {
// responses that are not awaited at the call site must be handled
// separately
self.send_event(NetworkEvent::ResponseReceived {
res: response,
});
}
}
}
} else {
Expand Down
8 changes: 1 addition & 7 deletions sn_node/src/log_markers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::{kad::RecordKey, PeerId};
use sn_protocol::{
messages::{Cmd, CmdResponse},
PrettyPrintRecordKey,
};
use sn_protocol::{messages::Cmd, PrettyPrintRecordKey};
use std::time::Duration;
// this gets us to_string easily enough
use strum::Display;
Expand All @@ -32,9 +29,6 @@ pub enum Marker<'a> {
/// Network Cmd message received
NodeCmdReceived(&'a Cmd),

/// Network Cmd message response was generated
NodeCmdResponded(&'a CmdResponse),

/// Peer was added to the routing table
PeerAddedToRoutingTable(PeerId),

Expand Down
39 changes: 14 additions & 25 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use sn_networking::{
};
use sn_protocol::{
error::Error as ProtocolError,
messages::{Cmd, CmdResponse, Query, QueryResponse, Request, Response},
messages::{Cmd, CmdResponse, Query, QueryResponse, Response},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::{CashNoteRedemption, LocalWallet, MainPubkey, MainSecretKey};
Expand Down Expand Up @@ -283,7 +283,8 @@ impl Node {
// these activities requires the node to be connected to some peer to be able to carry
// out get kad.get_record etc. This happens during replication/PUT. So we should wait
// until we have enough nodes, else these might fail.
NetworkEvent::RequestReceived { .. }
NetworkEvent::CmdRequestReceived { .. }
| NetworkEvent::QueryRequestReceived { .. }
| NetworkEvent::UnverifiedRecord(_)
| NetworkEvent::FailedToWrite(_)
| NetworkEvent::ResponseReceived { .. }
Expand All @@ -307,8 +308,13 @@ impl Node {
trace!("Handling NetworkEvent {event_string:?}");

match event {
NetworkEvent::RequestReceived { req, channel } => {
self.handle_request(req, channel).await;
NetworkEvent::QueryRequestReceived { query, channel } => {
let res = self.handle_query(query).await;

self.send_response(res, channel);
}
NetworkEvent::CmdRequestReceived { cmd } => {
self.handle_node_cmd(cmd);
}
NetworkEvent::ResponseReceived { res } => {
trace!("NetworkEvent::ResponseReceived {res:?}");
Expand Down Expand Up @@ -419,9 +425,8 @@ impl Node {
fn handle_response(&self, response: Response) -> Result<()> {
match response {
Response::Cmd(CmdResponse::Replicate(Ok(()))) => {
// Nothing to do, response was fine
// This only exists to ensure we dont drop the handle and
// exit early, potentially logging false connection woes
// This should actually have been short-circuted when received
warn!("Mishandled replicate response, should be handled earlier");
}
Response::Query(QueryResponse::GetReplicatedRecord(resp)) => {
error!("Response to replication shall be handled by called not by common handler, {resp:?}");
Expand All @@ -434,15 +439,6 @@ impl Node {
Ok(())
}

async fn handle_request(&self, request: Request, response_channel: MsgResponder) {
trace!("Handling request: {request:?}");
let response = match request {
Request::Cmd(cmd) => self.handle_node_cmd(cmd),
Request::Query(query) => self.handle_query(query).await,
};
self.send_response(response, response_channel);
}

async fn handle_query(&self, query: Query) -> Response {
let resp: QueryResponse = match query {
Query::GetStoreCost(address) => {
Expand Down Expand Up @@ -505,9 +501,9 @@ impl Node {
Response::Query(resp)
}

fn handle_node_cmd(&self, cmd: Cmd) -> Response {
fn handle_node_cmd(&self, cmd: Cmd) {
Marker::NodeCmdReceived(&cmd).log();
let resp = match cmd {
match cmd {
Cmd::Replicate { holder, keys } => {
trace!(
"Received replication list from {holder:?} of {} keys",
Expand All @@ -520,15 +516,8 @@ impl Node {
} else {
error!("Within the replication list, Can not parse peer_id from {holder:?}");
}

// if we do not send a response, we can cause connection failures.
CmdResponse::Replicate(Ok(()))
}
};

Marker::NodeCmdResponded(&resp).log();

Response::Cmd(resp)
}

fn send_response(&self, resp: Response, response_channel: MsgResponder) {
Expand Down
2 changes: 1 addition & 1 deletion sn_transfers/src/cashnotes/signed_spend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl std::hash::Hash for SignedSpend {
}

/// Represents the data to be signed by the DerivedSecretKey of the CashNote being spent.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[derive(custom_debug::Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Spend {
/// UniquePubkey of input CashNote that this SignedSpend is proving to be spent.
pub unique_pubkey: UniquePubkey,
Expand Down