Skip to content

Commit

Permalink
Return Pending rather than loop around if no new finalized hash in su…
Browse files Browse the repository at this point in the history
…bmit_transaction (#1378)

* Return Pending rather than loop around if no new finalized hash in submit_transaction

* uptodate => up-to-date

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>

---------

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
jsdw and niklasad1 authored Jan 18, 2024
1 parent 6b065eb commit 0f48d54
Showing 1 changed file with 38 additions and 58 deletions.
96 changes: 38 additions & 58 deletions subxt/src/backend/unstable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,61 +480,43 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// with chainHead_follow.
let mut finalized_hash: Option<T::Hash> = None;

// Monitor usage to help track down an obscure issue.
// Record the start time so that we can time out if things appear to take too long.
let start_instant = instant::Instant::now();
let mut last_seen_follow_event_str = None;
let mut last_seen_follow_event_time = None;
let mut last_seen_tx_event_str = None;
let mut last_seen_tx_event_time = None;
let mut num_polls: usize = 0;
let mut num_loops: usize = 0;

macro_rules! bail_with_stats {
($msg:literal) => {{
let msg = $msg;
let err = format!(
"submit_transaction error: {msg}. Please raise an issue. Details: \
start_time={start_instant:?}, \
num_polls={num_polls}, \
num_loops={num_loops}, \
last_follow_event_time={last_seen_follow_event_time:?}, \
last_follow_event={last_seen_follow_event_str:?}, \
last_tx_event_time={last_seen_tx_event_time:?}, \
last_tx_event={last_seen_tx_event_str:?}",
);
return Poll::Ready(Some(Err(Error::Other(err))));
}};
}

// A quick helper to return a generic error.
let err_other = |s: &str| Some(Err(Error::Other(s.into())));

// Now we can attempt to associate tx events with pinned blocks.
let tx_stream = futures::stream::poll_fn(move |cx| {
num_polls = num_polls.saturating_add(1);

loop {
num_loops = num_loops.saturating_add(1);

// Bail early if no more tx events; we don't want to keep polling for pinned blocks.
// Bail early if we're finished; nothing else to do.
if done {
return Poll::Ready(None);
}

// Bail if we exceed 4 mins; something very likely went wrong. We'll remove this
// once we get to the bottom of a spurious error that causes this to hang.
// Bail if we exceed 4 mins; something very likely went wrong.
if start_instant.elapsed().as_secs() > 240 {
bail_with_stats!("finalized block expected by now");
return Poll::Ready(err_other(
"Timeout waiting for the transaction to be finalized",
));
}

if let Poll::Ready(maybe_follow_event) = seen_blocks_sub.poll_next_unpin(cx) {
// Bail if we get back `None`, and we didn't previously see a `Stop`, which would be
// caught first below. Something unexpected has happened here.
let Some(follow_event) = maybe_follow_event else {
bail_with_stats!("chainHead_follow stream ended unexpectedly");
};

last_seen_follow_event_str = Some(format!("{follow_event:?}"));
last_seen_follow_event_time = Some(instant::Instant::now());

match follow_event {
// Poll for a follow event, and error if the stream has unexpectedly ended.
let follow_ev_poll = match seen_blocks_sub.poll_next_unpin(cx) {
Poll::Ready(None) => {
return Poll::Ready(err_other("chainHead_follow stream ended unexpectedly"))
}
Poll::Ready(Some(follow_ev)) => Poll::Ready(follow_ev),
Poll::Pending => Poll::Pending,
};
let follow_ev_is_pending = follow_ev_poll.is_pending();

// If there was a follow event, then handle it and loop around to see if there are more.
// We want to buffer follow events until we hit Pending, so that we are as up-to-date as possible
// for when we see a BestBlockChanged event, so that we have the best change of already having
// seen the block that it mentions and returning a proper pinned block.
if let Poll::Ready(follow_ev) = follow_ev_poll {
match follow_ev {
FollowEvent::NewBlock(ev) => {
// Optimization: once we have a `finalized_hash`, we only care about finalized
// block refs now and can avoid bothering to save new blocks.
Expand All @@ -558,7 +540,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
// in which we may lose the finaliuzed block that the TX is in. For now, just error if
// this happens, to prevent the case in which we never see a finalized block and wait
// forever.
return Poll::Ready(Some(Err(Error::Other("chainHead_follow emitted 'stop' event during transaction submission".into()))));
return Poll::Ready(err_other("chainHead_follow emitted 'stop' event during transaction submission"));
}
_ => {}
}
Expand All @@ -577,29 +559,27 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
};
return Poll::Ready(Some(Ok(ev)));
} else {
// Keep waiting for more finalized blocks until we find it (get rid of any other block refs
// now, since none of them were what we were looking for anyway).
// Not found it! If follow ev is pending, then return pending here and wait for
// a new one to come in, else loop around and see if we get another one immediately.
seen_blocks.clear();
continue;
if follow_ev_is_pending {
return Poll::Pending;
} else {
continue;
}
}
}

// Otherwise, we are still watching for tx events:
let ev = match tx_progress.poll_next_unpin(cx) {
// If we don't have a finalized block yet, we keep polling for tx progress events.
let tx_progress_ev = match tx_progress.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
done = true;
return Poll::Ready(None);
}
Poll::Ready(None) => return Poll::Ready(err_other("No more transaction progress events, but we haven't seen a Finalized one yet")),
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(ev))) => ev,
};

last_seen_tx_event_str = Some(format!("{ev:?}"));
last_seen_tx_event_time = Some(instant::Instant::now());

// When we get one, map it to the correct format (or for finalized ev, wait for the pinned block):
let ev = match ev {
let tx_progress_ev = match tx_progress_ev {
rpc_methods::TransactionStatus::Finalized { block } => {
// We'll wait until we have seen this hash, to try to guarantee
// that when we return this event, the corresponding block is
Expand Down Expand Up @@ -637,7 +617,7 @@ impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
}
rpc_methods::TransactionStatus::Validated => TransactionStatus::Validated,
};
return Poll::Ready(Some(Ok(ev)));
return Poll::Ready(Some(Ok(tx_progress_ev)));
}
});

Expand Down

0 comments on commit 0f48d54

Please sign in to comment.