Skip to content

Commit

Permalink
Do not stall on lost transaction (#1903)
Browse files Browse the repository at this point in the history
* `select_nonces_to_deliver` is no longer `&mut self`

* reset submitted nonces on lost transaction

* clippy

* fmt
  • Loading branch information
svyatonik committed Feb 24, 2023
1 parent 2d83d63 commit db5168f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 82 deletions.
53 changes: 0 additions & 53 deletions relays/messages/src/message_lane_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,58 +1017,6 @@ pub(crate) mod tests {
assert_eq!(result.submitted_messages_proofs, vec![(1..=1, None)],);
}

#[test]
fn message_lane_loop_is_able_to_recover_from_race_stall() {
// with this configuration, both source and target clients will lose their transactions =>
// reconnect will happen
let (source_exit_sender, exit_receiver) = unbounded();
let target_exit_sender = source_exit_sender.clone();
let result = run_loop_test(
Arc::new(Mutex::new(TestClientData {
source_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
source_latest_generated_nonce: 1,
source_tracked_transaction_status: TrackedTransactionStatus::Lost,
target_state: ClientState {
best_self: HeaderId(0, 0),
best_finalized_self: HeaderId(0, 0),
best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
actual_best_finalized_peer_at_best_self: Some(HeaderId(0, 0)),
},
target_latest_received_nonce: 0,
target_tracked_transaction_status: TrackedTransactionStatus::Lost,
..Default::default()
})),
Arc::new(move |data: &mut TestClientData| {
if data.is_source_reconnected {
data.source_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
source_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
Arc::new(move |data: &mut TestClientData| {
if data.is_target_reconnected {
data.target_tracked_transaction_status =
TrackedTransactionStatus::Finalized(Default::default());
}
if data.is_source_reconnected && data.is_target_reconnected {
target_exit_sender.unbounded_send(()).unwrap();
}
}),
Arc::new(|_| {}),
exit_receiver.into_future().map(|(_, _)| ()),
);

assert!(result.is_source_reconnected);
}

#[test]
fn message_lane_loop_is_able_to_recover_from_unsuccessful_transaction() {
// with this configuration, both source and target clients will mine their transactions, but
Expand Down Expand Up @@ -1146,7 +1094,6 @@ pub(crate) mod tests {
exit_receiver.into_future().map(|(_, _)| ()),
);

assert!(result.is_source_reconnected);
assert_eq!(result.submitted_messages_proofs.len(), 2);
assert_eq!(result.submitted_messages_receiving_proofs.len(), 2);
}
Expand Down
19 changes: 10 additions & 9 deletions relays/messages/src/message_race_delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,16 @@ impl<P: MessageLane, SC, TC> std::fmt::Debug for MessageDeliveryStrategy<P, SC,

impl<P: MessageLane, SC, TC> MessageDeliveryStrategy<P, SC, TC> {
/// Returns total weight of all undelivered messages.
fn total_queued_dispatch_weight(&self) -> Weight {
fn dispatch_weight_for_range(&self, range: &RangeInclusive<MessageNonce>) -> Weight {
self.strategy
.source_queue()
.iter()
.flat_map(|(_, range)| range.values().map(|details| details.dispatch_weight))
.flat_map(|(_, subrange)| {
subrange
.iter()
.filter(|(nonce, _)| range.contains(nonce))
.map(|(_, details)| details.dispatch_weight)
})
.fold(Weight::zero(), |total, weight| total.saturating_add(weight))
}
}
Expand Down Expand Up @@ -424,7 +429,7 @@ where
}

async fn select_nonces_to_deliver(
&mut self,
&self,
race_state: RaceState<SourceHeaderIdOf<P>, TargetHeaderIdOf<P>, P::MessagesProof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)> {
let best_finalized_source_header_id_at_best_target =
Expand Down Expand Up @@ -526,7 +531,6 @@ where

let maximal_source_queue_index =
self.strategy.maximal_available_source_queue_index(race_state)?;
let previous_total_dispatch_weight = self.total_queued_dispatch_weight();
let source_queue = self.strategy.source_queue();

let reference = RelayMessagesBatchReference {
Expand All @@ -544,10 +548,7 @@ where

let range_begin = source_queue[0].1.begin();
let selected_nonces = range_begin..=range_end;
self.strategy.remove_le_nonces_from_source_queue(range_end);

let new_total_dispatch_weight = self.total_queued_dispatch_weight();
let dispatch_weight = previous_total_dispatch_weight - new_total_dispatch_weight;
let dispatch_weight = self.dispatch_weight_for_range(&selected_nonces);

Some((
selected_nonces,
Expand Down Expand Up @@ -707,7 +708,7 @@ mod tests {

#[async_std::test]
async fn message_delivery_strategy_selects_messages_to_deliver() {
let (state, mut strategy) = prepare_strategy();
let (state, strategy) = prepare_strategy();

// both sides are ready to relay new messages
assert_eq!(
Expand Down
28 changes: 17 additions & 11 deletions relays/messages/src/message_race_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ pub trait RaceStrategy<SourceHeaderId, TargetHeaderId, Proof>: Debug {
/// data) from source to target node.
/// Additionally, parameters required to generate proof are returned.
async fn select_nonces_to_deliver(
&mut self,
&self,
race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>,
) -> Option<(RangeInclusive<MessageNonce>, Self::ProofParameters)>;
}
Expand All @@ -234,6 +234,13 @@ pub struct RaceState<SourceHeaderId, TargetHeaderId, Proof> {
pub nonces_submitted: Option<RangeInclusive<MessageNonce>>,
}

impl<SourceHeaderId, TargetHeaderId, Proof> RaceState<SourceHeaderId, TargetHeaderId, Proof> {
/// Reset `nonces_submitted` to `None`.
fn reset_submitted(&mut self) {
self.nonces_submitted = None;
}
}

/// Run race loop until connection with target or source node is lost.
pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_source: SC,
Expand Down Expand Up @@ -460,7 +467,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
(TrackedTransactionStatus::Finalized(at_block), Some(nonces_submitted)) => {
// our transaction has been mined, but was it successful or not? let's check the best
// nonce at the target node.
race_target.nonces(at_block, false)
let _ = race_target.nonces(at_block, false)
.await
.map_err(|e| format!("failed to read nonces from target node: {e:?}"))
.and_then(|(_, nonces_at_target)| {
Expand All @@ -477,26 +484,26 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
.map_err(|e| {
log::error!(
target: "bridge",
"{} -> {} race has stalled. Transaction failed: {}. Going to restart",
"{} -> {} race transaction failed: {}",
P::source_name(),
P::target_name(),
e,
);

FailedClient::Both
})?;
race_state.reset_submitted();
});
},
(TrackedTransactionStatus::Lost, _) => {
log::warn!(
target: "bridge",
"{} -> {} race has stalled. State: {:?}. Strategy: {:?}",
"{} -> {} race transaction has been lost. State: {:?}. Strategy: {:?}",
P::source_name(),
P::target_name(),
race_state,
strategy,
);

return Err(FailedClient::Both);
race_state.reset_submitted();
},
_ => (),
}
Expand Down Expand Up @@ -531,8 +538,7 @@ pub async fn run<P: MessageRace, SC: SourceClient<P>, TC: TargetClient<P>>(
race_state.clone()
};

let nonces_to_deliver =
select_nonces_to_deliver(expected_race_state, &mut strategy).await;
let nonces_to_deliver = select_nonces_to_deliver(expected_race_state, &strategy).await;
let best_at_source = strategy.best_at_source();

if let Some((at_block, nonces_range, proof_parameters)) = nonces_to_deliver {
Expand Down Expand Up @@ -665,7 +671,7 @@ where

async fn select_nonces_to_deliver<SourceHeaderId, TargetHeaderId, Proof, Strategy>(
race_state: RaceState<SourceHeaderId, TargetHeaderId, Proof>,
strategy: &mut Strategy,
strategy: &Strategy,
) -> Option<(SourceHeaderId, RangeInclusive<MessageNonce>, Strategy::ProofParameters)>
where
SourceHeaderId: Clone,
Expand Down Expand Up @@ -723,7 +729,7 @@ mod tests {

// the proof will be generated on source, but using BEST_AT_TARGET block
assert_eq!(
select_nonces_to_deliver(race_state, &mut strategy).await,
select_nonces_to_deliver(race_state, &strategy).await,
Some((HeaderId(BEST_AT_TARGET, BEST_AT_TARGET), 6..=10, (),))
);
}
Expand Down
18 changes: 9 additions & 9 deletions relays/messages/src/message_race_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl<
}

/// Remove all nonces that are less than or equal to given nonce from the source queue.
pub fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
fn remove_le_nonces_from_source_queue(&mut self, nonce: MessageNonce) {
while let Some((queued_at, queued_range)) = self.source_queue.pop_front() {
if let Some(range_to_requeue) = queued_range.greater_than(nonce) {
self.source_queue.push_front((queued_at, range_to_requeue));
Expand Down Expand Up @@ -168,12 +168,12 @@ impl<
SourceNoncesRange,
Proof,
> where
SourceHeaderHash: Clone + Debug + Send,
SourceHeaderNumber: Clone + Ord + Debug + Send,
SourceNoncesRange: NoncesRange + Debug + Send,
TargetHeaderHash: Debug + Send,
TargetHeaderNumber: Debug + Send,
Proof: Debug + Send,
SourceHeaderHash: Clone + Debug + Send + Sync,
SourceHeaderNumber: Clone + Ord + Debug + Send + Sync,
SourceNoncesRange: NoncesRange + Debug + Send + Sync,
TargetHeaderHash: Debug + Send + Sync,
TargetHeaderNumber: Debug + Send + Sync,
Proof: Debug + Send + Sync,
{
type SourceNoncesRange = SourceNoncesRange;
type ProofParameters = ();
Expand Down Expand Up @@ -284,14 +284,15 @@ impl<
Proof,
>,
) {
self.remove_le_nonces_from_source_queue(nonces.latest_nonce); // TODO: does it means that we'll try to submit old nonces in next tx???
self.best_target_nonce = Some(std::cmp::max(
self.best_target_nonce.unwrap_or(nonces.latest_nonce),
nonces.latest_nonce,
));
}

async fn select_nonces_to_deliver(
&mut self,
&self,
race_state: RaceState<
HeaderId<SourceHeaderHash, SourceHeaderNumber>,
HeaderId<TargetHeaderHash, TargetHeaderNumber>,
Expand All @@ -301,7 +302,6 @@ impl<
let maximal_source_queue_index = self.maximal_available_source_queue_index(race_state)?;
let range_begin = self.source_queue[0].1.begin();
let range_end = self.source_queue[maximal_source_queue_index].1.end();
self.remove_le_nonces_from_source_queue(range_end);
Some((range_begin..=range_end, ()))
}
}
Expand Down

0 comments on commit db5168f

Please sign in to comment.