Align parquet file rotation with cache chunk boundaries#3280
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3280 +/- ##
==========================================
+ Coverage 59.19% 59.21% +0.01%
==========================================
Files 2091 2091
Lines 171432 171462 +30
==========================================
+ Hits 101481 101526 +45
+ Misses 61152 61129 -23
- Partials 8799 8807 +8
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
cody-littley
left a comment
There was a problem hiding this comment.
I asked a LLM to do an audit and it provided the following findings. Please address each of these, or comment why you think the findings are incorrect.
1. Race window: cache claims coverage of a block before its receipts are inserted (HIGH)
In cachedReceiptStore.SetReceipts (sei-db/ledger_db/receipt/cached_receipt_store.go):
func (s *cachedReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptRecord) error {
if err := s.backend.SetReceipts(ctx, receipts); err != nil {
return err
}
if ctx.BlockHeight() > 0 {
s.cacheMu.Lock()
s.maybeRotateCacheLocked(uint64(ctx.BlockHeight()))
s.cacheMu.Unlock()
}
s.cacheReceipts(receipts)
return nil
}For a boundary block N (e.g., 500, 1000, ...):
backend.SetReceiptsupdatesbackend.LatestVersion = N.- The outer
maybeRotateCacheLocked(N)rotates the in-memoryledgerCache(current chunk swaps to "previous", new current chunk is empty) and advancescacheNextRotate. cacheMuis released.cacheReceipts(receipts)re-acquirescacheMuand finally inserts block N's receipts/logs into the new current chunk.
Between steps 3 and 4, a concurrent FilterLogs call sees:
cache.FilterLogsWithMinBlock(...)→ empty (nothing in current chunk for block N yet, prev chunk lost block N because rotation already happened).coverageWindow()→ readscacheNextRotate(already advanced) andbackend.LatestVersion = N, so it returnsfrom = floor(N/interval)*interval,to = N,hasCoverage = true.- The early-return
hasCoverage && fromBlock >= coveredFrom && toBlock <= coveredTofires for any query inside[N, N](or fully inside the new chunk's range), and the function returns emptycacheLogswithout ever consulting the backend.
The on-disk parquet file for block N is also still open (not in closedReceiptFiles), so the backend wouldn't have it either. Net effect: eth_getLogs for the just-committed block N can transiently return empty when it should return the block's logs.
The previous logic computed cacheMin from the actual cache contents under logMu, so it could not claim coverage of a block that wasn't in the cache snapshot. The new boundary-aligned, version-driven coverage window widens the claim and opens this TOCTOU.
Fixes worth considering:
- Drop the outer
maybeRotateCacheLockedand letcacheReceipts(which already calls it undercacheMu) do the rotation atomically with the insertion. For emptyreceipts, special-case rotation insidecacheReceiptsinstead of doing it outside. - Or hold
cacheMuacross both the rotate and the insert (movecacheReceipts's body inline under the same lock).
2. Race window: empty-block coverage after a cold reopen with no warmup (MEDIUM)
The coverageWindow doc comment promises:
Coverage only applies once the cache has observed at least one write, so a cold-reopen where WAL replay produced no warmup records reports no coverage and lets FilterLogs fall through to the backend.
But the new SetReceipts empty-block path defeats that:
if ctx.BlockHeight() > 0 {
s.cacheMu.Lock()
s.maybeRotateCacheLocked(uint64(ctx.BlockHeight()))
s.cacheMu.Unlock()
}After a cold reopen where WAL replay produced no warmup records, the first committed block — even if empty, even if mid-window — sets cacheNextRotate away from 0, immediately enabling coverageWindow to claim coverage of [floor(latest/interval)*interval, latest].
In most realistic chains the block at latest is the just-committed empty block (so claiming "no logs there" is correct), and historical blocks below floor(latest/interval)*interval correctly fall through to the backend. So this isn't catastrophically broken — but it does invalidate the documented invariant and is a foot-gun if the assumption ever drifts (e.g., if a later change starts using coveredFrom < latest to short-circuit older blocks).
Same fix as #1 (maybeRotateCacheLocked should also gate on receipts being non-empty, or coverage must be derived from actual cache contents).
3. New crash window in WriteReceipts at rotation boundaries (LOW)
WriteReceipts now rotates before writing the boundary block's WAL entry:
for _, b := range batches {
if s.receiptWriter != nil && b.blockNumber != s.lastSeenBlock && s.IsRotationBoundary(b.blockNumber) {
if err := s.rotateFileLocked(b.blockNumber); err != nil {
return err
}
}
entry := WALEntry{
BlockNumber: b.blockNumber,
Receipts: b.receipts,
}
if err := s.wal.Write(entry); err != nil {
return err
}
...rotateFileLocked flushes, closes the file, clears the WAL, then returns. A crash between ClearWAL and s.wal.Write(entry) loses the boundary block's WAL entry; on restart, the boundary block must be re-applied by Cosmos (it's no longer in the WAL nor in the just-rotated file).
The crash test was updated to assert exactly this: TestCrashRecoveryAtEachHookPoint now asserts that for needsRotation hooks the boundary block is not recovered and the caller must retry.
Note from Cody: are we expecting the outer context to replay something here in order to recover? Does it currently do that, or is that something we'd have to implement? In general, I think it's better to make this data store fully crash durable in order to simplify the mental model needed to interact with the store.
4. File-name misalignment after lazy init at a non-boundary block (LOW)
When the first receipt after a fresh start lands at, say, block 1234, applyReceiptLocked lazily initializes with fileStartBlock = 1234. The next rotation fires at 1500, producing receipts_1234.parquet (containing 1234–1499) instead of the expected receipts_1000.parquet. The reader's fileForBlock and GetFilesBeforeBlock both compute file coverage as [startBlock, startBlock+maxBlocksPerFile), so:
- Pruning is delayed by up to
(maxBlocksPerFile − offset)blocks (file "looks like" it can hold up to 1733). - Targeted lookups for block 1500 may pick
receipts_1234.parquetfirst (since1234 ≤ 1500), find nothing, and fall back to a full scan. Correct, but slower.
This is the same pattern that arises when an empty boundary block is observed before any writer exists (ObserveEmptyBlock updates lastSeenBlock without rotating, and the next non-boundary write lazy-inits at that height). Worth noting in operations docs and possibly fixed by snapping fileStartBlock to floor(input.BlockNumber/MaxBlocksPerFile) * MaxBlocksPerFile at lazy init.
5. SetMaxBlocksPerFile writes Reader.maxBlocksPerFile racily (LOW, test-only)
func (s *Store) SetMaxBlocksPerFile(n uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.config.MaxBlocksPerFile = n
if s.Reader != nil {
s.Reader.maxBlocksPerFile = n
}
}Holding the store's s.mu does not synchronize with the Reader's reads of r.maxBlocksPerFile (which happen under r.mu at most). The doc-comment marks this as test-only ("Not safe to call while writes are in flight"), so it's acceptable, but if it ever leaks into production code it'll be a data race under -race.
6. ObserveEmptyBlock allows out-of-order updates of lastSeenBlock (LOW)
func (s *Store) ObserveEmptyBlock(height uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if height == s.lastSeenBlock {
return nil
}
if s.receiptWriter == nil || !s.IsRotationBoundary(height) {
s.lastSeenBlock = height
return nil
}
...If height < s.lastSeenBlock (out-of-order observation, e.g., a buggy caller or a test), lastSeenBlock moves backwards. The very next WriteReceipts could then mis-evaluate b.blockNumber != s.lastSeenBlock for a block already seen. Cosmos commits in order so this isn't expected in production, but a height > s.lastSeenBlock guard would make the contract explicit and cheap to enforce.
|
|
||
| for _, b := range batches { | ||
| if s.receiptWriter != nil && b.blockNumber != s.lastSeenBlock && s.IsRotationBoundary(b.blockNumber) { | ||
| if err := s.rotateFileLocked(b.blockNumber); err != nil { |
There was a problem hiding this comment.
The godoc on rotateFileLocked() says the function is used during WAL replay, but this call is not being made during WAL replay.
|
The way the parquet receipt store does locking sometimes makes the thread safety and crash recovery safety a bit tricky to reason about. When code gets like this, race conditions sneak into the codebase, and it's a lot harder to be confident that you've found and fixed them all. As a follow up task, perhaps we can discuss how we could alter the code structure to make these things simpler to reason about. (Not a request for change in this PR.) |
|
Another batch of race conditions identified by LLM: Audit:
|
| # | Severity | Item |
|---|---|---|
| 1 | Critical | FilterLogs stale-read race at chain tip |
| 2 | High (latent) | Tx-hash index can desync after a rotation + crash |
| 3 | Medium | Non-monotonic SetLatestVersion on empty-block path |
| 4 | Low | UpdateLatestVersion uses non-atomic Load-then-Store |
Item 1 is a live correctness bug that is already reachable in production under concurrent eth_getLogs load. Item 2 is a latent corruption-class bug that becomes reachable as soon as multi-block writes are introduced. Recommend blocking on item 1 before merging.
1. CRITICAL — Stale-read race in cachedReceiptStore.FilterLogs at the chain tip
File: sei-db/ledger_db/receipt/cached_receipt_store.go
SetReceipts updates state in two steps, in this order:
// sei-db/ledger_db/receipt/cached_receipt_store.go:111-117
func (s *cachedReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptRecord) error {
if err := s.backend.SetReceipts(ctx, receipts); err != nil {
return err
}
s.cacheReceipts(receipts, ctx.BlockHeight())
return nil
}backend.SetReceiptspromotesbackend.LatestVersion()to blockN(viaUpdateLatestVersioninsideparquetReceiptStore.SetReceipts).- Only afterwards does
cacheReceiptsrunAddReceiptsBatch/AddLogsForBlockon the cache.
FilterLogs uses coverageWindow to decide whether to skip the backend:
// sei-db/ledger_db/receipt/cached_receipt_store.go:128-145
func (s *cachedReceiptStore) coverageWindow() (uint64, uint64, bool) {
...
latest := s.backend.LatestVersion()
...
latestU := uint64(latest)
from := (latestU / s.cacheRotateInterval) * s.cacheRotateInterval
return from, latestU, true
}// sei-db/ledger_db/receipt/cached_receipt_store.go:160-165
coveredFrom, coveredTo, hasCoverage := s.coverageWindow()
if hasCoverage && fromBlock >= coveredFrom && toBlock <= coveredTo {
s.reportLogFilterCacheHit()
sortLogs(cacheLogs)
return cacheLogs, nil
}Race sequence
Writer commits block N; reader polls for N concurrently.
- Writer:
backend.SetReceiptsreturns →backend.LatestVersion() == N. - Reader:
FilterLogs(N, N, crit).s.cache.FilterLogsWithMinBlock(N, N, crit)→ empty (cache not yet populated).coverageWindow()seesLatestVersion == Nand a non-zerocacheNextRotate, computescoveredFrom = floor(N/interval)*interval,coveredTo = N.- Query
[N, N]is declared fully covered → backend skipped → returns[].
- Writer:
cacheReceiptsrunsAddReceiptsBatch/AddLogsForBlockfor blockN.
Impact
eth_getLogs and filter subscriptions silently drop all logs for the newest block under load. Data is durable in the backend; the query path returns an incomplete (empty) answer. This is a correctness regression the existing crash tests cannot catch because TestSlowFlushWithConcurrentReads gates readers on a post-write committed counter.
Rotation makes it slightly worse: if block N is a rotation boundary, the rotate + reset of cacheNextRotate happens inside cacheReceipts but after backend.SetReceipts. During the window, coverageWindow with either the new or the old cacheNextRotate still computes a window ending at N while the cache is empty for N.
Fix options
- Populate the cache first, then call
backend.SetReceipts. Tears crash consistency in the other direction (cache exposes a block not yet in the backend), so it also needs theLatestVersiongate inverted. - Preferred: track the highest block the cache has observed (
cachedLatest uint64bumped undercacheMuafterAddReceiptsBatch/AddLogsForBlock) and usemin(cachedLatest, backend.LatestVersion())incoverageWindow. Coverage only advances toNonce the cache is guaranteed populated forN. - Failing that, require
FilterLogsto also consult the backend formax(coveredTo-1, backendTo)instead of trusting coverage alone.
Test gap
No existing test exercises this race. Add a test that spawns a reader polling FilterLogs(latest, latest, ...) concurrently with many single-block SetReceipts calls and asserts no logs are dropped.
2. HIGH (latent) — Tx-hash index can desync from the parquet store across rotations
File: sei-db/ledger_db/receipt/parquet_store.go
Write order is:
// sei-db/ledger_db/receipt/parquet_store.go:251-266
if err := s.store.WriteReceipts(inputs); err != nil {
return err
}
if s.txHashIndex != nil {
if err := s.indexReceiptInputs(inputs); err != nil {
return fmt.Errorf("tx hash index write failed: %w", err)
}
}
if maxBlock > 0 {
s.store.UpdateLatestVersion(int64(maxBlock))
}WriteReceipts may rotate when the batch contains a boundary block. Rotation calls ClearWAL, which preserves only the last WAL entry:
// sei-db/ledger_db/parquet/store.go:437-462
// ClearWAL truncates the WAL after rotation, preserving the last entry.
func (s *Store) ClearWAL() error {
...
if err := s.wal.TruncateBefore(lastOffset); err != nil {And replayWAL drops WAL entries whose block is below fileStartBlock:
// sei-db/ledger_db/receipt/parquet_store.go:389-393
blockNumber := entry.BlockNumber
if blockNumber < s.store.FileStartBlock() {
dropOffset = offset
return nil
}Crash scenario
Single SetReceipts whose inputs span more than one block and cross a rotation boundary:
WriteReceiptswrites all blocks, rotates at the boundary, flushes pre-boundary blocks to the closed parquet file, truncates WAL keeping only the boundary block.indexReceiptInputsstarts iterating batches and crashes after indexing blockkbut before blockk+1, wherek+1is non-boundary and already flushed to disk.- Restart:
replayWALsees only the boundary block's WAL entry; all non-boundary blocks fail theblockNumber < FileStartBlock()check and are dropped. They are re-indexed only by the tx-hash index path — which never got to them.
Result: receipts are durable in parquet but permanently unreachable by tx-hash lookup (the tx-index code path returns "not found"; receipt_config.go documents that no full-scan fallback is offered when the index is the configured backend). Durable data corruption from the RPC consumer's perspective.
Mitigating factors
In production the Cosmos commit path calls SetReceipts with a single block's receipts, so inputs never spans multiple blocks and the race is not triggered today. This is architectural — nothing in the type system prevents multi-block inputs, and warmupReceipts / replayWAL paths do batch across blocks. A future refactor that batches commits (fast-forward sync, snapshot restore, etc.) would start losing tx-index entries.
Recommendations
- Either: index before
WriteReceipts. WAL replay then catches any missed index writes — WAL preserves the boundary block, and all pre-boundary blocks are already in disk files by replay time. - Or: make the index write idempotent and re-derive from parquet on startup (scan files for blocks whose
fileStartBlock >= indexHighWatermark). - At minimum, add a
FaultHooks.AfterWALClearor newAfterIndexinjection point and a crash test that asserts every pre-boundary tx is still reachable by hash after an index-time crash.
3. MEDIUM — SetLatestVersion on the empty-block path is non-monotonic
// sei-db/ledger_db/receipt/parquet_store.go:189-191
if ctx.BlockHeight() > s.store.LatestVersion() {
s.store.SetLatestVersion(ctx.BlockHeight())
}The non-empty path uses UpdateLatestVersion, which is guarded monotonically:
// sei-db/ledger_db/parquet/store.go:308-313
func (s *Store) UpdateLatestVersion(version int64) {
if version > s.latestVersion.Load() {
s.latestVersion.Store(version)
}
}The empty path does an unlocked Load-then-SetLatestVersion (unconditional Store). A concurrent non-empty SetReceipts could land a higher value in between; the subsequent SetLatestVersion(ctx.BlockHeight()) then rolls the counter back.
In production, Cosmos commits are serialized, so this is latent rather than active. Still worth swapping to UpdateLatestVersion for symmetry — the current asymmetry is a footgun for any future concurrent commit path.
4. LOW — UpdateLatestVersion itself is racy between writers
// sei-db/ledger_db/parquet/store.go:308-313
func (s *Store) UpdateLatestVersion(version int64) {
if version > s.latestVersion.Load() {
s.latestVersion.Store(version)
}
}Classic non-atomic compare-then-store. Two concurrent writers with versions V1 < V2 can observe V2 landed first and overwrite it with V1 (only if V1 is still greater than the currently stored value at the time of Load — which includes cases where another writer stored V2 between this goroutine's Load and Store).
Single writer in Cosmos, so latent. Replace with a CompareAndSwap loop.
cody-littley
left a comment
There was a problem hiding this comment.
Approved conditional on refactor we discussed, prior to release to production environments.
Describe your changes and provide context
Testing performed to validate your change