YNU-839: fix blockchain listener lifecycle#658
Conversation
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThe changes refactor the event listener architecture from a callback-based approach to an interface-based design. The database store methods are simplified to return only block numbers and provide presence checks rather than full event records. The event listener is reworked to support deduplication, reorg handling, and improved context cancellation while splitting event processing into sequential historical and live phases. Changes
Sequence DiagramsequenceDiagram
participant Listener
participant Client as RPC Client
participant EventGetter as ContractEventGetter<br/>(DatabaseStore)
participant Handler as Event Handler
participant Sub as Live Subscription
Listener->>Sub: Subscribe to logs (non-blocking)
Listener->>Client: Get current chain tip
Listener->>EventGetter: GetLatestContractEventBlockNumber
EventGetter-->>Listener: lastBlock
Note over Listener: Phase 1: Historical Reconciliation
loop From lastBlock → currentTip
Listener->>Client: FilterLogs (batch)
Client-->>Listener: historical events
Listener->>EventGetter: IsContractEventPresent (first event)
alt Event not present
EventGetter-->>Listener: false
Listener->>Handler: Process historical event
Note over Listener: Skip further checks for remaining batch
else Event present
EventGetter-->>Listener: true
Listener->>Listener: Skip event, continue checking
end
end
Note over Listener: Phase 2: Live Subscription Processing
Listener->>Sub: Drain subscription events
loop For live events
Sub-->>Listener: event log
alt Removed flag set (reorg)
Listener->>Listener: Skip event
else Normal event
Listener->>EventGetter: IsContractEventPresent (first event)
alt Event not present
EventGetter-->>Listener: false
Listener->>Handler: Process live event
Note over Listener: Skip further checks for live phase
else Event present
EventGetter-->>Listener: true
Listener->>Listener: Skip event
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (6)
pkg/blockchain/evm/utils.go (1)
49-59: Avoid a raw-1backoff sentinel.
time.Duration(-1)is easy to misuse because timer APIs treat negative durations as “fire immediately.” A missed guard in any caller turns “stop retrying” into a tight retry loop. Returning(time.Duration, bool)or an error would make the abort path explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/blockchain/evm/utils.go` around lines 49 - 59, The backOffDuration function currently returns a raw negative duration sentinel which is error-prone; change its signature from backOffDuration(backOffCount int) time.Duration to backOffDuration(backOffCount int) (time.Duration, bool) where the bool explicitly signals "abort" (true = abort), return (0, false) for no delay, a positive duration for delays, and (0, true) when backOffCount exceeds maxBackOffCount; update all callers of backOffDuration to branch on the abort boolean instead of testing for negative durations so the abort path is explicit and cannot be mistaken for "fire immediately."pkg/blockchain/evm/listener_test.go (2)
215-340: Add regression coverage for the remaining audit paths.The new suite covers dedup and cancellation well, but it still doesn’t pin the two riskier behaviors called out by this PR:
Removedlogs during reorgs, and fatal errors fromGetLatestContractEventBlockNumber/IsContractEventPresent. A dedicated test for those paths would make this rewrite much harder to regress.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/blockchain/evm/listener_test.go` around lines 215 - 340, Add tests that cover the two missing regression paths: (1) reorg handling where a log with Removed=true is seen — create a test that injects a types.Log with Removed: true into the historical/current flow (use listener.processEvents or listener.reconcileBlockRange via MockEVMClient/MockSubscription) and assert that Removed logs are ignored/handled as intended; and (2) fatal error propagation from the event-checking helpers — add tests that make MockContractEventGetter return non-nil errors from GetLatestContractEventBlockNumber and IsContractEventPresent and assert listener.reconcileBlockRange/processEvents returns the expected fatal error path (or triggers reconnect behavior), referencing the MockContractEventGetter, GetLatestContractEventBlockNumber, IsContractEventPresent, processEvents, and reconcileBlockRange symbols so the test targets those code paths.
251-255: Replace fixed sleeps with event-based synchronization.These tests depend on
time.Sleep(50 * time.Millisecond)to reach the right phase before canceling or injecting the subscription error. That makes them timing-sensitive under slow CI or scheduler pressure. A channel/WaitGroup signal from the mocked path would be much more stable.Also applies to: 292-295
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/blockchain/evm/listener_test.go` around lines 251 - 255, The test currently uses time.Sleep(50 * time.Millisecond) before calling cancel(), which is flaky; instead add an explicit synchronization channel or sync.WaitGroup that the mocked path signals when historical processing completes (e.g., add a doneCh on the mock subscriber or a hook in the mock method used by the test), then in the test replace the Sleep and goroutine with a wait on that channel/WaitGroup and call cancel() (and similarly replace the sleep at the other location 292-295 by waiting for the mock’s signal before injecting the subscription error); reference the existing cancel() call and the mock subscriber method used by the test to wire the signal.clearnode/store/database/contract_event.go (1)
57-67: Prefer an existence query here overCOUNT(*).
IsContractEventPresentis now on the listener dedup hot path, andCOUNT(*)does more work than needed when the caller only needs a yes/no answer.SELECT 1 ... LIMIT 1/Take/EXISTSwould be a better fit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@clearnode/store/database/contract_event.go` around lines 57 - 67, Replace the expensive COUNT(*) in IsContractEventPresent with an existence check: query the ContractEvent model with the same WHERE clause (keeping the strings.ToLower(txHash) normalization) but use a Take (or SELECT 1 LIMIT 1) to fetch a single row and treat gorm.ErrRecordNotFound as "not present" and any other error as failure; update the return to true when the record is found and false when ErrRecordNotFound is returned.pkg/blockchain/evm/listener.go (2)
140-145: Consider propagating the parent context toSubscribeFilterLogs.Using
context.Background()here means the subscription setup cannot be cancelled if the parent context is done during the RPC call. WhileUnsubscribe()is called later whenctx.Done()fires, the initial subscription call could hang indefinitely if the RPC endpoint is unresponsive.♻️ Suggested change
- eventSubscription, err := l.client.SubscribeFilterLogs(context.Background(), watchFQ, currentCh) + subCtx, subCancel := context.WithTimeout(ctx, rpcRequestTimeout) + eventSubscription, err := l.client.SubscribeFilterLogs(subCtx, watchFQ, currentCh) + subCancel()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/blockchain/evm/listener.go` around lines 140 - 145, The SubscribeFilterLogs call currently uses context.Background(), preventing cancellation if the parent context is done; change the call to pass the parent context (the ctx in the surrounding function) instead of context.Background() so the RPC call can be cancelled when ctx is done (i.e., update the SubscribeFilterLogs invocation that assigns to eventSubscription and err to use ctx); optionally, if desired, wrap ctx with a timeout via context.WithTimeout before calling SubscribeFilterLogs to bound the RPC hang time.
233-241: Minor: Buffered events incurrentChare discarded on subscription error.When a subscription error occurs (lines 233-240), the function returns immediately without draining any buffered events in
currentCh. These events would be recovered during historical reconciliation on reconnect, but only if they haven't been pruned from the node.This is likely acceptable given the dedup-safe design, but worth noting in case very short RPC pruning windows are used.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/blockchain/evm/listener.go` around lines 233 - 241, Subscription error handling returns immediately and discards any buffered events in currentCh; before calling eventSubscription.Unsubscribe() and returning, non-blockingly drain currentCh to drop buffered events so they aren't lost silently. Locate the block handling case err := <-eventSubscription.Err() (using eventSubscription.Err(), eventSubscription.Unsubscribe(), and currentCh) and add a loop that repeatedly reads from currentCh in a non-blocking select (break when empty) to drain remaining items, then proceed to Unsubscribe() and return nil.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@clearnode/store/database/contract_event.go`:
- Around line 57-67: Replace the expensive COUNT(*) in IsContractEventPresent
with an existence check: query the ContractEvent model with the same WHERE
clause (keeping the strings.ToLower(txHash) normalization) but use a Take (or
SELECT 1 LIMIT 1) to fetch a single row and treat gorm.ErrRecordNotFound as "not
present" and any other error as failure; update the return to true when the
record is found and false when ErrRecordNotFound is returned.
In `@pkg/blockchain/evm/listener_test.go`:
- Around line 215-340: Add tests that cover the two missing regression paths:
(1) reorg handling where a log with Removed=true is seen — create a test that
injects a types.Log with Removed: true into the historical/current flow (use
listener.processEvents or listener.reconcileBlockRange via
MockEVMClient/MockSubscription) and assert that Removed logs are ignored/handled
as intended; and (2) fatal error propagation from the event-checking helpers —
add tests that make MockContractEventGetter return non-nil errors from
GetLatestContractEventBlockNumber and IsContractEventPresent and assert
listener.reconcileBlockRange/processEvents returns the expected fatal error path
(or triggers reconnect behavior), referencing the MockContractEventGetter,
GetLatestContractEventBlockNumber, IsContractEventPresent, processEvents, and
reconcileBlockRange symbols so the test targets those code paths.
- Around line 251-255: The test currently uses time.Sleep(50 * time.Millisecond)
before calling cancel(), which is flaky; instead add an explicit synchronization
channel or sync.WaitGroup that the mocked path signals when historical
processing completes (e.g., add a doneCh on the mock subscriber or a hook in the
mock method used by the test), then in the test replace the Sleep and goroutine
with a wait on that channel/WaitGroup and call cancel() (and similarly replace
the sleep at the other location 292-295 by waiting for the mock’s signal before
injecting the subscription error); reference the existing cancel() call and the
mock subscriber method used by the test to wire the signal.
In `@pkg/blockchain/evm/listener.go`:
- Around line 140-145: The SubscribeFilterLogs call currently uses
context.Background(), preventing cancellation if the parent context is done;
change the call to pass the parent context (the ctx in the surrounding function)
instead of context.Background() so the RPC call can be cancelled when ctx is
done (i.e., update the SubscribeFilterLogs invocation that assigns to
eventSubscription and err to use ctx); optionally, if desired, wrap ctx with a
timeout via context.WithTimeout before calling SubscribeFilterLogs to bound the
RPC hang time.
- Around line 233-241: Subscription error handling returns immediately and
discards any buffered events in currentCh; before calling
eventSubscription.Unsubscribe() and returning, non-blockingly drain currentCh to
drop buffered events so they aren't lost silently. Locate the block handling
case err := <-eventSubscription.Err() (using eventSubscription.Err(),
eventSubscription.Unsubscribe(), and currentCh) and add a loop that repeatedly
reads from currentCh in a non-blocking select (break when empty) to drain
remaining items, then proceed to Unsubscribe() and return nil.
In `@pkg/blockchain/evm/utils.go`:
- Around line 49-59: The backOffDuration function currently returns a raw
negative duration sentinel which is error-prone; change its signature from
backOffDuration(backOffCount int) time.Duration to backOffDuration(backOffCount
int) (time.Duration, bool) where the bool explicitly signals "abort" (true =
abort), return (0, false) for no delay, a positive duration for delays, and (0,
true) when backOffCount exceeds maxBackOffCount; update all callers of
backOffDuration to branch on the abort boolean instead of testing for negative
durations so the abort path is explicit and cannot be mistaken for "fire
immediately."
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 74509782-d2e7-469f-887b-69f3c6a4f114
📒 Files selected for processing (10)
clearnode/main.goclearnode/store/database/contract_event.goclearnode/store/database/contract_event_test.goclearnode/store/database/interface.gopkg/blockchain/evm/interface.gopkg/blockchain/evm/listener.gopkg/blockchain/evm/listener_test.gopkg/blockchain/evm/mock_test.gopkg/blockchain/evm/utils.gopkg/blockchain/evm/utils_test.go
H-H01: fix(contracts): disallow challenge with CLOSE or FIN_MIG intents (#631) H-M01: fix(contracts/ChannelHub): remove transferred amount check (#636) fix(contracts/ChannelHub): allow unblocking escrow ops after migration (#637) M-C01: feat(contracts): add token check between states (#639) H-L03: fix(contracts/ChannelHub): skip disputed escrow during purge (#640) M-H04: feat(contracts/ChannelHub): purge during escrow challenge finalization, count both skip and purge (#641) M-M02: fix(contracts): require zero allocs on close (#643) M-C02: fix(rpc/core): apply finalize escrow deposit correctly (#644) H-L02: fix(contracts/ChannelHub): revert on withdrawFromVault failure (#651) M-H03: fix(clearnode): log correct fields on failure (#647) M-C03: fix(pkg/core): disallow negative amount transitions (#645) H-L01: fix(contracts/ChannelHub): add CH address to prevent val addition replay (#650) M-H06(clearnode): restrict max channel challenge duration (#654) M-M03(clearnode): revert EscrowLock on insufficient home user balance (#655) M-M04(clearnode): support default signer even if it is not approved (#656) M-M05(clearnode): require correct intent on FinalizeEscrowWithdrawal (#657) M-M09: reject asset decimals exceeding its token's decimals (#659) M-L01: return correct errors during state advancement validation (#660) M-L03: limit number of related ids per session key state (#662) M-I02: normalize input hex addresses (#663) M-H01: feat(contracts/ChannelHub): restrict to one node (#649) M-H07: fix(contracts/ChannelHub): emit stored, not arbitrary candidate state (#664) M-I01: feat(contracts/ChannelHub): remove updateLastState flag (#665) H-L06: fix(contracts/ChannelHub): remove payable from methods, clarify in create (#666) H-L09: feat(contracts/ChannelEngine): add non-home migration version check (#669) YNU-839: fix blockchain listener lifecycle (#658) M-H09: use channel signer for all channel state node sigs (#667) H-L07: fix(contracts/ChannelHub): restrict createChannel to non-existing channels (#668) H-I02: docs(contracts): add a note that rebasing tokens are not supported (#670) H-I03: fix(contracts/ChannelHub): use CIE pattern in depositToHub (#671) M-H11: revert empty signatures on quorum verification (#672) M-H11: reject issuance of receiver state during escrow ops (#674) H-I01: docs(contracts): note that fee-on-transfer tokens are not supported (#675) M-L04: docs: mention liquidity monitoring (#677) fix(contracts/ChannelHub): fix initialize escrow deposit dos (#679) M-H08: enforce strict transition ordering after MutualLock and EscrowLock (#680) fix: run forge fmt (#681) M-L05: docs(contracts): document native token deposits (#685) fix(clearnode): resolve a set of audit findings (#686) M-H13: feat(contract): add validateChallengeSignature, revert in SK validator (#688)
Description
The blockchain listener had several issues: the exponential backoff used XOR instead of bit shift (
2^nvs1<<n), the event reconciliation skipped duplicates based on log index which is per-transaction not per-block, and there was a window between historical reconciliation completing and live subscription starting where events could be lost.This rewrites the listener to start the WebSocket subscription before historical reconciliation, so events arriving during catchup are buffered and no gap exists. Both streams are deduplicated against the database via a new
ContractEventGetterinterface withIsContractEventPresent— once the first unseen event is found, the check is skipped for all subsequent events since they arrive in strict chronological order. The reconciliation goroutine now runs under a dedicated child context to prevent leaks when the subscription drops mid-catchup.Errors from the dedup check and initial block lookup are now fatal rather than silently falling back to defaults, and the backoff was refactored into a pure duration function composable with
select/ctx.Done()for clean shutdown.Related Audit Findings
M-M06
The listener uses waitForBackOffTimeout before retrying failed processing loops. The intent is to gradually increase the delay after repeated failures, so transient errors do not immediately cause tight retry loops. The backoff calculation is incorrect because it uses Go’s bitwise XOR operator instead of exponentiation.
M-M07
While the EVM listener is processing live logs, it advances only the block cursor. When the subscription later drops and the listener rebuilds itself, it starts historical reconciliation again from that block, but still uses the old log index.
Historical replay begins from lastBlock again and skips only logs in that block whose index is less than or equal to lastIndex. Because lastIndex was never updated on the live path, a log that was already handled through the live subscription can be replayed again after reconnect. The reactor then processes that log and only afterwards stores it in contract_events. However, that DB insert is protected by a uniqueness constraint. If the same log is replayed, that insert can fail with a duplicate key error.
If replayed log processing fails at any point, that error is returned from listenEvents. In clearnode, that is fatal because the listener closure calls logger.Fatal, so the process exits immediately when the listener returns an error.
Consequently, a normal reconnect or replay condition can cause an already processed log to be replayed, hit the DB uniqueness check, and terminate the clearnode process.
M-M08
When the EVM listener starts from an existing cursor, it launches historical reconciliation and the live subscription concurrently. Both channels are then consumed in a single select, so an older historical log and a newer live log race each other and whichever arrives first is applied first.
This is dangerous because many handlers are not monotonic. HandleHomeChannelCreated, HandleEscrowDepositInitiated and HandleEscrowWithdrawalInitiated overwrite StateVersion and set Status = Open directly, while HandleHomeChannelCheckpointed overwrites StateVersion and can clear Challenged back to Open. A newer live event can be processed first, and then a stale historical event can overwrite it, leaving the node with a regressed StateVersion and Status = Open for a channel that on-chain is already closed or challenged.
M-M10
During a chain reorganization, go-ethereum delivers a new log entry with the same transaction data but
Removed: trueset ontypes.Logto signal that the log was removed from the canonical chain. The listener does not inspect this field, so removed logs follow the same processing path as normal logs. At the same time, the original side effects from the first delivery remain committed, and the event handler layer does not implement inverse operations. As a result, orphaned logs can leave the local database out of sync with the canonical on-chain state.Summary by CodeRabbit
Release Notes
Bug Fixes
Improvements