Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: use message_id instead of hash for message_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and dirvine committed May 28, 2021
1 parent 5616a45 commit 9f937a7
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 132 deletions.
11 changes: 0 additions & 11 deletions src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,6 @@ use xor_name::{XorName, XOR_NAME_LEN};
/// SHA3-256 hash digest.
pub type Digest256 = [u8; 32];

/// SHA3-256 hash function.
pub fn sha3_256(input: &[u8]) -> Digest256 {
use tiny_keccak::{Hasher, Sha3};

let mut hasher = Sha3::v256();
let mut output = Digest256::default();
hasher.update(input);
hasher.finalize(&mut output);
output
}

pub fn sign(msg: &[u8], keypair: &Keypair) -> Signature {
let expanded_secret_key = ExpandedSecretKey::from(&keypair.secret);
expanded_secret_key.sign(msg, &keypair.public)
Expand Down
61 changes: 5 additions & 56 deletions src/message_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,16 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::messages::{Message, MessageHash};
use lru_time_cache::LruCache;
use sn_messaging::DstLocation;
use sn_messaging::MessageId;
use std::time::Duration;
use xor_name::XorName;

const INCOMING_EXPIRY_DURATION: Duration = Duration::from_secs(20 * 60);
const OUTGOING_EXPIRY_DURATION: Duration = Duration::from_secs(10 * 60);
const MAX_ENTRIES: usize = 5_000;

/// An enum representing a result of message filtering
#[derive(Eq, PartialEq)]
pub enum FilteringResult {
/// We don't have the message in the filter yet
NewMessage,
/// We have the message in the filter
KnownMessage,
}

impl FilteringResult {
pub fn is_new(&self) -> bool {
match self {
Self::NewMessage => true,
Self::KnownMessage => false,
}
}
}

// Structure to filter (throttle) incoming and outgoing messages.
pub(crate) struct MessageFilter {
incoming: LruCache<MessageHash, ()>,
outgoing: LruCache<(MessageHash, XorName), ()>,
incoming: LruCache<MessageId, ()>,
}

impl MessageFilter {
Expand All @@ -47,46 +25,17 @@ impl MessageFilter {
INCOMING_EXPIRY_DURATION,
MAX_ENTRIES,
),
outgoing: LruCache::with_expiry_duration_and_capacity(
OUTGOING_EXPIRY_DURATION,
MAX_ENTRIES,
),
}
}

pub fn contains_incoming(&self, msg: &Message) -> bool {
let hash = msg.hash();
self.incoming.contains_key(hash)
}

pub fn insert_incoming(&mut self, msg: &Message) {
// Not filtering direct messages.
if let DstLocation::DirectAndUnrouted = msg.dst() {
return;
}
let _ = self.incoming.insert(*msg.hash(), ());
}

// Filter outgoing `SNRoutingMessage`. Return whether this specific message has been seen recently
// (and thus should not be sent, due to deduplication).
//
pub fn filter_outgoing(&mut self, msg: &Message, pub_id: &XorName) -> FilteringResult {
// Not filtering direct messages.
if let DstLocation::DirectAndUnrouted = msg.dst() {
return FilteringResult::NewMessage;
}

if self.outgoing.insert((*msg.hash(), *pub_id), ()).is_some() {
FilteringResult::KnownMessage
} else {
FilteringResult::NewMessage
}
pub fn contains_incoming(&mut self, msg_id: &MessageId) -> bool {
let cur_value = self.incoming.insert(*msg_id, ());
cur_value.is_some()
}

// Resets both incoming and outgoing filters.
pub fn reset(&mut self) {
self.incoming.clear();
self.outgoing.clear();
}
}

Expand Down
35 changes: 0 additions & 35 deletions src/messages/hash.rs

This file was deleted.

15 changes: 2 additions & 13 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod hash;
mod plain_message;
mod src_authority;
mod variant;

pub use self::{hash::MessageHash, src_authority::SrcAuthority};
pub use self::src_authority::SrcAuthority;
pub(crate) use self::{
plain_message::PlainMessage,
variant::{JoinRequest, ResourceProofResponse, Variant},
Expand Down Expand Up @@ -50,8 +49,6 @@ pub(crate) struct Message {
/// Serialised message, this is a signed and fully serialised message ready to send.
#[serde(skip)]
serialized: Bytes,
#[serde(skip)]
hash: MessageHash,
}

impl Message {
Expand Down Expand Up @@ -108,8 +105,7 @@ impl Message {
}
}

msg.serialized = msg_bytes.clone();
msg.hash = MessageHash::from_bytes(&msg_bytes);
msg.serialized = msg_bytes;

Ok(msg)
}
Expand All @@ -133,13 +129,11 @@ impl Message {
proof_chain,
variant,
serialized: Default::default(),
hash: Default::default(),
};

