diff --git a/applications/tari_base_node/log4rs_sample.yml b/applications/tari_base_node/log4rs_sample.yml index 54ea988c48..ee6d51e03f 100644 --- a/applications/tari_base_node/log4rs_sample.yml +++ b/applications/tari_base_node/log4rs_sample.yml @@ -36,7 +36,24 @@ appenders: count: 5 pattern: "log/base-node/network.{}.log" encoder: - pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m}{n} // {f}:{L}" + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m} // {f}:{L}{n}" + # An appender named "network" that writes to a file with a custom pattern encoder + message_logging: + kind: rolling_file + path: "log/base-node/messages.log" + policy: + kind: compound + trigger: + kind: size + limit: 10mb + roller: + kind: fixed_window + base: 1 + count: 5 + pattern: "log/base-node/messages.{}.log" + encoder: + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [Thread:{I}] {l:5} {m} // {f}:{L}{n}" + # An appender named "base_layer" that writes to a file with a custom pattern encoder base_layer: @@ -53,7 +70,7 @@ appenders: count: 5 pattern: "log/base-node/base_layer.{}.log" encoder: - pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [{X(node-public-key)},{X(node-id)}] {l:5} {m}{n} // {f}:{L} " + pattern: "{d(%Y-%m-%d %H:%M:%S.%f)} [{t}] [{X(node-public-key)},{X(node-id)}] {l:5} {m} // {f}:{L}{n}" # An appender named "other" that writes to a file with a custom pattern encoder other: @@ -152,3 +169,9 @@ loggers: appenders: - other additive: false + + comms::middleware::message_logging: + # Set to `trace` to retrieve message logging + level: warn + appenders: + - message_logging diff --git a/base_layer/core/src/base_node/service/service.rs b/base_layer/core/src/base_node/service/service.rs index cfb5817912..649b156c0a 100644 --- a/base_layer/core/src/base_node/service/service.rs +++ b/base_layer/core/src/base_node/service/service.rs @@ -389,6 +389,7 @@ async fn handle_incoming_request( .send_direct( origin_public_key, OutboundDomainMessage::new(&TariMessageType::BaseNodeResponse, message), + "Outbound response message from base node".to_string(), ) .await?; @@ -473,6 +474,14 @@ async fn handle_outbound_request( node_id: Option, service_request_timeout: Duration, ) -> Result<(), CommsInterfaceError> { + let debug_info = format!( + "Node request:{} to {}", + &request, + node_id + .as_ref() + .map(|n| n.short_str()) + .unwrap_or_else(|| "random".to_string()) + ); let request_key = generate_request_key(&mut OsRng); let service_request = proto::BaseNodeServiceRequest { request_key, @@ -480,6 +489,7 @@ async fn handle_outbound_request( }; let mut send_msg_params = SendMessageParams::new(); + send_msg_params.with_debug_info(debug_info); match node_id { Some(node_id) => send_msg_params.direct_node_id(node_id), None => send_msg_params.random(1), @@ -565,6 +575,7 @@ async fn handle_outbound_block( &TariMessageType::NewBlock, shared_protos::core::NewBlock::from(new_block), ), + "Outbound new block from base node".to_string(), ) .await; if let Err(e) = result { diff --git a/base_layer/core/src/mempool/mempool.rs b/base_layer/core/src/mempool/mempool.rs index 46f7f43b22..95495634a4 100644 --- a/base_layer/core/src/mempool/mempool.rs +++ b/base_layer/core/src/mempool/mempool.rs @@ -63,14 +63,14 @@ impl Mempool { /// Insert an unconfirmed transaction into the Mempool. pub async fn insert(&self, tx: Arc) -> Result { - self.with_write_access(|storage| storage.insert(tx)).await + self.with_write_access(|storage| Ok(storage.insert(tx))).await } /// Inserts all transactions into the mempool. pub async fn insert_all(&self, transactions: Vec>) -> Result<(), MempoolError> { self.with_write_access(|storage| { for tx in transactions { - storage.insert(tx)?; + storage.insert(tx); } Ok(()) diff --git a/base_layer/core/src/mempool/mempool_storage.rs b/base_layer/core/src/mempool/mempool_storage.rs index 2313ec13e7..31afcd356b 100644 --- a/base_layer/core/src/mempool/mempool_storage.rs +++ b/base_layer/core/src/mempool/mempool_storage.rs @@ -72,7 +72,7 @@ impl MempoolStorage { /// Insert an unconfirmed transaction into the Mempool. The transaction *MUST* have passed through the validation /// pipeline already and will thus always be internally consistent by this stage - pub fn insert(&mut self, tx: Arc) -> Result { + pub fn insert(&mut self, tx: Arc) -> TxStorageResponse { let tx_id = tx .body .kernels() @@ -87,41 +87,41 @@ impl MempoolStorage { "Transaction {} is VALID, inserting in unconfirmed pool", tx_id ); let weight = self.get_transaction_weighting(0); - self.unconfirmed_pool.insert(tx, None, &weight)?; - Ok(TxStorageResponse::UnconfirmedPool) + self.unconfirmed_pool.insert(tx, None, &weight); + TxStorageResponse::UnconfirmedPool }, Err(ValidationError::UnknownInputs(dependent_outputs)) => { if self.unconfirmed_pool.contains_all_outputs(&dependent_outputs) { let weight = self.get_transaction_weighting(0); - self.unconfirmed_pool.insert(tx, Some(dependent_outputs), &weight)?; - Ok(TxStorageResponse::UnconfirmedPool) + self.unconfirmed_pool.insert(tx, Some(dependent_outputs), &weight); + TxStorageResponse::UnconfirmedPool } else { warn!(target: LOG_TARGET, "Validation failed due to unknown inputs"); - Ok(TxStorageResponse::NotStoredOrphan) + TxStorageResponse::NotStoredOrphan } }, Err(ValidationError::ContainsSTxO) => { warn!(target: LOG_TARGET, "Validation failed due to already spent input"); - Ok(TxStorageResponse::NotStoredAlreadySpent) + TxStorageResponse::NotStoredAlreadySpent }, Err(ValidationError::MaturityError) => { warn!(target: LOG_TARGET, "Validation failed due to maturity error"); - Ok(TxStorageResponse::NotStoredTimeLocked) + TxStorageResponse::NotStoredTimeLocked }, Err(ValidationError::ConsensusError(msg)) => { warn!(target: LOG_TARGET, "Validation failed due to consensus rule: {}", msg); - Ok(TxStorageResponse::NotStoredConsensus) + TxStorageResponse::NotStoredConsensus }, Err(ValidationError::DuplicateKernelError(msg)) => { debug!( target: LOG_TARGET, "Validation failed due to already mined kernel: {}", msg ); - Ok(TxStorageResponse::NotStoredConsensus) + TxStorageResponse::NotStoredConsensus }, Err(e) => { warn!(target: LOG_TARGET, "Validation failed due to error: {}", e); - Ok(TxStorageResponse::NotStored) + TxStorageResponse::NotStored }, } } @@ -131,11 +131,10 @@ impl MempoolStorage { } // Insert a set of new transactions into the UTxPool. - fn insert_txs(&mut self, txs: Vec>) -> Result<(), MempoolError> { + fn insert_txs(&mut self, txs: Vec>) { for tx in txs { - self.insert(tx)?; + self.insert(tx); } - Ok(()) } /// Update the Mempool based on the received published block. @@ -168,10 +167,14 @@ impl MempoolStorage { failed_block.header.height, failed_block.hash().to_hex() ); - self.unconfirmed_pool + let txs = self + .unconfirmed_pool .remove_published_and_discard_deprecated_transactions(failed_block); + + // Reinsert them to validate if they are still valid + self.insert_txs(txs); self.unconfirmed_pool.compact(); - debug!(target: LOG_TARGET, "{}", self.stats()); + Ok(()) } @@ -190,12 +193,12 @@ impl MempoolStorage { // validation. This is important as invalid transactions that have not been mined yet may remain in the mempool // after a reorg. let removed_txs = self.unconfirmed_pool.drain_all_mempool_transactions(); - self.insert_txs(removed_txs)?; + self.insert_txs(removed_txs); // Remove re-orged transactions from reorg pool and re-submit them to the unconfirmed mempool let removed_txs = self .reorg_pool .remove_reorged_txs_and_discard_double_spends(removed_blocks, new_blocks); - self.insert_txs(removed_txs)?; + self.insert_txs(removed_txs); // Update the Mempool based on the received set of new blocks. for block in new_blocks { self.process_published_block(block)?; @@ -235,7 +238,7 @@ impl MempoolStorage { /// Will only return transactions that will fit into the given weight pub fn retrieve_and_revalidate(&mut self, total_weight: u64) -> Result>, MempoolError> { let results = self.unconfirmed_pool.fetch_highest_priority_txs(total_weight)?; - self.insert_txs(results.transactions_to_insert)?; + self.insert_txs(results.transactions_to_insert); Ok(results.retrieved_transactions) } diff --git a/base_layer/core/src/mempool/service/initializer.rs b/base_layer/core/src/mempool/service/initializer.rs index 83431f8f19..7ba3a67d9e 100644 --- a/base_layer/core/src/mempool/service/initializer.rs +++ b/base_layer/core/src/mempool/service/initializer.rs @@ -40,7 +40,7 @@ use tari_service_framework::{ use tokio::sync::mpsc; use crate::{ - base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle}, + base_node::comms_interface::LocalNodeCommsInterface, mempool::{ mempool::Mempool, service::{ @@ -135,7 +135,6 @@ impl ServiceInitializer for MempoolServiceInitializer { context.spawn_until_shutdown(move |handles| { let outbound_message_service = handles.expect_handle::().outbound_requester(); - let state_machine = handles.expect_handle::(); let base_node = handles.expect_handle::(); let streams = MempoolStreams { @@ -146,7 +145,7 @@ impl ServiceInitializer for MempoolServiceInitializer { request_receiver, }; debug!(target: LOG_TARGET, "Mempool service started"); - MempoolService::new(outbound_message_service, inbound_handlers, state_machine).start(streams) + MempoolService::new(outbound_message_service, inbound_handlers).start(streams) }); Ok(()) diff --git a/base_layer/core/src/mempool/service/service.rs b/base_layer/core/src/mempool/service/service.rs index 2eddbd3975..f747484615 100644 --- a/base_layer/core/src/mempool/service/service.rs +++ b/base_layer/core/src/mempool/service/service.rs @@ -36,10 +36,7 @@ use tari_utilities::hex::Hex; use tokio::{sync::mpsc, task}; use crate::{ - base_node::{ - comms_interface::{BlockEvent, BlockEventReceiver}, - StateMachineHandle, - }, + base_node::comms_interface::{BlockEvent, BlockEventReceiver}, mempool::service::{ error::MempoolServiceError, inbound_handlers::MempoolInboundHandlers, @@ -66,19 +63,13 @@ pub struct MempoolStreams { pub struct MempoolService { outbound_message_service: OutboundMessageRequester, inbound_handlers: MempoolInboundHandlers, - state_machine: StateMachineHandle, } impl MempoolService { - pub fn new( - outbound_message_service: OutboundMessageRequester, - inbound_handlers: MempoolInboundHandlers, - state_machine: StateMachineHandle, - ) -> Self { + pub fn new(outbound_message_service: OutboundMessageRequester, inbound_handlers: MempoolInboundHandlers) -> Self { Self { outbound_message_service, inbound_handlers, - state_machine, } } @@ -108,12 +99,20 @@ impl MempoolService { // Outbound tx messages from the OutboundMempoolServiceInterface Some((txn, excluded_peers)) = outbound_tx_stream.recv() => { - self.spawn_handle_outbound_tx(txn, excluded_peers); + let _res = handle_outbound_tx(&mut self.outbound_message_service, txn, excluded_peers).await.map_err(|e| + error!(target: LOG_TARGET, "Error sending outbound tx message: {}", e) + ); }, // Incoming transaction messages from the Comms layer Some(transaction_msg) = inbound_transaction_stream.next() => { - self.spawn_handle_incoming_tx(transaction_msg); + let result = handle_incoming_tx(&mut self.inbound_handlers, transaction_msg).await; + if let Err(e) = result { + error!( + target: LOG_TARGET, + "Failed to handle incoming transaction message: {:?}", e + ); + } } // Incoming local request messages from the LocalMempoolServiceInterface and other local services @@ -144,41 +143,6 @@ impl MempoolService { self.inbound_handlers.handle_request(request).await } - fn spawn_handle_outbound_tx(&self, tx: Arc, excluded_peers: Vec) { - let outbound_message_service = self.outbound_message_service.clone(); - task::spawn(async move { - let result = handle_outbound_tx(outbound_message_service, tx, excluded_peers).await; - if let Err(e) = result { - error!(target: LOG_TARGET, "Failed to handle outbound tx message {:?}", e); - } - }); - } - - fn spawn_handle_incoming_tx(&self, tx_msg: DomainMessage) { - // Determine if we are bootstrapped - let status_watch = self.state_machine.get_status_info_watch(); - - if !(*status_watch.borrow()).bootstrapped { - debug!( - target: LOG_TARGET, - "Transaction with Message {} from peer `{}` not processed while busy with initial sync.", - tx_msg.dht_header.message_tag, - tx_msg.source_peer.node_id.short_str(), - ); - return; - } - let inbound_handlers = self.inbound_handlers.clone(); - task::spawn(async move { - let result = handle_incoming_tx(inbound_handlers, tx_msg).await; - if let Err(e) = result { - error!( - target: LOG_TARGET, - "Failed to handle incoming transaction message: {:?}", e - ); - } - }); - } - fn spawn_handle_local_request( &self, request_context: RequestContext>, @@ -209,7 +173,7 @@ impl MempoolService { } async fn handle_incoming_tx( - mut inbound_handlers: MempoolInboundHandlers, + inbound_handlers: &mut MempoolInboundHandlers, domain_transaction_msg: DomainMessage, ) -> Result<(), MempoolServiceError> { let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg; @@ -236,7 +200,7 @@ async fn handle_incoming_tx( } async fn handle_outbound_tx( - mut outbound_message_service: OutboundMessageRequester, + outbound_message_service: &mut OutboundMessageRequester, tx: Arc, exclude_peers: Vec, ) -> Result<(), MempoolServiceError> { @@ -247,7 +211,13 @@ async fn handle_outbound_tx( exclude_peers, OutboundDomainMessage::new( &TariMessageType::NewTransaction, - proto::types::Transaction::try_from(tx).map_err(MempoolServiceError::ConversionError)?, + proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?, + ), + format!( + "Outbound mempool tx: {}", + tx.first_kernel_excess_sig() + .map(|s| s.get_signature().to_hex()) + .unwrap_or_else(|| "No kernels!".to_string()) ), ) .await; diff --git a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs index 3656ac51ad..bf59316dc8 100644 --- a/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs +++ b/base_layer/core/src/mempool/unconfirmed_pool/unconfirmed_pool.rs @@ -110,21 +110,21 @@ impl UnconfirmedPool { tx: Arc, dependent_outputs: Option>, transaction_weighting: &TransactionWeight, - ) -> Result<(), UnconfirmedPoolError> { + ) { if tx .body .kernels() .iter() .all(|k| self.txs_by_signature.contains_key(k.excess_sig.get_signature())) { - return Ok(()); + return; } let new_key = self.get_next_key(); let prioritized_tx = PrioritizedTransaction::new(new_key, transaction_weighting, tx, dependent_outputs); if self.tx_by_key.len() >= self.config.storage_capacity { if prioritized_tx.priority < *self.lowest_priority() { - return Ok(()); + return; } self.remove_lowest_priority_tx(); } @@ -143,8 +143,6 @@ impl UnconfirmedPool { "Inserted transaction {} into unconfirmed pool:", prioritized_tx ); self.tx_by_key.insert(new_key, prioritized_tx); - - Ok(()) } /// TThis will search the unconfirmed pool for the set of outputs and return true if all of them are found @@ -158,11 +156,10 @@ impl UnconfirmedPool { &mut self, txs: I, transaction_weighting: &TransactionWeight, - ) -> Result<(), UnconfirmedPoolError> { + ) { for tx in txs { - self.insert(tx, None, transaction_weighting)?; + self.insert(tx, None, transaction_weighting); } - Ok(()) } /// Check if a transaction is available in the UnconfirmedPool @@ -668,12 +665,10 @@ mod test { }); let tx_weight = TransactionWeight::latest(); - unconfirmed_pool - .insert_many( - [tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()], - &tx_weight, - ) - .unwrap(); + unconfirmed_pool.insert_many( + [tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()], + &tx_weight, + ); // Check that lowest priority tx was removed to make room for new incoming transactions assert!(unconfirmed_pool.has_tx_with_excess_sig(&tx1.body.kernels()[0].excess_sig)); assert!(!unconfirmed_pool.has_tx_with_excess_sig(&tx2.body.kernels()[0].excess_sig)); @@ -747,9 +742,7 @@ mod test { }); let tx_weight = TransactionWeight::latest(); - unconfirmed_pool - .insert_many(vec![tx1.clone(), tx2.clone(), tx3.clone()], &tx_weight) - .unwrap(); + unconfirmed_pool.insert_many(vec![tx1.clone(), tx2.clone(), tx3.clone()], &tx_weight); assert_eq!(unconfirmed_pool.len(), 3); let desired_weight = tx1.calculate_weight(&tx_weight) + @@ -779,12 +772,10 @@ mod test { storage_capacity: 10, weight_tx_skip_count: 3, }); - unconfirmed_pool - .insert_many( - vec![tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()], - &tx_weight, - ) - .unwrap(); + unconfirmed_pool.insert_many( + vec![tx1.clone(), tx2.clone(), tx3.clone(), tx4.clone(), tx5.clone()], + &tx_weight, + ); // utx6 should not be added to unconfirmed_pool as it is an unknown transactions that was included in the block // by another node @@ -829,19 +820,17 @@ mod test { storage_capacity: 10, weight_tx_skip_count: 3, }); - unconfirmed_pool - .insert_many( - vec![ - tx1.clone(), - tx2.clone(), - tx3.clone(), - tx4.clone(), - tx5.clone(), - tx6.clone(), - ], - &tx_weight, - ) - .unwrap(); + unconfirmed_pool.insert_many( + vec![ + tx1.clone(), + tx2.clone(), + tx3.clone(), + tx4.clone(), + tx5.clone(), + tx6.clone(), + ], + &tx_weight, + ); // The publishing of tx1 and tx3 will be double-spends and orphan tx5 and tx6 let published_block = create_orphan_block(0, vec![(*tx1).clone(), (*tx2).clone(), (*tx3).clone()], &consensus); @@ -885,7 +874,7 @@ mod test { Arc::new(tx3.clone()), Arc::new(tx4.clone()), ]; - unconfirmed_pool.insert_many(txns.clone(), &tx_weight).unwrap(); + unconfirmed_pool.insert_many(txns.clone(), &tx_weight); for txn in txns { for output in txn.as_ref().body.outputs() { @@ -967,9 +956,7 @@ mod test { let tx2 = Arc::new(tx2); let tx3 = Arc::new(tx3); let tx4 = Arc::new(tx4); - unconfirmed_pool - .insert_many(vec![tx1, tx2, tx3, tx4], &tx_weight) - .unwrap(); + unconfirmed_pool.insert_many(vec![tx1, tx2, tx3, tx4], &tx_weight); let stats = unconfirmed_pool.get_fee_per_gram_stats(1, 19500).unwrap(); assert_eq!(stats[0].order, 0); @@ -1007,7 +994,7 @@ mod test { let tx_weight = TransactionWeight::latest(); let mut unconfirmed_pool = UnconfirmedPool::new(UnconfirmedPoolConfig::default()); - unconfirmed_pool.insert_many(transactions, &tx_weight).unwrap(); + unconfirmed_pool.insert_many(transactions, &tx_weight); let stats = unconfirmed_pool.get_fee_per_gram_stats(2, 2000).unwrap(); assert_eq!(stats, expected_stats); diff --git a/base_layer/core/tests/mempool.rs b/base_layer/core/tests/mempool.rs index 74f4dfe3ac..30a7fa818f 100644 --- a/base_layer/core/tests/mempool.rs +++ b/base_layer/core/tests/mempool.rs @@ -846,6 +846,7 @@ async fn receive_and_propagate_transaction() { &TariMessageType::NewTransaction, proto::types::Transaction::try_from(tx).unwrap(), ), + "mempool tests".to_string(), ) .await .unwrap(); @@ -857,6 +858,7 @@ async fn receive_and_propagate_transaction() { &TariMessageType::NewTransaction, proto::types::Transaction::try_from(orphan).unwrap(), ), + "mempool tests".to_string(), ) .await .unwrap(); diff --git a/base_layer/p2p/src/services/liveness/service.rs b/base_layer/p2p/src/services/liveness/service.rs index 09f556dfe5..def15f5116 100644 --- a/base_layer/p2p/src/services/liveness/service.rs +++ b/base_layer/p2p/src/services/liveness/service.rs @@ -212,7 +212,11 @@ where debug!(target: LOG_TARGET, "Sending ping to peer '{}'", node_id.short_str(),); self.outbound_messaging - .send_direct_node_id(node_id, OutboundDomainMessage::new(&TariMessageType::PingPong, msg)) + .send_direct_node_id( + node_id, + OutboundDomainMessage::new(&TariMessageType::PingPong, msg), + "Send ping".to_string(), + ) .await .map_err(Into::::into)?; @@ -222,7 +226,11 @@ where async fn send_pong(&mut self, nonce: u64, dest: CommsPublicKey) -> Result<(), LivenessError> { let msg = PingPongMessage::pong_with_metadata(nonce, self.state.metadata().clone()); self.outbound_messaging - .send_direct(dest, OutboundDomainMessage::new(&TariMessageType::PingPong, msg)) + .send_direct( + dest, + OutboundDomainMessage::new(&TariMessageType::PingPong, msg), + "Sending pong".to_string(), + ) .await .map(|_| ()) .map_err(Into::into) @@ -302,7 +310,11 @@ where let msg = PingPongMessage::ping_with_metadata(self.state.metadata().clone()); self.state.add_inflight_ping(msg.nonce, peer.clone()); self.outbound_messaging - .send_direct_node_id(peer, OutboundDomainMessage::new(&TariMessageType::PingPong, msg)) + .send_direct_node_id( + peer, + OutboundDomainMessage::new(&TariMessageType::PingPong, msg), + "Start ping round".to_string(), + ) .await?; } diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index 34f5ffb205..7d498b139e 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -679,6 +679,7 @@ where .send_direct( self.dest_pubkey.clone(), OutboundDomainMessage::new(&TariMessageType::SenderPartialTransaction, proto_message.clone()), + "transaction send".to_string(), ) .await { diff --git a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs index 8bcfe73d10..66ca04aab2 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_finalized_transaction.rs @@ -114,6 +114,7 @@ pub async fn send_finalized_transaction_message_direct( &TariMessageType::TransactionFinalized, finalized_transaction_message.clone(), ), + "transaction finalized".to_string(), ) .await { diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs index 0983842521..5cac558ee4 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_cancelled.rs @@ -43,6 +43,7 @@ pub async fn send_transaction_cancelled_message( .send_direct( destination_public_key.clone(), OutboundDomainMessage::new(&TariMessageType::TransactionCancelled, proto_message.clone()), + "transaction cancelled".to_string(), ) .await?; diff --git a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs index 2e7bcb981e..9c81ba5255 100644 --- a/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs +++ b/base_layer/wallet/src/transaction_service/tasks/send_transaction_reply.rs @@ -95,6 +95,7 @@ pub async fn send_transaction_reply_direct( .send_direct( inbound_transaction.source_public_key.clone(), OutboundDomainMessage::new(&TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()), + "wallet transaction reply".to_string(), ) .await { diff --git a/comms/dht/examples/memory_net/utilities.rs b/comms/dht/examples/memory_net/utilities.rs index 7f33285172..35271742dd 100644 --- a/comms/dht/examples/memory_net/utilities.rs +++ b/comms/dht/examples/memory_net/utilities.rs @@ -275,6 +275,7 @@ pub async fn do_network_wide_propagation(nodes: &mut [TestNode], origin_node_ind OutboundEncryption::ClearText, vec![], OutboundDomainMessage::new(&0i32, PUBLIC_MESSAGE.to_string()), + "Memory net example".to_string(), ) .await .unwrap(); diff --git a/comms/dht/examples/propagation_stress.rs b/comms/dht/examples/propagation_stress.rs index e15d820315..865a5b45d1 100644 --- a/comms/dht/examples/propagation_stress.rs +++ b/comms/dht/examples/propagation_stress.rs @@ -118,7 +118,7 @@ async fn prompt(node: &CommsNode, dht: &Dht) -> anyhow::Result<()> { let msg = OutboundDomainMessage::new(&999, PropagationMessage::new(u32::try_from(i).unwrap(), opts.msg_size)); let states = match opts.send_method { SendMethod::Direct => outbound - .send_direct_node_id(opts.peer.node_id.clone(), msg) + .send_direct_node_id(opts.peer.node_id.clone(), msg, "Example stress".to_string()) .await .map(MessageSendStates::from)?, SendMethod::Propagated => { diff --git a/comms/dht/src/actor.rs b/comms/dht/src/actor.rs index 1cafa81270..bed2b99e9e 100644 --- a/comms/dht/src/actor.rs +++ b/comms/dht/src/actor.rs @@ -452,6 +452,7 @@ impl DhtActor { .closest(node_identity.node_id().clone(), vec![]) .with_destination(node_identity.public_key().clone().into()) .with_dht_message_type(DhtMessageType::Join) + .with_debug_info("Broadcast join".to_string()) .force_origin() .finish(), message, diff --git a/comms/dht/src/dht.rs b/comms/dht/src/dht.rs index f9a00d4387..5361665f42 100644 --- a/comms/dht/src/dht.rs +++ b/comms/dht/src/dht.rs @@ -358,6 +358,10 @@ impl Dht { S::Future: Send, { ServiceBuilder::new() + .layer(MessageLoggingLayer::new(format!( + "Pre Broadcast [{}]", + self.node_identity.node_id().short_str() + ))) .layer(outbound::BroadcastLayer::new( Arc::clone(&self.node_identity), self.dht_requester(), diff --git a/comms/dht/src/discovery/service.rs b/comms/dht/src/discovery/service.rs index 63a7009a85..b6aeef7d31 100644 --- a/comms/dht/src/discovery/service.rs +++ b/comms/dht/src/discovery/service.rs @@ -325,7 +325,7 @@ impl DhtDiscoveryService { }; debug!( target: LOG_TARGET, - "Sending Discovery message for peer public key '{}' with destination {}", dest_public_key, destination + "Sending Discovery message for peer public key '{}' with destination {}", &dest_public_key, destination ); self.outbound_requester @@ -333,6 +333,7 @@ impl DhtDiscoveryService { SendMessageParams::new() .broadcast(Vec::new()) .with_destination(destination) + .with_debug_info(format!("discover: {}", &dest_public_key)) .with_encryption(OutboundEncryption::EncryptFor(dest_public_key)) .with_dht_message_type(DhtMessageType::Discovery) .finish(), diff --git a/comms/dht/src/inbound/dht_handler/task.rs b/comms/dht/src/inbound/dht_handler/task.rs index c4c0e52f84..1760b47295 100644 --- a/comms/dht/src/inbound/dht_handler/task.rs +++ b/comms/dht/src/inbound/dht_handler/task.rs @@ -231,6 +231,7 @@ where S: Service origin_peer.node_id, source_peer.node_id.clone(), ]) + .with_debug_info("Propagating join message".to_string()) .with_dht_header(dht_header) .finish(), body.to_encoded_bytes(), @@ -352,6 +353,7 @@ where S: Service .send_message_no_header_no_wait( SendMessageParams::new() .direct_public_key(dest_public_key) + .with_debug_info("Sending discovery response".to_string()) .with_destination(NodeDestination::Unknown) .with_dht_message_type(DhtMessageType::DiscoveryResponse) .finish(), diff --git a/comms/dht/src/inbound/forward.rs b/comms/dht/src/inbound/forward.rs index 2bb455b67e..7ddd9e4fa7 100644 --- a/comms/dht/src/inbound/forward.rs +++ b/comms/dht/src/inbound/forward.rs @@ -217,20 +217,21 @@ where S: Service let mut send_params = SendMessageParams::new(); match (dest_node_id, is_saf_stored) { (Some(node_id), Some(true)) => { - debug!( - target: LOG_TARGET, - "Forwarding SAF message directly to node: {}, {}", node_id, dht_header.message_tag + let debug_info = format!( + "Forwarding SAF message directly to node: {}, {}", + node_id, dht_header.message_tag ); + debug!(target: LOG_TARGET, "{}", &debug_info); + send_params.with_debug_info(debug_info); send_params.direct_or_closest_connected(node_id, excluded_peers); }, _ => { - debug!( - target: LOG_TARGET, + let debug_info = format!( "Propagating SAF message for {}, propagating it. {}", - dht_header.destination, - dht_header.message_tag + dht_header.destination, dht_header.message_tag ); - + debug!(target: LOG_TARGET, "{}", debug_info); + send_params.with_debug_info(debug_info); send_params.propagate(dht_header.destination.clone(), excluded_peers); }, }; diff --git a/comms/dht/src/logging_middleware.rs b/comms/dht/src/logging_middleware.rs index 5789457095..9cfe176692 100644 --- a/comms/dht/src/logging_middleware.rs +++ b/comms/dht/src/logging_middleware.rs @@ -20,7 +20,12 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{borrow::Cow, fmt::Display, marker::PhantomData, task::Poll}; +use std::{ + borrow::Cow, + fmt::{Debug, Display}, + marker::PhantomData, + task::Poll, +}; use futures::task::Context; use log::*; @@ -75,7 +80,7 @@ impl<'a, S> MessageLoggingService<'a, S> { impl Service for MessageLoggingService<'_, S> where S: Service, - R: Display, + R: Display + Debug, { type Error = S::Error; type Future = S::Future; @@ -86,7 +91,7 @@ where } fn call(&mut self, msg: R) -> Self::Future { - trace!(target: LOG_TARGET, "{}{}", self.prefix_msg, msg); + debug!(target: LOG_TARGET, "{}{:?}", self.prefix_msg, msg); self.inner.call(msg) } } diff --git a/comms/dht/src/outbound/broadcast.rs b/comms/dht/src/outbound/broadcast.rs index 51c8dc37ab..8999d2fd41 100644 --- a/comms/dht/src/outbound/broadcast.rs +++ b/comms/dht/src/outbound/broadcast.rs @@ -262,6 +262,7 @@ where S: Service is_discovery_enabled, force_origin, dht_header, + debug_info: _, tag, } = params; @@ -582,7 +583,7 @@ mod test { }; #[runtime::test] - async fn send_message_flood() { + async fn test_send_message_flood() { let pk = CommsPublicKey::default(); let example_peer = Peer::new( pk.clone(), @@ -647,7 +648,7 @@ mod test { } #[runtime::test] - async fn send_message_direct_not_found() { + async fn test_send_message_direct_not_found() { // Test for issue https://github.com/tari-project/tari/issues/959 let pk = CommsPublicKey::default(); @@ -692,7 +693,7 @@ mod test { } #[runtime::test] - async fn send_message_direct_dht_discovery() { + async fn test_send_message_direct_dht_discovery() { let node_identity = NodeIdentity::random( &mut OsRng, "/ip4/127.0.0.1/tcp/9000".parse().unwrap(), diff --git a/comms/dht/src/outbound/message.rs b/comms/dht/src/outbound/message.rs index 237aa08afc..544287e090 100644 --- a/comms/dht/src/outbound/message.rs +++ b/comms/dht/src/outbound/message.rs @@ -191,12 +191,13 @@ impl fmt::Display for DhtOutboundMessage { }); write!( f, - "\n---- Outgoing message ---- \nSize: {} byte(s)\nType: {}\nPeer: {}\nHeader: {}\n{}\n----", + "\n---- Outgoing message ---- \nSize: {} byte(s)\nType: {}\nPeer: {}\nHeader: {}\n{}\n----\n{:?}\n", self.body.len(), self.dht_message_type, - self.destination_node_id, + self.destination, header_str, self.tag, + self.body ) } } diff --git a/comms/dht/src/outbound/message_params.rs b/comms/dht/src/outbound/message_params.rs index 1bd28ad766..2fb1aabf0e 100644 --- a/comms/dht/src/outbound/message_params.rs +++ b/comms/dht/src/outbound/message_params.rs @@ -68,6 +68,7 @@ pub struct FinalSendMessageParams { pub dht_message_type: DhtMessageType, pub dht_message_flags: DhtMessageFlags, pub dht_header: Option, + pub debug_info: Option, pub tag: Option, } @@ -82,6 +83,7 @@ impl Default for FinalSendMessageParams { force_origin: false, is_discovery_enabled: false, dht_header: None, + debug_info: None, tag: None, } } @@ -102,6 +104,11 @@ impl SendMessageParams { Default::default() } + pub fn with_debug_info(&mut self, debug_info: String) -> &mut Self { + self.params_mut().debug_info = Some(debug_info); + self + } + /// Set broadcast_strategy to DirectPublicKey pub fn direct_public_key(&mut self, public_key: CommsPublicKey) -> &mut Self { self.params_mut().broadcast_strategy = BroadcastStrategy::DirectPublicKey(Box::new(public_key)); diff --git a/comms/dht/src/outbound/requester.rs b/comms/dht/src/outbound/requester.rs index 945b64bc8b..0b1e38e9ee 100644 --- a/comms/dht/src/outbound/requester.rs +++ b/comms/dht/src/outbound/requester.rs @@ -54,12 +54,14 @@ impl OutboundMessageRequester { &mut self, dest_public_key: CommsPublicKey, message: OutboundDomainMessage, + source_info: String, ) -> Result where T: prost::Message, { self.send_message( SendMessageParams::new() + .with_debug_info(format!("Send direct to {} from {}", &dest_public_key, source_info)) .direct_public_key(dest_public_key) .with_discovery(true) .finish(), @@ -73,13 +75,17 @@ impl OutboundMessageRequester { &mut self, dest_node_id: NodeId, message: OutboundDomainMessage, + source_info: String, ) -> Result where T: prost::Message, { let resp = self .send_message( - SendMessageParams::new().direct_node_id(dest_node_id.clone()).finish(), + SendMessageParams::new() + .direct_node_id(dest_node_id.clone()) + .with_debug_info(format!("Send direct to {}. Source: {}", dest_node_id, source_info)) + .finish(), message, ) .await?; @@ -132,6 +138,7 @@ impl OutboundMessageRequester { encryption: OutboundEncryption, exclude_peers: Vec, message: OutboundDomainMessage, + source_info: String, ) -> Result where T: prost::Message, @@ -139,6 +146,7 @@ impl OutboundMessageRequester { self.send_message( SendMessageParams::new() .broadcast(exclude_peers) + .with_debug_info(format!("broadcast requested from {}", source_info)) .with_encryption(encryption) .with_destination(destination) .finish(), @@ -184,12 +192,14 @@ impl OutboundMessageRequester { encryption: OutboundEncryption, exclude_peers: Vec, message: OutboundDomainMessage, + source_info: String, ) -> Result where T: prost::Message, { self.send_message( SendMessageParams::new() + .with_debug_info(source_info) .flood(exclude_peers) .with_destination(destination) .with_encryption(encryption) diff --git a/infrastructure/libtor/Cargo.toml b/infrastructure/libtor/Cargo.toml index eb625febe2..a46d9283f4 100644 --- a/infrastructure/libtor/Cargo.toml +++ b/infrastructure/libtor/Cargo.toml @@ -16,4 +16,6 @@ multiaddr = { version = "0.14.0" } rand = "0.8" tempfile = "3.1.0" tor-hash-passwd = "1.0.1" -libtor = "46.9.0" + +[target.'cfg(unix)'.dependencies] +libtor = { version="46.9.0"} diff --git a/infrastructure/libtor/src/lib.rs b/infrastructure/libtor/src/lib.rs index 1327bac9ed..e632da607f 100644 --- a/infrastructure/libtor/src/lib.rs +++ b/infrastructure/libtor/src/lib.rs @@ -22,7 +22,8 @@ // For some inexplicable reason if we don't include extern crate then we get libtor not defined errors in this crate on // matrix builds +#[cfg(unix)] #[allow(unused_extern_crates)] extern crate libtor; - +#[cfg(unix)] pub mod tor;