Skip to content

Commit

Permalink
feat: allow DHT to be configured to repropagate messages for a number…
Browse files Browse the repository at this point in the history
… of rounds (#3211)

<!--- Provide a general summary of your changes in the Title above -->

## Description
<!--- Describe your changes in detail -->
Use the dedup cache hit count to allow certain duplicate messages through a
configurable number of times. 

## Motivation and Context
<!--- Why is this change required? What problem does it solve? -->
<!--- If it fixes an open issue, please link to the issue here. -->

~~This improves mempool synchronization.~~ Implements gossip repropagation that could be used for some message types in future.

## How Has This Been Tested?
<!--- Please describe in detail how you tested your changes. -->
<!--- Include details of your testing environment, and the tests you ran to -->
<!--- see how your change affects other areas of the code, etc. -->
New unit test. More manual system tests need to be done

## Checklist:
<!--- Go over all the following points, and put an `x` in all the boxes that apply. -->
* [x] I'm merging against the `development` branch.
* [x] I have squashed my commits into a single commit.
  • Loading branch information
sdbondi committed Aug 30, 2021
1 parent 20618b6 commit 60f286b
Show file tree
Hide file tree
Showing 26 changed files with 613 additions and 247 deletions.
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/inbound_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl MempoolInboundHandlers {
if tx_storage.is_stored() {
debug!(
target: LOG_TARGET,
"Mempool already has transaction: {}", kernel_excess_sig
"Mempool already has transaction: {}.", kernel_excess_sig
);
return Ok(tx_storage);
}
Expand Down
153 changes: 94 additions & 59 deletions comms/dht/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use tari_comms::{
peer_manager::{NodeId, NodeIdentity, PeerFeatures, PeerManager, PeerManagerError, PeerQuery, PeerQuerySortBy},
types::CommsPublicKey,
};
use tari_crypto::tari_utilities::hex::Hex;
use tari_shutdown::ShutdownSignal;
use tari_utilities::message_format::{MessageFormat, MessageFormatError};
use thiserror::Error;
Expand Down Expand Up @@ -101,9 +102,14 @@ impl From<SendError> for DhtActorError {
pub enum DhtRequest {
/// Send a Join request to the network
SendJoin,
/// Inserts a message signature to the msg hash cache. This operation replies with a boolean
/// which is true if the signature already exists in the cache, otherwise false
MsgHashCacheInsert(Vec<u8>, CommsPublicKey, oneshot::Sender<bool>),
/// Inserts a message signature to the msg hash cache. This operation replies with the number of times this message
/// has previously been seen (hit count)
MsgHashCacheInsert {
message_hash: Vec<u8>,
received_from: CommsPublicKey,
reply_tx: oneshot::Sender<u32>,
},
GetMsgHashHitCount(Vec<u8>, oneshot::Sender<u32>),
/// Fetch selected peers according to the broadcast strategy
SelectPeers(BroadcastStrategy, oneshot::Sender<Vec<NodeId>>),
GetMetadata(DhtMetadataKey, oneshot::Sender<Result<Option<Vec<u8>>, DhtActorError>>),
Expand All @@ -114,12 +120,22 @@ impl Display for DhtRequest {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use DhtRequest::*;
match self {
SendJoin => f.write_str("SendJoin"),
MsgHashCacheInsert(_, _, _) => f.write_str("MsgHashCacheInsert"),
SelectPeers(s, _) => f.write_str(&format!("SelectPeers (Strategy={})", s)),
GetMetadata(key, _) => f.write_str(&format!("GetMetadata (key={})", key)),
SendJoin => write!(f, "SendJoin"),
MsgHashCacheInsert {
message_hash,
received_from,
..
} => write!(
f,
"MsgHashCacheInsert(message hash: {}, received from: {})",
message_hash.to_hex(),
received_from.to_hex(),
),
GetMsgHashHitCount(hash, _) => write!(f, "GetMsgHashHitCount({})", hash.to_hex()),
SelectPeers(s, _) => write!(f, "SelectPeers (Strategy={})", s),
GetMetadata(key, _) => write!(f, "GetMetadata (key={})", key),
SetMetadata(key, value, _) => {
f.write_str(&format!("SetMetadata (key={}, value={} bytes)", key, value.len()))
write!(f, "SetMetadata (key={}, value={} bytes)", key, value.len())
},
}
}
Expand Down Expand Up @@ -147,14 +163,27 @@ impl DhtRequester {
reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

pub async fn insert_message_hash(
pub async fn add_message_to_dedup_cache(
&mut self,
message_hash: Vec<u8>,
public_key: CommsPublicKey,
) -> Result<bool, DhtActorError> {
received_from: CommsPublicKey,
) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert {
message_hash,
received_from,
reply_tx,
})
.await?;

reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
}

pub async fn get_message_cache_hit_count(&mut self, message_hash: Vec<u8>) -> Result<u32, DhtActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
self.sender
.send(DhtRequest::MsgHashCacheInsert(message_hash, public_key, reply_tx))
.send(DhtRequest::GetMsgHashHitCount(message_hash, reply_tx))
.await?;

reply_rx.await.map_err(|_| DhtActorError::ReplyCanceled)
Expand Down Expand Up @@ -268,7 +297,7 @@ impl DhtActor {
},

_ = dedup_cache_trim_ticker.select_next_some() => {
if let Err(err) = self.msg_hash_dedup_cache.truncate().await {
if let Err(err) = self.msg_hash_dedup_cache.trim_entries().await {
error!(target: LOG_TARGET, "Error when trimming message dedup cache: {:?}", err);
}
},
Expand Down Expand Up @@ -300,24 +329,36 @@ impl DhtActor {
let outbound_requester = self.outbound_requester.clone();
Box::pin(Self::broadcast_join(node_identity, outbound_requester))
},
MsgHashCacheInsert(hash, public_key, reply_tx) => {
MsgHashCacheInsert {
message_hash,
received_from,
reply_tx,
} => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
match msg_hash_cache.insert_body_hash_if_unique(hash, public_key).await {
Ok(already_exists) => {
let _ = reply_tx.send(already_exists).map_err(|_| DhtActorError::ReplyCanceled);
match msg_hash_cache.add_body_hash(message_hash, received_from).await {
Ok(hit_count) => {
let _ = reply_tx.send(hit_count);
},
Err(err) => {
warn!(
target: LOG_TARGET,
"Unable to update message dedup cache because {:?}", err
);
let _ = reply_tx.send(false).map_err(|_| DhtActorError::ReplyCanceled);
let _ = reply_tx.send(0);
},
}
Ok(())
})
},
GetMsgHashHitCount(hash, reply_tx) => {
let msg_hash_cache = self.msg_hash_dedup_cache.clone();
Box::pin(async move {
let hit_count = msg_hash_cache.get_hit_count(hash).await?;
let _ = reply_tx.send(hit_count);
Ok(())
})
},
SelectPeers(broadcast_strategy, reply_tx) => {
let peer_manager = Arc::clone(&self.peer_manager);
let node_identity = Arc::clone(&self.node_identity);
Expand Down Expand Up @@ -690,11 +731,9 @@ mod test {
test_utils::{build_peer_manager, make_client_identity, make_node_identity},
};
use chrono::{DateTime, Utc};
use std::time::Duration;
use tari_comms::test_utils::mocks::{create_connectivity_mock, create_peer_connection_mock_pair};
use tari_shutdown::Shutdown;
use tari_test_utils::random;
use tokio::time::delay_for;

async fn db_connection() -> DbConnection {
let conn = DbConnection::connect_memory(random::string(8)).await.unwrap();
Expand Down Expand Up @@ -756,21 +795,21 @@ mod test {
actor.spawn();

let signature = vec![1u8, 2, 3];
let is_dup = requester
.insert_message_hash(signature.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(signature.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
let is_dup = requester
.insert_message_hash(signature, CommsPublicKey::default())
assert_eq!(num_hits, 1);
let num_hits = requester
.add_message_to_dedup_cache(signature, CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
let is_dup = requester
.insert_message_hash(Vec::new(), CommsPublicKey::default())
assert_eq!(num_hits, 2);
let num_hits = requester
.add_message_to_dedup_cache(Vec::new(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

#[tokio_macros::test_basic]
Expand All @@ -783,14 +822,12 @@ mod test {
let (actor_tx, actor_rx) = mpsc::channel(1);
let mut requester = DhtRequester::new(actor_tx);
let outbound_requester = OutboundMessageRequester::new(out_tx);
let mut shutdown = Shutdown::new();
let trim_interval_ms = 500;
let shutdown = Shutdown::new();
// Note: This must be equal or larger than the minimum dedup cache capacity for DedupCacheDatabase
let capacity = 120;
let capacity = 10;
let actor = DhtActor::new(
DhtConfig {
dedup_cache_capacity: capacity,
dedup_cache_trim_interval: Duration::from_millis(trim_interval_ms),
..Default::default()
},
db_connection().await,
Expand All @@ -803,63 +840,61 @@ mod test {
);

// Create signatures for double the dedup cache capacity
let mut signatures: Vec<Vec<u8>> = Vec::new();
for i in 0..(capacity * 2) {
signatures.push(vec![1u8, 2, i as u8])
}
let signatures = (0..(capacity * 2)).map(|i| vec![1u8, 2, i as u8]).collect::<Vec<_>>();

// Pre-populate the dedup cache; everything should be accepted due to cleanup ticker not active yet
// Pre-populate the dedup cache; everything should be accepted because the cleanup ticker has not run yet
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}
// Try to re-insert all; everything should be marked as duplicates due to cleanup ticker not active yet
// Try to re-insert all; all hashes should have incremented their hit count
for key in &signatures {
let is_dup = actor
let num_hits = actor
.msg_hash_dedup_cache
.insert_body_hash_if_unique(key.clone(), CommsPublicKey::default())
.add_body_hash(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 2);
}

// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire immediately
let dedup_cache_db = actor.msg_hash_dedup_cache.clone();
// The cleanup ticker starts when the actor is spawned; the first cleanup event will fire fairly soon after the
// task is running on a thread. To remove this race condition, we trim the cache in the test.
dedup_cache_db.trim_entries().await.unwrap();
actor.spawn();

// Verify that the last half of the signatures are still present in the cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(is_dup);
assert_eq!(num_hits, 3);
}
// Verify that the first half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

// Let the trim period expire; this will trim the dedup cache to capacity
delay_for(Duration::from_millis(trim_interval_ms * 2)).await;
// Trim the database of excess entries
dedup_cache_db.trim_entries().await.unwrap();

// Verify that the last half of the signatures have been removed and can be re-inserted into cache
for key in signatures.iter().take(capacity * 2).skip(capacity) {
let is_dup = requester
.insert_message_hash(key.clone(), CommsPublicKey::default())
let num_hits = requester
.add_message_to_dedup_cache(key.clone(), CommsPublicKey::default())
.await
.unwrap();
assert!(!is_dup);
assert_eq!(num_hits, 1);
}

shutdown.trigger().unwrap();
}

#[tokio_macros::test_basic]
Expand Down
5 changes: 5 additions & 0 deletions comms/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl DhtBuilder {
self
}

pub fn with_dedup_discard_hit_count(mut self, max_hit_count: usize) -> Self {
self.config.dedup_allowed_message_occurrences = max_hit_count;
self
}

pub fn with_num_random_nodes(mut self, n: usize) -> Self {
self.config.num_random_nodes = n;
self
Expand Down
6 changes: 6 additions & 0 deletions comms/dht/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub struct DhtConfig {
/// The periodic trim interval for items in the message hash cache
/// Default: 300s (5 mins)
pub dedup_cache_trim_interval: Duration,
/// The number of occurrences of a message is allowed to pass through the DHT pipeline before being
/// deduped/discarded
/// Default: 1
pub dedup_allowed_message_occurrences: usize,
/// The duration to wait for a peer discovery to complete before giving up.
/// Default: 2 minutes
pub discovery_request_timeout: Duration,
Expand Down Expand Up @@ -136,6 +140,7 @@ impl DhtConfig {

impl Default for DhtConfig {
fn default() -> Self {
// NB: please remember to update field comments to reflect these defaults
Self {
num_neighbouring_nodes: 8,
num_random_nodes: 4,
Expand All @@ -151,6 +156,7 @@ impl Default for DhtConfig {
saf_max_message_size: 512 * 1024,
dedup_cache_capacity: 2_500,
dedup_cache_trim_interval: Duration::from_secs(5 * 60),
dedup_allowed_message_occurrences: 1,
database_url: DbConnectionUrl::Memory,
discovery_request_timeout: Duration::from_secs(2 * 60),
connectivity_update_interval: Duration::from_secs(2 * 60),
Expand Down
Loading

0 comments on commit 60f286b

Please sign in to comment.