Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve event testing #240

Merged
merged 30 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f487e82
remove uneeded
sideninja May 10, 2024
4850c98
add table tests for filter events
sideninja May 10, 2024
a769e53
add filter factory with checks
sideninja May 10, 2024
7233526
move errs and limits
sideninja May 10, 2024
d5300eb
add support for backfilling log events
sideninja May 10, 2024
4fb16be
remove the backfill and add check on filter from to on stream
sideninja May 10, 2024
5cc5e71
fix a bug in OR filter
sideninja May 10, 2024
a46e9d7
add todo
sideninja May 10, 2024
8ef5567
added more test combinations for filters
sideninja May 10, 2024
d17aea8
remove todos
sideninja May 10, 2024
984cf65
add todo
sideninja May 10, 2024
af1683e
match all no addresses
sideninja May 13, 2024
ea9f047
add special block values
sideninja May 13, 2024
d334d7d
handle get blocks with null values
sideninja May 13, 2024
575240f
add more custom tests
sideninja May 13, 2024
f508afb
improve filters with special values
sideninja May 13, 2024
9c842b1
use latest as special val
sideninja May 13, 2024
912269a
remove unneeded complexity
sideninja May 13, 2024
449f84a
add more tests
sideninja May 13, 2024
44ea942
match id again
sideninja May 13, 2024
bf68300
temp disable after test
sideninja May 13, 2024
6aa05bf
support multiple transactions in stream of pending txs
sideninja May 13, 2024
1df50b7
add test to fetch historic logs
sideninja May 14, 2024
441a3ce
clean up test
sideninja May 14, 2024
c71e144
clean up test
sideninja May 14, 2024
27b99d3
Merge branch 'main' into gregor/event-testing
sideninja May 14, 2024
8473656
update mocks
sideninja May 15, 2024
c80a52d
mod tidy
sideninja May 15, 2024
b18e802
Merge branch 'main' into gregor/event-testing
sideninja May 15, 2024
2a03886
Merge branch 'main' into gregor/event-testing
sideninja May 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
_ "embed"
"errors"
"fmt"
"math/big"

