Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions clearnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func main() {

reactor := evm.NewChannelHubReactor(b.ID, eventHandlerService, bb.DbStore.StoreContractEvent)
reactor.SetOnEventProcessed(bb.RuntimeMetrics.IncBlockchainEvent)
l := evm.NewListener(common.HexToAddress(b.ChannelHubAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore.GetLatestEvent)
l := evm.NewListener(common.HexToAddress(b.ChannelHubAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore)
l.Listen(blockchainCtx, func(err error) {
if err != nil {
logger.Fatal("blockchain listener stopped", "error", err, "blockchainID", b.ID)
Expand Down Expand Up @@ -142,7 +142,7 @@ func main() {
}

reactor.SetOnEventProcessed(bb.RuntimeMetrics.IncBlockchainEvent)
l := evm.NewListener(common.HexToAddress(b.LockingContractAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore.GetLatestEvent)
l := evm.NewListener(common.HexToAddress(b.LockingContractAddress), client, b.ID, b.BlockStep, logger, reactor.HandleEvent, bb.DbStore)
l.Listen(blockchainCtx, func(err error) {
if err != nil {
logger.Fatal("blockchain listener stopped", "error", err, "blockchainID", b.ID)
Expand Down
43 changes: 22 additions & 21 deletions clearnode/store/database/contract_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,37 +34,38 @@ func (s *DBStore) StoreContractEvent(ev core.BlockchainEvent) error {
BlockchainID: ev.BlockchainID,
Name: ev.Name,
BlockNumber: ev.BlockNumber,
TransactionHash: ev.TransactionHash,
TransactionHash: strings.ToLower(ev.TransactionHash),
LogIndex: ev.LogIndex,
CreatedAt: time.Now(),
}

return s.db.Create(contractEvent).Error
}

// GetLatestEvent returns the latest block number and log index for a given contract.
// This function matches the signature required by pkg/blockchain/evm.GetLatestEvent.
func (s *DBStore) GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error) {
var ev ContractEvent
err := s.db.Where("blockchain_id = ? AND contract_address = ?", blockchainID, strings.ToLower(contractAddress)).
Order("block_number DESC, log_index DESC").
First(&ev).Error
// GetLatestContractEventBlockNumber returns the highest block number stored for a given contract.
func (s *DBStore) GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (uint64, error) {
var blockNumber uint64
err := s.db.Model(&ContractEvent{}).
Where("blockchain_id = ? AND contract_address = ?", blockchainID, strings.ToLower(contractAddress)).
Select("COALESCE(MAX(block_number), 0)").
Scan(&blockNumber).Error
if err != nil {
return 0, err
}
return blockNumber, nil
}

// IsContractEventPresent checks whether a specific contract event has already been stored.
func (s *DBStore) IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (bool, error) {
var ev ContractEvent
err := s.db.Where("blockchain_id = ? AND block_number = ? AND transaction_hash = ? AND log_index = ?",
blockchainID, blockNumber, strings.ToLower(txHash), logIndex).
Take(&ev).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
// No events found, return zeros (will start from beginning)
return core.BlockchainEvent{}, nil
return false, nil
}

if err != nil {
return core.BlockchainEvent{}, err
return false, err
}

return core.BlockchainEvent{
BlockNumber: ev.BlockNumber,
BlockchainID: ev.BlockchainID,
Name: ev.Name,
ContractAddress: ev.ContractAddress,
TransactionHash: ev.TransactionHash,
LogIndex: ev.LogIndex,
}, nil
return true, nil
}
154 changes: 65 additions & 89 deletions clearnode/store/database/contract_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,124 +39,100 @@ func TestStoreContractEvent(t *testing.T) {
assert.Equal(t, event.LogIndex, storedEvent.LogIndex)
}

func TestGetLatestEvent(t *testing.T) {
func TestGetLatestContractEventBlockNumber(t *testing.T) {
db, cleanup := SetupTestDB(t)
defer cleanup()

store := NewDBStore(db)

contractAddress := "0x1234567890123456789012345678901234567890"
networkID := uint64(1)
blockchainID := uint64(1)

t.Run("no events returns empty event", func(t *testing.T) {
event, err := store.GetLatestEvent(contractAddress, networkID)
t.Run("no events returns zero", func(t *testing.T) {
block, err := store.GetLatestContractEventBlockNumber(contractAddress, blockchainID)
require.NoError(t, err)
assert.Equal(t, core.BlockchainEvent{}, event)
assert.Equal(t, uint64(0), block)
})

t.Run("returns latest event", func(t *testing.T) {
// Store multiple events
t.Run("returns max block number across multiple events", func(t *testing.T) {
events := []core.BlockchainEvent{
{
ContractAddress: contractAddress,
BlockchainID: networkID,
Name: "Event1",
BlockNumber: 100,
TransactionHash: "0xaaa",
LogIndex: 1,
},
{
ContractAddress: contractAddress,
BlockchainID: networkID,
Name: "Event2",
BlockNumber: 100,
TransactionHash: "0xbbb",
LogIndex: 2,
},
{
ContractAddress: contractAddress,
BlockchainID: networkID,
Name: "Event3",
BlockNumber: 150,
TransactionHash: "0xccc",
LogIndex: 0,
},
{ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E1", BlockNumber: 100, TransactionHash: "0xaaa", LogIndex: 0},
{ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E2", BlockNumber: 200, TransactionHash: "0xbbb", LogIndex: 0},
{ContractAddress: contractAddress, BlockchainID: blockchainID, Name: "E3", BlockNumber: 150, TransactionHash: "0xccc", LogIndex: 0},
}

for _, ev := range events {
err := store.StoreContractEvent(ev)
require.NoError(t, err)
require.NoError(t, store.StoreContractEvent(ev))
}

// Get latest event
latestEvent, err := store.GetLatestEvent(contractAddress, networkID)
block, err := store.GetLatestContractEventBlockNumber(contractAddress, blockchainID)
require.NoError(t, err)
assert.Equal(t, uint64(200), block)
})

// Should return the event with highest block number
assert.Equal(t, uint64(150), latestEvent.BlockNumber)
assert.Equal(t, uint32(0), latestEvent.LogIndex)
assert.Equal(t, "Event3", latestEvent.Name)
assert.Equal(t, contractAddress, latestEvent.ContractAddress)
assert.Equal(t, networkID, latestEvent.BlockchainID)
t.Run("different contract returns zero", func(t *testing.T) {
block, err := store.GetLatestContractEventBlockNumber("0x9999999999999999999999999999999999999999", blockchainID)
require.NoError(t, err)
assert.Equal(t, uint64(0), block)
})

t.Run("different contract returns empty event", func(t *testing.T) {
differentContract := "0x9999999999999999999999999999999999999999"
event, err := store.GetLatestEvent(differentContract, networkID)
t.Run("different blockchain returns zero", func(t *testing.T) {
block, err := store.GetLatestContractEventBlockNumber(contractAddress, 999)
require.NoError(t, err)
assert.Equal(t, core.BlockchainEvent{}, event)
assert.Equal(t, uint64(0), block)
})
}

func TestIsContractEventPresent(t *testing.T) {
db, cleanup := SetupTestDB(t)
defer cleanup()

store := NewDBStore(db)

// Store a known event
ev := core.BlockchainEvent{
ContractAddress: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
BlockchainID: 1,
Name: "TestEvent",
BlockNumber: 500,
TransactionHash: "0xAbCdEf1234567890AbCdEf1234567890AbCdEf1234567890AbCdEf1234567890",
LogIndex: 3,
}
require.NoError(t, store.StoreContractEvent(ev))

t.Run("different network returns empty event", func(t *testing.T) {
differentNetwork := uint64(999)
event, err := store.GetLatestEvent(contractAddress, differentNetwork)
t.Run("existing event returns true", func(t *testing.T) {
present, err := store.IsContractEventPresent(1, 500, ev.TransactionHash, 3)
require.NoError(t, err)
assert.Equal(t, core.BlockchainEvent{}, event)
assert.True(t, present)
})

t.Run("returns highest log index when same block", func(t *testing.T) {
contractAddr := "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
chainID := uint64(42)
t.Run("case-insensitive txHash match", func(t *testing.T) {
// Query with uppercase — stored value was lowercased by StoreContractEvent
present, err := store.IsContractEventPresent(1, 500, "0xABCDEF1234567890ABCDEF1234567890ABCDEF1234567890ABCDEF1234567890", 3)
require.NoError(t, err)
assert.True(t, present)
})

// Store events in same block with different log indices
events := []core.BlockchainEvent{
{
ContractAddress: contractAddr,
BlockchainID: chainID,
Name: "EventA",
BlockNumber: 200,
TransactionHash: "0x111",
LogIndex: 5,
},
{
ContractAddress: contractAddr,
BlockchainID: chainID,
Name: "EventB",
BlockNumber: 200,
TransactionHash: "0x222",
LogIndex: 10,
},
{
ContractAddress: contractAddr,
BlockchainID: chainID,
Name: "EventC",
BlockNumber: 200,
TransactionHash: "0x333",
LogIndex: 3,
},
}
t.Run("wrong block number returns false", func(t *testing.T) {
present, err := store.IsContractEventPresent(1, 501, ev.TransactionHash, 3)
require.NoError(t, err)
assert.False(t, present)
})

for _, ev := range events {
err := store.StoreContractEvent(ev)
require.NoError(t, err)
}
t.Run("wrong log index returns false", func(t *testing.T) {
present, err := store.IsContractEventPresent(1, 500, ev.TransactionHash, 4)
require.NoError(t, err)
assert.False(t, present)
})

// Get latest event - should return highest log index for the block
latestEvent, err := store.GetLatestEvent(contractAddr, chainID)
t.Run("wrong blockchain returns false", func(t *testing.T) {
present, err := store.IsContractEventPresent(2, 500, ev.TransactionHash, 3)
require.NoError(t, err)
assert.False(t, present)
})

assert.Equal(t, uint64(200), latestEvent.BlockNumber)
assert.Equal(t, uint32(10), latestEvent.LogIndex)
assert.Equal(t, "EventB", latestEvent.Name)
t.Run("wrong txHash returns false", func(t *testing.T) {
present, err := store.IsContractEventPresent(1, 500, "0x0000000000000000000000000000000000000000000000000000000000000000", 3)
require.NoError(t, err)
assert.False(t, present)
})
}
7 changes: 5 additions & 2 deletions clearnode/store/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,9 @@ type DatabaseStore interface {
// StoreContractEvent stores a blockchain event to prevent duplicate processing.
StoreContractEvent(ev core.BlockchainEvent) error

// GetLatestEvent returns the latest block number and log index for a given contract.
GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error)
// GetLatestContractEventBlockNumber returns the highest block number for a given contract.
GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (lastBlock uint64, err error)

// IsContractEventPresent checks if a specific contract event has already been stored.
IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (isPresent bool, err error)
}
9 changes: 8 additions & 1 deletion pkg/blockchain/evm/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ import (

type HandleEvent func(ctx context.Context, eventLog types.Log) error
type StoreContractEvent func(ev core.BlockchainEvent) error
type LatestEventGetter func(contractAddress string, blockchainID uint64) (ev core.BlockchainEvent, err error)

// ContractEventGetter is used by Listener for resumption and deduplication.
type ContractEventGetter interface {
// GetLatestContractEventBlockNumber returns the block to resume from (0 = start fresh).
GetLatestContractEventBlockNumber(contractAddress string, blockchainID uint64) (lastBlock uint64, err error)
// IsContractEventPresent checks whether a specific event was already processed.
IsContractEventPresent(blockchainID, blockNumber uint64, txHash string, logIndex uint32) (isPresent bool, err error)
}

type AssetStore interface {
// GetAssetDecimals checks if an asset exists and returns its decimals in YN
Expand Down
Loading