From 40d9c41a94d567b4f0a7faeb87b3348c85bcb32f Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Wed, 8 Apr 2026 14:57:25 +0300 Subject: [PATCH 1/3] YNU-839: fix blockchain listener lifecycle --- clearnode/main.go | 4 +- clearnode/store/database/contract_event.go | 44 ++- .../store/database/contract_event_test.go | 154 ++++------ clearnode/store/database/interface.go | 7 +- pkg/blockchain/evm/interface.go | 9 +- pkg/blockchain/evm/listener.go | 290 ++++++++++++------ pkg/blockchain/evm/listener_test.go | 171 +++++++++-- pkg/blockchain/evm/mock_test.go | 15 + pkg/blockchain/evm/utils.go | 17 +- pkg/blockchain/evm/utils_test.go | 34 ++ 10 files changed, 511 insertions(+), 234 deletions(-) diff --git a/clearnode/main.go b/clearnode/main.go index 3980fb3b1..dc3b9b695 100644 --- a/clearnode/main.go +++ b/clearnode/main.go @@ -113,7 +113,7 @@ func main() { reactor := evm.NewChannelHubReactor(b.ID, eventHandlerService, bb.DbStore.StoreContractEvent) reactor.SetOnEventProcessed(bb.RuntimeMetrics.IncBlockchainEvent) - l := evm.NewListener(common.HexToAddress(b.ChannelHubAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore.GetLatestEvent) + l := evm.NewListener(common.HexToAddress(b.ChannelHubAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore) l.Listen(blockchainCtx, func(err error) { if err != nil { logger.Fatal("blockchain listener stopped", "error", err, "blockchainID", b.ID) @@ -142,7 +142,7 @@ func main() { } reactor.SetOnEventProcessed(bb.RuntimeMetrics.IncBlockchainEvent) - l := evm.NewListener(common.HexToAddress(b.LockingContractAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore.GetLatestEvent) + l := evm.NewListener(common.HexToAddress(b.LockingContractAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore) l.Listen(blockchainCtx, func(err error) { if err != nil { logger.Fatal("blockchain listener stopped", "error", err, "blockchainID", b.ID) diff --git a/clearnode/store/database/contract_event.go b/clearnode/store/database/contract_event.go index 955feb58d..5a997e559 100644 --- a/clearnode/store/database/contract_event.go +++ b/clearnode/store/database/contract_event.go @@ -6,7 +6,6 @@ import ( "time" "github.com/layer-3/nitrolite/pkg/core" - "gorm.io/gorm" ) var ErrEventHasAlreadyBeenProcessed = errors.New("contract event has already been processed") @@ -34,7 +33,7 @@ func (s *DBStore) StoreContractEvent(ev core.BlockchainEvent) error { BlockchainID: ev.BlockchainID, Name: ev.Name, BlockNumber: ev.BlockNumber, - TransactionHash: ev.TransactionHash, + TransactionHash: strings.ToLower(ev.TransactionHash), LogIndex: ev.LogIndex, CreatedAt: time.Now(), } @@ -42,29 +41,28 @@ func (s *DBStore) StoreContractEvent(ev core.BlockchainEvent) error { return s.db.Create(contractEvent).Error } -// GetLatestEvent returns the latest block number and log index for a given contract. -// This function matches the signature required by pkg/blockchain/evm.GetLatestEvent. -func (s *DBStore) GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error) { - var ev ContractEvent - err := s.db.Where("blockchain_id = ? AND contract_address = ?", blockchainID, strings.ToLower(contractAddress)). - Order("block_number DESC, log_index DESC"). - First(&ev).Error - - if errors.Is(err, gorm.ErrRecordNotFound) { - // No events found, return zeros (will start from beginning) - return core.BlockchainEvent{}, nil +// GetLatestContractEventBlockNumber returns the highest block number stored for a given contract. +func (s *DBStore) GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (uint64, error) { + var blockNumber uint64 + err := s.db.Model(&ContractEvent{}). + Where("blockchain_id = ? AND contract_address = ?", blockchainID, strings.ToLower(contractAddress)). + Select("COALESCE(MAX(block_number), 0)"). + Scan(&blockNumber).Error + if err != nil { + return 0, err } + return blockNumber, nil +} +// IsContractEventPresent checks whether a specific contract event has already been stored. +func (s *DBStore) IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (bool, error) { + var count int64 + err := s.db.Model(&ContractEvent{}). + Where("blockchain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?", + blockchainID, blockNumber, strings.ToLower(txHash), logIndex). + Count(&count).Error if err != nil { - return core.BlockchainEvent{}, err + return false, err } - - return core.BlockchainEvent{ - BlockNumber: ev.BlockNumber, - BlockchainID: ev.BlockchainID, - Name: ev.Name, - ContractAddress: ev.ContractAddress, - TransactionHash: ev.TransactionHash, - LogIndex: ev.LogIndex, - }, nil + return count > 0, nil } diff --git a/clearnode/store/database/contract_event_test.go b/clearnode/store/database/contract_event_test.go index 72e054d30..11a28ab02 100644 --- a/clearnode/store/database/contract_event_test.go +++ b/clearnode/store/database/contract_event_test.go @@ -39,124 +39,100 @@ func TestStoreContractEvent(t *testing.T) { assert.Equal(t, event.LogIndex, storedEvent.LogIndex) } -func TestGetLatestEvent(t *testing.T) { +func TestGetLatestContractEventBlockNumber(t *testing.T) { db, cleanup := SetupTestDB(t) defer cleanup() store := NewDBStore(db) contractAddress := "0x1234567890123456789012345678901234567890" - networkID := uint64(1) + blockchainID := uint64(1) - t.Run("no events returns empty event", func(t *testing.T) { - event, err := store.GetLatestEvent(contractAddress, networkID) + t.Run("no events returns zero", func(t *testing.T) { + block, err := store.GetLatestContractEventBlockNumber(contractAddress, blockchainID) require.NoError(t, err) - assert.Equal(t, core.BlockchainEvent{}, event) + assert.Equal(t, uint64(0), block) }) - t.Run("returns latest event", func(t *testing.T) { - // Store multiple events + t.Run("returns max block number across multiple events", func(t *testing.T) { events := []core.BlockchainEvent{ - { - ContractAddress: contractAddress, - BlockchainID: networkID, - Name: "Event1", - BlockNumber: 100, - TransactionHash: "0xaaa", - LogIndex: 1, - }, - { - ContractAddress: contractAddress, - BlockchainID: networkID, - Name: "Event2", - BlockNumber: 100, - TransactionHash: "0xbbb", - LogIndex: 2, - }, - { - ContractAddress: contractAddress, - BlockchainID: networkID, - Name: "Event3", - BlockNumber: 150, - TransactionHash: "0xccc", - LogIndex: 0, - }, + {ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E1", BlockNumber: 100, TransactionHash: "0xaaa", LogIndex: 0}, + {ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E2", BlockNumber: 200, TransactionHash: "0xbbb", LogIndex: 0}, + {ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E3", BlockNumber: 150, TransactionHash: "0xccc", LogIndex: 0}, } - for _, ev := range events { - err := store.StoreContractEvent(ev) - require.NoError(t, err) + require.NoError(t, store.StoreContractEvent(ev)) } - // Get latest event - latestEvent, err := store.GetLatestEvent(contractAddress, networkID) + block, err := store.GetLatestContractEventBlockNumber(contractAddress, blockchainID) require.NoError(t, err) + assert.Equal(t, uint64(200), block) + }) - // Should return the event with highest block number - assert.Equal(t, uint64(150), latestEvent.BlockNumber) - assert.Equal(t, uint32(0), latestEvent.LogIndex) - assert.Equal(t, "Event3", latestEvent.Name) - assert.Equal(t, contractAddress, latestEvent.ContractAddress) - assert.Equal(t, networkID, latestEvent.BlockchainID) + t.Run("different contract returns zero", func(t *testing.T) { + block, err := store.GetLatestContractEventBlockNumber("0x9999999999999999999999999999999999999999", blockchainID) + require.NoError(t, err) + assert.Equal(t, uint64(0), block) }) - t.Run("different contract returns empty event", func(t *testing.T) { - differentContract := "0x9999999999999999999999999999999999999999" - event, err := store.GetLatestEvent(differentContract, networkID) + t.Run("different blockchain returns zero", func(t *testing.T) { + block, err := store.GetLatestContractEventBlockNumber(contractAddress, 999) require.NoError(t, err) - assert.Equal(t, core.BlockchainEvent{}, event) + assert.Equal(t, uint64(0), block) }) +} + +func TestIsContractEventPresent(t *testing.T) { + db, cleanup := SetupTestDB(t) + defer cleanup() + + store := NewDBStore(db) + + // Store a known event + ev := core.BlockchainEvent{ + ContractAddress: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + BlockchainID: 1, + Name: "TestEvent", + BlockNumber: 500, + TransactionHash: "0xAbCdEf1234567890AbCdEf1234567890AbCdEf1234567890AbCdEf1234567890", + LogIndex: 3, + } + require.NoError(t, store.StoreContractEvent(ev)) - t.Run("different network returns empty event", func(t *testing.T) { - differentNetwork := uint64(999) - event, err := store.GetLatestEvent(contractAddress, differentNetwork) + t.Run("existing event returns true", func(t *testing.T) { + present, err := store.IsContractEventPresent(1, 500, ev.TransactionHash, 3) require.NoError(t, err) - assert.Equal(t, core.BlockchainEvent{}, event) + assert.True(t, present) }) - t.Run("returns highest log index when same block", func(t *testing.T) { - contractAddr := "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - chainID := uint64(42) + t.Run("case-insensitive txHash match", func(t *testing.T) { + // Query with uppercase — stored value was lowercased by StoreContractEvent + present, err := store.IsContractEventPresent(1, 500, "0xABCDEF1234567890ABCDEF1234567890ABCDEF1234567890ABCDEF1234567890", 3) + require.NoError(t, err) + assert.True(t, present) + }) - // Store events in same block with different log indices - events := []core.BlockchainEvent{ - { - ContractAddress: contractAddr, - BlockchainID: chainID, - Name: "EventA", - BlockNumber: 200, - TransactionHash: "0x111", - LogIndex: 5, - }, - { - ContractAddress: contractAddr, - BlockchainID: chainID, - Name: "EventB", - BlockNumber: 200, - TransactionHash: "0x222", - LogIndex: 10, - }, - { - ContractAddress: contractAddr, - BlockchainID: chainID, - Name: "EventC", - BlockNumber: 200, - TransactionHash: "0x333", - LogIndex: 3, - }, - } + t.Run("wrong block number returns false", func(t *testing.T) { + present, err := store.IsContractEventPresent(1, 501, ev.TransactionHash, 3) + require.NoError(t, err) + assert.False(t, present) + }) - for _, ev := range events { - err := store.StoreContractEvent(ev) - require.NoError(t, err) - } + t.Run("wrong log index returns false", func(t *testing.T) { + present, err := store.IsContractEventPresent(1, 500, ev.TransactionHash, 4) + require.NoError(t, err) + assert.False(t, present) + }) - // Get latest event - should return highest log index for the block - latestEvent, err := store.GetLatestEvent(contractAddr, chainID) + t.Run("wrong blockchain returns false", func(t *testing.T) { + present, err := store.IsContractEventPresent(2, 500, ev.TransactionHash, 3) require.NoError(t, err) + assert.False(t, present) + }) - assert.Equal(t, uint64(200), latestEvent.BlockNumber) - assert.Equal(t, uint32(10), latestEvent.LogIndex) - assert.Equal(t, "EventB", latestEvent.Name) + t.Run("wrong txHash returns false", func(t *testing.T) { + present, err := store.IsContractEventPresent(1, 500, "0x0000000000000000000000000000000000000000000000000000000000000000", 3) + require.NoError(t, err) + assert.False(t, present) }) } diff --git a/clearnode/store/database/interface.go b/clearnode/store/database/interface.go index 06bb4cbba..dec4cec8a 100644 --- a/clearnode/store/database/interface.go +++ b/clearnode/store/database/interface.go @@ -237,6 +237,9 @@ type DatabaseStore interface { // StoreContractEvent stores a blockchain event to prevent duplicate processing. StoreContractEvent(ev core.BlockchainEvent) error - // GetLatestEvent returns the latest block number and log index for a given contract. - GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error) + // GetLatestContractEventBlockNumber returns the highest block number for a given contract. + GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (lastBlock uint64, err error) + + // IsContractEventPresent checks if a specific contract event has already been stored. + IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (isPresent bool, err error) } diff --git a/pkg/blockchain/evm/interface.go b/pkg/blockchain/evm/interface.go index 9a28ccd97..b9477c8c4 100644 --- a/pkg/blockchain/evm/interface.go +++ b/pkg/blockchain/evm/interface.go @@ -11,7 +11,14 @@ import ( type HandleEvent func(ctx context.Context, eventLog types.Log) error type StoreContractEvent func(ev core.BlockchainEvent) error -type LatestEventGetter func(contractAddress string, blockchainID uint64) (ev core.BlockchainEvent, err error) + +// ContractEventGetter is used by Listener for resumption and deduplication. +type ContractEventGetter interface { + // GetLatestContractEventBlockNumber returns the block to resume from (0 = start fresh). + GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (lastBlock uint64, err error) + // IsContractEventPresent checks whether a specific event was already processed. + IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (isPresent bool, err error) +} type AssetStore interface { // GetAssetDecimals checks if an asset exists and returns its decimals in YN diff --git a/pkg/blockchain/evm/listener.go b/pkg/blockchain/evm/listener.go index 1fcf9efc2..44239a411 100644 --- a/pkg/blockchain/evm/listener.go +++ b/pkg/blockchain/evm/listener.go @@ -2,6 +2,7 @@ package evm import ( "context" + "fmt" "math/big" "sync" "sync/atomic" @@ -11,34 +12,31 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" - "github.com/layer-3/nitrolite/pkg/core" "github.com/layer-3/nitrolite/pkg/log" ) const ( - maxBackOffCount = 5 + maxBackOffCount = 5 + rpcRequestTimeout = 1 * time.Minute ) +// Listener watches a single contract for on-chain events, combining historical +// log reconciliation with a live WebSocket subscription to guarantee gap-free, +// deduplicated delivery even across restarts. Cancel the context passed to Listen +// for graceful shutdown. type Listener struct { contractAddress common.Address client bind.ContractBackend blockchainID uint64 - blockStep uint64 + blockStep uint64 // max blocks per FilterLogs call during reconciliation logger log.Logger handleEvent HandleEvent - getLatestEvent LatestEventGetter + eventGetter ContractEventGetter } -func NewListener(contractAddress common.Address, client bind.ContractBackend, blockchainID uint64, blockStep uint64, logger log.Logger, eventHandler HandleEvent, getLatestEvent LatestEventGetter) *Listener { - if getLatestEvent == nil { - getLatestEvent = func(contractAddress string, networkID uint64) (core.BlockchainEvent, error) { - return core.BlockchainEvent{ - BlockNumber: 0, - LogIndex: 0, - }, nil - } - } +// NewListener creates a Listener. blockStep controls how many blocks are fetched +// per RPC call during historical reconciliation. +func NewListener(contractAddress common.Address, client bind.ContractBackend, blockchainID uint64, blockStep uint64, logger log.Logger, eventHandler HandleEvent, eventGetter ContractEventGetter) *Listener { return &Listener{ contractAddress: contractAddress, client: client, @@ -46,12 +44,12 @@ func NewListener(contractAddress common.Address, client bind.ContractBackend, bl blockStep: blockStep, logger: logger.WithName("evm"), handleEvent: eventHandler, - getLatestEvent: getLatestEvent, + eventGetter: eventGetter, } } -// Listen starts the event listener in a background goroutine. -// The handleClosure callback is invoked when the listener exits, with an error if any. +// Listen starts the listener in a background goroutine. handleClosure is called +// exactly once after the listener stops; err is non-nil only if the handler failed. func (l *Listener) Listen(ctx context.Context, handleClosure func(err error)) { childCtx, cancel := context.WithCancel(ctx) wg := sync.WaitGroup{} @@ -85,105 +83,211 @@ func (l *Listener) Listen(ctx context.Context, handleClosure func(err error)) { }() } -// listenEvents listens for blockchain events and processes them with the provided handler +// logBackOff computes the backoff duration and logs accordingly. +// Returns the duration and true if the caller should proceed, or false if the limit was exceeded (fatal logged). +func (l *Listener) logBackOff(count uint64, originator string) (time.Duration, bool) { + d := backOffDuration(int(count)) + if d < 0 { + l.logger.Fatal("back off limit reached, exiting", "originator", originator, "backOffCollisionCount", count) + return 0, false + } + if d > 0 { + l.logger.Info("backing off", "originator", originator, "backOffCollisionCount", count) + } + return d, true +} + +// listenEvents is the main loop. Each iteration: +// 1. Subscribes to live events (buffered in currentCh). +// 2. Fetches the chain tip — done after subscribing so no events fall through the gap. +// 3. Launches reconcileBlockRange in a goroutine (lastBlock → chain tip → historicalCh). +// 4. Calls processEvents: drains historicalCh first, then switches to currentCh. +// +// On subscription failure it retries with exponential backoff. Returns non-nil only +// when the handler or the event-presence check fails. func (l *Listener) listenEvents(ctx context.Context) error { - ev, err := l.getLatestEvent(l.contractAddress.String(), l.blockchainID) + lastBlock, err := l.eventGetter.GetLatestContractEventBlockNumber(l.contractAddress.String(), l.blockchainID) if err != nil { - l.logger.Error("failed to get latest processed event", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + return fmt.Errorf("failed to get latest processed block: %w", err) } - lastBlock := ev.BlockNumber - lastIndex := ev.LogIndex var backOffCount atomic.Uint64 - var historicalCh, currentCh chan types.Log - var eventSubscription event.Subscription l.logger.Info("starting listening events", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) for { - if eventSubscription == nil { - waitForBackOffTimeout(l.logger, int(backOffCount.Load()), "event subscription") - - historicalCh = make(chan types.Log, 1) - currentCh = make(chan types.Log, 100) + d, ok := l.logBackOff(backOffCount.Load(), "event subscription") + if !ok { + return nil + } + select { + case <-ctx.Done(): + l.logger.Info("stopping event listener", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + return nil + case <-time.After(d): + } + if ctx.Err() != nil { + l.logger.Info("stopping event listener", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + return nil + } - if lastBlock == 0 { - l.logger.Info("skipping historical logs fetching", - "blockchainID", l.blockchainID, - "contractAddress", l.contractAddress.String()) - } else { - var header *types.Header - var err error - headerCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - header, err = l.client.HeaderByNumber(headerCtx, nil) - cancel() - if err != nil { - l.logger.Error("failed to get latest block", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) - backOffCount.Add(1) - continue - } + historicalCh := make(chan types.Log, 1) + currentCh := make(chan types.Log, 100) - // TODO: ensure that new events start to be processed only after all historical ones are processed - go l.reconcileBlockRange( - header.Number.Uint64(), - lastBlock, - lastIndex, - historicalCh, - ) - } + // Subscribe to live events first so nothing is missed while reconciling. + watchFQ := ethereum.FilterQuery{ + Addresses: []common.Address{l.contractAddress}, + } + eventSubscription, err := l.client.SubscribeFilterLogs(context.Background(), watchFQ, currentCh) + if err != nil { + l.logger.Error("failed to subscribe on events", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + backOffCount.Add(1) + continue + } - watchFQ := ethereum.FilterQuery{ - Addresses: []common.Address{l.contractAddress}, - } - eventSub, err := l.client.SubscribeFilterLogs(context.Background(), watchFQ, currentCh) + // Fetch current block height after subscribing to avoid a gap. + var cancelReconcile context.CancelFunc + if lastBlock == 0 { + l.logger.Info("skipping historical logs fetching", + "blockchainID", l.blockchainID, + "contractAddress", l.contractAddress.String()) + close(historicalCh) + } else { + headerCtx, headerCancel := context.WithTimeout(context.Background(), rpcRequestTimeout) + header, err := l.client.HeaderByNumber(headerCtx, nil) + headerCancel() if err != nil { - l.logger.Error("failed to subscribe on events", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + l.logger.Error("failed to get latest block", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + eventSubscription.Unsubscribe() backOffCount.Add(1) continue } - eventSubscription = eventSub - l.logger.Info("watching events", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) - backOffCount.Store(0) + var reconcileCtx context.Context + reconcileCtx, cancelReconcile = context.WithCancel(ctx) + currentBlock := header.Number.Uint64() + go func() { + l.reconcileBlockRange(reconcileCtx, currentBlock, lastBlock, historicalCh) + close(historicalCh) + }() + } + + l.logger.Info("watching events", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + backOffCount.Store(0) + + err = l.processEvents(ctx, eventSubscription, historicalCh, currentCh, &lastBlock) + if cancelReconcile != nil { + cancelReconcile() } + if err != nil { + return err + } + } +} +// processEvents runs two sequential phases: historical (historicalCh until closed), +// then live (currentCh until ctx or subscription death). In each phase the first +// events are checked via IsContractEventPresent; once a non-present event is found +// the check is skipped for the rest of that phase (events are strictly ordered). +// Returns nil on subscription loss (reconnect), non-nil on handler/check failure. +func (l *Listener) processEvents( + ctx context.Context, + eventSubscription interface { + Unsubscribe() + Err() <-chan error + }, + historicalCh <-chan types.Log, + currentCh <-chan types.Log, + lastBlock *uint64, +) error { + // Phase 1: drain all historical events before processing live ones. + historicalCheckDone := false + for historicalCh != nil { select { case <-ctx.Done(): l.logger.Info("stopping event listener", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) eventSubscription.Unsubscribe() return nil - case eventLog := <-historicalCh: - l.logger.Debug("received new event", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String(), "blockNumber", lastBlock, "logIndex", eventLog.Index) - - ctx := log.SetContextLogger(context.Background(), l.logger) - if err := l.handleEvent(ctx, eventLog); err != nil { + case eventLog, ok := <-historicalCh: + if !ok { + historicalCh = nil + break + } + if !historicalCheckDone { + present, err := l.eventGetter.IsContractEventPresent(l.blockchainID, eventLog.BlockNumber, eventLog.TxHash.Hex(), uint32(eventLog.Index)) + if err != nil { + eventSubscription.Unsubscribe() + return fmt.Errorf("failed to check historical event presence: %w", err) + } + if present { + l.logger.Debug("skipping already present historical event", "blockchainID", l.blockchainID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) + continue + } + historicalCheckDone = true + } + l.logger.Debug("received historical event", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String(), "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) + evCtx := log.SetContextLogger(context.Background(), l.logger) + if err := l.handleEvent(evCtx, eventLog); err != nil { + eventSubscription.Unsubscribe() return err } - case eventLog := <-currentCh: - lastBlock = eventLog.BlockNumber - l.logger.Debug("received new event", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String(), "blockNumber", lastBlock, "logIndex", eventLog.Index) + case err := <-eventSubscription.Err(): + if err != nil { + l.logger.Error("event subscription error", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + } else { + l.logger.Debug("subscription closed, resubscribing", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + } + eventSubscription.Unsubscribe() + return nil + } + } - ctx := log.SetContextLogger(context.Background(), l.logger) - if err := l.handleEvent(ctx, eventLog); err != nil { + // Phase 2: process live events from subscription. + currentCheckDone := false + for { + select { + case <-ctx.Done(): + l.logger.Info("stopping event listener", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) + eventSubscription.Unsubscribe() + return nil + case eventLog := <-currentCh: + *lastBlock = eventLog.BlockNumber + if !currentCheckDone { + present, err := l.eventGetter.IsContractEventPresent(l.blockchainID, eventLog.BlockNumber, eventLog.TxHash.Hex(), uint32(eventLog.Index)) + if err != nil { + eventSubscription.Unsubscribe() + return fmt.Errorf("failed to check current event presence: %w", err) + } + if present { + l.logger.Debug("skipping already present current event", "blockchainID", l.blockchainID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) + continue + } + currentCheckDone = true + } + l.logger.Debug("received current event", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String(), "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index) + evCtx := log.SetContextLogger(context.Background(), l.logger) + if err := l.handleEvent(evCtx, eventLog); err != nil { + eventSubscription.Unsubscribe() return err } case err := <-eventSubscription.Err(): if err != nil { l.logger.Error("event subscription error", "error", err, "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) - eventSubscription.Unsubscribe() - // NOTE: do not increment backOffCount here, as connection errors on continuous subscriptions are normal } else { l.logger.Debug("subscription closed, resubscribing", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String()) } - - eventSubscription = nil + eventSubscription.Unsubscribe() + return nil } } } +// reconcileBlockRange fetches logs from lastBlock to currentBlock in blockStep-sized +// windows, sending each log to historicalCh. Caller closes historicalCh after return. +// Uses a dedicated context so it can be cancelled when the subscription drops. func (l *Listener) reconcileBlockRange( + ctx context.Context, currentBlock uint64, lastBlock uint64, - lastIndex uint32, historicalCh chan types.Log, ) { var backOffCount atomic.Uint64 @@ -191,12 +295,19 @@ func (l *Listener) reconcileBlockRange( endBlock := startBlock + l.blockStep for currentBlock > startBlock { - waitForBackOffTimeout(l.logger, int(backOffCount.Load()), "reconcile block range") + d, ok := l.logBackOff(backOffCount.Load(), "reconcile block range") + if !ok { + return + } + select { + case <-ctx.Done(): + return + case <-time.After(d): + } + if ctx.Err() != nil { + return + } - // We need to refetch events starting from last known block without adding 1 to it - // because it's possible that block includes more than 1 event, and some may be still unprocessed. - // - // This will cause duplicate key error in logs, but it's completely fine. if endBlock > currentBlock { endBlock = currentBlock } @@ -205,14 +316,15 @@ func (l *Listener) reconcileBlockRange( Addresses: []common.Address{l.contractAddress}, FromBlock: new(big.Int).SetUint64(startBlock), ToBlock: new(big.Int).SetUint64(endBlock), - // Topics: topics, } - logsCtx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + logsCtx, cancel := context.WithTimeout(ctx, rpcRequestTimeout) logs, err := l.client.FilterLogs(logsCtx, fetchFQ) cancel() if err != nil { - // TODO: divide previous block range by 2 + if ctx.Err() != nil { + return + } backOffCount.Add(1) l.logger.Error("failed to filter logs", "error", err, @@ -220,7 +332,7 @@ func (l *Listener) reconcileBlockRange( "contractAddress", l.contractAddress.String(), "startBlock", startBlock, "endBlock", endBlock) - continue // retry with the advised block range + continue } l.logger.Info("fetched historical logs", "blockchainID", l.blockchainID, @@ -230,13 +342,11 @@ func (l *Listener) reconcileBlockRange( "endBlock", endBlock) for _, ethLog := range logs { - // Filter out previously known events - if ethLog.BlockNumber == lastBlock && ethLog.Index <= uint(lastIndex) { - l.logger.Info("skipping previously known event", "blockchainID", l.blockchainID, "contractAddress", l.contractAddress.String(), "blockNumber", ethLog.BlockNumber, "logIndex", ethLog.Index) - continue + select { + case <-ctx.Done(): + return + case historicalCh <- ethLog: } - - historicalCh <- ethLog } startBlock = endBlock + 1 diff --git a/pkg/blockchain/evm/listener_test.go b/pkg/blockchain/evm/listener_test.go index 3e787b254..8b339dc36 100644 --- a/pkg/blockchain/evm/listener_test.go +++ b/pkg/blockchain/evm/listener_test.go @@ -2,6 +2,7 @@ package evm import ( "context" + "fmt" "math/big" "sync" "sync/atomic" @@ -11,7 +12,6 @@ import ( ethereum "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - "github.com/layer-3/nitrolite/pkg/core" "github.com/layer-3/nitrolite/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -44,7 +44,8 @@ func TestNewListener(t *testing.T) { logger := log.NewNoopLogger() addr := common.HexToAddress("0x123") - l := NewListener(addr, mockClient, 1, 100, logger, nil, nil) + eventGetter := new(MockContractEventGetter) + l := NewListener(addr, mockClient, 1, 100, logger, nil, eventGetter) require.NotNil(t, l) assert.Equal(t, addr, l.contractAddress) assert.Equal(t, uint64(1), l.blockchainID) @@ -57,19 +58,21 @@ func TestListener_Listen_CurrentEvents(t *testing.T) { logger := log.NewNoopLogger() addr := common.HexToAddress("0x123") - // Setup latest event getter (start from 0) - getLatestEvent := func(contractAddress string, networkID uint64) (core.BlockchainEvent, error) { - return core.BlockchainEvent{BlockNumber: 0, LogIndex: 0}, nil - } + eventGetter := new(MockContractEventGetter) + eventGetter.On("GetLatestContractEventBlockNumber", addr.String(), uint64(1)).Return(uint64(0), nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) // Channel to signal event handling eventHandled := make(chan struct{}) handleEvent := func(ctx context.Context, log types.Log) error { + cancel() close(eventHandled) return nil } - listener := NewListener(addr, mockClient, 1, 100, logger, handleEvent, getLatestEvent) + listener := NewListener(addr, mockClient, 1, 100, logger, handleEvent, eventGetter) // Mock SubscribeFilterLogs sub := &MockSubscription{ @@ -86,8 +89,8 @@ func TestListener_Listen_CurrentEvents(t *testing.T) { }). Return(sub, nil) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) + // The first current event will trigger IsContractEventPresent check + eventGetter.On("IsContractEventPresent", uint64(1), uint64(10), mock.Anything, uint32(1)).Return(false, nil) go listener.Listen(ctx, func(err error) {}) @@ -105,7 +108,8 @@ func TestListener_ReconcileBlockRange(t *testing.T) { logger := log.NewNoopLogger() addr := common.HexToAddress("0x123") - listener := NewListener(addr, mockClient, 1, 10, logger, nil, nil) + eventGetter := new(MockContractEventGetter) + listener := NewListener(addr, mockClient, 1, 10, logger, nil, eventGetter) // Setup FilterLogs mock // We expect a range fetch. start=100, step=10 -> end=110. current=120. @@ -129,7 +133,7 @@ func TestListener_ReconcileBlockRange(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - listener.reconcileBlockRange(120, 100, 0, historicalCh) + listener.reconcileBlockRange(context.Background(), 120, 100, historicalCh) close(historicalCh) }() @@ -151,9 +155,15 @@ func TestListener_Listen_HistoricalAndCurrent(t *testing.T) { addr := common.HexToAddress("0x123") // Start from block 100 - getLatestEvent := func(contractAddress string, networkID uint64) (core.BlockchainEvent, error) { - return core.BlockchainEvent{BlockNumber: 100, LogIndex: 0}, nil - } + eventGetter := new(MockContractEventGetter) + eventGetter.On("GetLatestContractEventBlockNumber", addr.String(), uint64(1)).Return(uint64(100), nil) + // Historical event at block 105 is not present + eventGetter.On("IsContractEventPresent", uint64(1), uint64(105), mock.Anything, uint32(0)).Return(false, nil) + // Current event at block 111 — after historical is done, first current event triggers check + eventGetter.On("IsContractEventPresent", uint64(1), uint64(111), mock.Anything, uint32(0)).Return(false, nil) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) var receivedCount int64 doneCh := make(chan struct{}) @@ -161,6 +171,7 @@ func TestListener_Listen_HistoricalAndCurrent(t *testing.T) { handleEvent := func(ctx context.Context, log types.Log) error { count := atomic.AddInt64(&receivedCount, 1) if count >= 2 { // Expect 1 historical + 1 current + cancel() select { case <-doneCh: default: @@ -171,7 +182,7 @@ func TestListener_Listen_HistoricalAndCurrent(t *testing.T) { return nil } - listener := NewListener(addr, mockClient, 1, 10, logger, handleEvent, getLatestEvent) + listener := NewListener(addr, mockClient, 1, 10, logger, handleEvent, eventGetter) // Mock HeaderByNumber (current tip is 110) currentHeader := &types.Header{Number: big.NewInt(110)} @@ -191,9 +202,6 @@ func TestListener_Listen_HistoricalAndCurrent(t *testing.T) { }). Return(sub, nil) - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - go listener.Listen(ctx, func(err error) {}) select { @@ -203,3 +211,130 @@ func TestListener_Listen_HistoricalAndCurrent(t *testing.T) { t.Fatal("timeout waiting for events") } } + +func TestProcessEvents_DedupSkipsPresent(t *testing.T) { + t.Parallel() + logger := log.NewNoopLogger() + addr := common.HexToAddress("0x123") + eventGetter := new(MockContractEventGetter) + + var handledBlocks []uint64 + handleEvent := func(ctx context.Context, eventLog types.Log) error { + handledBlocks = append(handledBlocks, eventLog.BlockNumber) + return nil + } + + listener := NewListener(addr, new(MockEVMClient), 1, 10, logger, handleEvent, eventGetter) + + // Historical: 3 events. First 2 are present (skipped), 3rd is not (handled). + // After the 3rd, the check should stop — no IsContractEventPresent call for events 4+. + historicalCh := make(chan types.Log, 5) + historicalCh <- types.Log{BlockNumber: 100, Index: 0, TxHash: common.HexToHash("0xaa")} + historicalCh <- types.Log{BlockNumber: 101, Index: 0, TxHash: common.HexToHash("0xbb")} + historicalCh <- types.Log{BlockNumber: 102, Index: 0, TxHash: common.HexToHash("0xcc")} + historicalCh <- types.Log{BlockNumber: 103, Index: 0, TxHash: common.HexToHash("0xdd")} + historicalCh <- types.Log{BlockNumber: 104, Index: 0, TxHash: common.HexToHash("0xee")} + close(historicalCh) + + // First two are present, third is not + eventGetter.On("IsContractEventPresent", uint64(1), uint64(100), mock.Anything, uint32(0)).Return(true, nil).Once() + eventGetter.On("IsContractEventPresent", uint64(1), uint64(101), mock.Anything, uint32(0)).Return(true, nil).Once() + eventGetter.On("IsContractEventPresent", uint64(1), uint64(102), mock.Anything, uint32(0)).Return(false, nil).Once() + // No mock for 103, 104 — if called, mock will panic, proving the check stopped + + sub := &MockSubscription{errChan: make(chan error)} + currentCh := make(chan types.Log) + + // processEvents will drain historical, then block on currentCh. Cancel via ctx. + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + // Wait for historical processing, then cancel + time.Sleep(50 * time.Millisecond) + cancel() + }() + + var lastBlock uint64 + err := listener.processEvents(ctx, sub, historicalCh, currentCh, &lastBlock) + require.NoError(t, err) + + // Only events 102, 103, 104 should have been handled (100, 101 skipped as present) + assert.Equal(t, []uint64{102, 103, 104}, handledBlocks) + eventGetter.AssertExpectations(t) +} + +func TestProcessEvents_SubscriptionErrorDuringPhase1(t *testing.T) { + t.Parallel() + logger := log.NewNoopLogger() + addr := common.HexToAddress("0x123") + eventGetter := new(MockContractEventGetter) + + var handledBlocks []uint64 + handleEvent := func(ctx context.Context, eventLog types.Log) error { + handledBlocks = append(handledBlocks, eventLog.BlockNumber) + return nil + } + + listener := NewListener(addr, new(MockEVMClient), 1, 10, logger, handleEvent, eventGetter) + + // Historical channel with events that will block (not closed yet) + historicalCh := make(chan types.Log, 2) + historicalCh <- types.Log{BlockNumber: 100, Index: 0, TxHash: common.HexToHash("0xaa")} + + eventGetter.On("IsContractEventPresent", uint64(1), uint64(100), mock.Anything, uint32(0)).Return(false, nil) + + // Subscription that will error shortly + subErrCh := make(chan error, 1) + sub := &MockSubscription{errChan: subErrCh, unsub: func() {}} + currentCh := make(chan types.Log) + + // Send subscription error after a short delay (while historical is still open) + go func() { + time.Sleep(50 * time.Millisecond) + subErrCh <- fmt.Errorf("connection lost") + }() + + var lastBlock uint64 + err := listener.processEvents(context.Background(), sub, historicalCh, currentCh, &lastBlock) + + // Should return nil (reconnect signal), not an error + require.NoError(t, err) + // The first historical event should have been handled before the subscription error + assert.Equal(t, []uint64{100}, handledBlocks) +} + +func TestReconcileBlockRange_ContextCancellation(t *testing.T) { + t.Parallel() + mockClient := new(MockEVMClient) + logger := log.NewNoopLogger() + addr := common.HexToAddress("0x123") + eventGetter := new(MockContractEventGetter) + + listener := NewListener(addr, mockClient, 1, 10, logger, nil, eventGetter) + + ctx, cancel := context.WithCancel(context.Background()) + + // First batch succeeds but cancels the context during the call. + // The second batch should never be reached. + logs1 := []types.Log{{BlockNumber: 105, Index: 0}} + mockClient.On("FilterLogs", mock.Anything, mock.MatchedBy(func(q ethereum.FilterQuery) bool { + return q.FromBlock.Uint64() == 100 && q.ToBlock.Uint64() == 110 + })).Run(func(args mock.Arguments) { + cancel() + }).Return(logs1, nil) + + historicalCh := make(chan types.Log, 10) + listener.reconcileBlockRange(ctx, 200, 100, historicalCh) + close(historicalCh) + + // Drain whatever was sent before cancellation took effect + var received []types.Log + for l := range historicalCh { + received = append(received, l) + } + + // The event from the first batch may or may not have been sent (race between + // the ctx.Done select and the historicalCh send), but the second batch must not run. + assert.LessOrEqual(t, len(received), 1) + mockClient.AssertNumberOfCalls(t, "FilterLogs", 1) +} diff --git a/pkg/blockchain/evm/mock_test.go b/pkg/blockchain/evm/mock_test.go index 4f72abbbf..1f044f3b2 100644 --- a/pkg/blockchain/evm/mock_test.go +++ b/pkg/blockchain/evm/mock_test.go @@ -120,6 +120,21 @@ func (m *MockEVMClient) SubscribeFilterLogs(ctx context.Context, query ethereum. return args.Get(0).(ethereum.Subscription), args.Error(1) } +// MockContractEventGetter implements ContractEventGetter interface +type MockContractEventGetter struct { + mock.Mock +} + +func (m *MockContractEventGetter) GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (uint64, error) { + args := m.Called(contractAddress, blockchainID) + return args.Get(0).(uint64), args.Error(1) +} + +func (m *MockContractEventGetter) IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (bool, error) { + args := m.Called(blockchainID, blockNumber, txHash, logIndex) + return args.Bool(0), args.Error(1) +} + // MockAssetStore implements AssetStore interface type MockAssetStore struct { mock.Mock diff --git a/pkg/blockchain/evm/utils.go b/pkg/blockchain/evm/utils.go index c118d5311..7b0ce383e 100644 --- a/pkg/blockchain/evm/utils.go +++ b/pkg/blockchain/evm/utils.go @@ -10,7 +10,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/layer-3/nitrolite/pkg/core" - "github.com/layer-3/nitrolite/pkg/log" "github.com/layer-3/nitrolite/pkg/sign" "github.com/pkg/errors" "github.com/shopspring/decimal" @@ -47,17 +46,17 @@ func signerTxOpts(signer sign.Signer, blockchainID uint64) *bind.TransactOpts { } } -// waitForBackOffTimeout implements exponential backoff between retries -func waitForBackOffTimeout(logger log.Logger, backOffCount int, originator string) { +// backOffDuration returns the exponential backoff delay for the given attempt count. +// Returns 0 when backOffCount is 0 (no delay). +// Returns -1 when backOffCount exceeds maxBackOffCount (caller should abort). +func backOffDuration(backOffCount int) time.Duration { if backOffCount > maxBackOffCount { - logger.Fatal("back off limit reached, exiting", "originator", originator, "backOffCollisionCount", backOffCount) - return + return -1 } - - if backOffCount > 0 { - logger.Info("backing off", "originator", originator, "backOffCollisionCount", backOffCount) - <-time.After(time.Duration(2^backOffCount-1) * time.Second) + if backOffCount == 0 { + return 0 } + return time.Duration((1< Date: Wed, 8 Apr 2026 15:25:57 +0300 Subject: [PATCH 2/3] address chain reorg in a simple way --- pkg/blockchain/evm/listener.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/blockchain/evm/listener.go b/pkg/blockchain/evm/listener.go index 44239a411..e20f37978 100644 --- a/pkg/blockchain/evm/listener.go +++ b/pkg/blockchain/evm/listener.go @@ -250,6 +250,12 @@ func (l *Listener) processEvents( eventSubscription.Unsubscribe() return nil case eventLog := <-currentCh: + // During a chain reorganization geth re-delivers orphaned logs with + // Removed: true. Skip them to avoid applying phantom state changes. + if eventLog.Removed { + l.logger.Warn("skipping removed log from reorg", "blockchainID", l.blockchainID, "blockNumber", eventLog.BlockNumber, "logIndex", eventLog.Index, "txHash", eventLog.TxHash.Hex()) + continue + } *lastBlock = eventLog.BlockNumber if !currentCheckDone { present, err := l.eventGetter.IsContractEventPresent(l.blockchainID, eventLog.BlockNumber, eventLog.TxHash.Hex(), uint32(eventLog.Index)) @@ -353,3 +359,12 @@ func (l *Listener) reconcileBlockRange( endBlock += l.blockStep } } + +// TODO: the current reorg handling (skipping Removed logs) prevents new damage but +// does not undo side effects from the original delivery if it was already processed. +// A more robust approach is a confirmation buffer: hold live logs in memory keyed by +// block number, only apply them after N confirmations (new blocks on top), and discard +// any log that arrives with Removed: true while still in the buffer. This adds N blocks +// of latency (~12s × N on mainnet) but guarantees that only finalized events reach the +// handler. On L2s where reorgs are near-zero, the latency trade-off may not be worth it, +// so this should be configurable per chain. From 7b322050aa1828da6a73cbeda1efd5b78e5c9d3f Mon Sep 17 00:00:00 2001 From: Anton Filonenko Date: Thu, 9 Apr 2026 14:36:49 +0300 Subject: [PATCH 3/3] address comments --- clearnode/store/database/contract_event.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/clearnode/store/database/contract_event.go b/clearnode/store/database/contract_event.go index 5a997e559..cdc5af2db 100644 --- a/clearnode/store/database/contract_event.go +++ b/clearnode/store/database/contract_event.go @@ -6,6 +6,7 @@ import ( "time" "github.com/layer-3/nitrolite/pkg/core" + "gorm.io/gorm" ) var ErrEventHasAlreadyBeenProcessed = errors.New("contract event has already been processed") @@ -56,13 +57,15 @@ func (s *DBStore) GetLatestContractEventBlockNumber(contractAddress string, bloc // IsContractEventPresent checks whether a specific contract event has already been stored. func (s *DBStore) IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (bool, error) { - var count int64 - err := s.db.Model(&ContractEvent{}). - Where("blockchain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?", - blockchainID, blockNumber, strings.ToLower(txHash), logIndex). - Count(&count).Error + var ev ContractEvent + err := s.db.Where("blockchain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?", + blockchainID, blockNumber, strings.ToLower(txHash), logIndex). + Take(&ev).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return false, nil + } if err != nil { return false, err } - return count > 0, nil + return true, nil }