Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: check SAF message inflight and check stored_at is in past #3444

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use log::*;
use tari_app_utilities::{consts, identity_management, utilities::create_transport_type};
use tari_common::{configuration::bootstrap::ApplicationType, GlobalConfig};
use tari_comms::{peer_manager::Peer, protocol::rpc::RpcServer, NodeIdentity, UnspawnedCommsNode};
use tari_comms_dht::{DbConnectionUrl, Dht, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, Dht, DhtConfig};
use tari_core::{
base_node,
base_node::{
Expand Down Expand Up @@ -251,7 +251,10 @@ where B: BlockchainBackend + 'static
auto_join: true,
allow_test_addresses: self.config.allow_test_addresses,
flood_ban_max_msg_count: self.config.flood_ban_max_msg_count,
saf_msg_validity: self.config.saf_expiry_duration,
saf_config: SafConfig {
msg_validity: self.config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: self.config.dedup_cache_capacity,
..Default::default()
},
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use tari_comms::{
types::CommsSecretKey,
NodeIdentity,
};
use tari_comms_dht::{DbConnectionUrl, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig};
use tari_core::transactions::CryptoFactories;
use tari_p2p::{
auto_update::AutoUpdateConfig,
Expand Down Expand Up @@ -337,7 +337,10 @@ pub async fn init_wallet(
auto_join: true,
allow_test_addresses: config.allow_test_addresses,
flood_ban_max_msg_count: config.flood_ban_max_msg_count,
saf_msg_validity: config.saf_expiry_duration,
saf_config: SafConfig {
msg_validity: config.saf_expiry_duration,
..Default::default()
},
dedup_cache_capacity: config.dedup_cache_capacity,
..Default::default()
},
Expand Down
7 changes: 5 additions & 2 deletions base_layer/wallet/tests/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tari_comms::{
peer_manager::{NodeId, NodeIdentity, Peer, PeerFeatures, PeerFlags},
types::{CommsPublicKey, CommsSecretKey},
};
use tari_comms_dht::DhtConfig;
use tari_comms_dht::{store_forward::SafConfig, DhtConfig};
use tari_core::transactions::{
helpers::{create_unblinded_output, TestParams},
tari_amount::{uT, MicroTari},
Expand Down Expand Up @@ -119,7 +119,10 @@ async fn create_wallet(
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
saf_auto_request: true,
saf_config: SafConfig {
auto_request: true,
..Default::default()
},
..Default::default()
},
allow_test_addresses: true,
Expand Down
7 changes: 5 additions & 2 deletions base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ use tari_comms::{
transports::MemoryTransport,
types::CommsSecretKey,
};
use tari_comms_dht::{DbConnectionUrl, DhtConfig};
use tari_comms_dht::{store_forward::SafConfig, DbConnectionUrl, DhtConfig};
use tari_core::transactions::{tari_amount::MicroTari, transaction::OutputFeatures, CryptoFactories};
use tari_p2p::{
transport::{TorConfig, TransportType, TransportType::Tor},
Expand Down Expand Up @@ -2596,7 +2596,10 @@ pub unsafe extern "C" fn comms_config_create(
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
auto_join: true,
saf_msg_validity: Duration::from_secs(saf_message_duration_in_secs),
saf_config: SafConfig {
msg_validity: Duration::from_secs(saf_message_duration_in_secs),
..Default::default()
},
..Default::default()
},
// TODO: This should be set to false for non-test wallets. See the `allow_test_addresses` field
Expand Down
6 changes: 5 additions & 1 deletion comms/dht/examples/memory_net/utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use tari_comms_dht::{
envelope::NodeDestination,
inbound::DecryptedDhtMessage,
outbound::OutboundEncryption,
store_forward::SafConfig,
Dht,
DhtConfig,
};
Expand Down Expand Up @@ -912,7 +913,10 @@ async fn setup_comms_dht(

let dht = Dht::builder()
.with_config(DhtConfig {
saf_auto_request,
saf_config: SafConfig {
auto_request: saf_auto_request,
..Default::default()
},
auto_join: false,
discovery_request_timeout: Duration::from_secs(15),
num_neighbouring_nodes,
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ impl DhtActor {
.map(|p| p.node_id)
.collect())
},
SelectedPeers(peers) => Ok(peers),
Broadcast(exclude) => {
let connections = connectivity
.select_connections(ConnectivitySelection::random_nodes(
Expand Down
2 changes: 2 additions & 0 deletions comms/dht/src/broadcast_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub enum BroadcastStrategy {
/// Send directly to destination if connected but otherwise send to all n nearest Communication Nodes
DirectOrClosestNodes(Box<BroadcastClosestRequest>),
Broadcast(Vec<NodeId>),
SelectedPeers(Vec<NodeId>),
/// Propagate to a set of closest neighbours and random peers
Propagate(NodeDestination, Vec<NodeId>),
}
Expand All @@ -77,6 +78,7 @@ impl fmt::Display for BroadcastStrategy {
Random(n, excluded) => write!(f, "Random({}, {} excluded)", n, excluded.len()),
Broadcast(excluded) => write!(f, "Broadcast({} excluded)", excluded.len()),
Propagate(destination, excluded) => write!(f, "Propagate({}, {} excluded)", destination, excluded.len(),),
SelectedPeers(peers) => write!(f, "SelectedPeers({} peer(s))", peers.len()),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DhtBuilder {
}

pub fn set_auto_store_and_forward_requests(&mut self, enabled: bool) -> &mut Self {
self.config.saf_auto_request = enabled;
self.config.saf_config.auto_request = enabled;
self
}

Expand Down Expand Up @@ -112,6 +112,7 @@ impl DhtBuilder {

pub fn with_num_neighbouring_nodes(&mut self, n: usize) -> &mut Self {
self.config.num_neighbouring_nodes = n;
self.config.saf_config.num_neighbouring_nodes = n;
self
}

Expand Down
50 changes: 14 additions & 36 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use crate::{network_discovery::NetworkDiscoveryConfig, storage::DbConnectionUrl, version::DhtProtocolVersion};
use crate::{
network_discovery::NetworkDiscoveryConfig,
storage::DbConnectionUrl,
store_forward::SafConfig,
version::DhtProtocolVersion,
};
use std::time::Duration;

#[derive(Debug, Clone)]
Expand All @@ -33,41 +38,18 @@ pub struct DhtConfig {
/// Default: 20
pub outbound_buffer_size: usize,
/// The maximum number of peer nodes that a message has to be closer to, to be considered a neighbour
/// Default: [DEFAULT_NUM_NEIGHBOURING_NODES](self::DEFAULT_NUM_NEIGHBOURING_NODES)
/// Default: 8
pub num_neighbouring_nodes: usize,
/// Number of random peers to include
/// Default: [DEFAULT_NUM_RANDOM_NODES](self::DEFAULT_NUM_RANDOM_NODES)
/// Default: 4
pub num_random_nodes: usize,
/// Send to this many peers when using the broadcast strategy
/// Default: 8
pub broadcast_factor: usize,
/// Send to this many peers when using the propagate strategy
/// Default: 4
pub propagation_factor: usize,
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 10800
pub saf_msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub saf_msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub saf_num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub saf_max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub saf_low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub saf_high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub saf_max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub saf_auto_request: bool,
pub saf_config: SafConfig,
/// The max capacity of the message hash cache
/// Default: 2,500
pub dedup_cache_capacity: usize,
Expand Down Expand Up @@ -127,7 +109,10 @@ impl DhtConfig {
pub fn default_local_test() -> Self {
Self {
database_url: DbConnectionUrl::Memory,
saf_auto_request: false,
saf_config: SafConfig {
auto_request: false,
..Default::default()
},
auto_join: false,
network_discovery: NetworkDiscoveryConfig {
// If a test requires the peer probe they should explicitly enable it
Expand All @@ -150,13 +135,7 @@ impl Default for DhtConfig {
propagation_factor: 4,
broadcast_factor: 8,
outbound_buffer_size: 20,
saf_num_closest_nodes: 10,
saf_max_returned_messages: 50,
saf_msg_storage_capacity: 100_000,
saf_low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
saf_high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
saf_auto_request: true,
saf_max_message_size: 512 * 1024,
saf_config: Default::default(),
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_allowed_message_occurrences: 1,
Expand All @@ -172,7 +151,6 @@ impl Default for DhtConfig {
flood_ban_max_msg_count: 10000,
flood_ban_timespan: Duration::from_secs(100),
offline_peer_cooldown: Duration::from_secs(2 * 60 * 60),
saf_msg_validity: Duration::from_secs(10800),
}
}
}
17 changes: 10 additions & 7 deletions comms/dht/src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl Dht {
saf_response_signal_rx: mpsc::Receiver<()>,
) -> StoreAndForwardService {
StoreAndForwardService::new(
self.config.clone(),
self.config.saf_config.clone(),
conn,
self.peer_manager.clone(),
self.dht_requester(),
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Dht {
))
.layer(filter::FilterLayer::new(filter_messages_to_rebroadcast))
.layer(store_forward::StoreLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
Arc::clone(&self.peer_manager),
Arc::clone(&self.node_identity),
self.store_and_forward_requester(),
Expand All @@ -321,7 +321,7 @@ impl Dht {
self.node_identity.features().contains(PeerFeatures::DHT_STORE_FORWARD),
))
.layer(store_forward::MessageHandlerLayer::new(
self.config.clone(),
self.config.saf_config.clone(),
self.store_and_forward_requester(),
self.dht_requester(),
Arc::clone(&self.node_identity),
Expand Down Expand Up @@ -640,6 +640,12 @@ mod test {
.await
.unwrap();

// SAF messages need to be requested before any response is accepted
dht.store_and_forward_requester()
.request_saf_messages_from_peer(node_identity.node_id().clone())
.await
.unwrap();

let spy = service_spy();
let mut service = dht.inbound_middleware_layer().layer(spy.to_service());

Expand All @@ -652,10 +658,7 @@ mod test {
MessageTag::new(),
false,
);
dht_envelope.header.as_mut().map(|header| {
header.message_type = DhtMessageType::SafStoredMessages as i32;
header
});
dht_envelope.header.as_mut().unwrap().message_type = DhtMessageType::SafStoredMessages as i32;
let inbound_message = make_comms_inbound_message(&node_identity, dht_envelope.to_encoded_bytes().into());

service.call(inbound_message).await.unwrap_err();
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![feature(map_entry_replace)]
#![doc(html_root_url = "https://docs.rs/tower-filter/0.3.0-alpha.2")]
#![cfg_attr(not(debug_assertions), deny(unused_variables))]
#![cfg_attr(not(debug_assertions), deny(unused_imports))]
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/outbound/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl BroadcastLayer {
dht_requester,
dht_discovery_requester,
node_identity,
message_validity_window: chrono::Duration::from_std(config.saf_msg_validity)
message_validity_window: chrono::Duration::from_std(config.saf_config.msg_validity)
.expect("message_validity_window is too large"),
protocol_version: config.protocol_version,
}
Expand Down
6 changes: 6 additions & 0 deletions comms/dht/src/outbound/message_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ impl SendMessageParams {
self
}

/// Set broadcast_strategy to SelectedPeers. Messages are queued for all selected peers.
pub fn selected_peers(&mut self, peers: Vec<NodeId>) -> &mut Self {
self.params_mut().broadcast_strategy = BroadcastStrategy::SelectedPeers(peers);
self
}

/// Set broadcast_strategy to Neighbours. `excluded_peers` are excluded. Only Peers that have
/// `PeerFeatures::MESSAGE_PROPAGATION` are included.
pub fn broadcast(&mut self, excluded_peers: Vec<NodeId>) -> &mut Self {
Expand Down
73 changes: 73 additions & 0 deletions comms/dht/src/store_forward/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2021, The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::time::Duration;

#[derive(Debug, Clone)]
pub struct SafConfig {
/// The amount of seconds added to the current time (Utc) which will then be used to check if the message has
/// expired or not when processing the message
/// Default: 3 hours
pub msg_validity: Duration,
/// The maximum number of messages that can be stored using the Store-and-forward middleware.
/// Default: 100,000
pub msg_storage_capacity: usize,
/// A request to retrieve stored messages will be ignored if the requesting node is
/// not within one of this nodes _n_ closest nodes.
/// Default 8
pub num_closest_nodes: usize,
/// The maximum number of messages to return from a store and forward retrieval request.
/// Default: 100
pub max_returned_messages: usize,
/// The time-to-live duration used for storage of low priority messages by the Store-and-forward middleware.
/// Default: 6 hours
pub low_priority_msg_storage_ttl: Duration,
/// The time-to-live duration used for storage of high priority messages by the Store-and-forward middleware.
/// Default: 3 days
pub high_priority_msg_storage_ttl: Duration,
/// The limit on the message size to store in SAF storage in bytes. Default 500 KiB
pub max_message_size: usize,
/// When true, store and forward messages are requested from peers on connect (Default: true)
pub auto_request: bool,
/// The maximum allowed time between asking for a message and accepting a response
pub max_inflight_request_age: Duration,
/// The maximum number of peer nodes that a message must be closer than to get stored by SAF
/// Default: 8
pub num_neighbouring_nodes: usize,
}

impl Default for SafConfig {
fn default() -> Self {
Self {
msg_validity: Duration::from_secs(3 * 60 * 60), // 3 hours
num_closest_nodes: 10,
max_returned_messages: 50,
msg_storage_capacity: 100_000,
low_priority_msg_storage_ttl: Duration::from_secs(6 * 60 * 60), // 6 hours
high_priority_msg_storage_ttl: Duration::from_secs(3 * 24 * 60 * 60), // 3 days
auto_request: true,
max_message_size: 512 * 1024,
max_inflight_request_age: Duration::from_secs(120),
num_neighbouring_nodes: 8,
}
}
}
Loading