fix(l1): batch GetPooledTransactions requests and skip when not synced#6443
fix(l1): batch GetPooledTransactions requests and skip when not synced#6443azteca1998 merged 5 commits intomainfrom
Conversation
392bd36 to
8d1dde9
Compare
Lines of code reportTotal lines added: Detailed view |
🤖 Claude Code ReviewNow I have everything I need for the review. Code Review:
|
🤖 Codex Code Review
I did not run tests. I also did not find coverage for the new batching/flush behavior in transactions_tests.rs. Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
🤖 Kimi Code ReviewCritical Issues 1. Data loss on partial send failure in transaction batching ( Fix: Either re-buffer unsent chunks on failure, or only remove successfully sent items from the buffer. 2. Mismatch between reserved in-flight state and actual send ( Recommendation: Consider a shorter timeout for buffered (not yet sent) requests, or send an empty Code Quality & Rust Best Practices 3. Inefficient O(n×m) lookup in 4. Unnecessary vector allocations in chunking ( 5. Inconsistent visibility in Test Infrastructure 6. Fragile Dockerfile workspace mutation ( 7. Large generated file committed ( DB Explorer Tooling 8. Blocking runtime in library code ( 9. Potential database reentrancy ( Minor Suggestions
Summary Automated review by Kimi (Moonshot AI) · kimi-k2.5 · custom prompt |
🤖 Codex Code ReviewFindings:
Aside from those points, the RLPx cleanup path for pending/in-flight tx requests looks structurally sound. I couldn’t run Automated review by OpenAI Codex · gpt-5.4 · custom prompt |
Greptile SummaryImplements two medium-priority P2P efficiency improvements: an Confidence Score: 5/5Safe to merge; both remaining findings are P2 style suggestions with no blocking correctness issues. The core logic — deduplication via
|
| Filename | Overview |
|---|---|
| crates/networking/p2p/rlpx/connection/server.rs | Adds pending_tx_requests buffer with 50ms periodic flush and is_synced guard; teardown correctly drains both in-flight maps |
| crates/networking/p2p/rlpx/connection/handshake.rs | Trivially initializes the new pending_tx_requests field to an empty Vec at connection setup |
| crates/networking/p2p/rlpx/eth/transactions.rs | Adds filter_to and from_raw helpers on NewPooledTransactionHashes for batched-request construction |
| test/packet-storm/docker-compose.yaml | Adds a 25+ node packet-storm stress-test cluster for validating reduced GetPooledTransactions message volume |
| tooling/db_explorer/src/lib.rs | New standalone read-only DB explorer library; unrelated to core p2p transaction batching changes |
Sequence Diagram
sequenceDiagram
participant Peer
participant Handler as handle_incoming_message
participant Buffer as pending_tx_requests
participant Flush as flush_pending_tx_requests (50ms tick)
participant InFlight as requested_pooled_txs
participant Mempool
Peer->>Handler: NewPooledTransactionHashes
Handler->>Handler: is_synced()?
alt not synced
Handler-->>Peer: (skip silently)
else synced
Handler->>Mempool: reserve_unknown_hashes(hashes)
Mempool-->>Handler: reserved hashes
Handler->>Buffer: push (announcement, hashes)
end
Note over Flush: Every 50 ms
Flush->>Buffer: std::mem::take()
Buffer-->>Flush: pending entries
loop each ≤256-hash chunk
Flush->>InFlight: insert (announcement, hashes, now)
Flush->>Peer: GetPooledTransactions(chunk)
end
Peer->>Handler: PooledTransactions(id, txs)
Handler->>InFlight: remove(id)
InFlight-->>Handler: (announcement, requested_hashes)
Handler->>Mempool: clear_in_flight_txs(requested_hashes)
Handler->>Handler: is_synced()?
alt synced
Handler->>Handler: validate_requested()
Handler->>Mempool: add transactions
end
Note over Handler: On teardown
Handler->>InFlight: drain → clear_in_flight_txs
Handler->>Buffer: drain → clear_in_flight_txs
Prompt To Fix All With AI
This is a comment left during a code review.
Path: crates/networking/p2p/rlpx/connection/server.rs
Line: 1491-1495
Comment:
**Insert-before-send ordering leaves stale entries on send failure**
The entry is inserted into `requested_pooled_txs` before `send` is awaited. If `send` errors on any chunk in a multi-chunk batch (>256 hashes), subsequent chunks are already registered in the map but no message was ever transmitted. They remain as stale in-flight entries until the 30-second sweep timer clears them. Swapping the order — send first, then insert — eliminates this window at the cost of one extra clone of `request.id`.
```suggestion
send(state, Message::GetPooledTransactions(request.clone())).await?;
state
.requested_pooled_txs
.insert(request.id, (announcement, chunk.to_vec(), Instant::now()));
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: crates/networking/p2p/rlpx/connection/server.rs
Line: 1459-1462
Comment:
**Sync guard not rechecked at flush time**
`is_synced()` gates buffering in `handle_incoming_message`, but `flush_pending_tx_requests` sends whatever is in the buffer unconditionally. If the node transitions synced→non-synced within the 50ms window, the buffered hashes are still dispatched. The hashes are already reserved as in-flight and will be cleaned up on response or by the sweep timer, so this is not a correctness issue — but adding an early-return guard keeps the two paths consistent and avoids unnecessary network traffic.
```suggestion
async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> {
if state.pending_tx_requests.is_empty() || !state.blockchain.is_synced() {
state.pending_tx_requests.clear();
return Ok(());
}
```
How can I resolve this? If you propose a fix, please make it concise.Reviews (1): Last reviewed commit: "chore: merge main and resolve conflicts" | Re-trigger Greptile
| let request = GetPooledTransactions::new(random(), chunk.to_vec()); | ||
| state | ||
| .requested_pooled_txs | ||
| .insert(request.id, (announcement, chunk.to_vec(), Instant::now())); | ||
| send(state, Message::GetPooledTransactions(request)).await?; |
There was a problem hiding this comment.
Insert-before-send ordering leaves stale entries on send failure
The entry is inserted into requested_pooled_txs before send is awaited. If send errors on any chunk in a multi-chunk batch (>256 hashes), subsequent chunks are already registered in the map but no message was ever transmitted. They remain as stale in-flight entries until the 30-second sweep timer clears them. Swapping the order — send first, then insert — eliminates this window at the cost of one extra clone of request.id.
| let request = GetPooledTransactions::new(random(), chunk.to_vec()); | |
| state | |
| .requested_pooled_txs | |
| .insert(request.id, (announcement, chunk.to_vec(), Instant::now())); | |
| send(state, Message::GetPooledTransactions(request)).await?; | |
| send(state, Message::GetPooledTransactions(request.clone())).await?; | |
| state | |
| .requested_pooled_txs | |
| .insert(request.id, (announcement, chunk.to_vec(), Instant::now())); |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/rlpx/connection/server.rs
Line: 1491-1495
Comment:
**Insert-before-send ordering leaves stale entries on send failure**
The entry is inserted into `requested_pooled_txs` before `send` is awaited. If `send` errors on any chunk in a multi-chunk batch (>256 hashes), subsequent chunks are already registered in the map but no message was ever transmitted. They remain as stale in-flight entries until the 30-second sweep timer clears them. Swapping the order — send first, then insert — eliminates this window at the cost of one extra clone of `request.id`.
```suggestion
send(state, Message::GetPooledTransactions(request.clone())).await?;
state
.requested_pooled_txs
.insert(request.id, (announcement, chunk.to_vec(), Instant::now()));
```
How can I resolve this? If you propose a fix, please make it concise.| async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> { | ||
| if state.pending_tx_requests.is_empty() { | ||
| return Ok(()); | ||
| } |
There was a problem hiding this comment.
Sync guard not rechecked at flush time
is_synced() gates buffering in handle_incoming_message, but flush_pending_tx_requests sends whatever is in the buffer unconditionally. If the node transitions synced→non-synced within the 50ms window, the buffered hashes are still dispatched. The hashes are already reserved as in-flight and will be cleaned up on response or by the sweep timer, so this is not a correctness issue — but adding an early-return guard keeps the two paths consistent and avoids unnecessary network traffic.
| async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> { | |
| if state.pending_tx_requests.is_empty() { | |
| return Ok(()); | |
| } | |
| async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> { | |
| if state.pending_tx_requests.is_empty() || !state.blockchain.is_synced() { | |
| state.pending_tx_requests.clear(); | |
| return Ok(()); | |
| } |
Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/rlpx/connection/server.rs
Line: 1459-1462
Comment:
**Sync guard not rechecked at flush time**
`is_synced()` gates buffering in `handle_incoming_message`, but `flush_pending_tx_requests` sends whatever is in the buffer unconditionally. If the node transitions synced→non-synced within the 50ms window, the buffered hashes are still dispatched. The hashes are already reserved as in-flight and will be cleaned up on response or by the sweep timer, so this is not a correctness issue — but adding an early-return guard keeps the two paths consistent and avoids unnecessary network traffic.
```suggestion
async fn flush_pending_tx_requests(state: &mut Established) -> Result<(), PeerConnectionError> {
if state.pending_tx_requests.is_empty() || !state.blockchain.is_synced() {
state.pending_tx_requests.clear();
return Ok(());
}
```
How can I resolve this? If you propose a fix, please make it concise.
🤖 Claude Code ReviewNow I have enough context to write the full review. PR Review:
|
| .insert(request.id, (announcement, chunk.to_vec(), Instant::now())); | ||
| send(state, Message::GetPooledTransactions(request)).await?; | ||
| } | ||
|
|
There was a problem hiding this comment.
If send() fails on, say, the 2nd chunk, the ? propagates the error and drops the local all_hashes/all_types/all_sizes vectors. Hashes from un-sent chunks were already removed from pending_tx_requests by std::mem::take (line 1464) but never inserted into requested_pooled_txs, so teardown() won't know to clear them from the mempool's in_flight_txs set. Those hashes become permanently stuck, preventing any peer from re-requesting them.
One fix: before the early ? return, clear in-flight for all hashes that weren't sent. For example, you could collect all hashes into a flat vec up front, track how many were successfully sent, and on error clear the remainder:
let result = send(state, Message::GetPooledTransactions(request)).await;
if result.is_err() {
// Clear in-flight for all remaining unsent hashes
let unsent = &all_hashes[offset + chunk.len()..];
let _ = state.blockchain.mempool.clear_in_flight_txs(unsent);
return result;
}Addresses the medium priority items from #6435: - is_synced guard: skip NewPooledTransactionHashes processing when the node is still syncing, since we won't be building blocks soon. - Rate limiting/batching: instead of sending a GetPooledTransactions request immediately for each NewPooledTransactionHashes announcement, buffer the hashes and flush them as a single batched request every 50ms. This reduces message volume when many peers announce overlapping transaction sets.
…drop merge() - Enforce the devp2p 256-hash-per-request limit by chunking batched GetPooledTransactions requests. - Build trimmed announcements containing only the actually-requested hashes (with matching types/sizes), fixing the validation divergence where the merged announcement could contain un-requested entries. - Replace the allocation-heavy merge() with filter_to() + from_raw(), which builds the final vectors once without repeated reallocs.
…stuck in-flight hashes In flush_pending_tx_requests, the entry was inserted into requested_pooled_txs before send() was awaited. If send() failed on any chunk, the current chunk's hashes were already registered but no message was transmitted, and the cleanup only covered hashes after the failed chunk (offset + chunk.len()..) missing the current chunk itself. Fix both issues: 1. Move the insert after a successful send, so only actually-sent hashes get tracked in requested_pooled_txs. 2. On send failure, clear in-flight from offset.. (including the current chunk) rather than offset + chunk.len().. so no hashes become permanently stuck in the mempool's in_flight_txs set.
9a7aad2 to
cc7c2f9
Compare
Summary
Addresses the medium priority items from #6435:
NewPooledTransactionHashesprocessing when the node is still syncing — no point requesting transactions we can't use yet.GetPooledTransactionsrequest immediately for eachNewPooledTransactionHashesannouncement, buffer the hashes and flush them as a single batched request every 50ms. This reduces message volume when many peers announce overlapping transaction sets in a short window.Implementation details
pending_tx_requestsbuffer on the per-connectionEstablishedstate accumulates(announcement, hashes)pairs.FlushPendingTxRequestsmessage (every 50ms) drains the buffer, merges announcements viaNewPooledTransactionHashes::merge(), and sends a singleGetPooledTransactions.Test plan
cargo build -p ethrex-p2pcompiles cleanlycargo build -p ethrex-p2p --features l2compiles cleanlycargo test -p ethrex-p2p— all 21 tests pass