Skip to content

Commit

Permalink
Update sending of Tx Reply & Finalise msgs to send SAF if Direct fails
Browse files Browse the repository at this point in the history
This PR updates the Transaction Service to first attempt to send the Transaction Reply and Transaction Finalise message Directly and only if that process fails to send the messages via SAF. Previously, both these message with sent in a Fire and Forget manner where both would be sent simultaneously without feedback.

Logging has also been introduced for both these processes.
  • Loading branch information
philipr-za committed May 15, 2020
1 parent 0d60fff commit 0a77747
Show file tree
Hide file tree
Showing 3 changed files with 449 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ where TBackend: TransactionBackend + Clone + 'static
e
});

// TODO Actually monitor the send status of this message
self.resources
match self
.resources
.outbound_message_service
.send_direct(
outbound_tx.destination_public_key.clone(),
Expand All @@ -345,59 +345,59 @@ where TBackend: TransactionBackend + Clone + 'static
),
)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

// TODO Monitor the final send result of this process
match self
.resources
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(
TariMessageType::TransactionFinalized,
finalized_transaction_message.clone(),
),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
tx_id,
tags,
);
self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone())
.await?
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \
unsuccessful and no messages were sent",
tx_id
);
Some(send_states) => {
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
self.id,
self.dest_pubkey,
send_states[0].tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was successful",
self.id
);
},
false => {
error!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was unsuccessful and \
no message was sent",
self.id
);
self.send_transaction_finalized_message_store_and_forward(
finalized_transaction_message.clone(),
)
.await?
},
}
} else {
error!(
target: LOG_TARGET,
"Transaction Finalized message Send Direct for TxID: {} failed", self.id
);
self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone())
.await?
}
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}",
return Err(TransactionServiceProtocolError::new(
self.id,
e
);
TransactionServiceError::from(e),
))
},
};
}

Ok(self.id)
}
Expand Down Expand Up @@ -662,6 +662,62 @@ where TBackend: TransactionBackend + Clone + 'static
}
Ok(true)
}

async fn send_transaction_finalized_message_store_and_forward(
&mut self,
msg: proto::TransactionFinalizedMessage,
) -> Result<(), TransactionServiceProtocolError>
{
match self
.resources
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::TransactionFinalized, msg.clone()),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
self.id,
tags,
);
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \
unsuccessful and no messages were sent",
self.id
);
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}",
self.id,
e
);
},
};

Ok(())
}
}

struct SendResult {
Expand Down
119 changes: 108 additions & 11 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,22 +744,66 @@ where

let tx_id = recipient_reply.tx_id;
let proto_message: proto::RecipientSignedMessage = recipient_reply.into();
self.outbound_message_service
match self
.outbound_message_service
.send_direct(
source_pubkey.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()),
)
.await?;

self.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)),
OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message),
)
.await?;
.await?
.resolve_ok()
.await
{
None => {
self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone())
.await?;
},
Some(send_states) => {
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction Reply (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
tx_id,
source_pubkey,
send_states[0].tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was successful", tx_id
);
},
false => {
error!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was unsuccessful and no \
message was sent",
tx_id
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
)
.await?
},
}
} else {
error!(
target: LOG_TARGET,
"Transaction Reply message Send Direct for TxID: {} failed", tx_id
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
)
.await?
}
},
}

// Otherwise add it to our pending transaction list and return reply
let inbound_transaction = InboundTransaction::new(
Expand Down Expand Up @@ -799,6 +843,59 @@ where
Ok(())
}

async fn send_transaction_reply_store_and_forward(
&mut self,
tx_id: TxId,
source_pubkey: CommsPublicKey,
msg: proto::RecipientSignedMessage,
) -> Result<(), TransactionServiceError>
{
match self
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)),
OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, msg),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed", tx_id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
tx_id,
tags,
);
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply to Neighbours for Store and Forward for TX_ID: {} was unsuccessful \
and no messages were sent",
tx_id
);
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed: {:?}", tx_id, e
);
},
};

Ok(())
}

/// Accept a new transaction from a sender by handling a public SenderMessage. The reply is generated and sent.
/// # Arguments
/// 'source_pubkey' - The pubkey from which the message was sent and to which the reply will be sent.
Expand Down
Loading

0 comments on commit 0a77747

Please sign in to comment.