Skip to content

Commit

Permalink
Move the encrypted send direct into the requester
Browse files Browse the repository at this point in the history
  • Loading branch information
brianp committed Apr 12, 2023
1 parent 95f74cb commit ceb8ec2
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 25 deletions.
23 changes: 6 additions & 17 deletions base_layer/contacts/src/contacts_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ use futures::{pin_mut, StreamExt};
use log::*;
use tari_common_types::tari_address::TariAddress;
use tari_comms::connectivity::{ConnectivityEvent, ConnectivityRequester};
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
envelope::NodeDestination,
outbound::{DhtOutboundError, OutboundEncryption, SendMessageParams},
Dht,
};
use tari_comms_dht::{domain_message::OutboundDomainMessage, outbound::OutboundEncryption, Dht};
use tari_p2p::{
comms_connector::SubscriptionFactory,
domain_message::DomainMessage,
Expand Down Expand Up @@ -290,19 +285,13 @@ where T: ContactsBackend + 'static
let mut comms_outbound = self.dht.outbound_requester();

comms_outbound
.send_message(
SendMessageParams::new()
.with_debug_info(format!("Send direct to {}", &address.public_key()))
.direct_public_key(address.public_key().clone())
.with_encryption(encryption)
.with_destination(NodeDestination::from(address.public_key().clone()))
.finish(),
.send_direct(
address.public_key().clone(),
ob_message,
encryption,
"contact service messaging".to_string(),
)
.await?
.resolve()
.await
.map_err(Into::<DhtOutboundError>::into)?;
.await?;
},
Err(e) => return Err(e),
_ => {
Expand Down
2 changes: 1 addition & 1 deletion base_layer/core/src/base_node/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ async fn handle_incoming_request<B: BlockchainBackend + 'static>(
);

let send_message_response = outbound_message_service
.send_direct(
.send_direct_unencrypted(
origin_public_key,
OutboundDomainMessage::new(&TariMessageType::BaseNodeResponse, message),
"Outbound response message from base node".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/core/tests/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ async fn receive_and_propagate_transaction() {

alice_node
.outbound_message_service
.send_direct(
.send_direct_unencrypted(
bob_node.node_identity.public_key().clone(),
OutboundDomainMessage::new(
&TariMessageType::NewTransaction,
Expand All @@ -866,7 +866,7 @@ async fn receive_and_propagate_transaction() {
.unwrap();
alice_node
.outbound_message_service
.send_direct(
.send_direct_unencrypted(
carol_node.node_identity.public_key().clone(),
OutboundDomainMessage::new(
&TariMessageType::NewTransaction,
Expand Down
2 changes: 1 addition & 1 deletion base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ where
async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> {
let msg = PingPongMessage::pong_with_metadata(nonce, self.state.metadata().clone());
self.outbound_messaging
.send_direct(
.send_direct_unencrypted(
dest,
OutboundDomainMessage::new(&TariMessageType::PingPong, msg),
"Sending pong".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ where
match self
.resources
.outbound_message_service
.send_direct(
.send_direct_unencrypted(
self.dest_address.public_key().clone(),
OutboundDomainMessage::new(&TariMessageType::SenderPartialTransaction, proto_message.clone()),
"transaction send".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub async fn send_finalized_transaction_message_direct(
let mut store_and_forward_send_result = false;
let mut direct_send_result = false;
match outbound_message_service
.send_direct(
.send_direct_unencrypted(
destination_public_key.clone(),
OutboundDomainMessage::new(
&TariMessageType::TransactionFinalized,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn send_transaction_cancelled_message(
// Send both direct and SAF we are not going to monitor the progress on these messages for potential resend as
// they are just courtesy messages
let _send_message_response = outbound_message_service
.send_direct(
.send_direct_unencrypted(
destination_public_key.clone(),
OutboundDomainMessage::new(&TariMessageType::TransactionCancelled, proto_message.clone()),
"transaction cancelled".to_string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub async fn send_transaction_reply_direct(
.try_into()
.map_err(TransactionServiceError::ServiceError)?;
match outbound_message_service
.send_direct(
.send_direct_unencrypted(
inbound_transaction.source_address.public_key().clone(),
OutboundDomainMessage::new(&TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()),
"wallet transaction reply".to_string(),
Expand Down
27 changes: 27 additions & 0 deletions comms/dht/src/outbound/requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,33 @@ impl OutboundMessageRequester {

/// Send directly to a peer. If the peer does not exist in the peer list, a discovery will be initiated.
pub async fn send_direct<T>(
&mut self,
dest_public_key: CommsPublicKey,
message: OutboundDomainMessage<T>,
encryption: OutboundEncryption,
source_info: String,
) -> Result<MessageSendStates, DhtOutboundError>
where
T: prost::Message,
{
self.send_message(
SendMessageParams::new()
.with_debug_info(format!("Send direct to {} from {}", &dest_public_key, source_info))
.direct_public_key(dest_public_key.clone())
.with_discovery(true)
.with_encryption(encryption)
.with_destination(dest_public_key.into())
.finish(),
message,
)
.await?
.resolve()
.await
.map_err(Into::into)
}

/// Send directly to a peer unencrypted. If the peer does not exist in the peer list, a discovery will be initiated.
pub async fn send_direct_unencrypted<T>(
&mut self,
dest_public_key: CommsPublicKey,
message: OutboundDomainMessage<T>,
Expand Down

0 comments on commit ceb8ec2

Please sign in to comment.