diff --git a/api/api.go b/api/api.go index d7085ab6..0838582d 100644 --- a/api/api.go +++ b/api/api.go @@ -5,6 +5,7 @@ import ( _ "embed" "errors" "fmt" + "math/big" evmTypes "github.com/onflow/flow-go/fvm/evm/types" "github.com/onflow/go-ethereum/common" @@ -435,31 +436,50 @@ func (b *BlockChainAPI) GetLogs( ctx context.Context, criteria filters.FilterCriteria, ) ([]*types.Log, error) { - filter := logs.FilterCriteria{ Addresses: criteria.Addresses, Topics: criteria.Topics, } + // if filter provided specific block ID if criteria.BlockHash != nil { return logs. NewIDFilter(*criteria.BlockHash, filter, b.blocks, b.receipts). Match() } - if criteria.FromBlock != nil && criteria.ToBlock != nil { - f, err := logs. - NewRangeFilter(*criteria.FromBlock, *criteria.ToBlock, filter, b.receipts) - if err != nil { - return nil, err - } - return f.Match() + // otherwise we use the block range as the filter + + // assign default values to latest block number, unless provided + from := models.LatestBlockNumber + if criteria.FromBlock != nil { + from = criteria.FromBlock + } + to := models.LatestBlockNumber + if criteria.ToBlock != nil { + to = criteria.ToBlock } - return nil, errors.Join( - errs.ErrInvalid, - fmt.Errorf("must provide either block ID or 'from' and 'to' block nubmers, to filter events"), - ) + l, err := b.blocks.LatestEVMHeight() + if err != nil { + return nil, err + } + latest := big.NewInt(int64(l)) + + // if special value, use latest block number + if from.Cmp(models.EarliestBlockNumber) < 0 { + from = latest + } + if to.Cmp(models.EarliestBlockNumber) < 0 { + to = latest + } + + f, err := logs.NewRangeFilter(*from, *to, filter, b.receipts) + if err != nil { + return nil, err + } + + return f.Match() } // GetTransactionCount returns the number of transactions the given address diff --git a/api/models.go b/api/models.go index 399aafce..caa32f6a 100644 --- a/api/models.go +++ b/api/models.go @@ -1,8 +1,6 @@ package api import ( - "errors" - errs "github.com/onflow/flow-evm-gateway/api/errors" "github.com/onflow/flow-evm-gateway/models" @@ -11,17 +9,6 @@ import ( "github.com/onflow/go-ethereum/core/types" ) -var ( - errExceedMaxTopics = errors.New("exceed max topics") - errExceedMaxAddresses = errors.New("exceed max addresses") -) - -// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0 -const maxTopics = 4 - -// The maximum number of addresses allowed -const maxAddresses = 6 - // TransactionArgs represents the arguments to construct a new transaction // or a message call. type TransactionArgs struct { diff --git a/api/stream.go b/api/stream.go index 8a135a23..ed475c91 100644 --- a/api/stream.go +++ b/api/stream.go @@ -6,17 +6,19 @@ import ( "fmt" "reflect" - errs "github.com/onflow/flow-evm-gateway/api/errors" - "github.com/onflow/flow-evm-gateway/config" - "github.com/onflow/flow-evm-gateway/services/logs" - "github.com/onflow/flow-evm-gateway/storage" - storageErrs "github.com/onflow/flow-evm-gateway/storage/errors" "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/access/subscription" + "github.com/onflow/go-ethereum/common" "github.com/onflow/go-ethereum/common/hexutil" "github.com/onflow/go-ethereum/eth/filters" "github.com/onflow/go-ethereum/rpc" "github.com/rs/zerolog" + + errs "github.com/onflow/flow-evm-gateway/api/errors" + "github.com/onflow/flow-evm-gateway/config" + "github.com/onflow/flow-evm-gateway/services/logs" + "github.com/onflow/flow-evm-gateway/storage" + storageErrs "github.com/onflow/flow-evm-gateway/storage/errors" ) // subscriptionBufferLimit is a constant that represents the buffer limit for subscriptions. @@ -111,39 +113,47 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* return nil, fmt.Errorf("failed to get block at height: %d: %w", height, err) } - // todo once a block can contain multiple transactions this needs to be refactored - if len(block.TransactionHashes) != 1 { - return nil, fmt.Errorf("block contains more than a single transaction") - } - hash := block.TransactionHashes[0] + hashes := make([]common.Hash, len(block.TransactionHashes)) + txs := make([]*Transaction, len(block.TransactionHashes)) - tx, err := s.transactions.Get(hash) - if err != nil { - if errors.Is(err, storageErrs.ErrNotFound) { - // make sure to wrap in not found error as the streamer expects it - return nil, errors.Join(subscription.ErrBlockNotReady, err) + for i, hash := range block.TransactionHashes { + + tx, err := s.transactions.Get(hash) + if err != nil { + if errors.Is(err, storageErrs.ErrNotFound) { + // make sure to wrap in not found error as the streamer expects it + return nil, errors.Join(subscription.ErrBlockNotReady, err) + } + return nil, fmt.Errorf("failed to get tx with hash: %s at height: %d: %w", hash, height, err) } - return nil, fmt.Errorf("failed to get tx with hash: %s at height: %d: %w", hash, height, err) - } - rcp, err := s.receipts.GetByTransactionID(hash) - if err != nil { - if errors.Is(err, storageErrs.ErrNotFound) { - // make sure to wrap in not found error as the streamer expects it - return nil, errors.Join(subscription.ErrBlockNotReady, err) + rcp, err := s.receipts.GetByTransactionID(hash) + if err != nil { + if errors.Is(err, storageErrs.ErrNotFound) { + // make sure to wrap in not found error as the streamer expects it + return nil, errors.Join(subscription.ErrBlockNotReady, err) + } + return nil, fmt.Errorf("failed to get receipt with hash: %s at height: %d: %w", hash, height, err) } - return nil, fmt.Errorf("failed to get receipt with hash: %s at height: %d: %w", hash, height, err) - } - h, err := tx.Hash() - if err != nil { - return nil, fmt.Errorf("failed to compute tx hash: %w", err) + h, err := tx.Hash() + if err != nil { + return nil, fmt.Errorf("failed to compute tx hash: %w", err) + } + + t, err := NewTransaction(tx, *rcp) + if err != nil { + return nil, err + } + + hashes[i] = h + txs[i] = t } if fullTx != nil && *fullTx { - return NewTransaction(tx, *rcp) + return txs, nil } - return h, nil + return hashes, nil }, ) l := s.logger.With().Str("subscription-id", string(sub.ID)).Logger() @@ -158,17 +168,22 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (* // Logs creates a subscription that fires for all new log that match the given filter criteria. func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (*rpc.Subscription, error) { - if len(criteria.Topics) > maxTopics { - return nil, errExceedMaxTopics - } - if len(criteria.Addresses) > maxAddresses { - return nil, errExceedMaxAddresses + filter, err := logs.NewFilterCriteria(criteria.Addresses, criteria.Topics) + if err != nil { + return nil, fmt.Errorf("failed to crete log subscription filter: %w", err) } sub, err := s.newSubscription( ctx, s.logsBroadcaster, func(ctx context.Context, height uint64) (interface{}, error) { + if criteria.ToBlock != nil && height > criteria.ToBlock.Uint64() { + return nil, nil + } + if criteria.FromBlock != nil && height < criteria.FromBlock.Uint64() { + return nil, nil + } + block, err := s.blocks.GetByHeight(height) if err != nil { if errors.Is(err, storageErrs.ErrNotFound) { @@ -183,12 +198,9 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( return nil, err } - // convert from the API type - f := logs.FilterCriteria{ - Addresses: criteria.Addresses, - Topics: criteria.Topics, - } - return logs.NewIDFilter(id, f, s.blocks, s.receipts).Match() + // todo change this to height filter so we don't have to get the same block twice, once for height and then for id + + return logs.NewIDFilter(id, *filter, s.blocks, s.receipts).Match() }, ) l := s.logger.With().Str("subscription-id", string(sub.ID)).Logger() @@ -198,6 +210,7 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( } l.Info().Msg("new logs subscription created") + return sub, nil } @@ -273,7 +286,6 @@ func streamData( } case err := <-rpcSub.Err(): - // todo maybe handle nil err, this is when client disconnects unexpectedly l.Debug().Err(err).Msg("client unsubscribed") return case <-notifier.Closed(): diff --git a/models/block.go b/models/block.go index 415793aa..ef06007b 100644 --- a/models/block.go +++ b/models/block.go @@ -2,12 +2,21 @@ package models import ( "fmt" + "math/big" "github.com/onflow/cadence" "github.com/onflow/flow-go/fvm/evm/types" "github.com/onflow/go-ethereum/common" ) +var ( + SafeBlockNumber = big.NewInt(-4) + FinalizedBlockNumber = big.NewInt(-3) + LatestBlockNumber = big.NewInt(-2) + PendingBlockNumber = big.NewInt(-1) + EarliestBlockNumber = big.NewInt(0) +) + // decodeBlock takes a cadence event that contains executed block payload and // decodes it into the Block type. func decodeBlock(event cadence.Event) (*types.Block, error) { diff --git a/models/mocks/Engine.go b/models/mocks/Engine.go index 2954041b..2127b294 100644 --- a/models/mocks/Engine.go +++ b/models/mocks/Engine.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -17,6 +17,10 @@ type Engine struct { func (_m *Engine) Done() <-chan struct{} { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Done") + } + var r0 <-chan struct{} if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { r0 = rf() @@ -33,6 +37,10 @@ func (_m *Engine) Done() <-chan struct{} { func (_m *Engine) Ready() <-chan struct{} { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Ready") + } + var r0 <-chan struct{} if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { r0 = rf() @@ -49,6 +57,10 @@ func (_m *Engine) Ready() <-chan struct{} { func (_m *Engine) Run(ctx context.Context) error { ret := _m.Called(ctx) + if len(ret) == 0 { + panic("no return value specified for Run") + } + var r0 error if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) @@ -64,13 +76,12 @@ func (_m *Engine) Stop() { _m.Called() } -type mockConstructorTestingTNewEngine interface { +// NewEngine creates a new instance of Engine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEngine(t interface { mock.TestingT Cleanup(func()) -} - -// NewEngine creates a new instance of Engine. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewEngine(t mockConstructorTestingTNewEngine) *Engine { +}) *Engine { mock := &Engine{} mock.Mock.Test(t) diff --git a/services/logs/filter.go b/services/logs/filter.go index 5ef41e72..5c26884e 100644 --- a/services/logs/filter.go +++ b/services/logs/filter.go @@ -1,15 +1,24 @@ package logs import ( + "errors" "fmt" "math/big" - "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/go-ethereum/common" gethTypes "github.com/onflow/go-ethereum/core/types" "golang.org/x/exp/slices" + + errs "github.com/onflow/flow-evm-gateway/api/errors" + "github.com/onflow/flow-evm-gateway/storage" ) +// The maximum number of topic criteria allowed +const maxTopics = 4 + +// The maximum number of addresses allowed +const maxAddresses = 6 + // FilterCriteria for log filtering. // Address of the contract emitting the log. // Topics that match the log topics, following the format: @@ -23,6 +32,20 @@ type FilterCriteria struct { Topics [][]common.Hash } +func NewFilterCriteria(addresses []common.Address, topics [][]common.Hash) (*FilterCriteria, error) { + if len(topics) > maxTopics { + return nil, fmt.Errorf("max topics exceeded, only %d allowed", maxTopics) + } + if len(addresses) > maxAddresses { + return nil, fmt.Errorf("max addresses exceeded, only %d allowed", maxAddresses) + } + + return &FilterCriteria{ + Addresses: addresses, + Topics: topics, + }, nil +} + // RangeFilter matches all the indexed logs within the range defined as // start and end block height. The start must be strictly smaller or equal than end value. type RangeFilter struct { @@ -36,8 +59,13 @@ func NewRangeFilter( criteria FilterCriteria, receipts storage.ReceiptIndexer, ) (*RangeFilter, error) { - if start.Cmp(&end) > 0 { - return nil, fmt.Errorf("invalid start and end block height, start must be smaller or equal than end value") + // check if both start and end don't have special values (negative values representing last block etc.) + // if so, make sure that beginning number is not bigger than end + if start.Cmp(big.NewInt(0)) > 0 && end.Cmp(big.NewInt(0)) > 0 && start.Cmp(&end) > 0 { + return nil, errors.Join( + errs.ErrInvalid, + fmt.Errorf("start block number must be smaller or equal to end block number"), + ) } return &RangeFilter{ @@ -82,6 +110,8 @@ func (r *RangeFilter) Match() ([]*gethTypes.Log, error) { return logs, nil } +// todo add HeightFilter + // IDFilter matches all logs against the criteria found in a single block identified // by the provided block ID. type IDFilter struct { @@ -128,61 +158,27 @@ func (i *IDFilter) Match() ([]*gethTypes.Log, error) { return logs, nil } -// StreamFilter matches all the logs against the criteria from the receipt channel. -type StreamFilter struct { - criteria FilterCriteria - receiptStream chan *gethTypes.Receipt -} - -func NewStreamFilter(criteria FilterCriteria, receipts chan *gethTypes.Receipt) *StreamFilter { - return &StreamFilter{ - criteria: criteria, - receiptStream: receipts, - } -} - -func (s *StreamFilter) Match() (<-chan *gethTypes.Log, error) { - logs := make(chan *gethTypes.Log) - - go func() { - defer close(logs) - - for { - receipt, ok := <-s.receiptStream - if !ok { - return // exit the goroutine if receiptStream is closed - } - - if !bloomMatch(receipt.Bloom, s.criteria) { - continue - } - - for _, log := range receipt.Logs { - if exactMatch(log, s.criteria) { - logs <- log - } - } - } - }() - - return logs, nil -} - // exactMatch checks the topic and address values of the log match the filter exactly. func exactMatch(log *gethTypes.Log, criteria FilterCriteria) bool { - // todo support no address matching all - // check criteria doesn't have more topics than the log, but it can have less due to wildcards if len(criteria.Topics) > len(log.Topics) { return false } - for _, sub := range criteria.Topics { - for _, topic := range sub { - if !slices.Contains(log.Topics, topic) { - return false - } + for i, sub := range criteria.Topics { + // wildcard matching all + if len(sub) == 0 { + continue } + + if !slices.Contains(sub, log.Topics[i]) { + return false + } + } + + // no addresses is a wildcard to match all + if len(criteria.Addresses) == 0 { + return true } return slices.Contains(criteria.Addresses, log.Address) diff --git a/storage/mocks/AccountIndexer.go b/storage/mocks/AccountIndexer.go index fe241914..e949b1ee 100644 --- a/storage/mocks/AccountIndexer.go +++ b/storage/mocks/AccountIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -22,6 +22,10 @@ type AccountIndexer struct { func (_m *AccountIndexer) GetBalance(address *common.Address) (*big.Int, error) { ret := _m.Called(address) + if len(ret) == 0 { + panic("no return value specified for GetBalance") + } + var r0 *big.Int var r1 error if rf, ok := ret.Get(0).(func(*common.Address) (*big.Int, error)); ok { @@ -48,6 +52,10 @@ func (_m *AccountIndexer) GetBalance(address *common.Address) (*big.Int, error) func (_m *AccountIndexer) GetNonce(address *common.Address) (uint64, error) { ret := _m.Called(address) + if len(ret) == 0 { + panic("no return value specified for GetNonce") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(*common.Address) (uint64, error)); ok { @@ -72,6 +80,10 @@ func (_m *AccountIndexer) GetNonce(address *common.Address) (uint64, error) { func (_m *AccountIndexer) Update(tx models.Transaction, receipt *types.Receipt) error { ret := _m.Called(tx, receipt) + if len(ret) == 0 { + panic("no return value specified for Update") + } + var r0 error if rf, ok := ret.Get(0).(func(models.Transaction, *types.Receipt) error); ok { r0 = rf(tx, receipt) @@ -82,13 +94,12 @@ func (_m *AccountIndexer) Update(tx models.Transaction, receipt *types.Receipt) return r0 } -type mockConstructorTestingTNewAccountIndexer interface { +// NewAccountIndexer creates a new instance of AccountIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAccountIndexer(t interface { mock.TestingT Cleanup(func()) -} - -// NewAccountIndexer creates a new instance of AccountIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewAccountIndexer(t mockConstructorTestingTNewAccountIndexer) *AccountIndexer { +}) *AccountIndexer { mock := &AccountIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/BlockIndexer.go b/storage/mocks/BlockIndexer.go index ee82d7cd..9a77ee5f 100644 --- a/storage/mocks/BlockIndexer.go +++ b/storage/mocks/BlockIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -18,6 +18,10 @@ type BlockIndexer struct { func (_m *BlockIndexer) GetByHeight(height uint64) (*types.Block, error) { ret := _m.Called(height) + if len(ret) == 0 { + panic("no return value specified for GetByHeight") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(uint64) (*types.Block, error)); ok { @@ -44,6 +48,10 @@ func (_m *BlockIndexer) GetByHeight(height uint64) (*types.Block, error) { func (_m *BlockIndexer) GetByID(ID common.Hash) (*types.Block, error) { ret := _m.Called(ID) + if len(ret) == 0 { + panic("no return value specified for GetByID") + } + var r0 *types.Block var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (*types.Block, error)); ok { @@ -70,6 +78,10 @@ func (_m *BlockIndexer) GetByID(ID common.Hash) (*types.Block, error) { func (_m *BlockIndexer) GetCadenceHeight(evmHeight uint64) (uint64, error) { ret := _m.Called(evmHeight) + if len(ret) == 0 { + panic("no return value specified for GetCadenceHeight") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(uint64) (uint64, error)); ok { @@ -94,6 +106,10 @@ func (_m *BlockIndexer) GetCadenceHeight(evmHeight uint64) (uint64, error) { func (_m *BlockIndexer) GetHeightByID(ID common.Hash) (uint64, error) { ret := _m.Called(ID) + if len(ret) == 0 { + panic("no return value specified for GetHeightByID") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (uint64, error)); ok { @@ -118,6 +134,10 @@ func (_m *BlockIndexer) GetHeightByID(ID common.Hash) (uint64, error) { func (_m *BlockIndexer) LatestCadenceHeight() (uint64, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for LatestCadenceHeight") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func() (uint64, error)); ok { @@ -142,6 +162,10 @@ func (_m *BlockIndexer) LatestCadenceHeight() (uint64, error) { func (_m *BlockIndexer) LatestEVMHeight() (uint64, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for LatestEVMHeight") + } + var r0 uint64 var r1 error if rf, ok := ret.Get(0).(func() (uint64, error)); ok { @@ -166,6 +190,10 @@ func (_m *BlockIndexer) LatestEVMHeight() (uint64, error) { func (_m *BlockIndexer) SetLatestCadenceHeight(height uint64) error { ret := _m.Called(height) + if len(ret) == 0 { + panic("no return value specified for SetLatestCadenceHeight") + } + var r0 error if rf, ok := ret.Get(0).(func(uint64) error); ok { r0 = rf(height) @@ -180,6 +208,10 @@ func (_m *BlockIndexer) SetLatestCadenceHeight(height uint64) error { func (_m *BlockIndexer) Store(cadenceHeight uint64, block *types.Block) error { ret := _m.Called(cadenceHeight, block) + if len(ret) == 0 { + panic("no return value specified for Store") + } + var r0 error if rf, ok := ret.Get(0).(func(uint64, *types.Block) error); ok { r0 = rf(cadenceHeight, block) @@ -190,13 +222,12 @@ func (_m *BlockIndexer) Store(cadenceHeight uint64, block *types.Block) error { return r0 } -type mockConstructorTestingTNewBlockIndexer interface { +// NewBlockIndexer creates a new instance of BlockIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBlockIndexer(t interface { mock.TestingT Cleanup(func()) -} - -// NewBlockIndexer creates a new instance of BlockIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewBlockIndexer(t mockConstructorTestingTNewBlockIndexer) *BlockIndexer { +}) *BlockIndexer { mock := &BlockIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/ReceiptIndexer.go b/storage/mocks/ReceiptIndexer.go index 638b369b..1f94595f 100644 --- a/storage/mocks/ReceiptIndexer.go +++ b/storage/mocks/ReceiptIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -20,6 +20,10 @@ type ReceiptIndexer struct { func (_m *ReceiptIndexer) BloomsForBlockRange(start *big.Int, end *big.Int) ([]*types.Bloom, []*big.Int, error) { ret := _m.Called(start, end) + if len(ret) == 0 { + panic("no return value specified for BloomsForBlockRange") + } + var r0 []*types.Bloom var r1 []*big.Int var r2 error @@ -55,6 +59,10 @@ func (_m *ReceiptIndexer) BloomsForBlockRange(start *big.Int, end *big.Int) ([]* func (_m *ReceiptIndexer) GetByBlockHeight(height *big.Int) ([]*types.Receipt, error) { ret := _m.Called(height) + if len(ret) == 0 { + panic("no return value specified for GetByBlockHeight") + } + var r0 []*types.Receipt var r1 error if rf, ok := ret.Get(0).(func(*big.Int) ([]*types.Receipt, error)); ok { @@ -81,6 +89,10 @@ func (_m *ReceiptIndexer) GetByBlockHeight(height *big.Int) ([]*types.Receipt, e func (_m *ReceiptIndexer) GetByTransactionID(ID common.Hash) (*types.Receipt, error) { ret := _m.Called(ID) + if len(ret) == 0 { + panic("no return value specified for GetByTransactionID") + } + var r0 *types.Receipt var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (*types.Receipt, error)); ok { @@ -107,6 +119,10 @@ func (_m *ReceiptIndexer) GetByTransactionID(ID common.Hash) (*types.Receipt, er func (_m *ReceiptIndexer) Store(receipt *types.Receipt) error { ret := _m.Called(receipt) + if len(ret) == 0 { + panic("no return value specified for Store") + } + var r0 error if rf, ok := ret.Get(0).(func(*types.Receipt) error); ok { r0 = rf(receipt) @@ -117,13 +133,12 @@ func (_m *ReceiptIndexer) Store(receipt *types.Receipt) error { return r0 } -type mockConstructorTestingTNewReceiptIndexer interface { +// NewReceiptIndexer creates a new instance of ReceiptIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReceiptIndexer(t interface { mock.TestingT Cleanup(func()) -} - -// NewReceiptIndexer creates a new instance of ReceiptIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewReceiptIndexer(t mockConstructorTestingTNewReceiptIndexer) *ReceiptIndexer { +}) *ReceiptIndexer { mock := &ReceiptIndexer{} mock.Mock.Test(t) diff --git a/storage/mocks/TransactionIndexer.go b/storage/mocks/TransactionIndexer.go index 0273b005..f3d23e8a 100644 --- a/storage/mocks/TransactionIndexer.go +++ b/storage/mocks/TransactionIndexer.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.21.4. DO NOT EDIT. +// Code generated by mockery v2.38.0. DO NOT EDIT. package mocks @@ -18,6 +18,10 @@ type TransactionIndexer struct { func (_m *TransactionIndexer) Get(ID common.Hash) (models.Transaction, error) { ret := _m.Called(ID) + if len(ret) == 0 { + panic("no return value specified for Get") + } + var r0 models.Transaction var r1 error if rf, ok := ret.Get(0).(func(common.Hash) (models.Transaction, error)); ok { @@ -44,6 +48,10 @@ func (_m *TransactionIndexer) Get(ID common.Hash) (models.Transaction, error) { func (_m *TransactionIndexer) Store(tx models.Transaction) error { ret := _m.Called(tx) + if len(ret) == 0 { + panic("no return value specified for Store") + } + var r0 error if rf, ok := ret.Get(0).(func(models.Transaction) error); ok { r0 = rf(tx) @@ -54,13 +62,12 @@ func (_m *TransactionIndexer) Store(tx models.Transaction) error { return r0 } -type mockConstructorTestingTNewTransactionIndexer interface { +// NewTransactionIndexer creates a new instance of TransactionIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewTransactionIndexer(t interface { mock.TestingT Cleanup(func()) -} - -// NewTransactionIndexer creates a new instance of TransactionIndexer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -func NewTransactionIndexer(t mockConstructorTestingTNewTransactionIndexer) *TransactionIndexer { +}) *TransactionIndexer { mock := &TransactionIndexer{} mock.Mock.Test(t) diff --git a/storage/pebble/receipts.go b/storage/pebble/receipts.go index a46986fa..d0753809 100644 --- a/storage/pebble/receipts.go +++ b/storage/pebble/receipts.go @@ -229,9 +229,8 @@ func (r *Receipts) BloomsForBlockRange(start, end *big.Int) ([]*gethTypes.Bloom, } }() - caps := end.Div(end, start).Uint64() // max capacity for slices - blooms := make([]*gethTypes.Bloom, 0, caps) - heights := make([]*big.Int, 0, caps) + blooms := make([]*gethTypes.Bloom, 0) + heights := make([]*big.Int, 0) for iterator.First(); iterator.Valid(); iterator.Next() { val, err := iterator.ValueAndErr() diff --git a/tests/e2e_web3js_test.go b/tests/e2e_web3js_test.go index 1c21f5ae..0efa1f3f 100644 --- a/tests/e2e_web3js_test.go +++ b/tests/e2e_web3js_test.go @@ -1,9 +1,10 @@ package tests import ( + "testing" + "github.com/onflow/cadence" "github.com/onflow/flow-emulator/emulator" - "testing" ) func TestWeb3_E2E(t *testing.T) { @@ -36,6 +37,10 @@ func TestWeb3_E2E(t *testing.T) { runWeb3Test(t, "eth_streaming_test") }) + t.Run("streaming of entities and subscription with filters", func(t *testing.T) { + runWeb3Test(t, "eth_streaming_filters_test") + }) + t.Run("batch run transactions", func(t *testing.T) { runWeb3TestWithSetup(t, "eth_batch_retrieval_test", func(emu emulator.Emulator) error { tx1, err := cadence.NewString("f9015880808301e8488080b901086060604052341561000f57600080fd5b60eb8061001d6000396000f300606060405260043610603f576000357c0100000000000000000000000000000000000000000000000000000000900463ffffffff168063c6888fa1146044575b600080fd5b3415604e57600080fd5b606260048080359060200190919050506078565b6040518082815260200191505060405180910390f35b60007f24abdb5865df5079dcc5ac590ff6f01d5c16edbc5fab4e195d9febd1114503da600783026040518082815260200191505060405180910390a16007820290509190505600a165627a7a7230582040383f19d9f65246752244189b02f56e8d0980ed44e7a56c0b200458caad20bb002982052fa09c05a7389284dc02b356ec7dee8a023c5efd3a9d844fa3c481882684b0640866a057e96d0a71a857ed509bb2b7333e78b2408574b8cc7f51238f25c58812662653") diff --git a/tests/web3js/eth_batch_retrieval_test.js b/tests/web3js/eth_batch_retrieval_test.js index db3b556c..1235fa06 100644 --- a/tests/web3js/eth_batch_retrieval_test.js +++ b/tests/web3js/eth_batch_retrieval_test.js @@ -1,8 +1,8 @@ const { assert } = require('chai') const conf = require('./config') -const helpers = require('./helpers') const web3 = conf.web3 + it('retrieve batch transactions', async() => { let latestHeight = await web3.eth.getBlockNumber() let block = await web3.eth.getBlock(latestHeight) @@ -19,4 +19,4 @@ it('retrieve batch transactions', async() => { assert.equal(block.hash, callTx.blockHash) assert.equal(0, callTx.type) assert.equal(1, callTx.transactionIndex) -}) \ No newline at end of file +}) diff --git a/tests/web3js/eth_streaming_filters.js b/tests/web3js/eth_streaming_filters.js deleted file mode 100644 index 6716ab07..00000000 --- a/tests/web3js/eth_streaming_filters.js +++ /dev/null @@ -1,6 +0,0 @@ -// todo add test that uses different log filters -// - specified address and all topics -// - not specified address (all logs) -// - specified address and single topic -// - specified address and multiple topics -// - combinations of from / to blocks diff --git a/tests/web3js/eth_streaming_filters_test.js b/tests/web3js/eth_streaming_filters_test.js new file mode 100644 index 00000000..38cdbc4a --- /dev/null +++ b/tests/web3js/eth_streaming_filters_test.js @@ -0,0 +1,178 @@ +const helpers = require("./helpers"); +const {Web3} = require("web3"); +const conf = require("./config"); +const {assert} = require("chai"); +const storageABI = require('../fixtures/storageABI.json') + +const timeout = 20 + +async function assertFilterLogs(subscription, expectedLogs) { + let allLogs = [] + return new Promise((res, rej) => { + subscription.on("error", err => { + rej(err) + }) + + subscription.on("data", async data => { + allLogs.push(data) + + if (allLogs.length !== expectedLogs.length) { + return + } + + // if logs matches expected logs length, + // wait for a bit and re-check, so there's no new logs that came in after delay + await new Promise(res => setTimeout(() => res(), 1000)) + assert.equal(allLogs.length, expectedLogs.length) + + console.log("## unsubscribe", subscription.id) + + subscription.unsubscribe() + + // after we receive all logs, we make sure each received + // logs matches the entry in the expected log by all the values + for (let i = 0; i < expectedLogs.length; i++) { + let expected = expectedLogs[i] + + // if we have ABI decoded event values as return values + if (allLogs[i].returnValues != undefined) { + for (const key in expected) { + let expectedVal = expected[key] + assert.isDefined(allLogs[i].returnValues) + let actualVal = allLogs[i].returnValues[key] + assert.equal(actualVal, expectedVal) + } + } else { // otherwise compare by position + let position = 2 // we start at 2 since first two topics are address and event name + for (const key in expected) { + let expectedVal = expected[key] + assert.isDefined(allLogs[i].topics) + // convert big int hex values + let actualVal = BigInt(allLogs[i].topics[position]) + if (actualVal & (1n << 255n)) { + actualVal -= (1n << 256n) // convert as signed int256 number + } + + assert.equal(actualVal, expectedVal) + position++ + } + } + } + + res(allLogs) + }) + }) +} + +it('streaming of logs using filters', async() => { + let contractDeployment = await helpers.deployContract("storage") + let contractAddress = contractDeployment.receipt.contractAddress + + // we deploy another contract to use for filtering by address + let contractDeployment2 = await helpers.deployContract("storage") + let contractAddress2 = contractDeployment2.receipt.contractAddress + + let repeatA = 10 + const testValues = [ + { numA: 1, numB: 2 }, + { numA: -1, numB: -2 }, + { numA: repeatA, numB: 200 }, + { numA: repeatA, numB: 300 }, + { numA: repeatA, numB: 400 }, + ] + + let ws = new Web3("ws://127.0.0.1:8545") + + let storageContract = new ws.eth.Contract(storageABI, contractAddress) + let storageContract2 = new ws.eth.Contract(storageABI, contractAddress); + let calculatedEvent = storageContract.events.Calculated + + let rawSubscribe = filter => ws.eth.subscribe('logs', filter) + + // wait for subscription for a bit + await new Promise((res, rej) => setTimeout(() => res(), 500)) + + let allTests = [ + // stream all events + assertFilterLogs(calculatedEvent({ }), testValues), + // stream only one event that has numA set to -1 + assertFilterLogs( + calculatedEvent({ filter: {numA: -1} }), + testValues.filter(v => v.numA === -1) + ), + // stream only events that have numB set to 200 + assertFilterLogs( + calculatedEvent({ filter: {numB: 200} }), + testValues.filter(v => v.numB === 200) + ), + // stream events that have numA set to 10 and numB set to 200 + assertFilterLogs( + calculatedEvent({ filter: {numA: repeatA, numB: 200} }), + testValues.filter(v => v.numB === 200 && v.numA === repeatA) + ), + // stream only events that have numA value set to 10 + assertFilterLogs( + calculatedEvent({ filter: {numA: repeatA} }), + testValues.filter(v => v.numA === repeatA) + ), + // stream events that have numB 200 OR 300 value + assertFilterLogs( + calculatedEvent({ filter: {numB: [200, 300]} }), + testValues.filter(v => v.numB === 200 || v.numB === 300) + ), + + // we also test the raw subscriptions since they allow for specifying raw values + + // stream all events by any contract, we have two same contracts, so we duplicate expected values and in order + assertFilterLogs( + await rawSubscribe({}), + testValues.concat(testValues) + ), + + // return all values by only a single contract + assertFilterLogs( + await rawSubscribe({ address: contractAddress }), + testValues + ), + + // get all events and handle from block provided + assertFilterLogs( + await rawSubscribe({ address: contractAddress, fromBlock: "0x0" }), + testValues, + ) + ] + + + // wait for subscription for a bit + await new Promise((res, rej) => setTimeout(() => res(), 500)) + + // produce events by submitting transactions + for (const { numA, numB } of testValues) { + let res = await helpers.signAndSend({ + from: conf.eoa.address, + to: contractAddress, + data: storageContract.methods.sum(numA, numB).encodeABI(), + gas: 1000000, + gasPrice: 0 + }) + assert.equal(res.receipt.status, conf.successStatus) + } + + for (const { numA, numB } of testValues) { + let res = await helpers.signAndSend({ + from: conf.eoa.address, + to: contractAddress2, + data: storageContract2.methods.sum(numA, numB).encodeABI(), + gas: 1000000, + gasPrice: 0 + }) + assert.equal(res.receipt.status, conf.successStatus) + } + + await Promise.all(allTests) + + // make sure we can also get logs streamed after the transactions were executed (historic) + await assertFilterLogs(await rawSubscribe({ address: contractAddress, fromBlock: "0x0" }), testValues) + + process.exit(0) +}).timeout(timeout*1000) diff --git a/tests/web3js/eth_streaming_test.js b/tests/web3js/eth_streaming_test.js index b0cfe8ed..6c3d94ec 100644 --- a/tests/web3js/eth_streaming_test.js +++ b/tests/web3js/eth_streaming_test.js @@ -58,7 +58,7 @@ it('streaming of logs using filters', async() => { let logCount = 0 let logHashes = [] // subscribe to events being emitted by a deployed contract and bellow transaction interactions - let doneLogs = new Promise(async (res, rej) => { + let doneAddressLogs = new Promise(async (res, rej) => { let subLog = await ws.eth.subscribe('logs', { address: contractAddress, }) @@ -90,7 +90,7 @@ it('streaming of logs using filters', async() => { } // wait for all events to be received - await Promise.all([doneTxs, doneBlocks, doneLogs]) + await Promise.all([doneTxs, doneBlocks, doneAddressLogs]) // check that transaction hashes we received when submitting transactions above // match array of transaction hashes received from events for blocks and txs