Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-spork client access support #230

Merged
merged 61 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
fc36a20
add cross-spork client type
sideninja May 6, 2024
0eaa3d4
add cross-spork methods
sideninja May 6, 2024
e47ad92
add support to config for providing the hosts values for previous sporks
sideninja May 6, 2024
b887c37
setup cross-spork client
sideninja May 6, 2024
cf5e6ed
change name
sideninja May 6, 2024
4ca5093
add logging
sideninja May 6, 2024
ca26af2
check correct client is used
sideninja May 6, 2024
b291868
add logger
sideninja May 6, 2024
21ced97
use test logger
sideninja May 6, 2024
b42c16a
test boundaries
sideninja May 6, 2024
a7ee4db
fix test
sideninja May 6, 2024
cbb71e4
add existing err test
sideninja May 6, 2024
f6fdfbc
update readme
sideninja May 6, 2024
aee635c
add example
sideninja May 6, 2024
8479fcb
add example
sideninja May 6, 2024
d068c59
improve docs
sideninja May 6, 2024
1874e82
add past spork method
sideninja May 7, 2024
156026d
change the client to base client with options
sideninja May 7, 2024
5d7a81e
define new block event type
sideninja May 7, 2024
44078d8
introduce backfilling ability
sideninja May 7, 2024
819a84d
add factories for block events
sideninja May 7, 2024
969e9b5
add logging
sideninja May 7, 2024
0e08f32
get latest height in the spork
sideninja May 7, 2024
ccfb9e4
add logger
sideninja May 7, 2024
8dba9c2
update ingestion engine with subscriber api change
sideninja May 7, 2024
b6a680a
improve error handling
sideninja May 7, 2024
b182d07
improve err handling
sideninja May 7, 2024
d55ac59
add log
sideninja May 7, 2024
1556554
update mocks
sideninja May 7, 2024
183496a
update mocks
sideninja May 7, 2024
c37a942
fix make file
sideninja May 7, 2024
375b36c
fix test api change
sideninja May 7, 2024
15ebaff
fix test api change
sideninja May 7, 2024
4ea7f9a
fix test api change
sideninja May 7, 2024
d207e5b
fix test api change
sideninja May 7, 2024
56d6d19
fix test api change
sideninja May 7, 2024
fad7b9e
fix spork check
sideninja May 7, 2024
9a3ce74
add mock
sideninja May 7, 2024
67aceef
mod tidy
sideninja May 7, 2024
873113f
change to getting a block header only
sideninja May 8, 2024
10094b2
add commments
sideninja May 8, 2024
ffac9c3
add while context error is nil
sideninja May 8, 2024
0949ca4
typo
sideninja May 8, 2024
45e49dc
update go-sdk to specific version
sideninja May 8, 2024
689797c
change to client interface
sideninja May 8, 2024
6e4dca5
change client not requiring height provided, also change client being…
sideninja May 8, 2024
02d85ad
add test for subscriber
sideninja May 8, 2024
db9f8d7
change type
sideninja May 8, 2024
de05844
implement typed mock
sideninja May 9, 2024
3f910aa
test order of blocks
sideninja May 9, 2024
03b4666
change to accept spork clients as part of the factory
sideninja May 15, 2024
752af31
move spork client to requester package
sideninja May 15, 2024
ac66503
remove test that overlap with subscription test
sideninja May 15, 2024
b133733
Merge branch 'main' into gregor/cross-spork
sideninja May 15, 2024
e2b9adb
update go sdk
sideninja May 16, 2024
3cadf70
cache spork boundaries
sideninja May 17, 2024
e600315
Merge branch 'main' into gregor/cross-spork
sideninja May 17, 2024
68d9dff
update flow-go v0.35.6-crescendo-preview.22-atree-inlining
sideninja May 17, 2024
f1a1df5
mod tidy
sideninja May 17, 2024
1269289
Merge branch 'main' into gregor/cross-spork
sideninja May 20, 2024
14d972f
replace for loop
sideninja May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ generate:
mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks
mockery --dir=storage --name=TransactionIndexer --output=storage/mocks
mockery --dir=storage --name=AccountIndexer --output=storage/mocks
mockery --all --dir=services/events --output=services/events/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks

.PHONY: ci
Expand Down
37 changes: 20 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,23 +90,26 @@ it should return:

The application can be configured using the following flags at runtime:

| Flag | Default Value | Description |
|---------------------------|------------------|------------------------------------------------------------------------------------------------------------------------|
| `--database-dir` | `./db` | Path to the directory for the database. |
| `--rpc-host` | `localhost` | Host for the JSON RPC API server. |
| `--rpc-port` | `8545` | Port for the JSON RPC API server. |
| `--access-node-grpc-host` | `localhost:3569` | Host to the Flow access node (AN) gRPC API. |
| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). |
| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). |
| `--coinbase` | (required) | Coinbase address to use for fee collection. |
| `--gas-price` | `1` | Static gas price used for EVM transactions. |
| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. |
| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. |
| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. |
| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') |
| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second |
| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client |

