Skip to content

Commit

Permalink
fix: check SAF message inflight and check stored_at is in past
Browse files Browse the repository at this point in the history
- Keeps track of inflight SAF requests and only accepts responses for
  requests that are inflight
- Checks that `stored_at` is in the past
- Fixes tari-project#3412, 3410
  • Loading branch information
sdbondi committed Oct 11, 2021
1 parent 92dee77 commit 6c6544a
Show file tree
Hide file tree
Showing 21 changed files with 544 additions and 162 deletions.
6 changes: 5 additions & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tari_comms_dht::{
envelope::NodeDestination,
inbound::DecryptedDhtMessage,
outbound::OutboundEncryption,
store_forward::SafConfig,
Dht,
DhtConfig,
};
Expand Down Expand Up @@ -912,7 +913,10 @@ async fn setup_comms_dht(

let dht = Dht::builder()
.with_config(DhtConfig {
saf_auto_request,
saf_config: SafConfig {
auto_request: saf_auto_request,
..Default::default()
},
auto_join: false,
discovery_request_timeout: Duration::from_secs(15),
num_neighbouring_nodes,
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl DhtActor {
.map(|p| p.node_id)
.collect())
},
SelectedPeers(peers) => Ok(peers),
Broadcast(exclude) => {
let connections = connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand Down
2 changes: 2 additions & 0 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum BroadcastStrategy {
/// Send directly to destination if connected but otherwise send to all n nearest Communication Nodes
DirectOrClosestNodes(Box<BroadcastClosestRequest>),
Broadcast(Vec<NodeId>),
SelectedPeers(Vec<NodeId>),
/// Propagate to a set of closest neighbours and random peers
Propagate(NodeDestination, Vec<NodeId>),
}
Expand All @@ -77,6 +78,7 @@ impl fmt::Display for BroadcastStrategy {
Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()),
Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()),
Propagate(destination, excluded) => write!(f, "Propagate({}, {} excluded)", destination, excluded.len(),),
SelectedPeers(peers) => write!(f, "SelectedPeers({} peer(s))", peers.len()),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DhtBuilder {
}

pub fn set_auto_store_and_forward_requests(&mut self, enabled: bool) -> &mut Self {
self.config.saf_auto_request = enabled;
self.config.saf_config.auto_request = enabled;
self
}

Expand Down Expand Up @@ -112,6 +112,7 @@ impl DhtBuilder {

pub fn with_num_neighbouring_nodes(&mut self, n: usize) -> &mut Self {
self.config.num_neighbouring_nodes = n;
self.config.saf_config.num_neighbouring_nodes = n;
self
}

Expand Down
50 changes: 14 additions & 36 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{network_discovery::NetworkDiscoveryConfig, storage::DbConnectionUrl, version::DhtProtocolVersion};
use crate::{
network_discovery::NetworkDiscoveryConfig,
storage::DbConnectionUrl,
store_forward::SafConfig,
version::DhtProtocolVersion,
};
use std::time::Duration;

#[derive(Debug, Clone)]
Expand All @@ -33,41 +38,18 @@ pub struct DhtConfig {
/// Default: 20
pub outbound_buffer_size: usize,
/// The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour
/// Default: [DEFAULT_NUM_NEIGHBOURING_NODES](self::DEFAULT_NUM_NEIGHBOURING_NODES)
/// Default: 8
pub num_neighbouring_nodes: usize,
/// Number of random peers to include
/// Default: [DEFAULT_NUM_RANDOM_NODES](self::DEFAULT_NUM_RANDOM_NODES)
/// Default: 4
pub num_random_nodes: usize,
/// Send to this many peers when using the broadcast strategy
/// Default: 8
pub broadcast_factor: usize,
/// Send to this many peers when using the propagate strategy
/// Default: 4
pub propagation_factor: usize,
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 10800
pub saf_msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub saf_msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub saf_num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub saf_max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub saf_max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub saf_auto_request: bool,
pub saf_config: SafConfig,
/// The max capacity of the message hash cache
/// Default: 2,500
pub dedup_cache_capacity: usize,
Expand Down Expand Up @@ -127,7 +109,10 @@ impl DhtConfig {
pub fn default_local_test() -> Self {
Self {
database_url: DbConnectionUrl::Memory,
saf_auto_request: false,
saf_config: SafConfig {
auto_request: false,
..Default::default()
},
auto_join: false,
network_discovery: NetworkDiscoveryConfig {
// If a test requires the peer probe they should explicitly enable it
Expand All @@ -150,13 +135,7 @@ impl Default for DhtConfig {
propagation_factor: 4,
broadcast_factor: 8,
outbound_buffer_size: 20,
saf_num_closest_nodes: 10,
saf_max_returned_messages: 50,
saf_msg_storage_capacity: 100_000,
saf_low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
saf_high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
saf_auto_request: true,
saf_max_message_size: 512 * 1024,
saf_config: Default::default(),
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_allowed_message_occurrences: 1,
Expand All @@ -172,7 +151,6 @@ impl Default for DhtConfig {
flood_ban_max_msg_count: 10000,
flood_ban_timespan: Duration::from_secs(100),
offline_peer_cooldown: Duration::from_secs(2 * 60 * 60),
saf_msg_validity: Duration::from_secs(10800),
}
}
}
17 changes: 10 additions & 7 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Dht {
saf_response_signal_rx: mpsc::Receiver<()>,
) -> StoreAndForwardService {
StoreAndForwardService::new(
self.config.clone(),
self.config.saf_config.clone(),
conn,
self.peer_manager.clone(),
self.dht_requester(),
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Dht {
))
.layer(filter::FilterLayer::new(filter_messages_to_rebroadcast))
.layer(store_forward::StoreLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
Arc::clone(&self.peer_manager),
Arc::clone(&self.node_identity),
self.store_and_forward_requester(),
Expand All @@ -321,7 +321,7 @@ impl Dht {
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
.layer(store_forward::MessageHandlerLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
self.store_and_forward_requester(),
self.dht_requester(),
Arc::clone(&self.node_identity),
Expand Down Expand Up @@ -640,6 +640,12 @@ mod test {
.await
.unwrap();

// SAF messages need to be requested before any response is accepted
dht.store_and_forward_requester()
.request_saf_messages_from_peer(node_identity.node_id().clone())
.await
.unwrap();

let spy = service_spy();
let mut service = dht.inbound_middleware_layer().layer(spy.to_service());

Expand All @@ -652,10 +658,7 @@ mod test {
MessageTag::new(),
false,
);
dht_envelope.header.as_mut().map(|header| {
header.message_type = DhtMessageType::SafStoredMessages as i32;
header
});
dht_envelope.header.as_mut().unwrap().message_type = DhtMessageType::SafStoredMessages as i32;
let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into());

service.call(inbound_message).await.unwrap_err();
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(map_entry_replace)]
#![doc(html_root_url = "https://docs.rs/tower-filter/0.3.0-alpha.2")]
#![cfg_attr(not(debug_assertions), deny(unused_variables))]
#![cfg_attr(not(debug_assertions), deny(unused_imports))]
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl BroadcastLayer {
dht_requester,
dht_discovery_requester,
node_identity,
message_validity_window: chrono::Duration::from_std(config.saf_msg_validity)
message_validity_window: chrono::Duration::from_std(config.saf_config.msg_validity)
.expect("message_validity_window is too large"),
protocol_version: config.protocol_version,
}
Expand Down
6 changes: 6 additions & 0 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ impl SendMessageParams {
self
}

/// Set broadcast_strategy to SelectedPeers. Messages are queued for all selected peers.
pub fn selected_peers(&mut self, peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::SelectedPeers(peers);
self
}

/// Set broadcast_strategy to Neighbours. `excluded_peers` are excluded. Only Peers that have
/// `PeerFeatures::MESSAGE_PROPAGATION` are included.
pub fn broadcast(&mut self, excluded_peers: Vec<NodeId>) -> &mut Self {
Expand Down
73 changes: 73 additions & 0 deletions comms/dht/src/store_forward/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

#[derive(Debug, Clone)]
pub struct SafConfig {
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 3 hours
pub msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub auto_request: bool,
/// The maximum allowed time between asking for a message and accepting a response
pub max_inflight_request_age: Duration,
/// The maximum number of peer nodes that a message must be closer than to get stored by SAF
/// Default: 8
pub num_neighbouring_nodes: usize,
}

impl Default for SafConfig {
fn default() -> Self {
Self {
msg_validity: Duration::from_secs(3 * 60 * 60), // 3 hours
num_closest_nodes: 10,
max_returned_messages: 50,
msg_storage_capacity: 100_000,
low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
auto_request: true,
max_message_size: 512 * 1024,
max_inflight_request_age: Duration::from_secs(120),
num_neighbouring_nodes: 8,
}
}
}
13 changes: 11 additions & 2 deletions comms/dht/src/store_forward/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

use crate::{actor::DhtActorError, envelope::DhtMessageError, outbound::DhtOutboundError, storage::StorageError};
use prost::DecodeError;
use tari_comms::{message::MessageError, peer_manager::PeerManagerError};
use std::time::Duration;
use tari_comms::{
message::MessageError,
peer_manager::{NodeId, PeerManagerError},
};
use tari_utilities::{byte_array::ByteArrayError, ciphers::cipher::CipherError};
use thiserror::Error;

Expand Down Expand Up @@ -62,7 +66,6 @@ pub enum StoreAndForwardError {
MessageOriginRequired,
#[error("The message was malformed")]
MalformedMessage,

#[error("StorageError: {0}")]
StorageError(#[from] StorageError),
#[error("The store and forward service requester channel closed")]
Expand All @@ -81,4 +84,10 @@ pub enum StoreAndForwardError {
InvalidDhtMessageType,
#[error("Failed to send request for store and forward messages: {0}")]
RequestMessagesFailed(DhtOutboundError),
#[error("Received SAF messages that were not requested")]
ReceivedUnrequestedSafMessages,
#[error("SAF messages received from peer {peer} after deadline. Received after {0:.2?}")]
SafMessagesRecievedAfterDeadline { peer: NodeId, message_age: Duration },
#[error("Invalid SAF request: `stored_at` cannot be in the future")]
StoredAtWasInFuture,
}
Loading

0 comments on commit 6c6544a

Please sign in to comment.