From 3f9338e15b866fe056756919c25b5635eaad34e8 Mon Sep 17 00:00:00 2001 From: Philip Robinson Date: Tue, 19 May 2020 17:16:28 +0200 Subject: [PATCH] Update sending of Tx Reply & Finalise msgs to send SAF if Direct fails This PR updates the Transaction Service to first attempt to send the Transaction Reply and Transaction Finalise message Directly and only if that process fails to send the messages via SAF. Previously, both these message with sent in a Fire and Forget manner where both would be sent simultaneously without feedback. Logging has also been introduced for both these processes. --- .../protocols/transaction_send_protocol.rs | 148 ++++++---- .../wallet/src/transaction_service/service.rs | 119 +++++++- .../tests/transaction_service/service.rs | 260 ++++++++++++++++-- 3 files changed, 449 insertions(+), 78 deletions(-) diff --git a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs index da6b9cd4ee..39f8cf10eb 100644 --- a/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs +++ b/base_layer/wallet/src/transaction_service/protocols/transaction_send_protocol.rs @@ -333,8 +333,8 @@ where TBackend: TransactionBackend + Clone + 'static e }); - // TODO Actually monitor the send status of this message - self.resources + match self + .resources .outbound_message_service .send_direct( outbound_tx.destination_public_key.clone(), @@ -345,59 +345,59 @@ where TBackend: TransactionBackend + Clone + 'static ), ) .await - .map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?; - - // TODO Monitor the final send result of this process - match self - .resources - .outbound_message_service - .broadcast( - NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| { - TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) - })?)), - OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())), - vec![], - OutboundDomainMessage::new( - TariMessageType::TransactionFinalized, - finalized_transaction_message.clone(), - ), - ) - .await { Ok(result) => match result.resolve_ok().await { None => { - error!( - target: LOG_TARGET, - "Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id - ); - }, - Some(tags) if !tags.is_empty() => { - info!( - target: LOG_TARGET, - "Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \ - Message Tags: {:?}", - tx_id, - tags, - ); + self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone()) + .await? }, - Some(_) => { - error!( - target: LOG_TARGET, - "Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \ - unsuccessful and no messages were sent", - tx_id - ); + Some(send_states) => { + if send_states.len() == 1 { + debug!( + target: LOG_TARGET, + "Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {:?}", + self.id, + self.dest_pubkey, + send_states[0].tag, + ); + match send_states.wait_single().await { + true => { + info!( + target: LOG_TARGET, + "Direct Send of Transaction Finalized message for TX_ID: {} was successful", + self.id + ); + }, + false => { + error!( + target: LOG_TARGET, + "Direct Send of Transaction Finalized message for TX_ID: {} was unsuccessful and \ + no message was sent", + self.id + ); + self.send_transaction_finalized_message_store_and_forward( + finalized_transaction_message.clone(), + ) + .await? + }, + } + } else { + error!( + target: LOG_TARGET, + "Transaction Finalized message Send Direct for TxID: {} failed", self.id + ); + self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone()) + .await? + } }, }, Err(e) => { - error!( - target: LOG_TARGET, - "Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}", + return Err(TransactionServiceProtocolError::new( self.id, - e - ); + TransactionServiceError::from(e), + )) }, - }; + } Ok(self.id) } @@ -662,6 +662,62 @@ where TBackend: TransactionBackend + Clone + 'static } Ok(true) } + + async fn send_transaction_finalized_message_store_and_forward( + &mut self, + msg: proto::TransactionFinalizedMessage, + ) -> Result<(), TransactionServiceProtocolError> + { + match self + .resources + .outbound_message_service + .broadcast( + NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| { + TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)) + })?)), + OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())), + vec![], + OutboundDomainMessage::new(TariMessageType::TransactionFinalized, msg.clone()), + ) + .await + { + Ok(result) => match result.resolve_ok().await { + None => { + error!( + target: LOG_TARGET, + "Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id + ); + }, + Some(tags) if !tags.is_empty() => { + info!( + target: LOG_TARGET, + "Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \ + Message Tags: {:?}", + self.id, + tags, + ); + }, + Some(_) => { + error!( + target: LOG_TARGET, + "Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \ + unsuccessful and no messages were sent", + self.id + ); + }, + }, + Err(e) => { + error!( + target: LOG_TARGET, + "Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}", + self.id, + e + ); + }, + }; + + Ok(()) + } } struct SendResult { diff --git a/base_layer/wallet/src/transaction_service/service.rs b/base_layer/wallet/src/transaction_service/service.rs index ef4c7fedfb..8d303c83c0 100644 --- a/base_layer/wallet/src/transaction_service/service.rs +++ b/base_layer/wallet/src/transaction_service/service.rs @@ -744,22 +744,66 @@ where let tx_id = recipient_reply.tx_id; let proto_message: proto::RecipientSignedMessage = recipient_reply.into(); - self.outbound_message_service + match self + .outbound_message_service .send_direct( source_pubkey.clone(), OutboundEncryption::None, OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()), ) - .await?; - - self.outbound_message_service - .broadcast( - NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)), - OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())), - vec![], - OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message), - ) - .await?; + .await? + .resolve_ok() + .await + { + None => { + self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone()) + .await?; + }, + Some(send_states) => { + if send_states.len() == 1 { + debug!( + target: LOG_TARGET, + "Transaction Reply (TxId: {}) Direct Send to {} queued with Message Tag: {:?}", + tx_id, + source_pubkey, + send_states[0].tag, + ); + match send_states.wait_single().await { + true => { + info!( + target: LOG_TARGET, + "Direct Send of Transaction Reply message for TX_ID: {} was successful", tx_id + ); + }, + false => { + error!( + target: LOG_TARGET, + "Direct Send of Transaction Reply message for TX_ID: {} was unsuccessful and no \ + message was sent", + tx_id + ); + self.send_transaction_reply_store_and_forward( + tx_id, + source_pubkey.clone(), + proto_message.clone(), + ) + .await? + }, + } + } else { + error!( + target: LOG_TARGET, + "Transaction Reply message Send Direct for TxID: {} failed", tx_id + ); + self.send_transaction_reply_store_and_forward( + tx_id, + source_pubkey.clone(), + proto_message.clone(), + ) + .await? + } + }, + } // Otherwise add it to our pending transaction list and return reply let inbound_transaction = InboundTransaction::new( @@ -799,6 +843,59 @@ where Ok(()) } + async fn send_transaction_reply_store_and_forward( + &mut self, + tx_id: TxId, + source_pubkey: CommsPublicKey, + msg: proto::RecipientSignedMessage, + ) -> Result<(), TransactionServiceError> + { + match self + .outbound_message_service + .broadcast( + NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)), + OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())), + vec![], + OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, msg), + ) + .await + { + Ok(result) => match result.resolve_ok().await { + None => { + error!( + target: LOG_TARGET, + "Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed", tx_id + ); + }, + Some(tags) if !tags.is_empty() => { + info!( + target: LOG_TARGET, + "Sending Transaction Reply (TxId: {}) to Neighbours for Store and Forward successful with \ + Message Tags: {:?}", + tx_id, + tags, + ); + }, + Some(_) => { + error!( + target: LOG_TARGET, + "Sending Transaction Reply to Neighbours for Store and Forward for TX_ID: {} was unsuccessful \ + and no messages were sent", + tx_id + ); + }, + }, + Err(e) => { + error!( + target: LOG_TARGET, + "Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed: {:?}", tx_id, e + ); + }, + }; + + Ok(()) + } + /// Accept a new transaction from a sender by handling a public SenderMessage. The reply is generated and sent. /// # Arguments /// 'source_pubkey' - The pubkey from which the message was sent and to which the reply will be sent. diff --git a/base_layer/wallet/tests/transaction_service/service.rs b/base_layer/wallet/tests/transaction_service/service.rs index 389e06aa90..f6489237e9 100644 --- a/base_layer/wallet/tests/transaction_service/service.rs +++ b/base_layer/wallet/tests/transaction_service/service.rs @@ -912,10 +912,9 @@ fn finalize_tx_with_incorrect_pubkey(al runtime.block_on(alice_tx_sender.send(tx_message.clone())).unwrap(); alice_outbound_service - .wait_call_count(2, Duration::from_secs(10)) + .wait_call_count(1, Duration::from_secs(10)) .unwrap(); let (_, body) = alice_outbound_service.pop_call().unwrap(); - let _ = alice_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let recipient_reply: RecipientSignedMessage = envelope_body @@ -1020,10 +1019,9 @@ fn finalize_tx_with_missing_output(alic runtime.block_on(alice_tx_sender.send(tx_message.clone())).unwrap(); alice_outbound_service - .wait_call_count(2, Duration::from_secs(10)) + .wait_call_count(1, Duration::from_secs(10)) .unwrap(); let (_, body) = alice_outbound_service.pop_call().unwrap(); - let _ = alice_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let recipient_reply: RecipientSignedMessage = envelope_body @@ -1351,11 +1349,11 @@ fn transaction_mempool_broadcast() { ))) .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .expect("bob call wait 1"); let call = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // Burn the SAF version of the message + let envelope_body = EnvelopeBody::decode(&mut call.1.to_vec().as_slice()).unwrap(); let bob_tx_reply_msg1: RecipientSignedMessage = envelope_body .decode_part::(1) @@ -1394,11 +1392,10 @@ fn transaction_mempool_broadcast() { ))) .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .expect("Bob call wait 2"); let (_, body) = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // Burn the SAF version of the message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let bob_tx_reply_msg2: RecipientSignedMessage = envelope_body @@ -1849,10 +1846,9 @@ fn transaction_base_node_monitoring() { .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .unwrap(); let (_, body) = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let bob_tx_reply_msg1: RecipientSignedMessage = envelope_body @@ -1894,10 +1890,9 @@ fn transaction_base_node_monitoring() { ))) .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .unwrap(); let (_, body) = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let bob_tx_reply_msg2: RecipientSignedMessage = envelope_body @@ -2464,10 +2459,9 @@ fn transaction_cancellation_when_not_in_mempool() { ))) .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .unwrap(); let (_, body) = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let tx_reply_msg: RecipientSignedMessage = envelope_body @@ -2484,9 +2478,8 @@ fn transaction_cancellation_when_not_in_mempool() { ))) .unwrap(); - let _ = alice_outbound_service.wait_call_count(2, Duration::from_secs(60)); - let _ = alice_outbound_service.pop_call().unwrap(); // Burn finalize message - let _ = alice_outbound_service.pop_call().unwrap(); // burn SAF message + let _ = alice_outbound_service.wait_call_count(1, Duration::from_secs(60)); + let _ = alice_outbound_service.pop_call().unwrap(); // Burn finalize messageSAF message runtime.block_on(async { let mut delay = delay_for(Duration::from_secs(60)).fuse(); @@ -2914,10 +2907,9 @@ fn test_resend_of_tx_on_pong_event(back ))) .unwrap(); bob_outbound_service - .wait_call_count(2, Duration::from_secs(60)) + .wait_call_count(1, Duration::from_secs(60)) .unwrap(); let (_, body) = bob_outbound_service.pop_call().unwrap(); - let _ = bob_outbound_service.pop_call().unwrap(); // burn SAF message let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); let tx_reply_msg: RecipientSignedMessage = envelope_body @@ -2934,9 +2926,8 @@ fn test_resend_of_tx_on_pong_event(back ))) .unwrap(); - let _ = alice_outbound_service.wait_call_count(2, Duration::from_secs(60)); + let _ = alice_outbound_service.wait_call_count(1, Duration::from_secs(60)); let _ = alice_outbound_service.pop_call().unwrap(); // Burn finalize message - let _ = alice_outbound_service.pop_call().unwrap(); // burn SAF message for i in 1..=20 { let call_count = liveness_mock_state.call_count(); @@ -2984,3 +2975,230 @@ fn test_resend_of_tx_on_pong_event_sqlite_db() { test_resend_of_tx_on_pong_event(TransactionServiceSqliteDatabase::new(connection)); } + +#[test] +fn test_direct_vs_saf_send_of_tx_reply_and_finalize() { + let factories = CryptoFactories::default(); + let mut runtime = Runtime::new().unwrap(); + + let alice_node_identity = + NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); + + let bob_node_identity = + NodeIdentity::random(&mut OsRng, get_next_memory_address(), PeerFeatures::COMMUNICATION_NODE).unwrap(); + + let ( + mut alice_ts, + mut alice_output_manager, + alice_outbound_service, + mut _alice_tx_sender, + mut alice_tx_ack_sender, + _, + _, + _, + _, + _, + _, + ) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(5)), + ); + + let alice_total_available = 250000 * uT; + let (_utxo, uo) = make_input(&mut OsRng, alice_total_available, &factories.commitment); + runtime.block_on(alice_output_manager.add_output(uo)).unwrap(); + + let amount_sent = 10000 * uT; + + let tx_id = runtime + .block_on(alice_ts.send_transaction( + bob_node_identity.public_key().clone(), + amount_sent, + 100 * uT, + "Testing Message".to_string(), + )) + .unwrap(); + + alice_outbound_service + .wait_call_count(1, Duration::from_secs(60)) + .unwrap(); + + let (_, body) = alice_outbound_service.pop_call().unwrap(); + + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let tx_sender_msg: TransactionSenderMessage = envelope_body + .decode_part::(1) + .unwrap() + .unwrap() + .try_into() + .unwrap(); + let msg_tx_id = match tx_sender_msg.clone() { + TransactionSenderMessage::Single(s) => s.tx_id, + _ => { + assert!(false, "Transaction is the not a single rounder sender variant"); + 0 + }, + }; + assert_eq!(tx_id, msg_tx_id); + + // Test sending the Reply to a receiver with Direct and then with SAF and never both + let (_bob_ts, _, bob_outbound_service, mut bob_tx_sender, _, _, _, _, _, _, _) = setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); + + bob_outbound_service.set_behaviour(MockBehaviour { + direct: ResponseType::Queued, + broadcast: ResponseType::Failed, + }); + + runtime + .block_on(bob_tx_sender.send(create_dummy_message( + tx_sender_msg.clone().into(), + alice_node_identity.public_key(), + ))) + .unwrap(); + bob_outbound_service + .wait_call_count(1, Duration::from_secs(60)) + .unwrap(); + let (_, body) = bob_outbound_service.pop_call().unwrap(); + + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let _: RecipientSignedMessage = envelope_body + .decode_part::(1) + .unwrap() + .unwrap() + .try_into() + .unwrap(); + + for _ in 0..3 { + assert_eq!(bob_outbound_service.call_count(), 0, "Should be no more calls"); + runtime.block_on(async { delay_for(Duration::from_secs(5)).await }); + } + + let (_bob2_ts, _, bob2_outbound_service, mut bob2_tx_sender, _, _, _, _, _, _, _) = + setup_transaction_service_no_comms( + &mut runtime, + factories.clone(), + TransactionMemoryDatabase::new(), + Some(Duration::from_secs(20)), + ); + bob2_outbound_service.set_behaviour(MockBehaviour { + direct: ResponseType::Failed, + broadcast: ResponseType::Queued, + }); + + runtime + .block_on(bob2_tx_sender.send(create_dummy_message( + tx_sender_msg.into(), + alice_node_identity.public_key(), + ))) + .unwrap(); + + bob2_outbound_service + .wait_call_count(1, Duration::from_secs(60)) + .unwrap(); + let (_, body) = bob2_outbound_service.pop_call().unwrap(); + + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let tx_reply_msg: RecipientSignedMessage = envelope_body + .decode_part::(1) + .unwrap() + .unwrap() + .try_into() + .unwrap(); + + for _ in 0..3 { + assert_eq!(bob2_outbound_service.call_count(), 0, "Should be no more calls"); + runtime.block_on(async { delay_for(Duration::from_secs(5)).await }); + } + + // Test finalize is sent Direct Only. + alice_outbound_service.set_behaviour(MockBehaviour { + direct: ResponseType::Queued, + broadcast: ResponseType::Queued, + }); + + runtime + .block_on(alice_tx_ack_sender.send(create_dummy_message( + tx_reply_msg.into(), + bob_node_identity.public_key(), + ))) + .unwrap(); + + let _ = alice_outbound_service.wait_call_count(1, Duration::from_secs(60)); + let _ = alice_outbound_service.pop_call().unwrap(); + for _ in 0..3 { + assert_eq!(alice_outbound_service.call_count(), 0, "Should be no more calls"); + runtime.block_on(async { delay_for(Duration::from_secs(5)).await }); + } + + // Now to repeat sending so we can test the SAF send of the finalize message + let alice_total_available = 250000 * uT; + let (_utxo, uo) = make_input(&mut OsRng, alice_total_available, &factories.commitment); + runtime.block_on(alice_output_manager.add_output(uo)).unwrap(); + + let amount_sent = 20000 * uT; + + let _tx_id2 = runtime + .block_on(alice_ts.send_transaction( + bob_node_identity.public_key().clone(), + amount_sent, + 100 * uT, + "Testing Message".to_string(), + )) + .unwrap(); + + alice_outbound_service + .wait_call_count(1, Duration::from_secs(60)) + .unwrap(); + + let (_, body) = alice_outbound_service.pop_call().unwrap(); + + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let tx_sender_msg: TransactionSenderMessage = envelope_body + .decode_part::(1) + .unwrap() + .unwrap() + .try_into() + .unwrap(); + + runtime + .block_on(bob_tx_sender.send(create_dummy_message( + tx_sender_msg.into(), + alice_node_identity.public_key(), + ))) + .unwrap(); + + bob_outbound_service + .wait_call_count(1, Duration::from_secs(60)) + .unwrap(); + let (_, body) = bob_outbound_service.pop_call().unwrap(); + + let envelope_body = EnvelopeBody::decode(body.to_vec().as_slice()).unwrap(); + let tx_reply_msg: RecipientSignedMessage = envelope_body + .decode_part::(1) + .unwrap() + .unwrap() + .try_into() + .unwrap(); + + alice_outbound_service.set_behaviour(MockBehaviour { + direct: ResponseType::Queued, + broadcast: ResponseType::Queued, + }); + + runtime + .block_on(alice_tx_ack_sender.send(create_dummy_message( + tx_reply_msg.into(), + bob_node_identity.public_key(), + ))) + .unwrap(); + + // Should be 2 messages sent, Direct and SAF + let _ = alice_outbound_service.wait_call_count(2, Duration::from_secs(60)); +}