| Flag | Default Value | Description |
|-----------------------------|------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| `--database-dir` | `./db` | Path to the directory for the database. |
| `--rpc-host` | `localhost` | Host for the JSON RPC API server. |
| `--rpc-port` | `8545` | Port for the JSON RPC API server. |
| `--access-node-grpc-host` | `localhost:3569` | Host to the current spork Flow access node (AN) gRPC API. |
| `--access-node-spork-hosts` | | Previous spork AN hosts, defined following the schema: `{latest height}@{host}` as comma separated list (e.g. `"200@host-1.com,300@host2.com"`) |
sideninja marked this conversation as resolved.
Show resolved Hide resolved
sideninja marked this conversation as resolved.
Show resolved Hide resolved
| `--evm-network-id` | `testnet` | EVM network ID (options: `testnet`, `mainnet`). |
| `--flow-network-id` | `emulator` | Flow network ID (options: `emulator`, `previewnet`). |
| `--coinbase` | (required) | Coinbase address to use for fee collection. |
| `--init-cadence-height` | 0 | Define the Cadence block height at which to start the indexing. |
| `--gas-price` | `1` | Static gas price used for EVM transactions. |
| `--coa-address` | (required) | Flow address that holds COA account used for submitting transactions. |
| `--coa-key` | (required) | *WARNING*: Do not use this flag in production! Private key value for the COA address used for submitting transactions. |
| `--coa-key-file` | | File path that contains JSON array of COA keys used in key-rotation mechanism, this is exclusive with `coa-key` flag. |
| `--coa-resource-create` | `false` | Auto-create the COA resource in the Flow COA account provided if one doesn't exist. |
| `--log-level` | `debug` | Define verbosity of the log output ('debug', 'info', 'error') |
| `--stream-limit` | 10 | Rate-limits the events sent to the client within one second |
| `--stream-timeout` | 3sec | Defines the timeout in seconds the server waits for the event to be sent to the client |
| `--filter-expiry` | `5m` | Filter defines the time it takes for an idle filter to expire |

## Getting Started

