Skip to content

Commit

Permalink
This enables tracking messages across nodes
Browse files Browse the repository at this point in the history
Merge pull request #1881

This enables tracking messages across nodes during testnet, e.g. originating `msg tag` as the transaction protocol is executed.
  • Loading branch information
CjS77 committed May 22, 2020
2 parents e1b70d2 + e1e555c commit b8ad396
Show file tree
Hide file tree
Showing 17 changed files with 227 additions and 88 deletions.
6 changes: 4 additions & 2 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ where
let excluded = self
.neighbours
.node_ids()
.into_iter()
.iter()
.chain(vec![node_id])
.cloned()
.collect();
Expand Down Expand Up @@ -441,7 +441,7 @@ where
}

async fn refresh_random_peer_pool(&mut self) -> Result<(), LivenessError> {
let excluded = self.neighbours.node_ids().into_iter().cloned().collect();
let excluded = self.neighbours.node_ids().to_vec();

// Select a pool of random peers the same length as neighbouring peers
let random_peers = self
Expand Down Expand Up @@ -565,6 +565,7 @@ mod test {
use rand::rngs::OsRng;
use std::time::Duration;
use tari_comms::{
message::MessageTag,
multiaddr::Multiaddr,
peer_manager::{NodeId, Peer, PeerFeatures, PeerFlags},
};
Expand Down Expand Up @@ -694,6 +695,7 @@ mod test {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: Default::default(),
message_tag: MessageTag::new(),
},
authenticated_origin: None,
source_peer,
Expand Down
8 changes: 5 additions & 3 deletions base_layer/p2p/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn make_node_identity() -> Arc<NodeIdentity> {
)
}

pub fn make_dht_header() -> DhtMessageHeader {
pub fn make_dht_header(trace: MessageTag) -> DhtMessageHeader {
DhtMessageHeader {
version: 0,
destination: NodeDestination::Unknown,
Expand All @@ -76,13 +76,15 @@ pub fn make_dht_header() -> DhtMessageHeader {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: DhtMessageFlags::NONE,
message_tag: trace,
}
}

pub fn make_dht_inbound_message(node_identity: &NodeIdentity, message: Vec<u8>) -> DhtInboundMessage {
let msg_tag = MessageTag::new();
DhtInboundMessage::new(
MessageTag::new(),
make_dht_header(),
msg_tag,
make_dht_header(msg_tag),
Arc::new(Peer::new(
node_identity.public_key().clone(),
node_identity.node_id().clone(),
Expand Down
10 changes: 5 additions & 5 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ where
},
// Incoming messages from the Comms layer
msg = base_node_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Base Node Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: {}", msg.dht_header.message_tag);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
Err(resp)
Expand Down Expand Up @@ -516,7 +516,7 @@ where
received_output: &TransactionOutput,
) -> Result<(), OutputManagerError>
{
let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id.clone()).await?;
let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id).await?;

// Assumption: We are only allowing a single output per receiver in the current transaction protocols.
if pending_transaction.outputs_to_be_received.len() != 1 ||
Expand All @@ -529,7 +529,7 @@ where
}

self.db
.confirm_pending_transaction_outputs(pending_transaction.tx_id.clone())
.confirm_pending_transaction_outputs(pending_transaction.tx_id)
.await?;

Ok(())
Expand Down Expand Up @@ -623,7 +623,7 @@ where
outputs: &[TransactionOutput],
) -> Result<(), OutputManagerError>
{
let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id.clone()).await?;
let pending_transaction = self.db.fetch_pending_transaction_outputs(tx_id).await?;

// Check that outputs to be spent can all be found in the provided transaction inputs
let mut inputs_confirmed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,21 @@ where TBackend: TransactionBackend + Clone + 'static
},
Some(send_states) => {
if send_states.len() == 1 {
let msg_tag = send_states[0].tag;
debug!(
target: LOG_TARGET,
"Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
"Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {}",
self.id,
self.dest_pubkey,
send_states[0].tag,
&msg_tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was successful",
self.id
"Direct Send of Transaction Finalized message for TX_ID: {} was successful ({})",
self.id,
msg_tag
);
},
false => {
Expand Down Expand Up @@ -507,7 +509,7 @@ where TBackend: TransactionBackend + Clone + 'static
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
"Transaction (TxId: {}) Direct Send to {} queued with Message Tag: {}",
self.id,
self.dest_pubkey,
send_states[0].tag,
Expand Down
Loading

0 comments on commit b8ad396

Please sign in to comment.