Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update sending of Tx Reply & Finalise msgs to send via Store and Forward only if Direct fails #1876

Merged
merged 1 commit into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ where TBackend: TransactionBackend + Clone + 'static
e
});

// TODO Actually monitor the send status of this message
self.resources
match self
.resources
.outbound_message_service
.send_direct(
outbound_tx.destination_public_key.clone(),
Expand All @@ -345,59 +345,59 @@ where TBackend: TransactionBackend + Clone + 'static
),
)
.await
.map_err(|e| TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e)))?;

// TODO Monitor the final send result of this process
match self
.resources
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(
TariMessageType::TransactionFinalized,
finalized_transaction_message.clone(),
),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
tx_id,
tags,
);
self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone())
.await?
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \
unsuccessful and no messages were sent",
tx_id
);
Some(send_states) => {
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction Finalized (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
self.id,
self.dest_pubkey,
send_states[0].tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was successful",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Direct Send of Transaction Finalized message for TX_ID: {} was successful",
"Direct Send of Transaction Finalized message for TX_ID: {} was successful ({:?})",

self.id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.id
self.id,
send_states[0].tag

);
},
false => {
error!(
target: LOG_TARGET,
"Direct Send of Transaction Finalized message for TX_ID: {} was unsuccessful and \
no message was sent",
self.id
);
self.send_transaction_finalized_message_store_and_forward(
finalized_transaction_message.clone(),
)
.await?
},
}
} else {
error!(
target: LOG_TARGET,
"Transaction Finalized message Send Direct for TxID: {} failed", self.id
);
self.send_transaction_finalized_message_store_and_forward(finalized_transaction_message.clone())
.await?
}
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}",
return Err(TransactionServiceProtocolError::new(
self.id,
e
);
TransactionServiceError::from(e),
))
},
};
}

Ok(self.id)
}
Expand Down Expand Up @@ -662,6 +662,62 @@ where TBackend: TransactionBackend + Clone + 'static
}
Ok(true)
}

async fn send_transaction_finalized_message_store_and_forward(
&mut self,
msg: proto::TransactionFinalizedMessage,
) -> Result<(), TransactionServiceProtocolError>
{
match self
.resources
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&self.dest_pubkey).map_err(|e| {
TransactionServiceProtocolError::new(self.id, TransactionServiceError::from(e))
})?)),
OutboundEncryption::EncryptFor(Box::new(self.dest_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::TransactionFinalized, msg.clone()),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed", self.id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
self.id,
tags,
);
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction to Neighbours for Store and Forward for TX_ID: {} was \
unsuccessful and no messages were sent",
self.id
);
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Finalized Transaction (TxId: {}) to neighbours for Store and Forward failed: {:?}",
self.id,
e
);
},
};

Ok(())
}
}

struct SendResult {
Expand Down
119 changes: 108 additions & 11 deletions base_layer/wallet/src/transaction_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,22 +744,66 @@ where

let tx_id = recipient_reply.tx_id;
let proto_message: proto::RecipientSignedMessage = recipient_reply.into();
self.outbound_message_service
match self
.outbound_message_service
.send_direct(
source_pubkey.clone(),
OutboundEncryption::None,
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message.clone()),
)
.await?;

self.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)),
OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, proto_message),
)
.await?;
.await?
.resolve_ok()
.await
{
None => {
self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone())
debug!(
target: LOG_TARGET,
"Transaction Reply (TxId: {}) Direct Send to {} not possible, attempting Store and Forward",
tx_id,
source_pubkey,
);
self.send_transaction_reply_store_and_forward(tx_id, source_pubkey.clone(), proto_message.clone())

.await?;
},
Some(send_states) => {
if send_states.len() == 1 {
debug!(
target: LOG_TARGET,
"Transaction Reply (TxId: {}) Direct Send to {} queued with Message Tag: {:?}",
tx_id,
source_pubkey,
send_states[0].tag,
);
match send_states.wait_single().await {
true => {
info!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was successful", tx_id
);
},
false => {
error!(
target: LOG_TARGET,
"Direct Send of Transaction Reply message for TX_ID: {} was unsuccessful and no \
message was sent",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
message was sent",
message was sent, attempting Store and Forward",

tx_id
);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
)
.await?
},
}
} else {
error!(
target: LOG_TARGET,
"Transaction Reply message Send Direct for TxID: {} failed", tx_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Transaction Reply message Send Direct for TxID: {} failed", tx_id
"Transaction Reply message Send Direct for TxID: {} failed, attempting Store and Forward", tx_id

);
self.send_transaction_reply_store_and_forward(
tx_id,
source_pubkey.clone(),
proto_message.clone(),
)
.await?
}
},
}

// Otherwise add it to our pending transaction list and return reply
let inbound_transaction = InboundTransaction::new(
Expand Down Expand Up @@ -799,6 +843,59 @@ where
Ok(())
}

async fn send_transaction_reply_store_and_forward(
&mut self,
tx_id: TxId,
source_pubkey: CommsPublicKey,
msg: proto::RecipientSignedMessage,
) -> Result<(), TransactionServiceError>
{
match self
.outbound_message_service
.broadcast(
NodeDestination::NodeId(Box::new(NodeId::from_key(&source_pubkey)?)),
OutboundEncryption::EncryptFor(Box::new(source_pubkey.clone())),
vec![],
OutboundDomainMessage::new(TariMessageType::ReceiverPartialTransactionReply, msg),
)
.await
{
Ok(result) => match result.resolve_ok().await {
None => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed", tx_id
);
},
Some(tags) if !tags.is_empty() => {
info!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to Neighbours for Store and Forward successful with \
Message Tags: {:?}",
tx_id,
tags,
);
},
Some(_) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply to Neighbours for Store and Forward for TX_ID: {} was unsuccessful \
and no messages were sent",
tx_id
);
},
},
Err(e) => {
error!(
target: LOG_TARGET,
"Sending Transaction Reply (TxId: {}) to neighbours for Store and Forward failed: {:?}", tx_id, e
);
},
};

Ok(())
}

/// Accept a new transaction from a sender by handling a public SenderMessage. The reply is generated and sent.
/// # Arguments
/// 'source_pubkey' - The pubkey from which the message was sent and to which the reply will be sent.
Expand Down
Loading