Skip to content

Commit

Permalink
Merge pull request #125 from renproject/chore(test)/watcher
Browse files Browse the repository at this point in the history
chore(watcher): test burn log handling via mock
  • Loading branch information
tok-kkk committed Mar 9, 2021
2 parents 2ffca4b + 835dde4 commit 96bbd68
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 67 deletions.
1 change: 0 additions & 1 deletion compat/v0/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ func (store Store) GetV1TxFromUTXO(utxo ExtBtcCompatUTXO) (tx.Tx, error) {

func (store Store) GetV0BurnTxHashFromRef(sel tx.Selector, ref uint64) (B32, error) {
key := fmt.Sprintf("%s_%v", sel.String(), ref)
fmt.Println("checking " + key)
hashS, err := store.client.Get(key).Result()
if err != nil {
if err == redis.Nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis v2.5.0+incompatible
github.com/alicebob/miniredis/v2 v2.14.3
github.com/btcsuite/btcd v0.21.0-beta
github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b // indirect
github.com/ethereum/go-ethereum v1.9.20
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZp
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible h1:yBHoLpsyjupjz3NL3MhKMVkR41j82Yjf3KFv7ApYzUI=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/alicebob/miniredis/v2 v2.14.3 h1:QWoo2wchYmLgOB6ctlTt2dewQ1Vu6phl+iQbwT8SYGo=
github.com/alicebob/miniredis/v2 v2.14.3/go.mod h1:gquAfGbzn92jvtrSC69+6zZnwSODVXVpYDRaGhWaL6I=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down Expand Up @@ -398,6 +400,7 @@ github.com/filecoin-project/test-vectors v0.0.0-20200819133914-e20cc29cc926/go.m
github.com/filecoin-project/test-vectors v0.0.0-20200826113833-9ffe6524729d/go.mod h1:hY/JN3OFRtykBrMByFjTonUFEOW2bRJjyR5YMUh3jLw=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/fjl/memsize v0.0.0-20180418122429-ca190fb6ffbc/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlKLDt+S+6hbjVMEW6RGQ7aUf7c=
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6/go.mod h1:1i71OnUq3iUe1ma7Lr6yG6/rjvM3emb6yoL7xLFzcVQ=
Expand Down Expand Up @@ -1149,6 +1152,7 @@ github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcncea
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
Expand Down Expand Up @@ -1442,6 +1446,7 @@ github.com/renproject/mercury v0.3.16 h1:vbD0DJOJK+ETIwMRDjfEOdUoPzYPWkaIiq/adqe
github.com/renproject/mercury v0.3.16/go.mod h1:+1iE/uf04pXGSk0lNTdGjs4w+MUyKs4R3o3xK2Bko3Y=
github.com/renproject/multichain v0.2.21 h1:Sd/MdBvr3IZPm9W8XdmbZo0X0K6CAsM8l8TCGFIoAy0=
github.com/renproject/multichain v0.2.21/go.mod h1:HexRygMyMZAnXUTiHT6egxJaC4XWnMFBgBBMU0vCw5g=
github.com/renproject/multichain v0.2.22/go.mod h1:HexRygMyMZAnXUTiHT6egxJaC4XWnMFBgBBMU0vCw5g=
github.com/renproject/multichain v0.2.23 h1:drGptZKlqlnQIwP03S3O7Go8jBmeS5AupXyUU1HI3WI=
github.com/renproject/multichain v0.2.23/go.mod h1:HexRygMyMZAnXUTiHT6egxJaC4XWnMFBgBBMU0vCw5g=
github.com/renproject/multichain v0.2.24 h1:8YGqVqA51QI7teVHGdOssJ/ZzPujaNc1otrz5AYqxuU=
Expand Down Expand Up @@ -2144,6 +2149,7 @@ gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce h1:+JknDZhAj8YMt7
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20190213234257-ec84240a7772/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200603215123-a4a8cb9d2cbc/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6 h1:a6cXbcDDUkSBlpnkWV1bJ+vv3mOgQEltEJ2rPxroVu0=
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns=
gopkg.in/redis.v4 v4.2.4/go.mod h1:8KREHdypkCEojGKQcjMqAODMICIVwZAONWq8RowTITA=
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
Expand All @@ -2152,6 +2158,7 @@ gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVY
gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/urfave/cli.v1 v1.20.0 h1:NdAVW6RYxDif9DhDHaAortIu956m2c0v+09AZBPTbE0=
gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
3 changes: 2 additions & 1 deletion lightnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func New(options Options, ctx context.Context, logger logrus.FieldLogger, sqlDB
if watchers[chain] == nil {
watchers[chain] = map[multichain.Asset]watcher.Watcher{}
}
watchers[chain][asset] = watcher.NewWatcher(logger, options.Network, selector, verifierBindings, ethClients[chain], bindings, resolverI, client, options.DistPubKey, options.WatcherPollRate)
burnLogFetcher := watcher.NewBurnLogFetcher(bindings)
watchers[chain][asset] = watcher.NewWatcher(logger, options.Network, selector, verifierBindings, ethClients[chain], burnLogFetcher, resolverI, client, options.DistPubKey, options.WatcherPollRate)
}
}

Expand Down
152 changes: 102 additions & 50 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,35 +25,100 @@ import (
"github.com/sirupsen/logrus"
)

type BurnLogResult struct {
Result ethereumbindings.MintGatewayLogicV1LogBurn
Error error
}

type BurnLogFetcher interface {
FetchBurnLogs(ctx context.Context, from uint64, to uint64) (chan BurnLogResult, error)
}

type EthBurnLogFetcher struct {
bindings *ethereumbindings.MintGatewayLogicV1
}

func NewBurnLogFetcher(bindings *ethereumbindings.MintGatewayLogicV1) EthBurnLogFetcher {
return EthBurnLogFetcher{
bindings: bindings,
}
}

// This will fetch the burn event logs using the ethereum bindings and emit them via a channel
// We do this so that we can unit test the log handling without calling ethereum
func (fetcher EthBurnLogFetcher) FetchBurnLogs(ctx context.Context, from uint64, to uint64) (chan BurnLogResult, error) {
iter, err := fetcher.bindings.FilterLogBurn(
&bind.FilterOpts{
Context: ctx,
Start: from,
End: &to,
},
nil,
nil,
)
if err != nil {
return nil, err
}
resultChan := make(chan BurnLogResult)

go func() {
func() {
for iter.Next() {
resultChan <- BurnLogResult{Result: *iter.Event}
select {
case <-ctx.Done():
return
}
}
}()
// Iter should stop if an error occurs,
// so no need to check on each iteration
err := iter.Error()
if err != nil {
resultChan <- BurnLogResult{Error: err}
}
// Always close the iter because apparently
// it doesn't close its subscription?
err = iter.Close()
if err != nil {
resultChan <- BurnLogResult{Error: err}
}

close(resultChan)
}()

return resultChan, iter.Error()
}

// Watcher watches for event logs for burn transactions. These transactions are
// then forwarded to the cacher.
type Watcher struct {
network multichain.Network
logger logrus.FieldLogger
gpubkey pack.Bytes
selector tx.Selector
bindings txengine.Bindings
ethClient *ethclient.Client
ethBindings *ethereumbindings.MintGatewayLogicV1
resolver jsonrpc.Resolver
cache redis.Cmdable
pollInterval time.Duration
network multichain.Network
logger logrus.FieldLogger
gpubkey pack.Bytes
selector tx.Selector
bindings txengine.Bindings
ethClient *ethclient.Client
burnLogFetcher BurnLogFetcher
resolver jsonrpc.Resolver
cache redis.Cmdable
pollInterval time.Duration
}

// NewWatcher returns a new Watcher.
func NewWatcher(logger logrus.FieldLogger, network multichain.Network, selector tx.Selector, bindings txengine.Bindings, ethClient *ethclient.Client, ethBindings *ethereumbindings.MintGatewayLogicV1, resolver jsonrpc.Resolver, cache redis.Cmdable, distPubKey *id.PubKey, pollInterval time.Duration) Watcher {
func NewWatcher(logger logrus.FieldLogger, network multichain.Network, selector tx.Selector, bindings txengine.Bindings, ethClient *ethclient.Client, burnLogFetcher BurnLogFetcher, resolver jsonrpc.Resolver, cache redis.Cmdable, distPubKey *id.PubKey, pollInterval time.Duration) Watcher {
gpubkey := (*btcec.PublicKey)(distPubKey).SerializeCompressed()
return Watcher{
logger: logger,
network: network,
gpubkey: gpubkey,
selector: selector,
bindings: bindings,
ethClient: ethClient,
ethBindings: ethBindings,
resolver: resolver,
cache: cache,
pollInterval: pollInterval,
logger: logger,
network: network,
gpubkey: gpubkey,
selector: selector,
bindings: bindings,
ethClient: ethClient,
burnLogFetcher: burnLogFetcher,
resolver: resolver,
cache: cache,
pollInterval: pollInterval,
}
}

Expand Down Expand Up @@ -92,6 +157,7 @@ func (watcher Watcher) watchLogShiftOuts(parent context.Context) {
}

if cur <= last {
watcher.logger.Warnf("[watcher] tried to process old blocks")
// Make sure we do not process old events. This could occur if there is
// an issue with the underlying blockchain node, for example if it needs
// to resync.
Expand All @@ -104,34 +170,33 @@ func (watcher Watcher) watchLogShiftOuts(parent context.Context) {
return
}

// Filter for all burn events in this range of blocks.
iter, err := watcher.ethBindings.FilterLogBurn(
&bind.FilterOpts{
Context: ctx,
Start: last + 1, // Add one to avoid duplication.
End: &cur,
},
nil,
nil,
)
// Fetch logs
// Add 1 to last so that we don't process duplicates
c, err := watcher.burnLogFetcher.FetchBurnLogs(ctx, last+1, cur)
if err != nil {
watcher.logger.Errorf("[watcher] error filtering LogBurn events from=%v to=%v: %v", last, cur, err)
watcher.logger.Errorf("[watcher] error iterating LogBurn events from=%v to=%v: %v", last, cur, err)
return
}

// Loop through the logs and check if there are burn events.
for iter.Next() {
to := iter.Event.To
for res := range c {
if res.Error != nil {
watcher.logger.Errorf("[watcher] error iterating LogBurn events from=%v to=%v: %v", last, cur, res.Error)
return
}
event := res.Result

to := event.To

amount := iter.Event.Amount.Uint64()
nonce := iter.Event.N.Uint64()
amount := event.Amount.Uint64()
nonce := event.N.Uint64()
watcher.logger.Infof("[watcher] detected burn for %v (to=%v, amount=%v, nonce=%v)", watcher.selector.String(), string(to), amount, nonce)

var nonceBytes pack.Bytes32
copy(nonceBytes[:], pack.NewU256FromU64(pack.NewU64(nonce)).Bytes())

// Send the burn transaction to the resolver.
params, err := watcher.burnToParams(iter.Event.Raw.TxHash.Bytes(), pack.NewU256FromU64(pack.NewU64(amount)), to, nonceBytes, watcher.gpubkey)
params, err := watcher.burnToParams(event.Raw.TxHash.Bytes(), pack.NewU256FromU64(pack.NewU64(amount)), to, nonceBytes, watcher.gpubkey)
if err != nil {
watcher.logger.Errorf("[watcher] cannot get params from burn transaction (to=%v, amount=%v, nonce=%v): %v", to, amount, nonce, err)
continue
Expand All @@ -142,10 +207,6 @@ func (watcher Watcher) watchLogShiftOuts(parent context.Context) {
continue
}
}
if err := iter.Error(); err != nil {
watcher.logger.Errorf("[watcher] error iterating LogBurn events from=%v to=%v: %v", last, cur, err)
return
}

if err := watcher.cache.Set(watcher.key(), cur, 0).Err(); err != nil {
watcher.logger.Errorf("[watcher] error setting last checked block number in redis: %v", err)
Expand Down Expand Up @@ -291,15 +352,6 @@ func NetParams(network multichain.Network, chain multichain.Chain) *chaincfg.Par
default:
return &chaincfg.RegressionNetParams
}
case multichain.Zcash:
switch network {
case multichain.NetworkMainnet:
return zcash.MainNetParams.Params
case multichain.NetworkDevnet, multichain.NetworkTestnet:
return zcash.TestNet3Params.Params
default:
return zcash.RegressionNetParams.Params
}
default:
panic(fmt.Errorf("cannot get network params: unknown chain %v", chain))
}
Expand Down
Loading

0 comments on commit 96bbd68

Please sign in to comment.