Skip to content

feat: replace TxPoller polling with SSE streaming#259

Open
Evalir wants to merge 3 commits intomainfrom
evalir/eop/sse-tx-poller
Open

feat: replace TxPoller polling with SSE streaming#259
Evalir wants to merge 3 commits intomainfrom
evalir/eop/sse-tx-poller

Conversation

@Evalir
Copy link
Copy Markdown
Member

@Evalir Evalir commented Apr 2, 2026

Summary

Replaces the 1s timer-based polling loop in TxPoller with SSE streaming for real-time transaction delivery from the tx-pool. The new task lifecycle:

  1. Startup: full paginated fetch of all transactions currently in the cache
  2. Steady state: SSE stream (/transactions/feed) pushes new transactions as they arrive — no more redundant refetches
  3. Block env change: full refetch to ensure consistency (covers any items the SSE stream may have missed)

On SSE disconnect or error, the poller reconnects with exponential backoff (1s initial, doubling up to 30s cap) and does a full refetch to cover the gap. Backoff resets on each successfully received transaction.

Changes

  • Cargo.toml: enable sse feature on init4-bin-base (transitively enables signet-tx-cache/sse)
  • src/tasks/cache/tx.rs: rewrite TxPoller — replace poll loop with full_fetch() + subscribe() + select! over SSE items and block env changes. Add reconnect() with exponential backoff. Remove poll_interval_ms, poll_duration(), Default impl.
  • src/tasks/cache/system.rs: pass block_env watch receiver to TxPoller::new()
  • tests/tx_poller_test.rs: update integration test to use TxCache directly (no more check_tx_cache() method)

BundlePoller is unchanged — the /bundles/feed server endpoint is not yet available.

Test plan

  • make clippy passes clean
  • make test — all 8 unit tests pass, integration tests correctly ignored
  • Manual test against tx-pool: verify SSE subscription log, real-time tx delivery, refetch on block env change, reconnect with backoff on disconnect

🤖 Generated with Claude Code

Evalir and others added 2 commits April 2, 2026 19:52
Switch TxPoller from 1s timer-based polling to SSE streaming for
real-time transaction delivery. The new lifecycle:
1. Full fetch of all transactions at startup
2. SSE stream for real-time new transaction delivery
3. Full refetch on each block environment change

Adds exponential backoff (1s-30s) on SSE reconnection to prevent
tight loops when the endpoint is unavailable.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Expand tokio import for nightly rustfmt, remove unresolved
`CacheTask` rustdoc link.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Evalir Evalir marked this pull request as ready for review April 15, 2026 16:07
Race the backoff sleep against envs.changed() so a block env change
arriving during reconnect cuts the sleep short, instead of buffering
up to 30s while the simulator operates on a stale cache.

Also replace the nested let-else + unwrap_err in the SSE arm with a
single match — no behavior change, drops the double-unwrap.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Member Author

Evalir commented Apr 15, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

Comment thread src/tasks/cache/tx.rs
loop {
tokio::select! {
item = sse_stream.next() => {
match item {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too nested

Comment thread src/tasks/cache/tx.rs
backoff: &mut Duration,
) -> SseStream {
tokio::select! {
_ = time::sleep(*backoff) => {}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be biased with an explanation of the choice of bias

Comment thread src/tasks/cache/tx.rs
/// stream on connection failure so the caller can handle reconnection
/// uniformly.
async fn subscribe(&self) -> SseStream {
match self.tx_cache.subscribe_transactions().await {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewrite functionally

Comment thread src/tasks/cache/tx.rs
_ = self.envs.changed() => {}
}
*backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF);
self.full_fetch(outbound).await;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be joined, no?

Comment thread src/tasks/cache/tx.rs
// full_fetch below serves the same purpose the env arm would have.
_ = self.envs.changed() => {}
}
*backoff = (*backoff * 2).min(Self::MAX_RECONNECT_BACKOFF);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love putting the exponential backoff in-line instead of using an existing implementation, or having it be an unbounded number of attempts. at what point is a failure deemed permanent?

Comment thread src/tasks/cache/tx.rs
pub fn new() -> Self {
Self::new_with_poll_interval_ms(POLL_INTERVAL_MS)
}
const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

top of file instead of assoc consts?

Comment thread src/tasks/cache/tx.rs
.stream_transactions()
.try_collect::<Vec<_>>()
.inspect_err(|error| {
counter!("signet.builder.cache.tx_poll_errors").increment(1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in #263 we reified metrics under new patterns. these should match

Comment thread src/tasks/cache/tx.rs
counter!("signet.builder.cache.tx_poll_count").increment(1);
if let Ok(transactions) = self
.tx_cache
.stream_transactions()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sdk API thing. we now have "stream transactions" and "subscribe", which are not clear about their behavior

Comment thread src/tasks/cache/tx.rs
self.tx_cache.stream_transactions().try_collect().await
/// Fetches all transactions from the cache, forwarding each to nonce
/// checking before it reaches the cache task.
async fn full_fetch(&self, outbound: &mpsc::UnboundedSender<ReceivedTx>) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

architectural:
why was check_tx_cache deleted if its logic is repeated inline here?

This function also does more than fetch, it dispatches tasks. So its name should reflect that

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants