Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: update sn_messaging.
Browse files Browse the repository at this point in the history
BREAKING_CHANGE: Use new DirectAndUnrouted naming for direct message clarity
  • Loading branch information
joshuef committed May 28, 2021
1 parent 54f0e17 commit 14e1f04
Show file tree
Hide file tree
Showing 14 changed files with 45 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ qp2p = "~0.11.9"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = "25.1.0"
sn_messaging = "26.0.0"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl Network {
let dst = match dst {
DstLocation::Section(name) => name,
DstLocation::Node(name) => name,
DstLocation::Direct | DstLocation::EndUser(_) => {
DstLocation::DirectAndUnrouted | DstLocation::EndUser(_) => {
return Err(format_err!("unexpected probe message dst: {:?}", dst))
}
};
Expand Down
6 changes: 4 additions & 2 deletions src/agreement/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ impl DkgCommand {
message,
} => {
let variant = Variant::DkgMessage { dkg_key, message };
let message = Message::single_src(node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(node, DstLocation::DirectAndUnrouted, variant, None)?;

Ok(Command::send_message_to_nodes(
recipients.clone(),
Expand Down Expand Up @@ -668,7 +669,8 @@ impl DkgCommand {
proof,
non_participants,
};
let message = Message::single_src(node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(node, DstLocation::DirectAndUnrouted, variant, None)?;

Ok(Command::send_message_to_nodes(
recipients.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/message_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl MessageFilter {

pub fn insert_incoming(&mut self, msg: &Message) {
// Not filtering direct messages.
if let DstLocation::Direct = msg.dst() {
if let DstLocation::DirectAndUnrouted = msg.dst() {
return;
}
let _ = self.incoming.insert(*msg.hash(), ());
Expand All @@ -72,7 +72,7 @@ impl MessageFilter {
//
pub fn filter_outgoing(&mut self, msg: &Message, pub_id: &XorName) -> FilteringResult {
// Not filtering direct messages.
if let DstLocation::Direct = msg.dst() {
if let DstLocation::DirectAndUnrouted = msg.dst() {
return FilteringResult::NewMessage;
}

Expand Down
2 changes: 1 addition & 1 deletion src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ mod tests {
};
let message = Message::single_src(
&node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
variant,
Some(full_proof_chain.truncate(1)),
)?;
Expand Down
9 changes: 5 additions & 4 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,8 @@ impl<'a> State<'a> {
info!("Sending {:?} to {:?}", join_request, recipients);

let variant = Variant::JoinRequest(Box::new(join_request));
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;
let node_msg = RoutingMsg::new(message.to_bytes());

let _ = self
Expand Down Expand Up @@ -793,7 +794,7 @@ mod tests {
let proof_chain = SecuredLinkedList::new(pk);
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::NodeApproval {
genesis_key: pk,
section_auth,
Expand Down Expand Up @@ -1242,7 +1243,7 @@ mod tests {
// Send `Rejoin` with bad prefix
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::JoinRetry {
section_auth: gen_section_authority_provider(bad_prefix, ELDER_SIZE).0,
section_key: bls::SecretKey::random().public_key(),
Expand All @@ -1265,7 +1266,7 @@ mod tests {
// Send `Rejoin` with good prefix
let message = Message::single_src(
&bootstrap_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::JoinRetry {
section_auth: gen_section_authority_provider(good_prefix, ELDER_SIZE).0,
section_key: bls::SecretKey::random().public_key(),
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl Core {

return self.send_message_for_dst_accumulation(
self.node.name(),
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
variant,
None,
&recipients,
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/delivery_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub(crate) fn delivery_targets(

candidates(target_name, our_name, section, network)?
}
DstLocation::Direct => return Err(Error::CannotRoute),
DstLocation::DirectAndUnrouted => return Err(Error::CannotRoute),
};

Ok((best_section, dg_size))
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/messaging/handling/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Core {
if !sync_recipients.is_empty() {
let sync_message = Message::single_src(
&self.node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Sync {
section: self.section.clone(),
network: self.network.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/routing/core/messaging/handling/bad_msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Core {
};
let bounce_msg = Message::single_src(
&self.node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::BouncedUntrustedMessage {
msg: Box::new(msg),
dest_info: received_dest_info,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl Core {

Message::single_src(
&self.node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Sync {
section,
network: network.clone(),
Expand Down
3 changes: 2 additions & 1 deletion src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ impl Core {

pub(crate) fn handle_dkg_failure(&mut self, proofs: DkgFailureProofSet) -> Result<Command> {
let variant = Variant::DkgFailureAgreement(proofs);
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;
Ok(self.send_message_to_our_elders(message.to_bytes()))
}

Expand Down
3 changes: 2 additions & 1 deletion src/routing/core/messaging/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ impl Core {
content: proposal,
proof_share,
};
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;

Ok(self.send_or_handle(message, recipients))
}
Expand Down
19 changes: 13 additions & 6 deletions src/routing/core/messaging/sending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ impl Core {
member_info,
};

let message =
Message::single_src(&self.node, DstLocation::Direct, variant, Some(proof_chain))?;
let message = Message::single_src(
&self.node,
DstLocation::DirectAndUnrouted,
variant,
Some(proof_chain),
)?;

Ok(Command::send_message_to_node(
(name, addr),
Expand All @@ -71,7 +75,8 @@ impl Core {
let send = |variant, recipients: Vec<(XorName, SocketAddr)>| -> Result<_> {
trace!("Send {:?} to {:?}", variant, recipients);

let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;
let dest_info = DestInfo {
dest: XorName::random(),
dest_section_pk: *self.section.chain().last_key(),
Expand Down Expand Up @@ -112,7 +117,8 @@ impl Core {
let send = |variant, recipients: Vec<_>| -> Result<_> {
trace!("Send {:?} to {:?}", variant, recipients);

let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;

Ok(Command::send_message_to_nodes(
recipients.clone(),
Expand Down Expand Up @@ -220,7 +226,7 @@ impl Core {

self.send_message_for_dst_accumulation(
src_prefix.name(),
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
variant,
None,
recipients,
Expand Down Expand Up @@ -363,7 +369,8 @@ impl Core {
variant: Variant,
dst_pk: bls::PublicKey,
) -> Result<Command> {
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None)?;
let message =
Message::single_src(&self.node, DstLocation::DirectAndUnrouted, variant, None)?;
Ok(Command::send_message_to_node(
recipient,
message.to_bytes(),
Expand Down
18 changes: 9 additions & 9 deletions src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ async fn receive_join_request_without_resource_proof_response() -> Result<()> {

let message = Message::single_src(
&new_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::JoinRequest(Box::new(JoinRequest {
section_key,
relocate_payload: None,
Expand Down Expand Up @@ -231,7 +231,7 @@ async fn receive_join_request_with_resource_proof_response() -> Result<()> {

let message = Message::single_src(
&new_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::JoinRequest(Box::new(JoinRequest {
section_key,
relocate_payload: None,
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn receive_join_request_from_relocated_node() -> Result<()> {

let join_request = Message::single_src(
&relocated_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::JoinRequest(Box::new(JoinRequest {
section_key,
relocate_payload: Some(relocate_payload),
Expand Down Expand Up @@ -409,7 +409,7 @@ async fn aggregate_proposals() -> Result<()> {
let proof_share = proposal.prove(pk_set.clone(), index, &sk_set.secret_key_share(index))?;
let message = Message::single_src(
&nodes[index],
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Propose {
content: proposal.clone(),
proof_share,
Expand Down Expand Up @@ -437,7 +437,7 @@ async fn aggregate_proposals() -> Result<()> {
)?;
let message = Message::single_src(
&nodes[THRESHOLD],
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Propose {
content: proposal.clone(),
proof_share,
Expand Down Expand Up @@ -1042,7 +1042,7 @@ async fn handle_bounced_untrusted_message() -> Result<()> {
// Create the bounced message, indicating the last key the peer knows is `pk0`
let bounced_message = Message::single_src(
&other_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::BouncedUntrustedMessage {
msg: Box::new(original_message),
dest_info: dest_info.clone(),
Expand Down Expand Up @@ -1141,7 +1141,7 @@ async fn handle_sync() -> Result<()> {
// Create the `Sync` message containing the new `Section`.
let message = Message::single_src(
&old_node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Sync {
section: new_section.clone(),
network: Network::new(),
Expand Down Expand Up @@ -1209,7 +1209,7 @@ async fn handle_untrusted_sync() -> Result<()> {
let sender = create_node(MIN_ADULT_AGE);
let orig_message = Message::single_src(
&sender,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Sync {
section: new_section.clone(),
network: Network::new(),
Expand Down Expand Up @@ -1298,7 +1298,7 @@ async fn handle_bounced_untrusted_sync() -> Result<()> {

let orig_message = Message::single_src(
&node,
DstLocation::Direct,
DstLocation::DirectAndUnrouted,
Variant::Sync {
section: section_full,
network: Network::new(),
Expand Down

0 comments on commit 14e1f04

Please sign in to comment.