Skip to content

Commit

Permalink
feat: add persistent dedup cache for message hashes (#3130)
Browse files Browse the repository at this point in the history
<!--- Provide a general summary of your changes in the Title above -->

## Description
- Added a persistent dedup message hash cache for messages that will be trimmed to capacity every 5 minutes:
  - dedup cache size configurable for users;
  - trimming period configurable.
- Added a unit test to test the dedup cache operation.

## Motivation and Context
A recent stress test highlighted duplicate stages of transaction messages still being sent hours after a transaction has been mined, evoking a flood of SAF messages each time. A wallet also had no way of knowing which transactional message had been processed already. The dedup cache was changed from _time-to-live_ to being _persistent_.

## How Has This Been Tested?
- System-level testing under stress test conditions.
- New unit test.

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [X] I'm merging against the `development` branch.
* [X] I have squashed my commits into a single commit.
  • Loading branch information
aviator-app[bot] committed Jul 28, 2021
2 parents 6818596 + a4b6aba commit 08f2675
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 41 deletions.
1 change: 1 addition & 0 deletions applications/tari_base_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ where B: BlockchainBackend + 'static
allow_test_addresses: self.config.allow_test_addresses,
flood_ban_max_msg_count: self.config.flood_ban_max_msg_count,
saf_msg_validity: self.config.saf_expiry_duration,
dedup_cache_capacity: self.config.dedup_cache_capacity,
..Default::default()
},
allow_test_addresses: self.config.allow_test_addresses,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_console_wallet/src/init/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ pub async fn init_wallet(
allow_test_addresses: config.allow_test_addresses,
flood_ban_max_msg_count: config.flood_ban_max_msg_count,
saf_msg_validity: config.saf_expiry_duration,
dedup_cache_capacity: config.dedup_cache_capacity,
..Default::default()
},
// TODO: This should be false unless testing locally - make this configurable
Expand Down
6 changes: 5 additions & 1 deletion common/config/presets/tari_config_example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ network = "weatherwax"
#buffer_rate_limit_base_node = 20
# - Rate limit for the base node wallet (min value = 5, default value = 20).
#buffer_rate_limit_base_node_wallet = 20
# The message deduplication persistent cache size - messages with these hashes in the cache will only be processed once.
# The cache will also be trimmed down to size periodically (min value = 0, default value = 2500).
dedup_cache_capacity = 25000

# The timeout (s) for requesting blocks from a peer during blockchain sync (min value = 10 s, default value = 150 s).
#fetch_blocks_timeout = 150
Expand Down Expand Up @@ -136,7 +139,8 @@ base_node_query_timeout = 120
# (options: "DirectOnly", "StoreAndForwardOnly", DirectAndStoreAndForward". default: "DirectAndStoreAndForward").
#transaction_routing_mechanism = "DirectAndStoreAndForward"

scan_for_utxo_interval=60
# UTXO scanning service interval (default = 12 hours, i.e. 60 * 60 * 12 seconds)
scan_for_utxo_interval = 60

