From 6a67af3c02c4617c0bb8e81134388768e9c3ace1 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 11 Nov 2022 10:18:38 -0800 Subject: [PATCH 01/18] wg/errchan handling, block batch size config param for ndexer, .default.env update, indexer folder file restructuring, nonce handling for message processor --- go.mod | 1 + go.sum | 5 + packages/relayer/.default.env | 11 +- packages/relayer/cli/cli.go | 10 + .../relayer/indexer/filter_then_subscribe.go | 196 ++---------------- packages/relayer/indexer/handle_event.go | 103 +++++++++ .../indexer/handle_no_events_in_batch.go | 39 ++++ .../indexer/handle_no_events_remaining.go | 38 ++++ packages/relayer/indexer/service.go | 10 + packages/relayer/indexer/subscribe.go | 31 +++ packages/relayer/indexer/watch_errors.go | 12 ++ packages/relayer/message/process_message.go | 75 +++++-- packages/relayer/message/processor.go | 13 ++ .../relayer/proof/encoded_signal_proof.go | 3 - 14 files changed, 338 insertions(+), 209 deletions(-) create mode 100644 packages/relayer/indexer/handle_event.go create mode 100644 packages/relayer/indexer/handle_no_events_in_batch.go create mode 100644 packages/relayer/indexer/handle_no_events_remaining.go create mode 100644 packages/relayer/indexer/subscribe.go create mode 100644 packages/relayer/indexer/watch_errors.go diff --git a/go.mod b/go.mod index ff09077ea3..455a4d2ca4 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( gorm.io/datatypes v1.0.7 gorm.io/driver/mysql v1.4.3 gorm.io/gorm v1.24.0 + modernc.org/libc v1.16.19 ) require ( diff --git a/go.sum b/go.sum index cedf6ecb41..822adf591e 100644 --- a/go.sum +++ b/go.sum @@ -723,6 +723,7 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1045,6 +1046,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1306,8 +1308,11 @@ lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= modernc.org/cc/v3 v3.36.1 h1:CICrjwr/1M4+6OQ4HJZ/AHxjcwe67r5vPUF518MkO8A= modernc.org/ccgo/v3 v3.16.8 h1:G0QNlTqI5uVgczBWfGKs7B++EPwCfXPWGD2MdeKloDs= modernc.org/libc v1.16.19 h1:S8flPn5ZeXx6iw/8yNa986hwTQDrY8RXU7tObZuAozo= +modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU= +modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= modernc.org/sqlite v1.18.1 h1:ko32eKt3jf7eqIkCgPAeHMBXw3riNSLhl2f3loEF7o8= modernc.org/strutil v1.1.2 h1:iFBDH6j1Z0bN/Q9udJnnFoFpENA4252qe/7/5woE5MI= diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index 3af9498791..5712a35728 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -4,7 +4,10 @@ MYSQL_PASSWORD=root MYSQL_DATABASE=relayer MYSQL_HOST=localhost:3306 RELAYER_ECDSA_KEY= -L1_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 -L2_BRIDGE_ADDRESS=0xa566811E9E63e4F573Df89d5453bB89F239F7e10 -L1_RPC_URL="wss://eth-goerli.g.alchemy.com/v2/bPAA5rQ42Zoo4ts9TYnTB2t0cuc5lf7_" -L2_RPC_URL="wss://rinkeby-light.eth.linkpool.io/ws" \ No newline at end of file +L1_BRIDGE_ADDRESS=0xB12d6112D64B213880Fa53F815aF1F29c91CaCe9 +L2_BRIDGE_ADDRESS=0x4eA05A0f7713333AeB4bB73F17aEeFE146CF13E3 +L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D +L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4 +L1_RPC_URL=ws://34.132.67.34:8546 +L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz +BLOCK_BATCH_SIZE=2 \ No newline at end of file diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index bd464e9acf..cdb55fcfe7 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strconv" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" @@ -33,6 +34,8 @@ var ( "MYSQL_HOST", "RELAYER_ECDSA_KEY", } + + defaultBlockBatchSize = 2 ) // TODO: implement `resync` mode to wipe DB and restart from block 0 @@ -108,6 +111,11 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) return nil, nil, err } + blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE")) + if err != nil { + blockBatchSize = defaultBlockBatchSize + } + indexers := make([]*indexer.Service, 0) if layer == L1 || layer == Both { @@ -123,6 +131,8 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) BridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), + + BlockBatchSize: uint64(blockBatchSize), }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 675bda481d..6b48068652 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -2,15 +2,12 @@ package indexer import ( "context" - "encoding/json" - "math/big" + "sync" "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikochain/taiko-mono/packages/relayer" - "github.com/taikochain/taiko-mono/packages/relayer/contracts" ) var ( @@ -21,6 +18,8 @@ var ( // up to the latest block. As it goes, it tries to process messages. // When it catches up, it then starts to Subscribe to latest events as they come in. func (svc *Service) FilterThenSubscribe(ctx context.Context) error { + go svc.watchErrors() + chainID, err := svc.ethClient.ChainID(ctx) if err != nil { return errors.Wrap(err, "s.ethClient.ChainID()") @@ -35,38 +34,26 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { return errors.Wrap(err, "s.blockRepo.GetLatestBlock()") } - log.Infof("latest processed block: %v", latestProcessedBlock.Height) - - if err != nil { - return errors.Wrap(err, "bridge.FilterMessageSent") - } - header, err := svc.ethClient.HeaderByNumber(ctx, nil) if err != nil { return errors.Wrap(err, "s.ethClient.HeaderByNumber") } - // if we have already done the latest block, exit early - // TODO: call SubscribeMessageSent, as we can now just watch the chain for new blocks + // if we have already done the latest block, subscribe to new changes if latestProcessedBlock.Height == header.Number.Uint64() { return svc.subscribe(ctx, chainID) } - const batchSize = 1000 - svc.processingBlock = latestProcessedBlock log.Infof("getting events between %v and %v in batches of %v", svc.processingBlock.Height, header.Number.Int64(), - batchSize, + svc.blockBatchSize, ) - // todo: parallelize/concurrently catch up. don't think we need to do this in order. - // use WaitGroup. - // we get a timeout/EOF if we don't batch. - for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += batchSize { - var end uint64 = svc.processingBlock.Height + batchSize + for i := latestProcessedBlock.Height; i < header.Number.Uint64(); i += svc.blockBatchSize { + var end uint64 = svc.processingBlock.Height + svc.blockBatchSize // if the end of the batch is greater than the latest block number, set end // to the latest block number if end > header.Number.Uint64() { @@ -94,12 +81,14 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { log.Info("found events") + wg := &sync.WaitGroup{} + for { - if err := svc.handleEvent(ctx, chainID, events.Event); err != nil { - return errors.Wrap(err, "svc.handleEvent") - } + go svc.handleEvent(ctx, wg, svc.errChan, chainID, events.Event) if !events.Next() { + wg.Wait() + if err := svc.handleNoEventsRemaining(ctx, chainID, events); err != nil { return errors.Wrap(err, "svc.handleNoEventsRemaining") } @@ -122,164 +111,3 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { return svc.subscribe(ctx, chainID) } - -// subscribe subscribes to latest events -func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { - sink := make(chan *contracts.BridgeMessageSent) - - sub, err := svc.bridge.WatchMessageSent(&bind.WatchOpts{}, sink, nil) - if err != nil { - return errors.Wrap(err, "svc.bridge.WatchMessageSent") - } - - defer sub.Unsubscribe() - - for { - select { - case err := <-sub.Err(): - return err - case event := <-sink: - if err := svc.handleEvent(ctx, chainID, event); err != nil { - return errors.Wrap(err, "svc.handleEvent") - } - } - } -} - -// handleEvent handles an individual MessageSent event -func (svc *Service) handleEvent(ctx context.Context, chainID *big.Int, event *contracts.BridgeMessageSent) error { - log.Infof("event found. signal:%v for block: %v", common.Hash(event.Signal).Hex(), event.Raw.BlockNumber) - - marshaled, err := json.Marshal(event) - if err != nil { - return errors.Wrap(err, "json.Marshal(event)") - } - - raw := event.Raw - - // handle chain re-org by checking Removed property, no need to - // return error, just continue and do not process. - if raw.Removed { - return nil - } - - // save event to database for later processing outside - // the indexer - log.Info("saving event to database") - - eventStatus := relayer.EventStatusNew - // if gasLimit is 0, relayer can not process this. - if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 { - eventStatus = relayer.EventStatusNewOnlyOwner - } - - e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ - Name: eventName, - Data: string(marshaled), - ChainID: chainID, - Status: eventStatus, - }) - if err != nil { - return errors.Wrap(err, "svc.eventRepo.Save") - } - - // we can not process, exit early - if eventStatus == relayer.EventStatusNewOnlyOwner && event.Message.Owner != svc.relayerAddr { - log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") - return nil - } - - messageStatus, err := svc.destBridge.GetMessageStatus(nil, event.Signal) - if err != nil { - return errors.Wrap(err, "svc.destBridge.GetMessageStatus") - } - - if messageStatus == uint8(relayer.EventStatusNew) { - log.Info("message not processed yet, attempting processing") - // process the message - if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { - // TODO: handle error here, update in eventRepo, continue on in processing - return errors.Wrap(err, "svc.processMessage") - } - } - - // if the block number of the event is higher than the block we are processing, - // we can now consider that previous block processed. save it to the DB - // and bump the block number. - if raw.BlockNumber > svc.processingBlock.Height { - log.Info("raw blockNumber > processingBlock.height") - log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: svc.processingBlock.Height, - Hash: common.HexToHash(svc.processingBlock.Hash), - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - svc.processingBlock = &relayer.Block{ - Height: raw.BlockNumber, - Hash: raw.BlockHash.Hex(), - } - } - - return nil -} - -// handleNoEventsRemaining is used when the batch had events, but is now finished, and we need to -// update the latest block processed -func (svc *Service) handleNoEventsRemaining( - ctx context.Context, - chainID *big.Int, - events *contracts.BridgeMessageSentIterator, -) error { - log.Info("no events remaining to be processed") - - if events.Error() != nil { - return errors.Wrap(events.Error(), "events.Error") - } - - log.Infof("saving new latest processed block to DB: %v", events.Event.Raw.BlockNumber) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: events.Event.Raw.BlockNumber, - Hash: events.Event.Raw.BlockHash, - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - return nil -} - -// handleNoEventsInBatch is used when an entire batch call has no events in the entire response, -// and we need to update the latest block processed -func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, blockNumber int64) error { - log.Infof("no events in batch") - - header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(blockNumber)) - if err != nil { - return errors.Wrap(err, "svc.ethClient.HeaderByNumber") - } - - log.Infof("setting last processed block to height: %v, hash: %v", blockNumber, header.Hash().Hex()) - - if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ - Height: uint64(blockNumber), - Hash: header.Hash(), - ChainID: chainID, - EventName: eventName, - }); err != nil { - return errors.Wrap(err, "svc.blockRepo.Save") - } - - svc.processingBlock = &relayer.Block{ - Height: uint64(blockNumber), - Hash: header.Hash().Hex(), - } - - return nil -} diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go new file mode 100644 index 0000000000..f7d522ed6b --- /dev/null +++ b/packages/relayer/indexer/handle_event.go @@ -0,0 +1,103 @@ +package indexer + +import ( + "context" + "encoding/json" + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" +) + +// handleEvent handles an individual MessageSent event +func (svc *Service) handleEvent( + ctx context.Context, + wg *sync.WaitGroup, + errChan chan error, + chainID *big.Int, + event *contracts.BridgeMessageSent, +) { + if wg != nil { + wg.Add(1) + defer wg.Done() + } + + raw := event.Raw + + // handle chain re-org by checking Removed property, no need to + // return error, just continue and do not process. + if raw.Removed { + return + } + + eventStatus := relayer.EventStatusNew + // if gasLimit is 0, relayer can not process this. + if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 { + eventStatus = relayer.EventStatusNewOnlyOwner + } + + marshaled, err := json.Marshal(event) + if err != nil { + errChan <- errors.Wrap(err, "json.Marshal(event)") + } + + e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ + Name: eventName, + Data: string(marshaled), + ChainID: chainID, + Status: eventStatus, + }) + if err != nil { + errChan <- errors.Wrap(err, "svc.eventRepo.Save") + return + } + + // we can not process, exit early + if eventStatus == relayer.EventStatusNewOnlyOwner && event.Message.Owner != svc.relayerAddr { + log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") + errChan <- nil + } + + messageStatus, err := svc.destBridge.GetMessageStatus(nil, event.Signal) + if err != nil { + errChan <- errors.Wrap(err, "svc.destBridge.GetMessageStatus") + return + } + + if messageStatus == uint8(relayer.EventStatusNew) { + // ctx, cancelFunc := context.WithTimeout(ctx, 90*time.Second) + // defer cancelFunc() + log.Info("message not processed yet, attempting processing") + // process the message + if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { + errChan <- errors.Wrap(err, "svc.processMessage") + return + } + } + + // if the block number of the event is higher than the block we are processing, + // we can now consider that previous block processed. save it to the DB + // and bump the block number. + if raw.BlockNumber > svc.processingBlock.Height { + log.Infof("saving new latest processed block to DB: %v", raw.BlockNumber) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: svc.processingBlock.Height, + Hash: common.HexToHash(svc.processingBlock.Hash), + ChainID: chainID, + EventName: eventName, + }); err != nil { + errChan <- errors.Wrap(err, "svc.blockRepo.Save") + return + } + + svc.processingBlock = &relayer.Block{ + Height: raw.BlockNumber, + Hash: raw.BlockHash.Hex(), + } + } +} diff --git a/packages/relayer/indexer/handle_no_events_in_batch.go b/packages/relayer/indexer/handle_no_events_in_batch.go new file mode 100644 index 0000000000..d16a9df696 --- /dev/null +++ b/packages/relayer/indexer/handle_no_events_in_batch.go @@ -0,0 +1,39 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" +) + +// handleNoEventsInBatch is used when an entire batch call has no events in the entire response, +// and we need to update the latest block processed +func (svc *Service) handleNoEventsInBatch(ctx context.Context, chainID *big.Int, blockNumber int64) error { + log.Infof("no events in batch") + + header, err := svc.ethClient.HeaderByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + return errors.Wrap(err, "svc.ethClient.HeaderByNumber") + } + + log.Infof("setting last processed block to height: %v, hash: %v", blockNumber, header.Hash().Hex()) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: uint64(blockNumber), + Hash: header.Hash(), + ChainID: chainID, + EventName: eventName, + }); err != nil { + return errors.Wrap(err, "svc.blockRepo.Save") + } + + svc.processingBlock = &relayer.Block{ + Height: uint64(blockNumber), + Hash: header.Hash().Hex(), + } + + return nil +} diff --git a/packages/relayer/indexer/handle_no_events_remaining.go b/packages/relayer/indexer/handle_no_events_remaining.go new file mode 100644 index 0000000000..e125b29421 --- /dev/null +++ b/packages/relayer/indexer/handle_no_events_remaining.go @@ -0,0 +1,38 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/taikochain/taiko-mono/packages/relayer" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" +) + +// handleNoEventsRemaining is used when the batch had events, but is now finished, and we need to +// update the latest block processed +func (svc *Service) handleNoEventsRemaining( + ctx context.Context, + chainID *big.Int, + events *contracts.BridgeMessageSentIterator, +) error { + log.Info("no events remaining to be processed") + + if events.Error() != nil { + return errors.Wrap(events.Error(), "events.Error") + } + + log.Infof("saving new latest processed block to DB: %v", events.Event.Raw.BlockNumber) + + if err := svc.blockRepo.Save(relayer.SaveBlockOpts{ + Height: events.Event.Raw.BlockNumber, + Hash: events.Event.Raw.BlockHash, + ChainID: chainID, + EventName: eventName, + }); err != nil { + return errors.Wrap(err, "svc.blockRepo.Save") + } + + return nil +} diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index b8e0371342..6ed9ca3c69 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -32,6 +32,10 @@ type Service struct { processor *message.Processor relayerAddr common.Address + + errChan chan error + + blockBatchSize uint64 } type NewServiceOpts struct { @@ -45,6 +49,7 @@ type NewServiceOpts struct { BridgeAddress common.Address DestBridgeAddress common.Address DestTaikoAddress common.Address + BlockBatchSize uint64 } func NewService(opts NewServiceOpts) (*Service, error) { @@ -122,6 +127,7 @@ func NewService(opts NewServiceOpts) (*Service, error) { DestBridge: destBridge, EventRepo: opts.EventRepo, DestHeaderSyncer: destHeaderSyncer, + RelayerAddress: relayerAddr, }) if err != nil { return nil, errors.Wrap(err, "message.NewProcessor") @@ -139,5 +145,9 @@ func NewService(opts NewServiceOpts) (*Service, error) { processor: processor, relayerAddr: relayerAddr, + + errChan: make(chan error), + + blockBatchSize: opts.BlockBatchSize, }, nil } diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go new file mode 100644 index 0000000000..52bcac1b26 --- /dev/null +++ b/packages/relayer/indexer/subscribe.go @@ -0,0 +1,31 @@ +package indexer + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/pkg/errors" + "github.com/taikochain/taiko-mono/packages/relayer/contracts" +) + +// subscribe subscribes to latest events +func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { + sink := make(chan *contracts.BridgeMessageSent) + + sub, err := svc.bridge.WatchMessageSent(&bind.WatchOpts{}, sink, nil) + if err != nil { + return errors.Wrap(err, "svc.bridge.WatchMessageSent") + } + + defer sub.Unsubscribe() + + for { + select { + case err := <-sub.Err(): + svc.errChan <- err + case event := <-sink: + go svc.handleEvent(ctx, nil, svc.errChan, chainID, event) + } + } +} diff --git a/packages/relayer/indexer/watch_errors.go b/packages/relayer/indexer/watch_errors.go new file mode 100644 index 0000000000..8879af13df --- /dev/null +++ b/packages/relayer/indexer/watch_errors.go @@ -0,0 +1,12 @@ +package indexer + +import log "github.com/sirupsen/logrus" + +func (svc *Service) watchErrors() { + for { + select { + case err := <-svc.errChan: + log.Infof("svc.watchErrors: %v", err) + } + } +} diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index 5b895258b6..d9ca77575a 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -7,6 +7,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -51,19 +52,6 @@ func (p *Processor) ProcessMessage( log.Infof("processing message for signal: %v, key: %v", common.Hash(event.Signal).Hex(), key) - auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, event.Message.DestChainId) - if err != nil { - return errors.Wrap(err, "bind.NewKeyedTransactorWithChainID") - } - - auth.Context = ctx - - // uncomment to skip `eth_estimateGas` - auth.GasLimit = 2000000 - auth.GasPrice = new(big.Int).SetUint64(500000000) - - log.Infof("getting proof") - encodedSignalProof, err := p.prover.EncodedSignalProof(ctx, p.rpc, event.Raw.Address, key, latestSyncedHeader) if err != nil { return errors.Wrap(err, "p.prover.GetEncodedSignalProof") @@ -79,18 +67,15 @@ func (p *Processor) ProcessMessage( return errors.Wrap(err, "p.destBridge.IsMessageReceived") } - log.Infof("isMessageReceived: %v", received) - // message will fail when we try to process is // TODO: update status in db if !received { return errors.New("message not received") } - // process the message on the destination bridge. - tx, err := p.destBridge.ProcessMessage(auth, event.Message, encodedSignalProof) + tx, err := p.sendProcessMessageCall(ctx, event, encodedSignalProof) if err != nil { - return errors.Wrap(err, "p.destBridge.ProcessMessage") + return errors.Wrap(err, "p.sendProcessMessageCall") } log.Infof("waiting for tx hash %v", hex.EncodeToString(tx.Hash().Bytes())) @@ -116,3 +101,57 @@ func (p *Processor) ProcessMessage( return nil } + +func (p *Processor) sendProcessMessageCall( + ctx context.Context, + event *contracts.BridgeMessageSent, + proof []byte, +) (*types.Transaction, error) { + auth, err := bind.NewKeyedTransactorWithChainID(p.ecdsaKey, event.Message.DestChainId) + if err != nil { + return nil, errors.Wrap(err, "bind.NewKeyedTransactorWithChainID") + } + + auth.Context = ctx + + // uncomment to skip `eth_estimateGas` + auth.GasLimit = 2000000 + auth.GasPrice = new(big.Int).SetUint64(500000000) + + p.mu.Lock() + + err = p.getLatestNonce(ctx, auth) + if err != nil { + return nil, errors.New("p.getLatestNonce") + } + // process the message on the destination bridge. + tx, err := p.destBridge.ProcessMessage(auth, event.Message, proof) + if err != nil { + return nil, errors.Wrap(err, "p.destBridge.ProcessMessage") + } + + p.setLatestNonce(tx.Nonce()) + + p.mu.Unlock() + + return tx, nil +} + +func (p *Processor) setLatestNonce(nonce uint64) { + p.destNonce = nonce +} + +func (p *Processor) getLatestNonce(ctx context.Context, auth *bind.TransactOpts) error { + pendingNonce, err := p.destEthClient.PendingNonceAt(ctx, p.relayerAddr) + if err != nil { + return err + } + + if pendingNonce > p.destNonce { + p.setLatestNonce(pendingNonce) + } + + auth.Nonce = big.NewInt(int64(p.destNonce)) + + return nil +} diff --git a/packages/relayer/message/processor.go b/packages/relayer/message/processor.go index 65ad44fff1..ad8927ecdf 100644 --- a/packages/relayer/message/processor.go +++ b/packages/relayer/message/processor.go @@ -2,7 +2,9 @@ package message import ( "crypto/ecdsa" + "sync" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/ethclient" @@ -21,6 +23,11 @@ type Processor struct { destHeaderSyncer *contracts.IHeaderSync prover *proof.Prover + + mu *sync.Mutex + + destNonce uint64 + relayerAddr common.Address } type NewProcessorOpts struct { @@ -31,6 +38,7 @@ type NewProcessorOpts struct { DestBridge *contracts.Bridge EventRepo relayer.EventRepository DestHeaderSyncer *contracts.IHeaderSync + RelayerAddress common.Address } func NewProcessor(opts NewProcessorOpts) (*Processor, error) { @@ -70,5 +78,10 @@ func NewProcessor(opts NewProcessorOpts) (*Processor, error) { destEthClient: opts.DestETHClient, destBridge: opts.DestBridge, destHeaderSyncer: opts.DestHeaderSyncer, + + mu: &sync.Mutex{}, + + destNonce: 0, + relayerAddr: opts.RelayerAddress, }, nil } diff --git a/packages/relayer/proof/encoded_signal_proof.go b/packages/relayer/proof/encoded_signal_proof.go index 8737e6eecf..766568a04f 100644 --- a/packages/relayer/proof/encoded_signal_proof.go +++ b/packages/relayer/proof/encoded_signal_proof.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rlp" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" ) // EncodedSignalProof rlp and abi encodes the SignalProof struct expected by LibBridgeSignal @@ -43,8 +42,6 @@ func (p *Prover) EncodedSignalProof( return nil, errors.Wrap(err, "enoding.EncodeSignalProof") } - log.Infof("signalProof: %s", hexutil.Encode(encodedSignalProof)) - return encodedSignalProof, nil } From 48d0a2dedf5f0a78924b731511972150efe6a183 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 11 Nov 2022 10:35:00 -0800 Subject: [PATCH 02/18] mysql conn params + block batch size param --- packages/relayer/.default.env | 5 +++- packages/relayer/cli/cli.go | 44 ++++++++++++++++++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index 5712a35728..9b81a9af97 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -10,4 +10,7 @@ L1_TAIKO_ADDRESS=0x9b557777Be33A8A2fE6aF93E017A0d139B439E5D L2_TAIKO_ADDRESS=0x0027f309f7F94A8Efb6A3DBfb30827f1062803F4 L1_RPC_URL=ws://34.132.67.34:8546 L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz -BLOCK_BATCH_SIZE=2 \ No newline at end of file +BLOCK_BATCH_SIZE=2 +MYSQL_MAX_IDLE_CONNS= +MYSQL_MAX_OPEN_CONNS= +MYSQL_CONN_MAX_LIFETIME_IN_MS= \ No newline at end of file diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index cdb55fcfe7..82e22fade2 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strconv" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rpc" @@ -112,7 +113,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) } blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE")) - if err != nil { + if err != nil || blockBatchSize == 0 { blockBatchSize = defaultBlockBatchSize } @@ -154,6 +155,8 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) BridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), + + BlockBatchSize: uint64(blockBatchSize), }) if err != nil { log.Fatal(err) @@ -198,6 +201,45 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB { log.Fatal(err) } + sqlDB, err := db.DB() + if err != nil { + log.Fatal(err) + } + + var ( + defaultMaxIdleConns = 50 + defaultMaxOpenConns = 200 + defaultConnMaxLifetime = time.Hour + ) + + maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS")) + if err != nil || maxIdleConns == 0 { + maxIdleConns = defaultMaxIdleConns + } + + maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS")) + if err != nil || maxOpenConns == 0 { + maxOpenConns = defaultMaxOpenConns + } + + var maxLifetime time.Duration + + connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS")) + if err != nil || connMaxLifetime == 0 { + maxLifetime = defaultConnMaxLifetime + } else { + maxLifetime = time.Duration(connMaxLifetime) + } + + // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. + sqlDB.SetMaxIdleConns(maxIdleConns) + + // SetMaxOpenConns sets the maximum number of open connections to the database. + sqlDB.SetMaxOpenConns(maxOpenConns) + + // SetConnMaxLifetime sets the maximum amount of time a connection may be reused. + sqlDB.SetConnMaxLifetime(maxLifetime) + return db } From fd6f1086ac3ebb9ba092d4d5b6b8c8245fec4615 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 11 Nov 2022 10:40:56 -0800 Subject: [PATCH 03/18] wip tests fo indexer + go mod tidy --- go.mod | 1 - go.sum | 5 ----- packages/relayer/indexer/service_test.go | 17 +++++++++++++++++ packages/relayer/indexer/watch_errors_test.go | 16 ++++++++++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) create mode 100644 packages/relayer/indexer/watch_errors_test.go diff --git a/go.mod b/go.mod index 455a4d2ca4..ff09077ea3 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( gorm.io/datatypes v1.0.7 gorm.io/driver/mysql v1.4.3 gorm.io/gorm v1.24.0 - modernc.org/libc v1.16.19 ) require ( diff --git a/go.sum b/go.sum index 822adf591e..cedf6ecb41 100644 --- a/go.sum +++ b/go.sum @@ -723,7 +723,6 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= -github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rjeczalik/notify v0.9.1 h1:CLCKso/QK1snAlnhNR/CNvNiFU2saUtjV0bx3EwNeCE= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= @@ -1046,7 +1045,6 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1308,11 +1306,8 @@ lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= modernc.org/cc/v3 v3.36.1 h1:CICrjwr/1M4+6OQ4HJZ/AHxjcwe67r5vPUF518MkO8A= modernc.org/ccgo/v3 v3.16.8 h1:G0QNlTqI5uVgczBWfGKs7B++EPwCfXPWGD2MdeKloDs= modernc.org/libc v1.16.19 h1:S8flPn5ZeXx6iw/8yNa986hwTQDrY8RXU7tObZuAozo= -modernc.org/libc v1.16.19/go.mod h1:p7Mg4+koNjc8jkqwcoFBJx7tXkpj00G77X7A72jXPXA= modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= -modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= modernc.org/memory v1.1.1 h1:bDOL0DIDLQv7bWhP3gMvIrnoFw+Eo6F7a2QK9HPDiFU= -modernc.org/memory v1.1.1/go.mod h1:/0wo5ibyrQiaoUoH7f9D8dnglAmILJ5/cxZlRECf+Nw= modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4= modernc.org/sqlite v1.18.1 h1:ko32eKt3jf7eqIkCgPAeHMBXw3riNSLhl2f3loEF7o8= modernc.org/strutil v1.1.2 h1:iFBDH6j1Z0bN/Q9udJnnFoFpENA4252qe/7/5woE5MI= diff --git a/packages/relayer/indexer/service_test.go b/packages/relayer/indexer/service_test.go index 94ad7ad6e6..2a9deb9fab 100644 --- a/packages/relayer/indexer/service_test.go +++ b/packages/relayer/indexer/service_test.go @@ -14,6 +14,23 @@ import ( var dummyEcdsaKey = "8da4ef21b864d2cc526dbdb2a120bd2874c36c9d0a1fb7f8c63d7f7a8b41de8f" var dummyAddress = "0x63FaC9201494f0bd17B9892B9fae4d52fe3BD377" +func newTestService(t *testing.T) *Service { + svc, err := NewService(NewServiceOpts{ + EventRepo: &repo.EventRepository{}, + BlockRepo: &repo.BlockRepository{}, + RPCClient: &rpc.Client{}, + EthClient: ðclient.Client{}, + DestEthClient: ðclient.Client{}, + ECDSAKey: dummyEcdsaKey, + BridgeAddress: common.HexToAddress(dummyAddress), + DestBridgeAddress: common.HexToAddress(dummyAddress), + }) + + assert.Equal(t, nil, err) + + return svc +} + func Test_NewService(t *testing.T) { tests := []struct { name string diff --git a/packages/relayer/indexer/watch_errors_test.go b/packages/relayer/indexer/watch_errors_test.go new file mode 100644 index 0000000000..026d52db9a --- /dev/null +++ b/packages/relayer/indexer/watch_errors_test.go @@ -0,0 +1,16 @@ +package indexer + +import ( + "errors" + "testing" +) + +func Test_watchErrors(t *testing.T) { + svc := newTestService(t) + + go svc.watchErrors() + + err := errors.New("err") + + svc.errChan <- err +} From 5eb0d614d7ad60704d5c7ab3293769fb5917d8bf Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Fri, 11 Nov 2022 10:57:57 -0800 Subject: [PATCH 04/18] add gosimple linter --- packages/relayer/.golangci.yml | 1 + packages/relayer/cli/cli.go | 8 ++++---- packages/relayer/indexer/watch_errors.go | 7 ++----- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index d9a6b9dd7c..dd578fed41 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -19,6 +19,7 @@ linters: - gofmt - golint - gosec + - gosimple - lll - whitespace - wsl diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 82e22fade2..d6abea5979 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -209,7 +209,7 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB { var ( defaultMaxIdleConns = 50 defaultMaxOpenConns = 200 - defaultConnMaxLifetime = time.Hour + defaultConnMaxLifetime = 10 * time.Second ) maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS")) @@ -231,12 +231,12 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB { maxLifetime = time.Duration(connMaxLifetime) } - // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. - sqlDB.SetMaxIdleConns(maxIdleConns) - // SetMaxOpenConns sets the maximum number of open connections to the database. sqlDB.SetMaxOpenConns(maxOpenConns) + // SetMaxIdleConns sets the maximum number of connections in the idle connection pool. + sqlDB.SetMaxIdleConns(maxIdleConns) + // SetConnMaxLifetime sets the maximum amount of time a connection may be reused. sqlDB.SetConnMaxLifetime(maxLifetime) diff --git a/packages/relayer/indexer/watch_errors.go b/packages/relayer/indexer/watch_errors.go index 8879af13df..4dad58ce35 100644 --- a/packages/relayer/indexer/watch_errors.go +++ b/packages/relayer/indexer/watch_errors.go @@ -3,10 +3,7 @@ package indexer import log "github.com/sirupsen/logrus" func (svc *Service) watchErrors() { - for { - select { - case err := <-svc.errChan: - log.Infof("svc.watchErrors: %v", err) - } + for err := range svc.errChan { + log.Infof("svc.watchErrors: %v", err) } } From 6ac57c1651d69769281441528cc31dde43f6b1b1 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Mon, 14 Nov 2022 08:37:43 -0800 Subject: [PATCH 05/18] refactor handle_even with cleaner handle methods. test for canProcessMessage. error handling return in handle_event after filling errChan --- .../relayer/indexer/filter_then_subscribe.go | 3 + packages/relayer/indexer/handle_event.go | 76 +++++++++++++------ packages/relayer/indexer/handle_event_test.go | 73 ++++++++++++++++++ packages/relayer/indexer/watch_errors.go | 8 +- 4 files changed, 136 insertions(+), 24 deletions(-) create mode 100644 packages/relayer/indexer/handle_event_test.go diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 6b48068652..21c45cc080 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -83,6 +83,9 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context) error { wg := &sync.WaitGroup{} + // TODO: do we want to limit the number of possible goroutines in the waitgroup? + // right now it is dependent on how many events are found in the + // block range. the main concern would be exceeding DB connection pooling limits. for { go svc.handleEvent(ctx, wg, svc.errChan, chainID, events.Event) diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index f7d522ed6b..b48ec5e0f6 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -34,15 +34,15 @@ func (svc *Service) handleEvent( return } - eventStatus := relayer.EventStatusNew - // if gasLimit is 0, relayer can not process this. - if event.Message.GasLimit == nil || event.Message.GasLimit.Cmp(common.Big0) == 0 { - eventStatus = relayer.EventStatusNewOnlyOwner + eventStatus, err := svc.eventStatusFromSignal(ctx, event.Message.GasLimit, event.Signal) + if err != nil { + errChan <- errors.Wrap(err, "svc.eventStatusFromSignal") } marshaled, err := json.Marshal(event) if err != nil { errChan <- errors.Wrap(err, "json.Marshal(event)") + return } e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ @@ -56,27 +56,14 @@ func (svc *Service) handleEvent( return } - // we can not process, exit early - if eventStatus == relayer.EventStatusNewOnlyOwner && event.Message.Owner != svc.relayerAddr { - log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") - errChan <- nil - } - - messageStatus, err := svc.destBridge.GetMessageStatus(nil, event.Signal) - if err != nil { - errChan <- errors.Wrap(err, "svc.destBridge.GetMessageStatus") + if !canProcessMessage(ctx, eventStatus, event.Message.Owner, svc.relayerAddr) { return } - if messageStatus == uint8(relayer.EventStatusNew) { - // ctx, cancelFunc := context.WithTimeout(ctx, 90*time.Second) - // defer cancelFunc() - log.Info("message not processed yet, attempting processing") - // process the message - if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { - errChan <- errors.Wrap(err, "svc.processMessage") - return - } + // process the message + if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { + errChan <- errors.Wrap(err, "svc.processMessage") + return } // if the block number of the event is higher than the block we are processing, @@ -101,3 +88,48 @@ func (svc *Service) handleEvent( } } } + +func canProcessMessage( + ctx context.Context, + eventStatus relayer.EventStatus, + messageOwner common.Address, + relayerAddress common.Address, +) bool { + // we can not process, exit early + if eventStatus == relayer.EventStatusNewOnlyOwner { + if messageOwner != relayerAddress { + log.Infof("gasLimit == 0 and owner is not the current relayer key, can not process. continuing loop") + return false + } + + return true + } + + if eventStatus == relayer.EventStatusNew { + return true + } + + return false +} + +func (svc *Service) eventStatusFromSignal( + ctx context.Context, + gasLimit *big.Int, + signal [32]byte, +) (relayer.EventStatus, error) { + var eventStatus relayer.EventStatus + + // if gasLimit is 0, relayer can not process this. + if gasLimit == nil || gasLimit.Cmp(common.Big0) == 0 { + eventStatus = relayer.EventStatusNewOnlyOwner + } else { + messageStatus, err := svc.destBridge.GetMessageStatus(nil, signal) + if err != nil { + return 0, errors.Wrap(err, "svc.destBridge.GetMessageStatus") + } + + eventStatus = relayer.EventStatus(messageStatus) + } + + return eventStatus, nil +} diff --git a/packages/relayer/indexer/handle_event_test.go b/packages/relayer/indexer/handle_event_test.go new file mode 100644 index 0000000000..dba884f22d --- /dev/null +++ b/packages/relayer/indexer/handle_event_test.go @@ -0,0 +1,73 @@ +package indexer + +import ( + "context" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/taikochain/taiko-mono/packages/relayer" + "gopkg.in/go-playground/assert.v1" +) + +var ( + relayerAddr = common.HexToAddress("0x71C7656EC7ab88b098defB751B7401B5f6d8976F") +) + +func Test_canProcessMessage(t *testing.T) { + tests := []struct { + name string + eventStatus relayer.EventStatus + messageOwner common.Address + relayerAddress common.Address + want bool + }{ + { + "canProcess, eventStatusNew", + relayer.EventStatusNew, + relayerAddr, + relayerAddr, + true, + }, + { + "cantProcess, eventStatusDone", + relayer.EventStatusDone, + relayerAddr, + relayerAddr, + false, + }, + { + "cantProcess, eventStatusRetriable", + relayer.EventStatusRetriable, + relayerAddr, + relayerAddr, + false, + }, + { + "cantProcess, eventStatusNewOnlyOwner and relayer is not owner", + relayer.EventStatusNewOnlyOwner, + common.HexToAddress("0x"), + relayerAddr, + false, + }, + { + "canProcess, eventStatusOnlyOwner and relayer address is owner", + relayer.EventStatusNewOnlyOwner, + relayerAddr, + relayerAddr, + true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + canProcess := canProcessMessage( + context.Background(), + tt.eventStatus, + tt.messageOwner, + tt.relayerAddress, + ) + + assert.Equal(t, tt.want, canProcess) + }) + } +} diff --git a/packages/relayer/indexer/watch_errors.go b/packages/relayer/indexer/watch_errors.go index 4dad58ce35..a3c9bd7e28 100644 --- a/packages/relayer/indexer/watch_errors.go +++ b/packages/relayer/indexer/watch_errors.go @@ -3,7 +3,11 @@ package indexer import log "github.com/sirupsen/logrus" func (svc *Service) watchErrors() { - for err := range svc.errChan { - log.Infof("svc.watchErrors: %v", err) + // nolint: gosimple + for { + select { + case err := <-svc.errChan: + log.Infof("svc.watchErrors: %v", err) + } } } From 1308298f6a97e0083719209622f4c060c22da303 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Mon, 14 Nov 2022 08:42:22 -0800 Subject: [PATCH 06/18] subscribe return error --- packages/relayer/indexer/subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 52bcac1b26..f2ff45cccb 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -23,7 +23,7 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { for { select { case err := <-sub.Err(): - svc.errChan <- err + return errors.Wrap(err, "sub.Err()") case event := <-sink: go svc.handleEvent(ctx, nil, svc.errChan, chainID, event) } From 3c1c78c02d8c4c7d10d1dd9e2ff1248df6e0ac4c Mon Sep 17 00:00:00 2001 From: cyberhorsey <113397187+cyberhorsey@users.noreply.github.com> Date: Tue, 15 Nov 2022 07:17:19 -0800 Subject: [PATCH 07/18] Update packages/relayer/cli/cli.go Co-authored-by: David <104078303+davidtaikocha@users.noreply.github.com> --- packages/relayer/cli/cli.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index d6abea5979..ea6841a771 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -113,7 +113,7 @@ func makeIndexers(layer Layer, db *gorm.DB) ([]*indexer.Service, func(), error) } blockBatchSize, err := strconv.Atoi(os.Getenv("BLOCK_BATCH_SIZE")) - if err != nil || blockBatchSize == 0 { + if err != nil || blockBatchSize <= 0 { blockBatchSize = defaultBlockBatchSize } From 03583c2176c33c74e4f782dcef8c21c8ea62bcba Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:17:49 -0800 Subject: [PATCH 08/18] check negative ints for configs --- packages/relayer/cli/cli.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index eb0bf294f8..b332f1cae3 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -212,19 +212,19 @@ func openDBConnection(opts relayer.DBConnectionOpts) *gorm.DB { ) maxIdleConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_IDLE_CONNS")) - if err != nil || maxIdleConns == 0 { + if err != nil || maxIdleConns <= 0 { maxIdleConns = defaultMaxIdleConns } maxOpenConns, err := strconv.Atoi(os.Getenv("MYSQL_MAX_OPEN_CONNS")) - if err != nil || maxOpenConns == 0 { + if err != nil || maxOpenConns <= 0 { maxOpenConns = defaultMaxOpenConns } var maxLifetime time.Duration connMaxLifetime, err := strconv.Atoi(os.Getenv("MYSQL_CONN_MAX_LIFETIME_IN_MS")) - if err != nil || connMaxLifetime == 0 { + if err != nil || connMaxLifetime <= 0 { maxLifetime = defaultConnMaxLifetime } else { maxLifetime = time.Duration(connMaxLifetime) From 03b1aa330bdc8ebd8f959e422a81dba681a6b846 Mon Sep 17 00:00:00 2001 From: cyberhorsey <113397187+cyberhorsey@users.noreply.github.com> Date: Tue, 15 Nov 2022 07:18:55 -0800 Subject: [PATCH 09/18] Update packages/relayer/indexer/watch_errors.go Co-authored-by: David <104078303+davidtaikocha@users.noreply.github.com> --- packages/relayer/indexer/watch_errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/indexer/watch_errors.go b/packages/relayer/indexer/watch_errors.go index a3c9bd7e28..f0c8810f8c 100644 --- a/packages/relayer/indexer/watch_errors.go +++ b/packages/relayer/indexer/watch_errors.go @@ -7,7 +7,7 @@ func (svc *Service) watchErrors() { for { select { case err := <-svc.errChan: - log.Infof("svc.watchErrors: %v", err) + log.Errorf("svc.watchErrors: %v", err) } } } From 2cae802ded4c785f3a6806e434ea4a1656c93117 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:19:42 -0800 Subject: [PATCH 10/18] Defer mutex unlock in process message --- packages/relayer/message/process_message.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/relayer/message/process_message.go b/packages/relayer/message/process_message.go index d9ca77575a..5b06d266c1 100644 --- a/packages/relayer/message/process_message.go +++ b/packages/relayer/message/process_message.go @@ -119,6 +119,7 @@ func (p *Processor) sendProcessMessageCall( auth.GasPrice = new(big.Int).SetUint64(500000000) p.mu.Lock() + defer p.mu.Unlock() err = p.getLatestNonce(ctx, auth) if err != nil { @@ -132,8 +133,6 @@ func (p *Processor) sendProcessMessageCall( p.setLatestNonce(tx.Nonce()) - p.mu.Unlock() - return tx, nil } From e3e6cd189efb3516ac359d6c748016cb11bc5b35 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:22:23 -0800 Subject: [PATCH 11/18] lint --- packages/relayer/indexer/watch_errors_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/indexer/watch_errors_test.go b/packages/relayer/indexer/watch_errors_test.go index 026d52db9a..eea6e7a454 100644 --- a/packages/relayer/indexer/watch_errors_test.go +++ b/packages/relayer/indexer/watch_errors_test.go @@ -6,7 +6,7 @@ import ( ) func Test_watchErrors(t *testing.T) { - svc := newTestService(t) + svc := newTestService() go svc.watchErrors() From 69cf8572389a64af13c2ae1e7ed7aae690af507b Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:40:51 -0800 Subject: [PATCH 12/18] waitgroup => errgroup --- go.mod | 1 + go.sum | 2 ++ packages/relayer/.golangci.yml | 2 +- packages/relayer/cli/cli.go | 7 +++++ .../relayer/indexer/filter_then_subscribe.go | 22 +++++++++----- packages/relayer/indexer/handle_event.go | 30 +++++++------------ packages/relayer/indexer/service.go | 3 ++ packages/relayer/indexer/subscribe.go | 15 +++++++++- packages/relayer/indexer/watch_errors.go | 13 -------- packages/relayer/indexer/watch_errors_test.go | 16 ---------- 10 files changed, 53 insertions(+), 58 deletions(-) delete mode 100644 packages/relayer/indexer/watch_errors.go delete mode 100644 packages/relayer/indexer/watch_errors_test.go diff --git a/go.mod b/go.mod index ff09077ea3..f388a52a26 100644 --- a/go.mod +++ b/go.mod @@ -62,6 +62,7 @@ require ( go.opencensus.io v0.23.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad // indirect google.golang.org/grpc v1.47.0 // indirect diff --git a/go.sum b/go.sum index cedf6ecb41..88f1db973f 100644 --- a/go.sum +++ b/go.sum @@ -970,6 +970,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index dd578fed41..49f13f6957 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -29,7 +29,7 @@ linters-settings: lines: 100 statements: 45 gocognit: - min-complexity: 32 + min-complexity: 35 issues: exclude-rules: diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index 647039a1da..ca9c3f560d 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -37,6 +37,7 @@ var ( } defaultBlockBatchSize = 2 + defaultNumGoroutines = 10 ) func Run(mode relayer.Mode, layer relayer.Layer) { @@ -116,6 +117,11 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), blockBatchSize = defaultBlockBatchSize } + numGoroutines, err := strconv.Atoi(os.Getenv("NUM_GOROUTINES")) + if err != nil || numGoroutines <= 0 { + numGoroutines = defaultNumGoroutines + } + indexers := make([]*indexer.Service, 0) if layer == relayer.L1 || layer == relayer.Both { @@ -133,6 +139,7 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 55d9d99df1..8a2f6bf21a 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -2,12 +2,12 @@ package indexer import ( "context" - "sync" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikochain/taiko-mono/packages/relayer" + "golang.org/x/sync/errgroup" ) var ( @@ -18,8 +18,6 @@ var ( // up to the latest block. As it goes, it tries to process messages. // When it catches up, it then starts to Subscribe to latest events as they come in. func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) error { - go svc.watchErrors() - chainID, err := svc.ethClient.ChainID(ctx) if err != nil { return errors.Wrap(err, "svc.ethClient.ChainID()") @@ -72,18 +70,28 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) continue } - log.Info("found events") + group, ctx := errgroup.WithContext(ctx) - wg := &sync.WaitGroup{} + group.SetLimit(svc.numGoroutines) // TODO: do we want to limit the number of possible goroutines in the waitgroup? // right now it is dependent on how many events are found in the // block range. the main concern would be exceeding DB connection pooling limits. for { - go svc.handleEvent(ctx, wg, svc.errChan, chainID, events.Event) + group.Go(func() error { + err := svc.handleEvent(ctx, chainID, events.Event) + if err != nil { + // log error but alwys return nil to keep other goroutines active + log.Error(err.Error()) + } + + return nil + }) if !events.Next() { - wg.Wait() + if err := group.Wait(); err != nil { + return errors.Wrap(err, "group.Wait") + } if err := svc.handleNoEventsRemaining(ctx, chainID, events); err != nil { return errors.Wrap(err, "svc.handleNoEventsRemaining") diff --git a/packages/relayer/indexer/handle_event.go b/packages/relayer/indexer/handle_event.go index b48ec5e0f6..bd98caabc8 100644 --- a/packages/relayer/indexer/handle_event.go +++ b/packages/relayer/indexer/handle_event.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "math/big" - "sync" "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" @@ -16,33 +15,25 @@ import ( // handleEvent handles an individual MessageSent event func (svc *Service) handleEvent( ctx context.Context, - wg *sync.WaitGroup, - errChan chan error, chainID *big.Int, event *contracts.BridgeMessageSent, -) { - if wg != nil { - wg.Add(1) - defer wg.Done() - } - +) error { raw := event.Raw // handle chain re-org by checking Removed property, no need to // return error, just continue and do not process. if raw.Removed { - return + return nil } eventStatus, err := svc.eventStatusFromSignal(ctx, event.Message.GasLimit, event.Signal) if err != nil { - errChan <- errors.Wrap(err, "svc.eventStatusFromSignal") + return errors.Wrap(err, "svc.eventStatusFromSignal") } marshaled, err := json.Marshal(event) if err != nil { - errChan <- errors.Wrap(err, "json.Marshal(event)") - return + return errors.Wrap(err, "json.Marshal(event)") } e, err := svc.eventRepo.Save(relayer.SaveEventOpts{ @@ -52,18 +43,16 @@ func (svc *Service) handleEvent( Status: eventStatus, }) if err != nil { - errChan <- errors.Wrap(err, "svc.eventRepo.Save") - return + return errors.Wrap(err, "svc.eventRepo.Save") } if !canProcessMessage(ctx, eventStatus, event.Message.Owner, svc.relayerAddr) { - return + return nil } // process the message if err := svc.processor.ProcessMessage(ctx, event, e); err != nil { - errChan <- errors.Wrap(err, "svc.processMessage") - return + return errors.Wrap(err, "svc.processMessage") } // if the block number of the event is higher than the block we are processing, @@ -78,8 +67,7 @@ func (svc *Service) handleEvent( ChainID: chainID, EventName: eventName, }); err != nil { - errChan <- errors.Wrap(err, "svc.blockRepo.Save") - return + return errors.Wrap(err, "svc.blockRepo.Save") } svc.processingBlock = &relayer.Block{ @@ -87,6 +75,8 @@ func (svc *Service) handleEvent( Hash: raw.BlockHash.Hex(), } } + + return nil } func canProcessMessage( diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index dd758112ab..c4d24b2238 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -44,6 +44,7 @@ type Service struct { errChan chan error blockBatchSize uint64 + numGoroutines int } type NewServiceOpts struct { @@ -58,6 +59,7 @@ type NewServiceOpts struct { DestBridgeAddress common.Address DestTaikoAddress common.Address BlockBatchSize uint64 + NumGoroutines int } func NewService(opts NewServiceOpts) (*Service, error) { @@ -157,5 +159,6 @@ func NewService(opts NewServiceOpts) (*Service, error) { errChan: make(chan error), blockBatchSize: opts.BlockBatchSize, + numGoroutines: opts.NumGoroutines, }, nil } diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index f2ff45cccb..37eec8018d 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -6,7 +6,9 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/taikochain/taiko-mono/packages/relayer/contracts" + "golang.org/x/sync/errgroup" ) // subscribe subscribes to latest events @@ -20,12 +22,23 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { defer sub.Unsubscribe() + group, ctx := errgroup.WithContext(ctx) + + group.SetLimit(svc.numGoroutines) + for { select { case err := <-sub.Err(): return errors.Wrap(err, "sub.Err()") case event := <-sink: - go svc.handleEvent(ctx, nil, svc.errChan, chainID, event) + group.Go(func() error { + err := svc.handleEvent(ctx, chainID, event) + if err != nil { + log.Errorf("svc.handleEvent: %v", err) + } + + return nil + }) } } } diff --git a/packages/relayer/indexer/watch_errors.go b/packages/relayer/indexer/watch_errors.go deleted file mode 100644 index f0c8810f8c..0000000000 --- a/packages/relayer/indexer/watch_errors.go +++ /dev/null @@ -1,13 +0,0 @@ -package indexer - -import log "github.com/sirupsen/logrus" - -func (svc *Service) watchErrors() { - // nolint: gosimple - for { - select { - case err := <-svc.errChan: - log.Errorf("svc.watchErrors: %v", err) - } - } -} diff --git a/packages/relayer/indexer/watch_errors_test.go b/packages/relayer/indexer/watch_errors_test.go deleted file mode 100644 index eea6e7a454..0000000000 --- a/packages/relayer/indexer/watch_errors_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package indexer - -import ( - "errors" - "testing" -) - -func Test_watchErrors(t *testing.T) { - svc := newTestService() - - go svc.watchErrors() - - err := errors.New("err") - - svc.errChan <- err -} From 3d0879dc7f5bca9147dc79aa1f46c95c3aa84da2 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:46:31 -0800 Subject: [PATCH 13/18] use ResubscribeErr --- packages/relayer/indexer/subscribe.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 37eec8018d..7f51417e28 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -3,8 +3,10 @@ package indexer import ( "context" "math/big" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/event" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikochain/taiko-mono/packages/relayer/contracts" @@ -15,10 +17,15 @@ import ( func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { sink := make(chan *contracts.BridgeMessageSent) - sub, err := svc.bridge.WatchMessageSent(&bind.WatchOpts{}, sink, nil) - if err != nil { - return errors.Wrap(err, "svc.bridge.WatchMessageSent") - } + sub := event.ResubscribeErr(3*time.Second, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + log.Errorf("svc.bridge.WatchMessageSent: %v", err) + } + + return svc.bridge.WatchMessageSent(&bind.WatchOpts{ + Context: ctx, + }, sink, nil) + }) defer sub.Unsubscribe() From a124290f6305475c018098fd76ed8739c607d312 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:51:01 -0800 Subject: [PATCH 14/18] subscription backoff in seconds --- packages/relayer/.default.env | 4 ++- packages/relayer/cli/cli.go | 23 +++++++++++++---- packages/relayer/indexer/service.go | 36 +++++++++++++++------------ packages/relayer/indexer/subscribe.go | 3 +-- 4 files changed, 42 insertions(+), 24 deletions(-) diff --git a/packages/relayer/.default.env b/packages/relayer/.default.env index 9b81a9af97..35c8667b95 100644 --- a/packages/relayer/.default.env +++ b/packages/relayer/.default.env @@ -13,4 +13,6 @@ L2_RPC_URL=ws://ws.a1.testnet.taiko.xyz BLOCK_BATCH_SIZE=2 MYSQL_MAX_IDLE_CONNS= MYSQL_MAX_OPEN_CONNS= -MYSQL_CONN_MAX_LIFETIME_IN_MS= \ No newline at end of file +MYSQL_CONN_MAX_LIFETIME_IN_MS= +NUM_GOROUTINES=20 +SUBSCRIPTION_BACKOFF_IN_SECONDS=3 \ No newline at end of file diff --git a/packages/relayer/cli/cli.go b/packages/relayer/cli/cli.go index ca9c3f560d..782e33d695 100644 --- a/packages/relayer/cli/cli.go +++ b/packages/relayer/cli/cli.go @@ -36,8 +36,9 @@ var ( "RELAYER_ECDSA_KEY", } - defaultBlockBatchSize = 2 - defaultNumGoroutines = 10 + defaultBlockBatchSize = 2 + defaultNumGoroutines = 10 + defaultSubscriptionBackoff = 2 * time.Second ) func Run(mode relayer.Mode, layer relayer.Layer) { @@ -122,6 +123,15 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), numGoroutines = defaultNumGoroutines } + var subscriptionBackoff time.Duration + + subscriptionBackoffInSeconds, err := strconv.Atoi(os.Getenv("SUBSCRIPTION_BACKOFF_IN_SECONDS")) + if err != nil || numGoroutines <= 0 { + subscriptionBackoff = defaultSubscriptionBackoff + } else { + subscriptionBackoff = time.Duration(subscriptionBackoffInSeconds) * time.Second + } + indexers := make([]*indexer.Service, 0) if layer == relayer.L1 || layer == relayer.Both { @@ -138,8 +148,9 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), DestBridgeAddress: common.HexToAddress(os.Getenv("L2_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L2_TAIKO_ADDRESS")), - BlockBatchSize: uint64(blockBatchSize), - NumGoroutines: numGoroutines, + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, }) if err != nil { log.Fatal(err) @@ -162,7 +173,9 @@ func makeIndexers(layer relayer.Layer, db *gorm.DB) ([]*indexer.Service, func(), DestBridgeAddress: common.HexToAddress(os.Getenv("L1_BRIDGE_ADDRESS")), DestTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), - BlockBatchSize: uint64(blockBatchSize), + BlockBatchSize: uint64(blockBatchSize), + NumGoroutines: numGoroutines, + SubscriptionBackoff: subscriptionBackoff, }) if err != nil { log.Fatal(err) diff --git a/packages/relayer/indexer/service.go b/packages/relayer/indexer/service.go index c4d24b2238..564f6f769b 100644 --- a/packages/relayer/indexer/service.go +++ b/packages/relayer/indexer/service.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "math/big" + "time" "github.com/cyberhorsey/errors" "github.com/ethereum/go-ethereum/common" @@ -43,23 +44,25 @@ type Service struct { errChan chan error - blockBatchSize uint64 - numGoroutines int + blockBatchSize uint64 + numGoroutines int + subscriptionBackoff time.Duration } type NewServiceOpts struct { - EventRepo relayer.EventRepository - BlockRepo relayer.BlockRepository - EthClient *ethclient.Client - DestEthClient *ethclient.Client - RPCClient *rpc.Client - DestRPCClient *rpc.Client - ECDSAKey string - BridgeAddress common.Address - DestBridgeAddress common.Address - DestTaikoAddress common.Address - BlockBatchSize uint64 - NumGoroutines int + EventRepo relayer.EventRepository + BlockRepo relayer.BlockRepository + EthClient *ethclient.Client + DestEthClient *ethclient.Client + RPCClient *rpc.Client + DestRPCClient *rpc.Client + ECDSAKey string + BridgeAddress common.Address + DestBridgeAddress common.Address + DestTaikoAddress common.Address + BlockBatchSize uint64 + NumGoroutines int + SubscriptionBackoff time.Duration } func NewService(opts NewServiceOpts) (*Service, error) { @@ -158,7 +161,8 @@ func NewService(opts NewServiceOpts) (*Service, error) { errChan: make(chan error), - blockBatchSize: opts.BlockBatchSize, - numGoroutines: opts.NumGoroutines, + blockBatchSize: opts.BlockBatchSize, + numGoroutines: opts.NumGoroutines, + subscriptionBackoff: opts.SubscriptionBackoff, }, nil } diff --git a/packages/relayer/indexer/subscribe.go b/packages/relayer/indexer/subscribe.go index 7f51417e28..5dd65a3e4e 100644 --- a/packages/relayer/indexer/subscribe.go +++ b/packages/relayer/indexer/subscribe.go @@ -3,7 +3,6 @@ package indexer import ( "context" "math/big" - "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/event" @@ -17,7 +16,7 @@ import ( func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { sink := make(chan *contracts.BridgeMessageSent) - sub := event.ResubscribeErr(3*time.Second, func(ctx context.Context, err error) (event.Subscription, error) { + sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { log.Errorf("svc.bridge.WatchMessageSent: %v", err) } From 70a1d3388dada3bc6d573217e57048d39ca77039 Mon Sep 17 00:00:00 2001 From: cyberhorsey <113397187+cyberhorsey@users.noreply.github.com> Date: Tue, 15 Nov 2022 07:51:27 -0800 Subject: [PATCH 15/18] Update packages/relayer/indexer/filter_then_subscribe.go Co-authored-by: David <104078303+davidtaikocha@users.noreply.github.com> --- packages/relayer/indexer/filter_then_subscribe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/indexer/filter_then_subscribe.go b/packages/relayer/indexer/filter_then_subscribe.go index 8a2f6bf21a..8ffdabd6fc 100644 --- a/packages/relayer/indexer/filter_then_subscribe.go +++ b/packages/relayer/indexer/filter_then_subscribe.go @@ -81,7 +81,7 @@ func (svc *Service) FilterThenSubscribe(ctx context.Context, mode relayer.Mode) group.Go(func() error { err := svc.handleEvent(ctx, chainID, events.Event) if err != nil { - // log error but alwys return nil to keep other goroutines active + // log error but always return nil to keep other goroutines active log.Error(err.Error()) } From 4239be3f73848ef7b6dd29fea71e486865377304 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Tue, 15 Nov 2022 07:52:52 -0800 Subject: [PATCH 16/18] lint --- packages/relayer/.golangci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 49f13f6957..98f04cced6 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -26,7 +26,7 @@ linters: linters-settings: funlen: - lines: 100 + lines: 110 statements: 45 gocognit: min-complexity: 35 From bf0f4e9670e63150392d16f0dca1f03319478ea5 Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Wed, 16 Nov 2022 08:49:35 -0800 Subject: [PATCH 17/18] lint --- packages/relayer/.golangci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index 98f04cced6..a2ab7eada8 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -27,7 +27,7 @@ linters: linters-settings: funlen: lines: 110 - statements: 45 + statements: 48 gocognit: min-complexity: 35 From ec80d8f6f7a3d6137d43a08998271e1c3c56600f Mon Sep 17 00:00:00 2001 From: Jeffery Walsh Date: Wed, 16 Nov 2022 09:00:30 -0800 Subject: [PATCH 18/18] bump lint funlen --- packages/relayer/.golangci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/relayer/.golangci.yml b/packages/relayer/.golangci.yml index a2ab7eada8..b2008b7e90 100644 --- a/packages/relayer/.golangci.yml +++ b/packages/relayer/.golangci.yml @@ -26,7 +26,7 @@ linters: linters-settings: funlen: - lines: 110 + lines: 116 statements: 48 gocognit: min-complexity: 35