Skip to content

Commit

Permalink
feat: implement DHT protocol versioning, includes #3243 (#3377)
Browse files Browse the repository at this point in the history
Description
---
- implements DHT protocol versioning allowing the same codebase to be
  used for multiple networks
- DRY dht header signature construction
- Pre-version 2 DHT protocol does not use the entire header in
  signature challenge
- minor DHT builder improvements

Motivation and Context
---
Allow new break protocols to be introduced while old protocols still exist.
Setting the protocol version sets the DHT to speak that version, while still supporting previous versions  

Closes #3243 

How Has This Been Tested?
---

Tested discovery on weatherwax which uses versioned challenge construction
  • Loading branch information
sdbondi committed Sep 22, 2021
1 parent 30343d4 commit d676bba
Show file tree
Hide file tree
Showing 36 changed files with 697 additions and 346 deletions.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions applications/tari_base_node/src/bootstrap.rs
Expand Up @@ -55,7 +55,7 @@ use tari_p2p::{
auto_update::{AutoUpdateConfig, SoftwareUpdaterService},
comms_connector::pubsub_connector,
initialization,
initialization::{CommsConfig, P2pInitializer},
initialization::{P2pConfig, P2pInitializer},
peer_seeds::SeedPeer,
services::liveness::{LivenessConfig, LivenessInitializer},
};
Expand Down Expand Up @@ -236,8 +236,8 @@ where B: BlockchainBackend + 'static
comms.add_protocol_extension(rpc_server)
}