evmTypes "github.com/onflow/flow-go/fvm/evm/types"
"github.com/onflow/go-ethereum/common"
Expand Down Expand Up @@ -435,31 +436,50 @@ func (b *BlockChainAPI) GetLogs(
ctx context.Context,
criteria filters.FilterCriteria,
) ([]*types.Log, error) {

filter := logs.FilterCriteria{
Addresses: criteria.Addresses,
Topics: criteria.Topics,
}

// if filter provided specific block ID
if criteria.BlockHash != nil {
return logs.
NewIDFilter(*criteria.BlockHash, filter, b.blocks, b.receipts).
Match()
}
if criteria.FromBlock != nil && criteria.ToBlock != nil {
f, err := logs.
NewRangeFilter(*criteria.FromBlock, *criteria.ToBlock, filter, b.receipts)
if err != nil {
return nil, err
}

return f.Match()
// otherwise we use the block range as the filter

// assign default values to latest block number, unless provided
from := models.LatestBlockNumber
if criteria.FromBlock != nil {
from = criteria.FromBlock
}
to := models.LatestBlockNumber
if criteria.ToBlock != nil {
to = criteria.ToBlock
}

return nil, errors.Join(
errs.ErrInvalid,
fmt.Errorf("must provide either block ID or 'from' and 'to' block nubmers, to filter events"),
)
l, err := b.blocks.LatestEVMHeight()
if err != nil {
return nil, err
}
latest := big.NewInt(int64(l))

// if special value, use latest block number
if from.Cmp(models.EarliestBlockNumber) < 0 {
from = latest
}
if to.Cmp(models.EarliestBlockNumber) < 0 {
to = latest
}

f, err := logs.NewRangeFilter(*from, *to, filter, b.receipts)
if err != nil {
return nil, err
}

return f.Match()
}

// GetTransactionCount returns the number of transactions the given address
Expand Down
13 changes: 0 additions & 13 deletions api/models.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package api

import (
"errors"

errs "github.com/onflow/flow-evm-gateway/api/errors"
"github.com/onflow/flow-evm-gateway/models"

Expand All @@ -11,17 +9,6 @@ import (
"github.com/onflow/go-ethereum/core/types"
)

var (
errExceedMaxTopics = errors.New("exceed max topics")
errExceedMaxAddresses = errors.New("exceed max addresses")
)

// The maximum number of topic criteria allowed, vm.LOG4 - vm.LOG0
const maxTopics = 4

// The maximum number of addresses allowed
const maxAddresses = 6

// TransactionArgs represents the arguments to construct a new transaction
// or a message call.
type TransactionArgs struct {
Expand Down
94 changes: 53 additions & 41 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ import (
"fmt"
"reflect"

errs "github.com/onflow/flow-evm-gateway/api/errors"
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/services/logs"
"github.com/onflow/flow-evm-gateway/storage"
storageErrs "github.com/onflow/flow-evm-gateway/storage/errors"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/subscription"
"github.com/onflow/go-ethereum/common"
"github.com/onflow/go-ethereum/common/hexutil"
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"

errs "github.com/onflow/flow-evm-gateway/api/errors"
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/services/logs"
"github.com/onflow/flow-evm-gateway/storage"
storageErrs "github.com/onflow/flow-evm-gateway/storage/errors"
)

// subscriptionBufferLimit is a constant that represents the buffer limit for subscriptions.
Expand Down Expand Up @@ -111,39 +113,47 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*
return nil, fmt.Errorf("failed to get block at height: %d: %w", height, err)
}

// todo once a block can contain multiple transactions this needs to be refactored
if len(block.TransactionHashes) != 1 {
return nil, fmt.Errorf("block contains more than a single transaction")
}
hash := block.TransactionHashes[0]
hashes := make([]common.Hash, len(block.TransactionHashes))
txs := make([]*Transaction, len(block.TransactionHashes))

tx, err := s.transactions.Get(hash)
if err != nil {
if errors.Is(err, storageErrs.ErrNotFound) {
// make sure to wrap in not found error as the streamer expects it
return nil, errors.Join(subscription.ErrBlockNotReady, err)
for i, hash := range block.TransactionHashes {

tx, err := s.transactions.Get(hash)
if err != nil {
if errors.Is(err, storageErrs.ErrNotFound) {
// make sure to wrap in not found error as the streamer expects it
return nil, errors.Join(subscription.ErrBlockNotReady, err)
}
return nil, fmt.Errorf("failed to get tx with hash: %s at height: %d: %w", hash, height, err)
}
return nil, fmt.Errorf("failed to get tx with hash: %s at height: %d: %w", hash, height, err)
}

rcp, err := s.receipts.GetByTransactionID(hash)
if err != nil {
if errors.Is(err, storageErrs.ErrNotFound) {
// make sure to wrap in not found error as the streamer expects it
return nil, errors.Join(subscription.ErrBlockNotReady, err)
rcp, err := s.receipts.GetByTransactionID(hash)
if err != nil {
if errors.Is(err, storageErrs.ErrNotFound) {
// make sure to wrap in not found error as the streamer expects it
return nil, errors.Join(subscription.ErrBlockNotReady, err)
}
return nil, fmt.Errorf("failed to get receipt with hash: %s at height: %d: %w", hash, height, err)
}
return nil, fmt.Errorf("failed to get receipt with hash: %s at height: %d: %w", hash, height, err)
}

h, err := tx.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute tx hash: %w", err)
h, err := tx.Hash()
if err != nil {
return nil, fmt.Errorf("failed to compute tx hash: %w", err)
}

t, err := NewTransaction(tx, *rcp)
if err != nil {
return nil, err
}

hashes[i] = h
txs[i] = t
}

if fullTx != nil && *fullTx {
return NewTransaction(tx, *rcp)
return txs, nil
}
return h, nil
return hashes, nil
},
)
l := s.logger.With().Str("subscription-id", string(sub.ID)).Logger()
Expand All @@ -158,17 +168,22 @@ func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*

// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (*rpc.Subscription, error) {
if len(criteria.Topics) > maxTopics {
return nil, errExceedMaxTopics
}
if len(criteria.Addresses) > maxAddresses {
return nil, errExceedMaxAddresses
filter, err := logs.NewFilterCriteria(criteria.Addresses, criteria.Topics)
if err != nil {
return nil, fmt.Errorf("failed to crete log subscription filter: %w", err)
}

sub, err := s.newSubscription(
ctx,
s.logsBroadcaster,
func(ctx context.Context, height uint64) (interface{}, error) {
if criteria.ToBlock != nil && height > criteria.ToBlock.Uint64() {
return nil, nil
}
if criteria.FromBlock != nil && height < criteria.FromBlock.Uint64() {
return nil, nil
}

block, err := s.blocks.GetByHeight(height)
if err != nil {
if errors.Is(err, storageErrs.ErrNotFound) {
Expand All @@ -183,12 +198,9 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
return nil, err
}

// convert from the API type
f := logs.FilterCriteria{
Addresses: criteria.Addresses,
Topics: criteria.Topics,
}
return logs.NewIDFilter(id, f, s.blocks, s.receipts).Match()
// todo change this to height filter so we don't have to get the same block twice, once for height and then for id

return logs.NewIDFilter(id, *filter, s.blocks, s.receipts).Match()
},
)
l := s.logger.With().Str("subscription-id", string(sub.ID)).Logger()
Expand All @@ -198,6 +210,7 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
}

l.Info().Msg("new logs subscription created")

return sub, nil
}

Expand Down Expand Up @@ -273,7 +286,6 @@ func streamData(
}

case err := <-rpcSub.Err():
// todo maybe handle nil err, this is when client disconnects unexpectedly
l.Debug().Err(err).Msg("client unsubscribed")
return
case <-notifier.Closed():
Expand Down
9 changes: 9 additions & 0 deletions models/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,21 @@ package models

import (
"fmt"
"math/big"

"github.com/onflow/cadence"
"github.com/onflow/flow-go/fvm/evm/types"
"github.com/onflow/go-ethereum/common"
)

var (
SafeBlockNumber = big.NewInt(-4)
FinalizedBlockNumber = big.NewInt(-3)
LatestBlockNumber = big.NewInt(-2)
PendingBlockNumber = big.NewInt(-1)
EarliestBlockNumber = big.NewInt(0)
)

// decodeBlock takes a cadence event that contains executed block payload and
// decodes it into the Block type.
func decodeBlock(event cadence.Event) (*types.Block, error) {
Expand Down
23 changes: 17 additions & 6 deletions models/mocks/Engine.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading