Skip to content

Commit

Permalink
This enables tracking messages across nodes during testnet, e.g. orig…
Browse files Browse the repository at this point in the history
…inating `msg tag` as the transaction protocol is executed.
  • Loading branch information
hansieodendaal committed May 20, 2020
1 parent 948ef00 commit f555048
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 73 deletions.
7 changes: 4 additions & 3 deletions base_layer/p2p/src/services/liveness/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ where
let excluded = self
.neighbours
.node_ids()
.into_iter()
.iter()
.chain(vec![node_id])
.cloned()
.collect();
Expand Down Expand Up @@ -441,7 +441,7 @@ where
}

async fn refresh_random_peer_pool(&mut self) -> Result<(), LivenessError> {
let excluded = self.neighbours.node_ids().into_iter().cloned().collect();
let excluded = self.neighbours.node_ids().to_vec();

// Select a pool of random peers the same length as neighbouring peers
let random_peers = self
Expand Down Expand Up @@ -562,7 +562,7 @@ mod test {
services::liveness::{handle::LivenessHandle, state::Metadata},
};
use futures::{channel::mpsc, stream, FutureExt};
use rand::rngs::OsRng;
use rand::{rngs::OsRng, RngCore};
use std::time::Duration;
use tari_comms::{
multiaddr::Multiaddr,
Expand Down Expand Up @@ -694,6 +694,7 @@ mod test {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: Default::default(),
message_trace: OsRng.next_u64(),
},
authenticated_origin: None,
source_peer,
Expand Down
8 changes: 5 additions & 3 deletions base_layer/p2p/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn make_node_identity() -> Arc<NodeIdentity> {
)
}

pub fn make_dht_header() -> DhtMessageHeader {
pub fn make_dht_header(trace: u64) -> DhtMessageHeader {
DhtMessageHeader {
version: 0,
destination: NodeDestination::Unknown,
Expand All @@ -76,13 +76,15 @@ pub fn make_dht_header() -> DhtMessageHeader {
message_type: DhtMessageType::None,
network: Network::LocalTest,
flags: DhtMessageFlags::NONE,
message_trace: trace,
}
}

pub fn make_dht_inbound_message(node_identity: &NodeIdentity, message: Vec<u8>) -> DhtInboundMessage {
let msg_tag = MessageTag::new();
DhtInboundMessage::new(
MessageTag::new(),
make_dht_header(),
msg_tag,
make_dht_header(msg_tag.value()),
Arc::new(Peer::new(
node_identity.public_key().clone(),
node_identity.node_id().clone(),
Expand Down
4 changes: 2 additions & 2 deletions base_layer/wallet/src/output_manager_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ where
},
// Incoming messages from the Comms layer
msg = base_node_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Base Node Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?}", origin_public_key, resp);
Err(resp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,19 +353,21 @@ where TBackend: TransactionBackend + Clone + 'static
},
Some(send_states) => {
if send_states.len() == 1 {
let msg_tag = send_states[0].tag;
debug!(
target: LOG_TARGET,
"Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
self.id,
self.dest_pubkey,
send_states[0].tag,
&msg_tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was successful",
self.id
"Direct Send of Transaction Finalized message for TX_ID: {} was successful ({:?})",
self.id,
msg_tag
);
},
false => {
Expand Down
118 changes: 82 additions & 36 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,44 +275,46 @@ where
},
// Incoming messages from the Comms layer
msg = transaction_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let result = self.accept_transaction(origin_public_key, inner_msg).await;
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Message, Trace: Tag#{}", msg.dht_header.message_trace);

let result = self.accept_transaction(origin_public_key, inner_msg, msg.dht_header.message_trace).await;


match result {
Err(TransactionServiceError::RepeatedMessageError) => {
trace!(target: LOG_TARGET, "A repeated Transaction message was received");
trace!(target: LOG_TARGET, "A repeated Transaction message was received, Trace: Tag#{}", msg.dht_header.message_trace);
}
Err(e) => {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}", e, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction message: {:?} for NodeID: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error(format!("Error handling Transaction Sender message: {:?}", e).to_string())));
}
_ => (),
}
},
// Incoming messages from the Comms layer
msg = transaction_reply_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Reply Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Reply Message, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.accept_recipient_reply(origin_public_key, inner_msg).await;

match result {
Err(TransactionServiceError::TransactionDoesNotExistError) => {
debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward", self.node_identity.node_id().short_str());
debug!(target: LOG_TARGET, "Unable to handle incoming Transaction Reply message from NodeId: {} due to Transaction not Existing. This usually means the message was a repeated message from Store and Forward, Trace: Tag#{}", self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
},
Err(e) => {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}", e, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Reply message: {:?} for NodeId: {}, Trace: Tag#{}", e, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
let _ = self.event_publisher.send(Arc::new(TransactionEvent::Error("Error handling Transaction Recipient Reply message".to_string())));
},
Ok(_) => (),
}
},
// Incoming messages from the Comms layer
msg = transaction_finalized_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Transaction Finalized Message");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Transaction Finalized Message, Trace: Tag#{}", msg.dht_header.message_trace);
let result = self.accept_finalized_transaction(origin_public_key, inner_msg, &mut transaction_broadcast_protocol_handles).await.or_else(|err| {
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}", err , self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Failed to handle incoming Transaction Finalized message: {:?} for NodeID: {}, Trace: Tag#{}", err , self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
Err(err)
});

Expand All @@ -322,19 +324,19 @@ where
},
// Incoming messages from the Comms layer
msg = mempool_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Mempool Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Mempool Response, Trace: Tag#{}", msg.dht_header.message_trace);
let _ = self.handle_mempool_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling mempool service response: {:?}", resp);
error!(target: LOG_TARGET, "Error handling mempool service response: {:?}, Trace: Tag#{}", resp, msg.dht_header.message_trace);
Err(resp)
});
}
// Incoming messages from the Comms layer
msg = base_node_response_stream.select_next_some() => {
trace!(target: LOG_TARGET, "Handling Base Node Response");
let (origin_public_key, inner_msg) = msg.into_origin_and_inner();
let (origin_public_key, inner_msg) = msg.clone().into_origin_and_inner();
trace!(target: LOG_TARGET, "Handling Base Node Response, Trace: Tag#{}", msg.dht_header.message_trace);
let _ = self.handle_base_node_response(inner_msg).await.or_else(|resp| {
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}", origin_public_key, resp, self.node_identity.node_id().short_str());
error!(target: LOG_TARGET, "Error handling base node service response from {}: {:?} for NodeID: {}, Trace: Tag#{}", origin_public_key, resp, self.node_identity.node_id().short_str(), msg.dht_header.message_trace);
Err(resp)
});
}
Expand Down Expand Up @@ -700,6 +702,7 @@ where
&mut self,
source_pubkey: CommsPublicKey,
sender_message: proto::TransactionSenderMessage,
message_trace: u64,
) -> Result<(), TransactionServiceError>
{
let sender_message: TransactionSenderMessage = sender_message
Expand All @@ -710,17 +713,19 @@ where
if let TransactionSenderMessage::Single(data) = sender_message.clone() {
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) received from {}",
"Transaction (TxId: {}) received from {}, Trace: Tag#{}",
data.tx_id,
source_pubkey
source_pubkey,
message_trace
);
// Check this is not a repeat message i.e. tx_id doesn't already exist in our pending or completed
// transactions
if self.db.transaction_exists(data.tx_id).await? {
trace!(
target: LOG_TARGET,
"Transaction (TxId: {}) already present in database.",
data.tx_id
"Transaction (TxId: {}) already present in database, Trace: Tag#{}.",
data.tx_id,
message_trace
);
return Err(TransactionServiceError::RepeatedMessageError);
}
Expand Down Expand Up @@ -756,49 +761,73 @@ where
.await
{
None => {
self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone())
.await?;
debug!(
target: LOG_TARGET,
"Transaction Reply (TxId: {}) Direct Send to {} not possible, attempting Store and Forward, \
Trace: Tag#{}",
tx_id,
source_pubkey,
message_trace,
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
message_trace,
)
.await?;
},
Some(send_states) => {
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction Reply (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
"Transaction Reply (TxId: {}) Direct Send to {} queued with Message Tag: {:?}, Trace: \
Tag#{}",
tx_id,
source_pubkey,
send_states[0].tag,
message_trace,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was successful", tx_id
"Direct Send of Transaction Reply message for TX_ID: {} was successful, Trace: \
Tag#{}",
tx_id,
message_trace
);
},
false => {
error!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was unsuccessful and no \
message was sent",
tx_id
message was sent, attempting Store and Forward, Trace: Tag#{}",
tx_id,
message_trace
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
message_trace,
)
.await?
},
}
} else {
error!(
target: LOG_TARGET,
"Transaction Reply message Send Direct for TxID: {} failed", tx_id
"Transaction Reply message Direct Send for TxID: {} failed, attempting Store and Forward, \
Trace: Tag#{}",
tx_id,
message_trace
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
message_trace,
)
.await?
}
Expand All @@ -821,11 +850,18 @@ where

info!(
target: LOG_TARGET,
"Transaction with TX_ID = {} received from {}. Reply Sent", tx_id, source_pubkey,
"Transaction with TX_ID = {} received from {}, Trace: Tag#{}. Reply Sent",
tx_id,
source_pubkey,
message_trace
);
info!(
target: LOG_TARGET,
"Transaction (TX_ID: {}) - Amount: {} - Message: {}", tx_id, amount, data.message
"Transaction (TX_ID: {}) - Amount: {} - Message: {}, Trace: Tag#{}",
tx_id,
amount,
data.message,
message_trace
);

let _ = self
Expand All @@ -848,6 +884,7 @@ where
tx_id: TxId,
source_pubkey: CommsPublicKey,
msg: proto::RecipientSignedMessage,
message_trace: u64,
) -> Result<(), TransactionServiceError>
{
match self
Expand All @@ -864,31 +901,40 @@ where
None => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed", tx_id
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed, Trace: \
Tag#{}",
tx_id,
message_trace
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
Message Tags: {:?}, Trace: Tag#{}",
tx_id,
tags,
message_trace,
);
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply to Neighbours for Store and Forward for TX_ID: {} was unsuccessful \
and no messages were sent",
tx_id
and no messages were sent, Trace: Tag#{}",
tx_id,
message_trace
);
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed: {:?}", tx_id, e
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed: {:?}, Trace: \
Tag#{}",
tx_id,
e,
message_trace
);
},
};
Expand Down
Loading

0 comments on commit f555048

Please sign in to comment.