fn create_comms_config(&self) -> CommsConfig {
CommsConfig {
fn create_comms_config(&self) -> P2pConfig {
P2pConfig {
network: self.config.network,
node_identity: self.node_identity.clone(),
transport_type: create_transport_type(self.config),
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_console_wallet/src/init/mod.rs
Expand Up @@ -38,7 +38,7 @@ use tari_comms_dht::{DbConnectionUrl, DhtConfig};
use tari_core::transactions::CryptoFactories;
use tari_p2p::{
auto_update::AutoUpdateConfig,
initialization::CommsConfig,
initialization::P2pConfig,
peer_seeds::SeedPeer,
transport::TransportType::Tor,
DEFAULT_DNS_NAME_SERVER,
Expand Down Expand Up @@ -326,7 +326,7 @@ pub async fn init_wallet(
_ => transport_type,
};

let comms_config = CommsConfig {
let comms_config = P2pConfig {
network: config.network,
node_identity,
user_agent: format!("tari/wallet/{}", env!("CARGO_PKG_VERSION")),
Expand Down
51 changes: 25 additions & 26 deletions base_layer/p2p/src/initialization.rs
Expand Up @@ -62,7 +62,7 @@ use tari_comms::{
PeerManager,
UnspawnedCommsNode,
};
use tari_comms_dht::{Dht, DhtBuilder, DhtConfig, DhtInitializationError};
use tari_comms_dht::{Dht, DhtConfig, DhtInitializationError, DhtProtocolVersion};
use tari_service_framework::{async_trait, ServiceInitializationError, ServiceInitializer, ServiceInitializerContext};
use tari_shutdown::ShutdownSignal;
use tari_storage::{
Expand Down Expand Up @@ -112,7 +112,7 @@ impl CommsInitializationError {

/// Configuration for a comms node
#[derive(Clone)]
pub struct CommsConfig {
pub struct P2pConfig {
/// Path to the LMDB data files.
pub datastore_path: PathBuf,
/// Name to use for the peer database
Expand Down Expand Up @@ -202,17 +202,17 @@ pub async fn initialize_local_test_comms(
// Create outbound channel
let (outbound_tx, outbound_rx) = mpsc::channel(10);

let dht = DhtBuilder::new(
comms.node_identity(),
comms.peer_manager(),
outbound_tx,
comms.connectivity(),
comms.shutdown_signal(),
)
.local_test()
.with_discovery_timeout(discovery_request_timeout)
.build()
.await?;
let dht = Dht::builder()
.local_test()
.with_outbound_sender(outbound_tx)
.with_discovery_timeout(discovery_request_timeout)
.build(
comms.node_identity(),
comms.peer_manager(),
comms.connectivity(),
comms.shutdown_signal(),
)
.await?;

let dht_outbound_layer = dht.outbound_middleware_layer();
let (event_sender, _) = broadcast::channel(100);
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn initialize_hidden_service(

async fn configure_comms_and_dht(
builder: CommsBuilder,
config: &CommsConfig,
config: &P2pConfig,
connector: InboundDomainConnector,
) -> Result<(UnspawnedCommsNode, Dht), CommsInitializationError> {
let file_lock = acquire_exclusive_file_lock(&config.datastore_path)?;
Expand Down Expand Up @@ -352,16 +352,15 @@ async fn configure_comms_and_dht(
// Create outbound channel
let (outbound_tx, outbound_rx) = mpsc::channel(config.outbound_buffer_size);

let dht = DhtBuilder::new(
node_identity.clone(),
peer_manager,
outbound_tx,
connectivity,
shutdown_signal,
)
.with_config(config.dht.clone())
.build()
.await?;
let mut dht = Dht::builder();
dht.with_config(config.dht.clone()).with_outbound_sender(outbound_tx);
// TODO: remove this once enough weatherwax nodes have upgraded
if config.network == Network::Weatherwax {
dht.with_protocol_version(DhtProtocolVersion::v1());
}
let dht = dht
.build(node_identity.clone(), peer_manager, connectivity, shutdown_signal)
.await?;

let dht_outbound_layer = dht.outbound_middleware_layer();

Expand Down Expand Up @@ -449,12 +448,12 @@ async fn add_all_peers(
}

pub struct P2pInitializer {
config: CommsConfig,
config: P2pConfig,
connector: Option<PubsubDomainConnector>,
}

impl P2pInitializer {
pub fn new(config: CommsConfig, connector: PubsubDomainConnector) -> Self {
pub fn new(config: P2pConfig, connector: PubsubDomainConnector) -> Self {
Self {
config,
connector: Some(connector),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/services/liveness/service.rs
Expand Up @@ -322,6 +322,7 @@ mod test {
use tari_comms_dht::{
envelope::{DhtMessageHeader, DhtMessageType},
outbound::{DhtOutboundRequest, MessageSendState, SendMessageResponse},
DhtProtocolVersion,
};
use tari_crypto::keys::PublicKey;
use tari_service_framework::reply_channel;
Expand Down Expand Up @@ -435,8 +436,7 @@ mod test {
);
DomainMessage {
dht_header: DhtMessageHeader {
major: 0,
minor: 0,
version: DhtProtocolVersion::latest(),
destination: Default::default(),
origin_mac: Vec::new(),
ephemeral_public_key: None,
Expand Down
4 changes: 2 additions & 2 deletions base_layer/p2p/src/test_utils.rs
Expand Up @@ -30,6 +30,7 @@ use tari_comms::{
use tari_comms_dht::{
envelope::{DhtMessageFlags, DhtMessageHeader, DhtMessageType, NodeDestination},
inbound::DhtInboundMessage,
DhtProtocolVersion,
};

macro_rules! unwrap_oms_send_msg {
Expand Down Expand Up @@ -59,8 +60,7 @@ pub fn make_node_identity() -> Arc<NodeIdentity> {

pub fn make_dht_header(trace: MessageTag) -> DhtMessageHeader {
DhtMessageHeader {
major: 0,
minor: 0,
version: DhtProtocolVersion::latest(),
destination: NodeDestination::Unknown,
origin_mac: Vec::new(),
ephemeral_public_key: None,
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/src/config.rs
Expand Up @@ -23,7 +23,7 @@
use std::time::Duration;

use tari_core::{consensus::NetworkConsensus, transactions::CryptoFactories};
use tari_p2p::{auto_update::AutoUpdateConfig, initialization::CommsConfig};
use tari_p2p::{auto_update::AutoUpdateConfig, initialization::P2pConfig};

use crate::{
base_node_service::config::BaseNodeServiceConfig,
Expand All @@ -35,7 +35,7 @@ pub const KEY_MANAGER_COMMS_SECRET_KEY_BRANCH_KEY: &str = "comms";

#[derive(Clone)]
pub struct WalletConfig {
pub comms_config: CommsConfig,
pub comms_config: P2pConfig,
pub factories: CryptoFactories,
pub transaction_service_config: Option<TransactionServiceConfig>,
pub output_manager_service_config: Option<OutputManagerServiceConfig>,
Expand All @@ -51,7 +51,7 @@ pub struct WalletConfig {
impl WalletConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
comms_config: CommsConfig,
comms_config: P2pConfig,
factories: CryptoFactories,
transaction_service_config: Option<TransactionServiceConfig>,
output_manager_service_config: Option<OutputManagerServiceConfig>,
Expand Down
5 changes: 2 additions & 3 deletions base_layer/wallet/tests/support/comms_and_services.rs
Expand Up @@ -29,7 +29,7 @@ use tari_comms::{
types::CommsPublicKey,
CommsNode,
};
use tari_comms_dht::{envelope::DhtMessageHeader, Dht};
use tari_comms_dht::{envelope::DhtMessageHeader, Dht, DhtProtocolVersion};
use tari_p2p::{
comms_connector::InboundDomainConnector,
domain_message::DomainMessage,
Expand Down Expand Up @@ -77,8 +77,7 @@ pub fn create_dummy_message<T>(inner: T, public_key: &CommsPublicKey) -> DomainM
);
DomainMessage {
dht_header: DhtMessageHeader {
major: Default::default(),
minor: Default::default(),
version: DhtProtocolVersion::latest(),
ephemeral_public_key: None,
origin_mac: Vec::new(),
message_type: Default::default(),
Expand Down
6 changes: 3 additions & 3 deletions base_layer/wallet/tests/wallet/mod.rs
Expand Up @@ -53,7 +53,7 @@ use tari_core::transactions::{
transaction::OutputFeatures,
CryptoFactories,
};
use tari_p2p::{initialization::CommsConfig, transport::TransportType, Network, DEFAULT_DNS_NAME_SERVER};
use tari_p2p::{initialization::P2pConfig, transport::TransportType, Network, DEFAULT_DNS_NAME_SERVER};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tari_test_utils::random;
use tari_wallet::{
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn create_wallet(
) -> Result<WalletSqlite, WalletError> {
const NETWORK: Network = Network::Weatherwax;
let node_identity = NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE);
let comms_config = CommsConfig {
let comms_config = P2pConfig {
network: NETWORK,
node_identity: Arc::new(node_identity.clone()),
transport_type: TransportType::Memory {
Expand Down Expand Up @@ -685,7 +685,7 @@ async fn test_import_utxo() {
);
let temp_dir = tempdir().unwrap();
let (connection, _temp_dir) = make_wallet_database_connection(None);
let comms_config = CommsConfig {
let comms_config = P2pConfig {
network: Network::Weatherwax,
node_identity: Arc::new(alice_identity.clone()),
transport_type: TransportType::Tcp {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/wallet_ffi/src/lib.rs
Expand Up @@ -210,7 +210,7 @@ const LOG_TARGET: &str = "wallet_ffi";
pub type TariTransportType = tari_p2p::transport::TransportType;
pub type TariPublicKey = tari_comms::types::CommsPublicKey;
pub type TariPrivateKey = tari_comms::types::CommsSecretKey;
pub type TariCommsConfig = tari_p2p::initialization::CommsConfig;
pub type TariCommsConfig = tari_p2p::initialization::P2pConfig;
pub type TariTransactionKernel = tari_core::transactions::transaction::TransactionKernel;

pub struct TariContacts(Vec<TariContact>);
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/Cargo.toml
Expand Up @@ -36,7 +36,7 @@ serde_derive = "1.0.90"
serde_repr = "0.1.5"
thiserror = "1.0.26"
tokio = { version = "1.11", features = ["rt", "macros"] }
tower = "0.3.1"
tower = { version= "0.4.8", features=["full"] }
ttl_cache = "0.5.1"

# tower-filter dependencies
Expand Down
41 changes: 20 additions & 21 deletions comms/dht/examples/memory_net/utilities.rs
Expand Up @@ -54,7 +54,6 @@ use tari_comms_dht::{
inbound::DecryptedDhtMessage,
outbound::OutboundEncryption,
Dht,
DhtBuilder,
DhtConfig,
};
use tari_shutdown::{Shutdown, ShutdownSignal};
Expand Down Expand Up @@ -911,26 +910,26 @@ async fn setup_comms_dht(
comms.peer_manager().add_peer(peer).await.unwrap();
}

let dht = DhtBuilder::new(
comms.node_identity(),
comms.peer_manager(),
outbound_tx,
comms.connectivity(),
comms.shutdown_signal(),
)
.with_config(DhtConfig {
saf_auto_request,
auto_join: false,
discovery_request_timeout: Duration::from_secs(15),
num_neighbouring_nodes,
num_random_nodes,
propagation_factor,
network_discovery: Default::default(),
..DhtConfig::default_local_test()
})
.build()
.await
.unwrap();
let dht = Dht::builder()
.with_config(DhtConfig {
saf_auto_request,
auto_join: false,
discovery_request_timeout: Duration::from_secs(15),
num_neighbouring_nodes,
num_random_nodes,
propagation_factor,
network_discovery: Default::default(),
..DhtConfig::default_local_test()
})
.with_outbound_sender(outbound_tx)
.build(
comms.node_identity(),
comms.peer_manager(),
comms.connectivity(),
comms.shutdown_signal(),
)
.await
.unwrap();

let dht_outbound_layer = dht.outbound_middleware_layer();
let pipeline = pipeline::Builder::new()
Expand Down

0 comments on commit d676bba

Please sign in to comment.