Skip to content

Commit

Permalink
feat(eventindexer): speed up sync (#14258)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberhorsey committed Jul 23, 2023
1 parent 5fbf827 commit d337174
Showing 1 changed file with 79 additions and 40 deletions.
119 changes: 79 additions & 40 deletions packages/eventindexer/indexer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/labstack/gommon/log"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)

type FilterFunc func(
Expand All @@ -21,44 +23,73 @@ func L1FilterFunc(
svc *Service,
filterOpts *bind.FilterOpts,
) error {
blockProvenEvents, err := svc.taikol1.FilterBlockProven(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockProven")
}
wg, ctx := errgroup.WithContext(ctx)

err = svc.saveBlockProvenEvents(ctx, chainID, blockProvenEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockProvenEvents")
}
wg.Go(func() error {
blockProvenEvents, err := svc.taikol1.FilterBlockProven(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockProven")
}

blockProposedEvents, err := svc.taikol1.FilterBlockProposed(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockProposed")
}
err = svc.saveBlockProvenEvents(ctx, chainID, blockProvenEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockProvenEvents")
}

err = svc.saveBlockProposedEvents(ctx, chainID, blockProposedEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockProposedEvents")
}
return nil
})

blockVerifiedEvents, err := svc.taikol1.FilterBlockVerified(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockVerified")
}
wg.Go(func() error {
blockProposedEvents, err := svc.taikol1.FilterBlockProposed(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockProposed")
}

err = svc.saveBlockVerifiedEvents(ctx, chainID, blockVerifiedEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockVerifiedEvents")
}
err = svc.saveBlockProposedEvents(ctx, chainID, blockProposedEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockProposedEvents")
}

messagesSent, err := svc.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterMessageSent")
}
return nil
})

wg.Go(func() error {
blockVerifiedEvents, err := svc.taikol1.FilterBlockVerified(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.taikol1.FilterBlockVerified")
}

err = svc.saveBlockVerifiedEvents(ctx, chainID, blockVerifiedEvents)
if err != nil {
return errors.Wrap(err, "svc.saveBlockVerifiedEvents")
}

return nil
})

wg.Go(func() error {
messagesSent, err := svc.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterMessageSent")
}

err = svc.saveMessageSentEvents(ctx, chainID, messagesSent)
if err != nil {
return errors.Wrap(err, "svc.saveMessageSentEvents")
}

return nil
})

err := wg.Wait()

err = svc.saveMessageSentEvents(ctx, chainID, messagesSent)
if err != nil {
return errors.Wrap(err, "svc.saveMessageSentEvents")
if errors.Is(err, context.Canceled) {
log.Error("context cancelled")
return err
}

return err
}

return nil
Expand All @@ -70,18 +101,26 @@ func L2FilterFunc(
svc *Service,
filterOpts *bind.FilterOpts,
) error {
wg, ctx := errgroup.WithContext(ctx)

for _, s := range svc.swaps {
swaps, err := s.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}
swap := s

// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
}
wg.Go(func() error {
swaps, err := swap.FilterSwap(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "svc.bridge.FilterSwap")
}

// only save ones above 0.01 ETH, this is only for Galaxe
// and we dont care about the rest
err = svc.saveSwapEvents(ctx, chainID, swaps)
if err != nil {
return errors.Wrap(err, "svc.saveSwapEvents")
}

return nil
})
}

return nil
Expand Down

0 comments on commit d337174

Please sign in to comment.