Skip to content

Commit

Permalink
Merge #1888
Browse files Browse the repository at this point in the history
1888: chore(messaging): removing unused payload_debug field from msgs r=joshuef a=bochaco

This change also includes a minor refactor within `comm` mod.

Co-authored-by: bochaco <gabrielviganotti@gmail.com>
  • Loading branch information
bors[bot] and bochaco committed Dec 15, 2022
2 parents 82c0cf6 + 6e4fce5 commit fc0be25
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 93 deletions.
19 changes: 0 additions & 19 deletions sn_interface/src/messaging/serialisation/wire_msg.rs
Expand Up @@ -37,11 +37,6 @@ pub struct WireMsg {
#[debug(skip)]
/// Serialized Message dst
pub serialized_dst: Option<Bytes>,
/// Extra debug info if the relevant feature is enabled.
// This is behind a feature because it's potentially expensive to carry around the message as
// well as its serialization.
#[cfg(feature = "test-utils")]
pub payload_debug: Option<std::sync::Arc<dyn std::fmt::Debug + Send + Sync>>,
}

impl PartialEq for WireMsg {
Expand Down Expand Up @@ -92,8 +87,6 @@ impl WireMsg {
payload,
serialized_dst: None,
serialized_header: None,
#[cfg(feature = "test-utils")]
payload_debug: None,
}
}

Expand All @@ -117,8 +110,6 @@ impl WireMsg {
payload,
serialized_dst: Some(dst_bytes),
serialized_header: Some(header_bytes),
#[cfg(feature = "test-utils")]
payload_debug: None,
})
}

Expand Down Expand Up @@ -267,16 +258,6 @@ impl WireMsg {
pub fn verify_sig(auth: ClientAuth, msg: ClientMsg) -> Result<AuthorityProof<ClientAuth>> {
Self::serialize_msg_payload(&msg).and_then(|payload| AuthorityProof::verify(auth, &payload))
}

#[cfg(feature = "test-utils")]
pub fn set_payload_debug(
// take ownership for ergonomics in `cfg(...)` blocks
mut self,
payload_debug: impl std::fmt::Debug + Send + Sync + 'static,
) -> Self {
self.payload_debug = Some(std::sync::Arc::new(payload_debug));
self
}
}