# When running the console wallet in command mode, use these values to determine what "stage" and timeout to wait
# for sent transactions.
Expand Down
7 changes: 7 additions & 0 deletions common/src/configuration/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub struct GlobalConfig {
pub buffer_size_base_node_wallet: usize,
pub buffer_rate_limit_base_node: usize,
pub buffer_rate_limit_base_node_wallet: usize,
pub dedup_cache_capacity: usize,
pub fetch_blocks_timeout: Duration,
pub fetch_utxos_timeout: Duration,
pub service_request_timeout: Duration,
Expand Down Expand Up @@ -563,6 +564,11 @@ fn convert_node_config(
cfg.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as usize;

let key = "common.dedup_cache_capacity";
let dedup_cache_capacity = cfg
.get_int(&key)
.map_err(|e| ConfigurationError::new(&key, &e.to_string()))? as usize;

let key = "common.fetch_blocks_timeout";
let fetch_blocks_timeout = Duration::from_secs(
cfg.get_int(&key)
Expand Down Expand Up @@ -696,6 +702,7 @@ fn convert_node_config(
buffer_size_base_node_wallet,
buffer_rate_limit_base_node,
buffer_rate_limit_base_node_wallet,
dedup_cache_capacity,
fetch_blocks_timeout,
fetch_utxos_timeout,
service_request_timeout,
Expand Down
1 change: 1 addition & 0 deletions common/src/configuration/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub fn default_config(bootstrap: &ConfigBootstrap) -> Config {
cfg.set_default("common.buffer_rate_limit_base_node", 1_000).unwrap();
cfg.set_default("common.buffer_rate_limit_base_node_wallet", 1_000)
.unwrap();
cfg.set_default("common.dedup_cache_capacity", 2_500).unwrap();
cfg.set_default("common.fetch_blocks_timeout", 150).unwrap();
cfg.set_default("common.fetch_utxos_timeout", 600).unwrap();
cfg.set_default("common.service_request_timeout", 180).unwrap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS dedup_cache;
10 changes: 10 additions & 0 deletions comms/dht/migrations/2021-07-23-162902_add_dedup_cache/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE dedup_cache (
id INTEGER NOT NULL PRIMARY KEY,
body_hash TEXT NOT NULL,
sender_public_key TEXT NOT NULL,
number_of_hits INT NOT NULL,
stored_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_hit_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE UNIQUE INDEX uidx_dedup_cache_body_hash ON dedup_cache (body_hash);
169 changes: 148 additions & 21 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

use crate::{
broadcast_strategy::BroadcastStrategy,
dedup::DedupCacheDatabase,
discovery::DhtDiscoveryError,
outbound::{DhtOutboundError, OutboundMessageRequester, SendMessageParams},
proto::{dht::JoinMessage, envelope::DhtMessageType},
Expand All @@ -38,7 +39,6 @@ use crate::{
use chrono::{DateTime, Utc};
use futures::{
channel::{mpsc, mpsc::SendError, oneshot},
future,
future::BoxFuture,
stream::{Fuse, FuturesUnordered},
SinkExt,
Expand All @@ -49,12 +49,12 @@ use std::{cmp, fmt, fmt::Display, sync::Arc};
use tari_comms::{
connectivity::{ConnectivityError, ConnectivityRequester, ConnectivitySelection},
peer_manager::{NodeId, NodeIdentity, PeerFeatures, PeerManager, PeerManagerError, PeerQuery, PeerQuerySortBy},
types::CommsPublicKey,
};
use tari_shutdown::ShutdownSignal;
use tari_utilities::message_format::{MessageFormat, MessageFormatError};
use thiserror::Error;
use tokio::task;
use ttl_cache::TtlCache;
use tokio::{task, time};

const LOG_TARGET: &str = "comms::dht::actor";

Expand Down Expand Up @@ -97,12 +97,13 @@ impl From<SendError> for DhtActorError {
}

#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, oneshot::Sender<bool>),
MsgHashCacheInsert(Vec<u8>, CommsPublicKey, oneshot::Sender<bool>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<NodeId>>),
GetMetadata(DhtMetadataKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
Expand All @@ -114,7 +115,7 @@ impl Display for DhtRequest {
use DhtRequest::*;
match self {
SendJoin => f.write_str("SendJoin"),
MsgHashCacheInsert(_, _) => f.write_str("MsgHashCacheInsert"),
MsgHashCacheInsert(_, _, _) => f.write_str("MsgHashCacheInsert"),
SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
GetMetadata(key, _) => f.write_str(&format!("GetMetadata (key={})", key)),
SetMetadata(key, value, _) => {
Expand Down Expand Up @@ -146,10 +147,14 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

pub async fn insert_message_hash(&mut self, signature: Vec<u8>) -> Result<bool, DhtActorError> {
pub async fn insert_message_hash(
&mut self,
message_hash: Vec<u8>,
public_key: CommsPublicKey,
) -> Result<bool, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert(signature, reply_tx))
.send(DhtRequest::MsgHashCacheInsert(message_hash, public_key, reply_tx))
.await?;

reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
Expand Down Expand Up @@ -183,7 +188,7 @@ pub struct DhtActor {
config: DhtConfig,
shutdown_signal: Option<ShutdownSignal>,
request_rx: Fuse<mpsc::Receiver<DhtRequest>>,
msg_hash_cache: TtlCache<Vec<u8>, ()>,
msg_hash_dedup_cache: DedupCacheDatabase,
}

impl DhtActor {
Expand All @@ -198,8 +203,14 @@ impl DhtActor {
request_rx: mpsc::Receiver<DhtRequest>,
shutdown_signal: ShutdownSignal,
) -> Self {
debug!(
target: LOG_TARGET,
"Message dedup cache will be trimmed to capacity every {}s",
config.dedup_cache_trim_interval.as_secs() as f64 +
config.dedup_cache_trim_interval.subsec_nanos() as f64 * 1e-9
);
Self {
msg_hash_cache: TtlCache::new(config.msg_hash_cache_capacity),
msg_hash_dedup_cache: DedupCacheDatabase::new(conn.clone(), config.dedup_cache_capacity),
config,
database: DhtDatabase::new(conn),
outbound_requester,
Expand Down Expand Up @@ -236,6 +247,8 @@ impl DhtActor {

let mut pending_jobs = FuturesUnordered::new();

let mut dedup_cache_trim_ticker = time::interval(self.config.dedup_cache_trim_interval).fuse();

let mut shutdown_signal = self
.shutdown_signal
.take()
Expand All @@ -254,6 +267,12 @@ impl DhtActor {
}
},

_ = dedup_cache_trim_ticker.select_next_some() => {
if let Err(err) = self.msg_hash_dedup_cache.truncate().await {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},

_ = shutdown_signal => {
info!(target: LOG_TARGET, "DhtActor is shutting down because it received a shutdown signal.");
self.mark_shutdown_time().await;
Expand Down Expand Up @@ -281,15 +300,23 @@ impl DhtActor {
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::broadcast_join(node_identity, outbound_requester))
},
MsgHashCacheInsert(hash, reply_tx) => {
// No locks needed here. Downside is this isn't really async, however this should be
// fine as it is very quick
let already_exists = self
.msg_hash_cache
.insert(hash, (), self.config.msg_hash_cache_ttl)
.is_some();
let result = reply_tx.send(already_exists).map_err(|_| DhtActorError::ReplyCanceled);
Box::pin(future::ready(result))
MsgHashCacheInsert(hash, public_key, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.insert_body_hash_if_unique(hash, public_key).await {
Ok(already_exists) => {
let _ = reply_tx.send(already_exists).map_err(|_| DhtActorError::ReplyCanceled);
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Unable to update message dedup cache because {:?}", err
);
let _ = reply_tx.send(false).map_err(|_| DhtActorError::ReplyCanceled);
},
}
Ok(())
})
},
SelectPeers(broadcast_strategy, reply_tx) => {
let peer_manager = Arc::clone(&self.peer_manager);
Expand Down Expand Up @@ -643,9 +670,11 @@ mod test {
test_utils::{build_peer_manager, make_client_identity, make_node_identity},
};
use chrono::{DateTime, Utc};
use std::time::Duration;
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;
use tokio::time::delay_for;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
Expand Down Expand Up @@ -707,14 +736,112 @@ mod test {
actor.spawn();

let signature = vec![1u8, 2, 3];
let is_dup = requester.insert_message_hash(signature.clone()).await.unwrap();
let is_dup = requester
.insert_message_hash(signature.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
let is_dup = requester.insert_message_hash(signature).await.unwrap();
let is_dup = requester
.insert_message_hash(signature, CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
let is_dup = requester.insert_message_hash(Vec::new()).await.unwrap();
let is_dup = requester
.insert_message_hash(Vec::new(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
}

#[tokio_macros::test_basic]
async fn dedup_cache_cleanup() {
let node_identity = make_node_identity();
let peer_manager = build_peer_manager();
let (connectivity_manager, mock) = create_connectivity_mock();
mock.spawn();
let (out_tx, _) = mpsc::channel(1);
let (actor_tx, actor_rx) = mpsc::channel(1);
let mut requester = DhtRequester::new(actor_tx);
let outbound_requester = OutboundMessageRequester::new(out_tx);
let mut shutdown = Shutdown::new();
let trim_interval_ms = 500;
// Note: This must be equal or larger than the minimum dedup cache capacity for DedupCacheDatabase
let capacity = 120;
let actor = DhtActor::new(
DhtConfig {
dedup_cache_capacity: capacity,
dedup_cache_trim_interval: Duration::from_millis(trim_interval_ms),
..Default::default()
},
db_connection().await,
node_identity,
peer_manager,
connectivity_manager,
outbound_requester,
actor_rx,
shutdown.to_signal(),
);

// Create signatures for double the dedup cache capacity
let mut signatures: Vec<Vec<u8>> = Vec::new();
for i in 0..(capacity * 2) {
signatures.push(vec![1u8, 2, i as u8])
}

// Pre-populate the dedup cache; everything should be accepted due to cleanup ticker not active yet
for key in &signatures {
let is_dup = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
}
// Try to re-insert all; everything should be marked as duplicates due to cleanup ticker not active yet
for key in &signatures {
let is_dup = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
}

// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire immediately
actor.spawn();

// Verify that the last half of the signatures are still present in the cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
}
// Verify that the first half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
}

// Let the trim period expire; this will trim the dedup cache to capacity
delay_for(Duration::from_millis(trim_interval_ms * 2)).await;

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
}

shutdown.trigger().unwrap();
}

#[tokio_macros::test_basic]
async fn select_peers() {
let node_identity = make_node_identity();
Expand Down
8 changes: 4 additions & 4 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ impl DhtBuilder {
self
}

pub fn with_signature_cache_ttl(mut self, ttl: Duration) -> Self {
self.config.msg_hash_cache_ttl = ttl;
pub fn with_dedup_cache_trim_interval(mut self, trim_interval: Duration) -> Self {
self.config.dedup_cache_trim_interval = trim_interval;
self
}

pub fn with_signature_cache_capacity(mut self, capacity: usize) -> Self {
self.config.msg_hash_cache_capacity = capacity;
pub fn with_dedup_cache_capacity(mut self, capacity: usize) -> Self {
self.config.dedup_cache_capacity = capacity;
self
}

Expand Down
Loading

0 comments on commit 08f2675

Please sign in to comment.