From 2f4a0beb1a431c5c7ff40c3c4b7fcecb094d2e52 Mon Sep 17 00:00:00 2001 From: jeff <113397187+cyberhorsey@users.noreply.github.com> Date: Sun, 9 Jul 2023 07:39:33 -0700 Subject: [PATCH] feat(eventindexer): support multiple swap pairs (#14130) --- packages/eventindexer/.l2.env | 2 +- packages/eventindexer/cli/cli.go | 14 +++++++++++- packages/eventindexer/indexer/filter.go | 20 +++++++++-------- .../indexer/save_message_sent_event.go | 22 +++++++++++++++++++ packages/eventindexer/indexer/service.go | 20 ++++++++++------- packages/eventindexer/indexer/subscribe.go | 12 +++++----- 6 files changed, 66 insertions(+), 24 deletions(-) diff --git a/packages/eventindexer/.l2.env b/packages/eventindexer/.l2.env index 0823cf0143..daff48fb32 100644 --- a/packages/eventindexer/.l2.env +++ b/packages/eventindexer/.l2.env @@ -8,7 +8,7 @@ MYSQL_MAX_IDLE_CONNS=50 MYSQL_MAX_OPEN_CONNS=3000 MYSQL_CONN_MAX_LIFETIME_IN_MS=100000 PROVER_POOL_ADDRESS=0x7D992599E1B8b4508Ba6E2Ba97893b4C36C23A28 -SWAP_ADDRESS=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246 +SWAP_ADDRESSES=0x501f63210aE6D7Eeb50DaE74DA5Ae407515ee246,0x926815A3fb587DDF5e2d2A03ea235630c0A53a16,0x2223D60359736532958DF6a4E9A5e4A5a71729A1 RPC_URL=wss://ws.test.taiko.xyz CORS_ORIGINS=* BLOCK_BATCH_SIZE=1000 diff --git a/packages/eventindexer/cli/cli.go b/packages/eventindexer/cli/cli.go index b4766d3a2b..c8b9dbe283 100644 --- a/packages/eventindexer/cli/cli.go +++ b/packages/eventindexer/cli/cli.go @@ -134,7 +134,7 @@ func Run( RPCClient: rpcClient, SrcTaikoAddress: common.HexToAddress(os.Getenv("L1_TAIKO_ADDRESS")), SrcBridgeAddress: common.HexToAddress(os.Getenv("BRIDGE_ADDRESS")), - SrcSwapAddress: common.HexToAddress(os.Getenv("SWAP_ADDRESS")), + SrcSwapAddresses: stringsToAddresses(strings.Split(os.Getenv("SWAP_ADDRESSES"), ",")), BlockBatchSize: uint64(blockBatchSize), SubscriptionBackoff: subscriptionBackoff, }) @@ -158,6 +158,18 @@ func Run( <-forever } +func stringsToAddresses(s []string) []common.Address { + a := []common.Address{} + + for _, v := range s { + if v != "" { + a = append(a, common.HexToAddress(v)) + } + } + + return a +} + func openDBConnection(opts eventindexer.DBConnectionOpts) (eventindexer.DB, error) { dsn := "" if opts.Password == "" { diff --git a/packages/eventindexer/indexer/filter.go b/packages/eventindexer/indexer/filter.go index 2d3b21141c..d8eef5875d 100644 --- a/packages/eventindexer/indexer/filter.go +++ b/packages/eventindexer/indexer/filter.go @@ -70,16 +70,18 @@ func L2FilterFunc( svc *Service, filterOpts *bind.FilterOpts, ) error { - swaps, err := svc.swap.FilterSwap(filterOpts, nil, nil) - if err != nil { - return errors.Wrap(err, "svc.bridge.FilterSwap") - } + for _, s := range svc.swaps { + swaps, err := s.FilterSwap(filterOpts, nil, nil) + if err != nil { + return errors.Wrap(err, "svc.bridge.FilterSwap") + } - // only save ones above 0.01 ETH, this is only for Galaxe - // and we dont care about the rest - err = svc.saveSwapEvents(ctx, chainID, swaps) - if err != nil { - return errors.Wrap(err, "svc.saveSwapEvents") + // only save ones above 0.01 ETH, this is only for Galaxe + // and we dont care about the rest + err = svc.saveSwapEvents(ctx, chainID, swaps) + if err != nil { + return errors.Wrap(err, "svc.saveSwapEvents") + } } return nil diff --git a/packages/eventindexer/indexer/save_message_sent_event.go b/packages/eventindexer/indexer/save_message_sent_event.go index a78ca96944..2d9eb2d6fd 100644 --- a/packages/eventindexer/indexer/save_message_sent_event.go +++ b/packages/eventindexer/indexer/save_message_sent_event.go @@ -5,12 +5,18 @@ import ( "encoding/json" "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/taikoxyz/taiko-mono/packages/eventindexer" "github.com/taikoxyz/taiko-mono/packages/eventindexer/contracts/bridge" ) +var ( + minEthAmount = new(big.Int).SetUint64(150000000000000000) + zeroHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") +) + func (svc *Service) saveMessageSentEvents( ctx context.Context, chainID *big.Int, @@ -43,6 +49,22 @@ func (svc *Service) saveMessageSentEvent( chainID *big.Int, event *bridge.BridgeMessageSent, ) error { + // only save eth transfers + if event.Message.Data != nil && common.BytesToHash(event.Message.Data) != zeroHash { + log.Info("skipping message sent event, is not eth transfer") + return nil + } + + // amount must be >= 0.15 eth + if event.Message.DepositValue.Cmp(minEthAmount) < 0 { + log.Infof("skipping message sent event, value: %v, requiredValue: %v", + event.Message.DepositValue.String(), + minEthAmount.String(), + ) + + return nil + } + marshaled, err := json.Marshal(event) if err != nil { return errors.Wrap(err, "json.Marshal(event)") diff --git a/packages/eventindexer/indexer/service.go b/packages/eventindexer/indexer/service.go index dd85ed6f60..14304f27f3 100644 --- a/packages/eventindexer/indexer/service.go +++ b/packages/eventindexer/indexer/service.go @@ -30,7 +30,7 @@ type Service struct { taikol1 *taikol1.TaikoL1 bridge *bridge.Bridge - swap *swap.Swap + swaps []*swap.Swap } type NewServiceOpts struct { @@ -41,7 +41,7 @@ type NewServiceOpts struct { RPCClient *rpc.Client SrcTaikoAddress common.Address SrcBridgeAddress common.Address - SrcSwapAddress common.Address + SrcSwapAddresses []common.Address BlockBatchSize uint64 SubscriptionBackoff time.Duration } @@ -79,12 +79,16 @@ func NewService(opts NewServiceOpts) (*Service, error) { } } - var swapContract *swap.Swap + var swapContracts []*swap.Swap - if opts.SrcSwapAddress.Hex() != ZeroAddress.Hex() { - swapContract, err = swap.NewSwap(opts.SrcSwapAddress, opts.EthClient) - if err != nil { - return nil, errors.Wrap(err, "contracts.NewBridge") + if opts.SrcSwapAddresses != nil && len(opts.SrcSwapAddresses) > 0 { + for _, v := range opts.SrcSwapAddresses { + swapContract, err := swap.NewSwap(v, opts.EthClient) + if err != nil { + return nil, errors.Wrap(err, "contracts.NewBridge") + } + + swapContracts = append(swapContracts, swapContract) } } @@ -95,7 +99,7 @@ func NewService(opts NewServiceOpts) (*Service, error) { ethClient: opts.EthClient, taikol1: taikoL1, bridge: bridgeContract, - swap: swapContract, + swaps: swapContracts, blockBatchSize: opts.BlockBatchSize, subscriptionBackoff: opts.SubscriptionBackoff, diff --git a/packages/eventindexer/indexer/subscribe.go b/packages/eventindexer/indexer/subscribe.go index 9b181a79aa..1357b713a8 100644 --- a/packages/eventindexer/indexer/subscribe.go +++ b/packages/eventindexer/indexer/subscribe.go @@ -30,8 +30,10 @@ func (svc *Service) subscribe(ctx context.Context, chainID *big.Int) error { go svc.subscribeMessageSent(ctx, chainID, errChan) } - if svc.swap != nil { - go svc.subscribeSwap(ctx, chainID, errChan) + if svc.swaps != nil { + for _, swap := range svc.swaps { + go svc.subscribeSwap(ctx, swap, chainID, errChan) + } } // nolint: gosimple @@ -306,16 +308,16 @@ func (svc *Service) subscribeMessageSent(ctx context.Context, chainID *big.Int, } } -func (svc *Service) subscribeSwap(ctx context.Context, chainID *big.Int, errChan chan error) { +func (svc *Service) subscribeSwap(ctx context.Context, s *swap.Swap, chainID *big.Int, errChan chan error) { sink := make(chan *swap.SwapSwap) sub := event.ResubscribeErr(svc.subscriptionBackoff, func(ctx context.Context, err error) (event.Subscription, error) { if err != nil { - log.Errorf("svc.swap.WatchSwap: %v", err) + log.Errorf("s.WatchSwap: %v", err) } log.Info("resubscribing to Swap events") - return svc.swap.WatchSwap(&bind.WatchOpts{ + return s.WatchSwap(&bind.WatchOpts{ Context: ctx, }, sink, nil, nil) })