Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node/watchers: address ToB findings #2850

Merged
merged 4 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions node/pkg/watchers/evm/connectors/common.go
Expand Up @@ -37,9 +37,9 @@ type Connector interface {

type PollSubscription struct {
errOnce sync.Once
err chan error
quit chan error
unsubDone chan struct{}
err chan error // subscription consumer reads, subscription fulfiller writes. used to propagate errors.
quit chan error // subscription consumer writes, subscription fulfiller reads. used to signal that consumer wants to cancel the subscription.
unsubDone chan struct{} // subscription consumer reads, subscription fulfiller writes. used to signal that the subscription was successfully canceled
}

func NewPollSubscription() *PollSubscription {
Expand All @@ -63,6 +63,6 @@ func (sub *PollSubscription) Unsubscribe() {
<-sub.unsubDone
case <-sub.unsubDone:
}
close(sub.err)
close(sub.err) // TODO FIXME this violates golang guidelines “Only the sender should close a channel, never the receiver. Sending on a closed channel will cause a panic.”
})
}
21 changes: 11 additions & 10 deletions node/pkg/watchers/evm/connectors/poller.go
Expand Up @@ -37,6 +37,9 @@ func NewBlockPollConnector(ctx context.Context, baseConnector Connector, finaliz
if publishSafeBlocks && !useFinalized {
return nil, fmt.Errorf("publishSafeBlocks may only be enabled if useFinalized is enabled")
}
if finalizer == nil {
return nil, fmt.Errorf("finalizer must not be nil; Use finalizers.NewDefaultFinalizer() if you want to have no finalizer.")
}
connector := &BlockPollConnector{
Connector: baseConnector,
Delay: delay,
Expand Down Expand Up @@ -143,17 +146,15 @@ func (b *BlockPollConnector) pollBlocks(ctx context.Context, logger *zap.Logger,
return lastPublishedBlock, fmt.Errorf("failed to fetch next block (%d): %w", nextBlockNumber.Uint64(), err)
}

if b.finalizer != nil {
finalized, err := b.isBlockFinalizedWithTimeout(ctx, block)
if err != nil {
logger.Error("failed to check block finalization",
zap.Uint64("block", block.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err)
}
finalized, err := b.isBlockFinalizedWithTimeout(ctx, block)
if err != nil {
logger.Error("failed to check block finalization",
zap.Uint64("block", block.Number.Uint64()), zap.Error(err))
return lastPublishedBlock, fmt.Errorf("failed to check block finalization (%d): %w", block.Number.Uint64(), err)
}

if !finalized {
break
}
if !finalized {
break
}

b.blockFeed.Send(block)
Expand Down
6 changes: 6 additions & 0 deletions node/pkg/watchers/evm/connectors/polygon.go
Expand Up @@ -119,13 +119,19 @@ func (c *PolygonConnector) SubscribeForBlocks(ctx context.Context, errC chan err
for {
select {
case <-ctx.Done():
messageSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
case err := <-messageSub.Err():
sub.err <- err
case checkpoint := <-messageC:
if err := c.processCheckpoint(ctx, sink, checkpoint); err != nil {
sub.err <- fmt.Errorf("failed to process checkpoint: %w", err)
}
case <-sub.quit:
messageSub.Unsubscribe()
sub.unsubDone <- struct{}{}
return nil
}
}
})
Expand Down
10 changes: 8 additions & 2 deletions node/pkg/watchers/evm/watcher.go
Expand Up @@ -162,8 +162,14 @@ func NewEthWatcher(
}
}

func (w *Watcher) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
func (w *Watcher) Run(parentCtx context.Context) error {
logger := supervisor.Logger(parentCtx)

// later on we will spawn multiple go-routines through `RunWithScissors`, i.e. catching panics.
// If any of them panic, this function will return, causing this child context to be canceled
// such that the other go-routines can free up resources
ctx, watcherContextCancelFunc := context.WithCancel(parentCtx)
defer watcherContextCancelFunc()

useFinalizedBlocks := ((w.chainID == vaa.ChainIDEthereum || w.chainID == vaa.ChainIDSepolia) && (!w.unsafeDevMode))
if (w.chainID == vaa.ChainIDKarura || w.chainID == vaa.ChainIDAcala) && (!w.unsafeDevMode) {
Expand Down