Expand Down
36 changes: 27 additions & 9 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@ import (
"errors"
"fmt"

"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
broadcast "github.com/onflow/flow-go/engine"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/api"
"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
Expand All @@ -13,11 +20,6 @@ import (
"github.com/onflow/flow-evm-gateway/storage"
storageErrs "github.com/onflow/flow-evm-gateway/storage/errors"
"github.com/onflow/flow-evm-gateway/storage/pebble"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
broadcast "github.com/onflow/flow-go/engine"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"
)

func Start(ctx context.Context, cfg *config.Config) error {
Expand Down Expand Up @@ -107,7 +109,23 @@ func startIngestion(
) error {
logger.Info().Msg("starting up event ingestion")

client, err := grpc.NewClient(cfg.AccessNodeGRPCHost)
currentSporkClient, err := grpc.NewClient(cfg.AccessNodeHost)
if err != nil {
return fmt.Errorf("failed to create client connection for host: %s, with error: %w", cfg.AccessNodeHost, err)
}

// if we provided access node previous spork hosts add them to the client
pastSporkClients := make([]access.Client, len(cfg.AccessNodePreviousSporkHosts))
for i, host := range cfg.AccessNodePreviousSporkHosts {
grpcClient, err := grpc.NewClient(host)
if err != nil {
return fmt.Errorf("failed to create client connection for host: %s, with error: %w", host, err)
}

pastSporkClients[i] = grpcClient
}

client, err := requester.NewCrossSporkClient(currentSporkClient, pastSporkClients, logger)
if err != nil {
return err
}
Expand All @@ -123,7 +141,7 @@ func startIngestion(
}

// make sure the provided block to start the indexing can be loaded
_, err = client.GetBlockByHeight(context.Background(), latestCadenceHeight)
_, err = client.GetBlockHeaderByHeight(context.Background(), latestCadenceHeight)
if err != nil {
return fmt.Errorf("failed to get provided cadence height: %w", err)
}
Expand All @@ -134,7 +152,7 @@ func startIngestion(
Uint64("missed-heights", blk.Height-latestCadenceHeight).
Msg("indexing cadence height information")

subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID)
subscriber := ingestion.NewRPCSubscriber(client, cfg.FlowNetworkID, logger)
engine := ingestion.NewEventIngestionEngine(
subscriber,
blocks,
Expand Down Expand Up @@ -180,7 +198,7 @@ func startServer(

srv := api.NewHTTPServer(l, rpc.DefaultHTTPTimeouts)

client, err := grpc.NewClient(cfg.AccessNodeGRPCHost)
client, err := grpc.NewClient(cfg.AccessNodeHost)
sideninja marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand Down
17 changes: 13 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math/big"
"os"
"strings"
"time"

"github.com/goccy/go-json"
Expand All @@ -29,8 +30,10 @@ const LiveNetworkInitCadenceHeght = uint64(1)
type Config struct {
// DatabaseDir is where the database should be stored.
DatabaseDir string
// AccessNodeGRPCHost defines the Flow network AN host.
AccessNodeGRPCHost string
// AccessNodeHost defines the current spork Flow network AN host.
AccessNodeHost string
// AccessNodePreviousSporkHosts contains a list of the ANs hosts for each spork
AccessNodePreviousSporkHosts []string
// GRPCPort for the RPC API server
RPCPort int
// GRPCHost for the RPC API server
Expand Down Expand Up @@ -71,15 +74,16 @@ type Config struct {

func FromFlags() (*Config, error) {
cfg := &Config{}
var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry string
var evmNetwork, coinbase, gas, coa, key, keysPath, flowNetwork, logLevel, filterExpiry, accessSporkHosts string
var streamTimeout int
var initHeight, forceStartHeight uint64

// parse from flags
flag.StringVar(&cfg.DatabaseDir, "database-dir", "./db", "Path to the directory for the database")
flag.StringVar(&cfg.RPCHost, "rpc-host", "", "Host for the RPC API server")
flag.IntVar(&cfg.RPCPort, "rpc-port", 8545, "Port for the RPC API server")
flag.StringVar(&cfg.AccessNodeGRPCHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API")
flag.StringVar(&cfg.AccessNodeHost, "access-node-grpc-host", "localhost:3569", "Host to the flow access node gRPC API")
flag.StringVar(&accessSporkHosts, "access-node-spork-hosts", "", `Previous spork AN hosts, defined following the schema: {host1},{host2} as a comma separated list (e.g. "host-1.com,host2.com")`)
flag.StringVar(&evmNetwork, "evm-network-id", "previewnet", "EVM network ID (previewnet, testnet, mainnet)")
flag.StringVar(&flowNetwork, "flow-network-id", "flow-emulator", "Flow network ID (flow-emulator, flow-previewnet)")
flag.StringVar(&coinbase, "coinbase", "", "Coinbase address to use for fee collection")
Expand Down Expand Up @@ -184,6 +188,11 @@ func FromFlags() (*Config, error) {
}
cfg.FilterExpiry = exp

if accessSporkHosts != "" {
heightHosts := strings.Split(accessSporkHosts, ",")
cfg.AccessNodePreviousSporkHosts = append(cfg.AccessNodePreviousSporkHosts, heightHosts...)
}

if forceStartHeight != 0 {
cfg.ForceStartCadenceHeight = forceStartHeight
}
Expand Down
18 changes: 18 additions & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,21 @@ func (c *CadenceEvents) CadenceHeight() uint64 {
func (c *CadenceEvents) Length() int {
return len(c.events.Events)
}

// BlockEvents is a wrapper around events streamed, and it also contains an error
type BlockEvents struct {
Events *CadenceEvents
Err error
}

func NewBlockEvents(events flow.BlockEvents) BlockEvents {
return BlockEvents{
Events: NewCadenceEvents(events),
}
}

func NewBlockEventsError(err error) BlockEvents {
return BlockEvents{
Err: err,
}
}
sideninja marked this conversation as resolved.
Show resolved Hide resolved
56 changes: 14 additions & 42 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package ingestion

import (
"context"
"errors"
"fmt"

"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/fvm/evm/types"
gethTypes "github.com/onflow/go-ethereum/core/types"
Expand Down Expand Up @@ -96,45 +94,21 @@ func (e *Engine) Run(ctx context.Context) error {

e.log.Info().Uint64("start-cadence-height", latestCadence).Msg("starting ingestion")

events, errs, err := e.subscriber.Subscribe(ctx, latestCadence)
if err != nil {
return fmt.Errorf("failed to subscribe to events: %w", err)
}

e.status.MarkReady()

for {
select {
case <-ctx.Done():
e.log.Info().Msg("event ingestion received done signal")
return nil

case blockEvents, ok := <-events:
if !ok {
if ctx.Err() != nil {
return ctx.Err()
}
return models.ErrDisconnected
}

err = e.processEvents(blockEvents)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
}

case err, ok := <-errs:
if !ok {
if ctx.Err() != nil {
return ctx.Err()
}

return models.ErrDisconnected
}

return errors.Join(err, models.ErrDisconnected)
for events := range e.subscriber.Subscribe(ctx, latestCadence) {
if events.Err != nil {
return fmt.Errorf("failure in event subscription: %w", events.Err)
}

err = e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
}
}

return nil
sideninja marked this conversation as resolved.
Show resolved Hide resolved
}

// processEvents converts the events to block and transactions and indexes them.
Expand All @@ -149,14 +123,12 @@ func (e *Engine) Run(ctx context.Context) error {
// https://github.com/onflow/flow-go/blob/master/fvm/evm/types/events.go
//
// Any error is unexpected and fatal.
func (e *Engine) processEvents(blockEvents flow.BlockEvents) error {
func (e *Engine) processEvents(events *models.CadenceEvents) error {
e.log.Debug().
Uint64("cadence-height", blockEvents.Height).
Int("cadence-event-length", len(blockEvents.Events)).
Uint64("cadence-height", events.CadenceHeight()).
Int("cadence-event-length", events.Length()).
Msg("received new cadence evm events")

events := models.NewCadenceEvents(blockEvents)

// if heartbeat interval with no data still update the cadence height
if events.Empty() {
if err := e.blocks.SetLatestCadenceHeight(events.CadenceHeight()); err != nil {
Expand Down
Loading
Loading