#[cfg(test)]
Expand Down
74 changes: 35 additions & 39 deletions sn_node/src/comm/peer_session.rs
Expand Up @@ -162,47 +162,43 @@ impl PeerSessionWorker {
trace!("Processing session {peer:?} cmd: {session_cmd:?}");

let status = match session_cmd {
SessionCmd::Send(job) => {
if let Some(send_stream) = job.send_stream {
// send response on the stream
debug!("Sending on stream via PeerSessionWorker");
let _handle = tokio::spawn(async move {
let stream_prio = 10;
let mut send_stream = send_stream.lock().await;
send_stream.set_priority(stream_prio);
let stream_id = send_stream.id();
if let Err(error) = send_stream.send_user_msg(job.bytes).await {
error!(
"Could not send msg {:?} over response {stream_id} to {peer:?}: {error:?}",
job.msg_id
);

job.reporter.send(SendStatus::TransientError(format!(
"Could not send msg on response {stream_id} to {peer:?} for {:?}", job.msg_id
)));
} else {
// Attempt to gracefully terminate the stream.
// If this errors it does _not_ mean our message has not been sent
let _ = send_stream.finish().await;

job.reporter.send(SendStatus::Sent);
}
});

SessionStatus::Ok
} else {
//another
match self.send_over_peer_connection(job).await {
Ok(s) => s,
Err(error) => {
error!("session error {error:?}");
// don't breakout here?
// TODO: is this correct?
continue;
}
SessionCmd::Send(SendJob {
msg_id,
bytes,
reporter,
send_stream: Some(send_stream),
..
}) => {
// send response on the stream
let _handle = tokio::spawn(async move {
let stream_prio = 10;
let mut send_stream = send_stream.lock().await;
debug!("Sending on {} via PeerSessionWorker", send_stream.id());
send_stream.set_priority(stream_prio);
let stream_id = send_stream.id();
if let Err(error) = send_stream.send_user_msg(bytes).await {
error!("Could not send msg {msg_id:?} over response {stream_id} to {peer:?}: {error:?}");
reporter.send(SendStatus::TransientError(format!("Could not send msg on response {stream_id} to {peer:?} for {msg_id:?}")));
} else {
// Attempt to gracefully terminate the stream.
// If this errors it does _not_ mean our message has not been sent
let _ = send_stream.finish().await;

reporter.send(SendStatus::Sent);
}
}
});

SessionStatus::Ok
}
SessionCmd::Send(job) => match self.send_over_peer_connection(job).await {
Ok(status) => status,
Err(error) => {
error!("session error {error:?}");
// don't breakout here?
// TODO: is this correct?
continue;
}
},
SessionCmd::AddConnection(conn) => {
self.link.add(conn);
SessionStatus::Ok
Expand Down
3 changes: 0 additions & 3 deletions sn_node/src/node/data/records.rs
Expand Up @@ -403,9 +403,6 @@ impl MyNode {

let mut wire_msg = WireMsg::new_msg(msg_id, payload, kind, dst);

#[cfg(feature = "test-utils")]
let wire_msg = wire_msg.set_payload_debug(msg);

wire_msg
.serialize_and_cache_bytes()
.map_err(|_| Error::InvalidMessage)
Expand Down
10 changes: 0 additions & 10 deletions sn_node/src/node/flow_ctrl/cmds.rs
Expand Up @@ -198,19 +198,9 @@ impl fmt::Display for Cmd {
Cmd::SetStorageLevel(level) => {
write!(f, "SetStorageLevel {:?}", level)
}
#[cfg(not(feature = "test-utils"))]
Cmd::HandleMsg { wire_msg, .. } => {
write!(f, "HandleMsg {:?}", wire_msg.msg_id())
}
#[cfg(feature = "test-utils")]
Cmd::HandleMsg { wire_msg, .. } => {
write!(
f,
"HandleMsg {:?} {:?}",
wire_msg.msg_id(),
wire_msg.payload_debug
)
}
Cmd::UpdateNetworkAndHandleValidClientMsg { msg_id, msg, .. } => {
write!(f, "UpdateAndHandleValidClientMsg {:?}: {:?}", msg_id, msg)
}
Expand Down
14 changes: 2 additions & 12 deletions sn_node/src/node/flow_ctrl/dispatcher.rs
Expand Up @@ -9,14 +9,13 @@
use crate::node::{messaging::Peers, Cmd, Error, MyNode, Result, STANDARD_CHANNEL_SIZE};

use sn_interface::{
messaging::{system::NodeMsg, Dst, MsgId, MsgKind, WireMsg},
messaging::{system::NodeMsg, Dst, MsgId, WireMsg},
network_knowledge::{NetworkKnowledge, SectionTreeUpdate},
types::{DataAddress, Peer},
};

use qp2p::UsrMsgBytes;

use bytes::Bytes;
use std::sync::Arc;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Expand Down Expand Up @@ -254,7 +253,7 @@ fn into_msg_bytes(
section_key: bls::SecretKey::random().public_key(),
};

let mut initial_wire_msg = wire_msg(msg_id, payload, kind, dst);
let mut initial_wire_msg = WireMsg::new_msg(msg_id, payload, kind, dst);

let _bytes = initial_wire_msg.serialize_and_cache_bytes()?;

Expand All @@ -274,12 +273,3 @@ fn into_msg_bytes(

Ok(msgs)
}

fn wire_msg(msg_id: MsgId, payload: Bytes, auth: MsgKind, dst: Dst) -> WireMsg {
#[allow(unused_mut)]
let mut wire_msg = WireMsg::new_msg(msg_id, payload, auth, dst);

#[cfg(feature = "test-utils")]
let wire_msg = wire_msg.set_payload_debug(msg);
wire_msg
}
7 changes: 0 additions & 7 deletions sn_node/src/node/flow_ctrl/mod.rs
Expand Up @@ -218,13 +218,6 @@ impl FlowCtrl {
LogMarker::DispatchHandleMsgCmd,
);

#[cfg(feature = "test-utils")]
let wire_msg = if let Ok(msg) = wire_msg.into_msg() {
wire_msg.set_payload_debug(msg)
} else {
wire_msg
};

Ok(Cmd::HandleMsg {
origin: sender,
wire_msg,
Expand Down
3 changes: 0 additions & 3 deletions sn_node/src/node/messages.rs
Expand Up @@ -27,9 +27,6 @@ impl WireMsgUtils for WireMsg {

let wire_msg = WireMsg::new_msg(MsgId::new(), msg_payload, MsgKind::Node(node.name()), dst);

#[cfg(feature = "test-utils")]
let wire_msg = wire_msg.set_payload_debug(msg);

Ok(wire_msg)
}
}

0 comments on commit fc0be25

Please sign in to comment.