Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: restore outgoing filter and only have one wire_msg copy for mult…
Browse files Browse the repository at this point in the history
…iple recipients
  • Loading branch information
maqi authored and dirvine committed May 28, 2021
1 parent c0bb0e6 commit ba98b41
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 57 deletions.
11 changes: 11 additions & 0 deletions src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ use xor_name::{XorName, XOR_NAME_LEN};
/// SHA3-256 hash digest.
pub type Digest256 = [u8; 32];

/// SHA3-256 hash function.
pub fn sha3_256(input: &[u8]) -> Digest256 {
use tiny_keccak::{Hasher, Sha3};

let mut hasher = Sha3::v256();
let mut output = Digest256::default();
hasher.update(input);
hasher.finalize(&mut output);
output
}

pub fn sign(msg: &[u8], keypair: &Keypair) -> Signature {
let expanded_secret_key = ExpandedSecretKey::from(&keypair.secret);
expanded_secret_key.sign(msg, &keypair.public)
Expand Down
49 changes: 46 additions & 3 deletions src/message_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,38 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::messages::{Message, MessageHash};
use lru_time_cache::LruCache;
use sn_messaging::MessageId;
use sn_messaging::{DstLocation, MessageId};
use std::time::Duration;
use xor_name::XorName;

const INCOMING_EXPIRY_DURATION: Duration = Duration::from_secs(20 * 60);
const OUTGOING_EXPIRY_DURATION: Duration = Duration::from_secs(10 * 60);
const MAX_ENTRIES: usize = 5_000;

// Structure to filter (throttle) incoming messages.
/// An enum representing a result of message filtering
#[derive(Eq, PartialEq)]
pub enum FilteringResult {
/// We don't have the message in the filter yet
NewMessage,
/// We have the message in the filter
KnownMessage,
}

impl FilteringResult {
pub fn is_new(&self) -> bool {
match self {
Self::NewMessage => true,
Self::KnownMessage => false,
}
}
}

// Structure to filter (throttle) incoming and outgoing messages.
pub(crate) struct MessageFilter {
incoming: LruCache<MessageId, ()>,
outgoing: LruCache<(MessageHash, XorName), ()>,
}

impl MessageFilter {
Expand All @@ -25,6 +47,26 @@ impl MessageFilter {
INCOMING_EXPIRY_DURATION,
MAX_ENTRIES,
),
outgoing: LruCache::with_expiry_duration_and_capacity(
OUTGOING_EXPIRY_DURATION,
MAX_ENTRIES,
),
}
}

// Filter outgoing `SNRoutingMessage`. Return whether this specific message has been seen recently
// (and thus should not be sent, due to deduplication).
//
pub fn filter_outgoing(&mut self, msg: &Message, pub_id: &XorName) -> FilteringResult {
// Not filtering direct messages.
if let DstLocation::Direct = msg.dst() {
return FilteringResult::NewMessage;
}

if self.outgoing.insert((*msg.hash(), *pub_id), ()).is_some() {
FilteringResult::KnownMessage
} else {
FilteringResult::NewMessage
}
}

Expand All @@ -34,9 +76,10 @@ impl MessageFilter {
cur_value.is_none()
}

// Resets the incoming filter.
// Resets both incoming and outgoing filters.
pub fn reset(&mut self) {
self.incoming.clear();
self.outgoing.clear();
}
}

Expand Down
35 changes: 35 additions & 0 deletions src/messages/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::crypto::{self, Digest256};
use hex_fmt::HexFmt;
use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Formatter};

/// Cryptographic hash of Message
#[derive(Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct MessageHash(Digest256);

impl MessageHash {
/// Compute hash of the given message.
pub fn from_bytes(bytes: &[u8]) -> Self {
Self(crypto::sha3_256(bytes))
}
}

impl Default for MessageHash {
fn default() -> MessageHash {
MessageHash([0u8; 32])
}
}