msg.serialized = bincode::serialize(&msg)
.map_err(|_| Error::InvalidMessage)?
.into();
msg.hash = MessageHash::from_bytes(&msg.serialized);

Ok(msg)
}
Expand Down Expand Up @@ -343,11 +337,6 @@ impl Message {
&self.src
}

/// Getter
pub fn hash(&self) -> &MessageHash {
&self.hash
}

/// Returns the attached proof chain, if any.
pub(crate) fn proof_chain(&self) -> Result<&SecuredLinkedList> {
self.proof_chain.as_ref().ok_or(Error::InvalidMessage)
Expand Down
8 changes: 0 additions & 8 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ impl Core {
return Ok(commands);
}

// Filter messages which were already handled
if self.msg_filter.contains_incoming(&msg) {
trace!("not handling message - already handled: {:?}", msg);
return Ok(commands);
}

match self.decide_message_status(&msg)? {
MessageStatus::Useful => {
trace!("Useful message from {:?}: {:?}", sender, msg);
Expand Down Expand Up @@ -252,8 +246,6 @@ impl Core {
msg: Message,
dest_info: DestInfo,
) -> Result<Vec<Command>> {
self.msg_filter.insert_incoming(&msg);

let msg = if let Some(msg) = self.aggregate_message(msg)? {
msg
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/routing/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use bls_signature_aggregator::SignatureAggregator;
use itertools::Itertools;
use resource_proof::ResourceProof;
use secured_linked_list::SecuredLinkedList;
use sn_messaging::DestInfo;
use sn_messaging::{DestInfo, MessageId};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::net::SocketAddr;
Expand Down Expand Up @@ -103,6 +103,10 @@ impl Core {
// Miscellaneous
////////////////////////////////////////////////////////////////////////////

pub fn contains_incoming(&mut self, msg_id: &MessageId) -> bool {
self.msg_filter.contains_incoming(msg_id)
}

fn check_for_entropy(
&mut self,
msg: &Message,
Expand Down
5 changes: 0 additions & 5 deletions src/routing/core/public_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,11 +149,6 @@ impl Core {
let target_name = msg.dst().name().ok_or(Error::CannotRoute)?;
let dest_pk = *self.section_key_by_name(&target_name);

let targets: Vec<_> = targets
.into_iter()
.filter(|peer| self.msg_filter.filter_outgoing(msg, peer.name()).is_new())
.collect();

if targets.is_empty() {
return Ok(None);
}
Expand Down
6 changes: 5 additions & 1 deletion src/routing/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
error::Result, event::Event, relocation::SignedRelocateDetails, routing::comm::SendStatus,
Error, XorName,
};
use sn_messaging::MessageType;
use sn_messaging::{MessageId, MessageType};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex},
Expand Down Expand Up @@ -42,6 +42,10 @@ impl Dispatcher {
}
}

pub async fn contains_incoming(&self, msg_id: &MessageId) -> bool {
self.core.lock().await.contains_incoming(msg_id)
}

/// Send provided Event to the user which shall receive it through the EventStream
pub async fn send_event(&self, event: Event) {
self.core.lock().await.send_event(event)
Expand Down
23 changes: 21 additions & 2 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,10 +479,29 @@ async fn handle_message(dispatcher: Arc<Dispatcher>, bytes: Bytes, sender: Socke
};
let _span_guard = span.enter();

let message_type = match WireMsg::deserialize(bytes) {
let wire_msg = match WireMsg::from(bytes) {
Ok(wire_msg) => wire_msg,
Err(error) => {
error!("Failed to deserialize wired_message: {}", error);
return;
}
};
if dispatcher.contains_incoming(&wire_msg.msg_id()).await {
trace!(
"not handling message - already handled: {:?}",
wire_msg.msg_id()
);
return;
}

let message_type = match wire_msg.to_message() {
Ok(message_type) => message_type,
Err(error) => {
error!("Failed to deserialize message: {}", error);
error!(
"Failed to deserialize message({:?}): {}",
wire_msg.msg_id(),
error
);
return;
}
};
Expand Down

0 comments on commit 9f937a7

Please sign in to comment.