Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(tests): fix tests after refactor and rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoga07 committed May 31, 2021
1 parent ec32530 commit 6977855
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 170 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ qp2p = "~0.11.9"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = "27.0.0"
# sn_messaging = "27.0.0"
sn_messaging = { git = "https://github.com/yoga07/sn_messaging", branch = "remove-proofchain" }
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
Expand Down
18 changes: 4 additions & 14 deletions src/agreement/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,8 @@ impl DkgCommand {
message,
} => {
let variant = Variant::DkgMessage { dkg_key, message };
let message = RoutingMsg::single_src(
node,
DstLocation::DirectAndUnrouted,
variant,
key,
None,
)?;
let message =
RoutingMsg::single_src(node, DstLocation::DirectAndUnrouted, variant, key)?;

Ok(Command::send_message_to_nodes(
recipients.clone(),
Expand Down Expand Up @@ -554,13 +549,8 @@ impl DkgCommand {
proof,
non_participants,
};
let message = RoutingMsg::single_src(
node,
DstLocation::DirectAndUnrouted,
variant,
key,
None,
)?;
let message =
RoutingMsg::single_src(node, DstLocation::DirectAndUnrouted, variant, key)?;

Ok(Command::send_message_to_nodes(
recipients.clone(),
Expand Down
1 change: 0 additions & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use bytes::Bytes;
use ed25519_dalek::Keypair;
use hex_fmt::HexFmt;
pub use qp2p::{RecvStream, SendStream};
use secured_linked_list::SecuredLinkedList;
use sn_messaging::{client::ClientMsg, DstLocation, EndUser, SrcLocation};
use std::{
collections::BTreeSet,
Expand Down
29 changes: 15 additions & 14 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,11 @@ pub trait RoutingMsgUtils {

fn verify_variant<'a, I: IntoIterator<Item = &'a bls::PublicKey>>(
&self,
proof_chain: Option<&SecuredLinkedList>,
trusted_keys: I,
) -> Result<VerifyStatus>;

/// Returns an updated message with the provided Section key i.e. known to be latest.
fn updated_with_latest_key(&mut self, section_pk: bls::PublicKey);
}

impl RoutingMsgUtils for RoutingMsg {
Expand Down Expand Up @@ -213,12 +215,12 @@ impl RoutingMsgUtils for RoutingMsg {
section_chain: section_chain.clone(),
};

Self::new_signed(src, dst, variant, section_chain.last_key())
Self::new_signed(src, dst, variant, *section_chain.last_key())
}

/// Converts the message src authority from `BlsShare` to `Section` on successful accumulation.
/// Returns errors if src is not `BlsShare` or if the proof is invalid.
pub(crate) fn into_dst_accumulated(mut self, proof: Proof) -> Result<Self> {
fn into_dst_accumulated(mut self, proof: Proof) -> Result<Self> {
let (proof_share, src_name, section_chain) = if let SrcAuthority::BlsShare {
proof_share,
src_name,
Expand Down Expand Up @@ -326,7 +328,7 @@ impl RoutingMsgUtils for RoutingMsg {
}

// Variant-specific verification.
self.verify_variant(self.proof_chain.as_ref(), trusted_keys)
self.verify_variant(trusted_keys)
}
SrcAuthority::BlsShare {
proof_share,
Expand Down Expand Up @@ -424,31 +426,26 @@ impl RoutingMsgUtils for RoutingMsg {
// RoutingMsg::new_signed(self.src, self.dst, self.variant, self.proof_chain)
// }

fn verify_variant<'a, I>(
&self,
proof_chain: Option<&SecuredLinkedList>,
trusted_keys: I,
) -> Result<VerifyStatus>
fn verify_variant<'a, I>(&self, trusted_keys: I) -> Result<VerifyStatus>
where
I: IntoIterator<Item = &'a bls::PublicKey>,
{
let proof_chain = match &self.variant {
Variant::NodeApproval {
section_auth,
member_info,
section_chain,
..
} => {
let proof_chain = proof_chain.ok_or(Error::InvalidMessage)?;

if !section_auth.verify(proof_chain) {
if !section_auth.verify(section_chain) {
return Err(Error::InvalidMessage);
}

if !member_info.verify(proof_chain) {
if !member_info.verify(section_chain) {
return Err(Error::InvalidMessage);
}

proof_chain
section_chain
}
Variant::Sync { section, .. } => section.chain(),
_ => return Ok(VerifyStatus::Full),
Expand All @@ -460,6 +457,10 @@ impl RoutingMsgUtils for RoutingMsg {
Ok(VerifyStatus::Unknown)
}
}

fn updated_with_latest_key(&mut self, section_pk: bls::PublicKey) {
self.section_pk = section_pk
}
}

#[derive(Eq, PartialEq, Debug)]
Expand Down
105 changes: 21 additions & 84 deletions src/routing/core/anti_entropy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::routing::command::Command;
use crate::routing::core::LaggingMessages;
use crate::{
error::Result,
messages::{RoutingMsgUtils, SrcAuthorityUtils, Variant},
network::NetworkUtils,
messages::{RoutingMsgUtils, SrcAuthorityUtils},
node::Node,
section::SectionUtils,
};
use sn_messaging::{
node::{Network, RoutingMsg, Section, Variant},
node::{RoutingMsg, Section, Variant},
DestInfo,
};
use std::cmp::Ordering;
use std::net::SocketAddr;

/// On reception of an incoming message, determine the actions that need to be taken in order to
/// bring ours and the senders knowledge about each other up to date. Returns a tuple of
Expand All @@ -29,10 +25,8 @@ use std::net::SocketAddr;
pub(crate) fn process(
node: &Node,
section: &Section,
lagging_messages: &mut LaggingMessages,
msg: &RoutingMsg,
dest_info: DestInfo,
sender: Option<SocketAddr>,
) -> Result<(Actions, bool)> {
let mut actions = Actions::default();

Expand All @@ -49,48 +43,20 @@ pub(crate) fn process(
.chain()
.cmp_by_position(&dest_info.dest_section_pk, section.chain().last_key())
{
if !section.chain().has_key(&dest_info.dest_section_pk) {
// Their knowledge of our section is newer than what we have stored - store it and execute upon sync.
info!("Anti-Entropy: We, the dst are outdated. Source has a greater key than ours.");
info!("Enqueue the messages and act on them upon syncing in the future");
let command = Command::HandleMessage {
sender,
message: msg.clone(),
dest_info: dest_info.clone(),
};
match lagging_messages
.src_ahead
.get_mut(&dest_info.dest_section_pk)
{
Some(lagging) => {
lagging.push(command);
}
None => {
let _ = lagging_messages
.src_ahead
.insert(dest_info.dest_section_pk, vec![command]);
}
}
} else {
info!("Anti-Entropy: Source's knowledge of our key is outdated, send them an update.");
info!("We can still execute the message as the key is a part of our chain");
let chain = section
.chain()
.get_proof_chain_to_current(&dest_info.dest_section_pk)?;
let section_auth = section.proven_authority_provider();
let variant = Variant::SectionKnowledge {
src_info: (section_auth.clone(), chain),
msg: None,
};
let msg = RoutingMsg::single_src(
node,
dst,
variant,
section.authority_provider().section_key,
)?;
actions.send.push(msg);
return Ok((actions, true));
}
info!("Anti-Entropy: Source's knowledge of our key is outdated, send them an update.");
info!("We can still execute the message as the key is a part of our chain");
let chain = section
.chain()
.get_proof_chain_to_current(&dest_info.dest_section_pk)?;
let section_auth = section.proven_authority_provider();
let variant = Variant::SectionKnowledge {
src_info: (section_auth.clone(), chain),
msg: None,
};
let msg =
RoutingMsg::single_src(node, dst, variant, section.authority_provider().section_key)?;
actions.send.push(msg);
return Ok((actions, true));
}

Ok((actions, false))
Expand All @@ -109,12 +75,11 @@ mod tests {
agreement::test_utils::proven,
crypto,
section::test_utils::{gen_addr, gen_section_authority_provider},
Error, XorName, ELDER_SIZE, MIN_ADULT_AGE,
XorName, ELDER_SIZE, MIN_ADULT_AGE,
};
use anyhow::{Context, Result};
use assert_matches::assert_matches;
use secured_linked_list::SecuredLinkedList;
use secured_linked_list::SecuredLinkedList;
use sn_messaging::DstLocation;
use xor_name::Prefix;

Expand All @@ -131,14 +96,7 @@ mod tests {
dest_section_pk: *env.section.chain().last_key(),
};

let (actions, _) = process(
&env.node,
&env.section,
&mut LaggingMessages::default(),
&msg,
dest_info,
None,
)?;
let (actions, _) = process(&env.node, &env.section, &msg, dest_info)?;
assert_eq!(actions.send, vec![]);

Ok(())
Expand All @@ -160,14 +118,7 @@ mod tests {
dest_section_pk: our_new_pk,
};

let (actions, _) = process(
&env.node,
&env.section,
&mut LaggingMessages::default(),
&msg,
dest_info,
None,
)?;
let (actions, _) = process(&env.node, &env.section, &msg, dest_info)?;

assert_eq!(actions.send, vec![]);

Expand All @@ -187,14 +138,7 @@ mod tests {
dest_section_pk: *env.section.chain().root_key(),
};

let (mut actions, _) = process(
&env.node,
&env.section,
&mut LaggingMessages::default(),
&msg,
dest_info,
None,
)?;
let (mut actions, _) = process(&env.node, &env.section, &msg, dest_info)?;

assert_matches!(&actions.send.pop(), Some(message) => {
assert_matches!(message.variant(), Variant::SectionKnowledge { src_info, .. } => {
Expand All @@ -219,14 +163,7 @@ mod tests {
dest_section_pk: *env.section.chain().root_key(),
};

let (actions, _) = process(
&env.node,
&env.section,
&mut LaggingMessages::default(),
&msg,
dest_info,
None,
)?;
let (actions, _) = process(&env.node, &env.section, &msg, dest_info)?;

assert_eq!(actions.send, vec![]);

Expand Down
2 changes: 1 addition & 1 deletion src/routing/core/messaging/handling/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ impl Core {
proof: Proof,
dest_info: DestInfo,
) -> Result<Command> {
let message = Message::section_src(message, proof, section_chain)?;
let message = RoutingMsg::section_src(message, proof, section_chain)?;

Ok(Command::HandleMessage {
message,
Expand Down
12 changes: 5 additions & 7 deletions src/routing/core/messaging/handling/bad_msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Core {
&self,
sender: Peer,
dst_key: bls::PublicKey,
bounced_msg: RoutingMsg,
mut bounced_msg: RoutingMsg,
) -> Result<Command> {
let span = trace_span!("Received BouncedUntrustedMessage", ?bounced_msg, %sender);
let _span_guard = span.enter();
Expand All @@ -84,12 +84,10 @@ impl Core {
self.section.authority_provider().section_key,
)?
}
_ => bounced_msg
.updated_with_latest_key(self.section.authority_provider().section_key)
.map_err(|err| {
error!("extending proof chain failed: {:?}", err);
Error::InvalidMessage // TODO: more specific error
})?,
_ => {
bounced_msg.updated_with_latest_key(self.section.authority_provider().section_key);
bounced_msg
}
};

let dest_info = DestInfo {
Expand Down
23 changes: 8 additions & 15 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl Core {
MessageStatus::Useful => {
trace!("Useful message from {:?}: {:?}", sender, msg);
let (entropy_commands, shall_be_handled) =
self.check_for_entropy(&msg, dest_info.clone(), sender)?;
self.check_for_entropy(&msg, dest_info.clone())?;
commands.extend(entropy_commands);
if shall_be_handled {
commands.extend(self.handle_useful_message(sender, msg, dest_info).await?);
Expand Down Expand Up @@ -438,16 +438,20 @@ impl Core {

fn handle_user_message(&mut self, msg: RoutingMsg, content: Bytes) -> Result<Vec<Command>> {
trace!("handle user message {:?}", msg);
if let DstLocation::EndUser(EndUser { xorname, socket_id }) = msg.dst() {
if let DstLocation::EndUser(EndUser {
xorname: xor_name,
socket_id,
}) = msg.dst()
{
if let Some(socket_addr) = self.get_socket_addr(*socket_id).copied() {
trace!("sending user message {:?} to client {:?}", msg, socket_addr);
return Ok(vec![Command::SendMessage {
recipients: vec![(*xorname, socket_addr)],
recipients: vec![(*xor_name, socket_addr)],
delivery_group_size: 1,
message: MessageType::Client {
msg: ClientMsg::from(content)?,
dest_info: DestInfo {
dest: *xorname,
dest: *xor_name,
dest_section_pk: *self.section.chain().last_key(),
},
},
Expand Down Expand Up @@ -520,17 +524,6 @@ impl Core {
self.update_state(snapshot)
}

pub fn handle_lagging_messages_on_sync(&mut self) -> Result<Vec<Command>> {
let mut commands = vec![];
let latest_key = *self.section_chain().last_key();
if let Some(lagged_commands) = self.lagging_messages.src_ahead.remove(&latest_key) {
// We now have the latest key, execute the messages that received when we were lagging.
commands.extend(lagged_commands);
}

Ok(commands)
}

pub(crate) fn handle_join_request(
&mut self,
peer: Peer,
Expand Down

0 comments on commit 6977855

Please sign in to comment.