impl Debug for MessageHash {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "{:10}", HexFmt(&self.0))
}
}
13 changes: 12 additions & 1 deletion src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

mod hash;
mod plain_message;
mod src_authority;
mod variant;

pub use self::src_authority::SrcAuthority;
pub use self::{hash::MessageHash, src_authority::SrcAuthority};
pub(crate) use self::{
plain_message::PlainMessage,
variant::{JoinRequest, ResourceProofResponse, Variant},
Expand Down Expand Up @@ -49,6 +50,8 @@ pub(crate) struct Message {
/// Serialised message, this is a signed and fully serialised message ready to send.
#[serde(skip)]
serialized: Bytes,
#[serde(skip)]
hash: MessageHash,
}

impl Message {
Expand Down Expand Up @@ -105,6 +108,7 @@ impl Message {
}
}

msg.hash = MessageHash::from_bytes(&msg_bytes);
msg.serialized = msg_bytes;

Ok(msg)
Expand All @@ -129,11 +133,13 @@ impl Message {
proof_chain,
variant,
serialized: Default::default(),
hash: Default::default(),
};

msg.serialized = bincode::serialize(&msg)
.map_err(|_| Error::InvalidMessage)?
.into();
msg.hash = MessageHash::from_bytes(&msg.serialized);

Ok(msg)
}
Expand Down Expand Up @@ -337,6 +343,11 @@ impl Message {
&self.src
}

/// Getter
pub fn hash(&self) -> &MessageHash {
&self.hash
}

/// Returns the attached proof chain, if any.
pub(crate) fn proof_chain(&self) -> Result<&SecuredLinkedList> {
self.proof_chain.as_ref().ok_or(Error::InvalidMessage)
Expand Down
77 changes: 42 additions & 35 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Comm {
&self,
recipients: &[(XorName, SocketAddr)],
delivery_group_size: usize,
msg: MessageType,
mut msg: MessageType,
) -> Result<SendStatus> {
trace!(
"Sending message to {} of {:?}",
Expand All @@ -194,41 +194,45 @@ impl Comm {

let delivery_group_size = delivery_group_size.min(recipients.len());

let wire_msg = msg.to_wire_msg()?;
if recipients.is_empty() {
return Err(Error::EmptyRecipientList);
}
// Use the first Xor address recipient to represent the destination section.
// So that only one copy of MessageType need to be constructed.
msg.update_dest_info(None, Some(recipients[0].0));
let msg_bytes = match msg.serialize() {
Ok(bytes) => bytes,
Err(e) => return Err(Error::Messaging(e)),
};

// Run all the sends concurrently (using `FuturesUnordered`). If any of them fails, pick
// the next recipient and try to send to them. Proceed until the needed number of sends
// succeeds or if there are no more recipients to pick.
let send = |recipient: (XorName, SocketAddr), mut wire_msg: WireMsg| async move {
wire_msg.update_dest_info(None, Some(recipient.0));
match wire_msg.serialize() {
Ok(bytes) => {
trace!(
"Sending message ({} bytes) to {} of {:?}",
bytes.len(),
delivery_group_size,
recipient.1
);

let result = self
.send_to(&recipient.1, bytes)
.await
.map_err(|err| match err {
qp2p::Error::Connection(qp2p::ConnectionError::LocallyClosed)
| qp2p::Error::Connection(qp2p::ConnectionError::TimedOut) => {
Error::AddressNotReachable { err }
}
_ => Error::ConnectionClosed,
});

(result, recipient.1)
}
Err(e) => (Err(Error::Messaging(e)), recipient.1),
}
let send = |recipient: (XorName, SocketAddr), msg_bytes: Bytes| async move {
trace!(
"Sending message ({} bytes) to {} of {:?}",
msg_bytes.len(),
delivery_group_size,
recipient.1
);

let result = self
.send_to(&recipient.1, msg_bytes)
.await
.map_err(|err| match err {
qp2p::Error::Connection(qp2p::ConnectionError::LocallyClosed)
| qp2p::Error::Connection(qp2p::ConnectionError::TimedOut) => {
Error::AddressNotReachable { err }
}
_ => Error::ConnectionClosed,
});

(result, recipient.1)
};

let mut tasks: FuturesUnordered<_> = recipients[0..delivery_group_size]
.iter()
.map(|(name, recipient)| send((*name, *recipient), wire_msg.clone()))
.map(|(name, recipient)| send((*name, *recipient), msg_bytes.clone()))
.collect();

let mut next = delivery_group_size;
Expand All @@ -247,7 +251,7 @@ impl Comm {
failed_recipients.push(addr);

if next < recipients.len() {
tasks.push(send(recipients[next], wire_msg.clone()));
tasks.push(send(recipients[next], msg_bytes.clone()));
next += 1;
}
}
Expand Down Expand Up @@ -380,7 +384,6 @@ mod tests {
}

if let Some(bytes) = peer1.rx.recv().await {
original_message.update_dest_info(None, Some(peer1._name));
assert_eq!(WireMsg::deserialize(bytes)?, original_message);
}

Expand Down Expand Up @@ -462,18 +465,20 @@ mod tests {
.await?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;
let name = XorName::random();

let mut message = new_section_info_message();
let _ = comm
.send(
&[(XorName::random(), invalid_addr), (peer._name, peer.addr)],
&[(name, invalid_addr), (peer._name, peer.addr)],
1,
message.clone(),
)
.await?;

// Using first name of the recipients to represent section_name.
if let Some(bytes) = peer.rx.recv().await {
message.update_dest_info(None, Some(peer._name));
message.update_dest_info(None, Some(name));
assert_eq!(WireMsg::deserialize(bytes)?, message);
}
Ok(())
Expand All @@ -492,11 +497,12 @@ mod tests {
.await?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;
let name = XorName::random();

let mut message = new_section_info_message();
let status = comm
.send(
&[(XorName::random(), invalid_addr), (peer._name, peer.addr)],
&[(name, invalid_addr), (peer._name, peer.addr)],
2,
message.clone(),
)
Expand All @@ -507,8 +513,9 @@ mod tests {
SendStatus::MinDeliveryGroupSizeFailed(_) => vec![invalid_addr]
);

// Using first name of the recipients to represent section_name.
if let Some(bytes) = peer.rx.recv().await {
message.update_dest_info(None, Some(peer._name));
message.update_dest_info(None, Some(name));
assert_eq!(WireMsg::deserialize(bytes)?, message);
}
Ok(())
Expand Down
5 changes: 5 additions & 0 deletions src/routing/core/public_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ impl Core {
let target_name = msg.dst().name().ok_or(Error::CannotRoute)?;
let dest_pk = *self.section_key_by_name(&target_name);

let targets: Vec<_> = targets
.into_iter()
.filter(|peer| self.msg_filter.filter_outgoing(msg, peer.name()).is_new())
.collect();

if targets.is_empty() {
return Ok(None);
}
Expand Down
6 changes: 1 addition & 5 deletions src/routing/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
error::Result, event::Event, relocation::SignedRelocateDetails, routing::comm::SendStatus,
Error, XorName,
};
use sn_messaging::{MessageId, MessageType};
use sn_messaging::MessageType;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
sync::{mpsc, watch, Mutex},
Expand Down Expand Up @@ -42,10 +42,6 @@ impl Dispatcher {
}
}

pub async fn add_to_filter(&self, msg_id: &MessageId) -> bool {
self.core.lock().await.add_to_filter(msg_id)
}

/// Send provided Event to the user which shall receive it through the EventStream
pub async fn send_event(&self, event: Event) {
self.core.lock().await.send_event(event)
Expand Down

0 comments on commit ba98b41

Please sign in to comment.