Skip to content

Commit

Permalink
feat: add heartbeat to eth-forwarder so last-seen block heights stay …
Browse files Browse the repository at this point in the history
…recent
  • Loading branch information
wwestgarth committed Jun 26, 2024
1 parent a40ef07 commit e817e7e
Show file tree
Hide file tree
Showing 58 changed files with 5,054 additions and 3,986 deletions.
4 changes: 2 additions & 2 deletions core/banking/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (e *Engine) Load(ctx context.Context, data []byte) error {
e.lastSeenPrimaryEthBlock = b.LastSeenPrimaryEthBlock
if e.lastSeenPrimaryEthBlock != 0 {
e.log.Info("restoring primary collateral bridge starting block", logging.Uint64("block", e.lastSeenPrimaryEthBlock))
e.primaryEthEventSource.UpdateCollateralStartingBlock(e.lastSeenPrimaryEthBlock)
e.ethEventSource.UpdateContractBlock(e.primaryBridgeView.CollateralBridgeAddress(), e.primaryEthChainID, e.lastSeenPrimaryEthBlock)
}

e.lastSeenSecondaryEthBlock = b.LastSeenSecondaryEthBlock
if e.lastSeenSecondaryEthBlock != 0 {
e.log.Info("restoring secondary collateral bridge starting block", logging.Uint64("block", e.lastSeenSecondaryEthBlock))
e.secondaryEthEventSource.UpdateCollateralStartingBlock(e.lastSeenSecondaryEthBlock)
e.ethEventSource.UpdateContractBlock(e.secondaryBridgeView.CollateralBridgeAddress(), e.secondaryEthChainID, e.lastSeenSecondaryEthBlock)
}

aa := make([]*types.AssetAction, 0, len(b.AssetActions))
Expand Down
8 changes: 5 additions & 3 deletions core/banking/deduplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import (
"code.vegaprotocol.io/vega/core/types"
vgrand "code.vegaprotocol.io/vega/libs/rand"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)

func TestAssetActionDeduplication(t *testing.T) {
ctx := context.Background()

eng := getTestEngine(t)
eng.OnPrimaryEthChainIDUpdated("1")
eng.OnPrimaryEthChainIDUpdated("1", "hello")

id1 := vgrand.RandomStr(5)
txHash1 := vgrand.RandomStr(5)
Expand All @@ -55,7 +56,8 @@ func TestAssetActionDeduplication(t *testing.T) {

t.Run("Generate asset list", func(t *testing.T) {
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))
eng.ethSource.EXPECT().UpdateContractBlock(gomock.Any(), gomock.Any(), gomock.Any()).Times(1)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, "1"))

// Validate the asset list.
eng.witness.f(eng.witness.r, true)
Expand All @@ -71,7 +73,7 @@ func TestAssetActionDeduplication(t *testing.T) {

t.Run("Generate duplicated asset list and ", func(t *testing.T) {
eng.assets.EXPECT().Get(assetID1).Times(1).Return(asset1, nil)
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, ""))
require.NoError(t, eng.EnableERC20(ctx, assetList1, id1, 1000, 1000, txHash1, "1"))

// Validate the asset list.
eng.witness.f(eng.witness.r, true)
Expand Down
27 changes: 18 additions & 9 deletions core/banking/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type MarketActivityTracker interface {
}

type EthereumEventSource interface {
UpdateCollateralStartingBlock(uint64)
UpdateContractBlock(string, string, uint64)
}

type Parties interface {
Expand Down Expand Up @@ -164,15 +164,18 @@ type Engine struct {
lastSeenPrimaryEthBlock uint64
primaryBridgeState *bridgeState
primaryBridgeView ERC20BridgeView
primaryEthEventSource EthereumEventSource

// lastSeenSecondaryEthBlock holds the block height of the latest ERC20 chain
// event, from the secondary chain, processed by the engine.
lastSeenSecondaryEthBlock uint64
secondaryEthChainID string
secondaryBridgeState *bridgeState
secondaryBridgeView ERC20BridgeView
secondaryEthEventSource EthereumEventSource

// map from chain-id -> collateral contract address
bridgeAddresses map[string]string

ethEventSource EthereumEventSource

withdrawals map[string]withdrawalRef
withdrawalCnt *big.Int
Expand Down Expand Up @@ -237,8 +240,7 @@ func New(log *logging.Logger,
marketActivityTracker MarketActivityTracker,
primaryBridgeView ERC20BridgeView,
secondaryBridgeView ERC20BridgeView,
primaryEthEventSource EthereumEventSource,
secondaryEthEventSource EthereumEventSource,
ethEventSource EthereumEventSource,
parties Parties,
) (e *Engine) {
log = log.Named(namedLogger)
Expand All @@ -254,8 +256,7 @@ func New(log *logging.Logger,
assets: assets,
notary: notary,
top: top,
primaryEthEventSource: primaryEthEventSource,
secondaryEthEventSource: secondaryEthEventSource,
ethEventSource: ethEventSource,
parties: parties,
assetActions: map[string]*assetAction{},
seenAssetActions: treeset.NewWithStringComparator(),
Expand All @@ -281,6 +282,7 @@ func New(log *logging.Logger,
secondaryBridgeState: &bridgeState{
active: true,
},
bridgeAddresses: map[string]string{},
feeDiscountPerPartyAndAsset: map[partyAssetKey]*num.Uint{},
pendingPerAssetAndPartyFeeDiscountUpdates: map[string]map[string]*num.Uint{},
primaryBridgeView: primaryBridgeView,
Expand All @@ -304,12 +306,14 @@ func (e *Engine) OnMinWithdrawQuantumMultiple(ctx context.Context, f num.Decimal
return nil
}

func (e *Engine) OnPrimaryEthChainIDUpdated(chainID string) {
func (e *Engine) OnPrimaryEthChainIDUpdated(chainID, collateralAddress string) {
e.primaryEthChainID = chainID
e.bridgeAddresses[chainID] = collateralAddress
}

func (e *Engine) OnSecondaryEthChainIDUpdated(chainID string) {
func (e *Engine) OnSecondaryEthChainIDUpdated(chainID, collateralAddress string) {
e.secondaryEthChainID = chainID
e.bridgeAddresses[chainID] = collateralAddress
}

// ReloadConf updates the internal configuration.
Expand Down Expand Up @@ -466,6 +470,11 @@ func (e *Engine) dedupAction(ctx context.Context, aa *assetAction) error {
}

func (e *Engine) finalizeAction(ctx context.Context, aa *assetAction, now time.Time) error {
// tell the evt forwarder tracker about this block height
if addr, ok := e.bridgeAddresses[aa.chainID]; ok {
e.ethEventSource.UpdateContractBlock(addr, aa.chainID, aa.blockHeight)
}

switch {
case aa.IsBuiltinAssetDeposit():
dep := e.deposits[aa.id]
Expand Down
5 changes: 3 additions & 2 deletions core/banking/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ func getTestEngine(t *testing.T) *testEngine {
notary.EXPECT().OfferSignatures(gomock.Any(), gomock.Any()).AnyTimes()
epoch.EXPECT().NotifyOnEpoch(gomock.Any(), gomock.Any()).AnyTimes()
parties := mocks.NewMockParties(ctrl)
eng := banking.New(logging.NewTestLogger(), banking.NewDefaultConfig(), col, witness, tsvc, assets, notary, broker, top, marketActivityTracker, primaryBridgeView, secondaryBridgeView, ethSource, nil, parties)
eng := banking.New(logging.NewTestLogger(), banking.NewDefaultConfig(), col, witness, tsvc, assets, notary, broker, top, marketActivityTracker, primaryBridgeView, secondaryBridgeView, ethSource, parties)

require.NoError(t, eng.OnMaxQuantumAmountUpdate(context.Background(), num.DecimalOne()))
eng.OnPrimaryEthChainIDUpdated("1")
eng.OnPrimaryEthChainIDUpdated("1", "hello")
eng.OnSecondaryEthChainIDUpdated("2", "hello2")

return &testEngine{
Engine: eng,
Expand Down
6 changes: 1 addition & 5 deletions core/banking/erc20.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ERC20BridgeView interface {
FindBridgeResumed(al *types.ERC20EventBridgeResumed, blockNumber, logIndex uint64, txHash string) error
FindDeposit(d *types.ERC20Deposit, blockNumber, logIndex uint64, ethAssetAddress string, txHash string) error
FindAssetLimitsUpdated(update *types.ERC20AssetLimitsUpdated, blockNumber uint64, logIndex uint64, ethAssetAddress string, txHash string) error
CollateralBridgeAddress() string
}

func (e *Engine) EnableERC20(
Expand Down Expand Up @@ -348,9 +349,4 @@ func (e *Engine) offerERC20NotarySignatures(resource string) []byte {

func (e *Engine) addAction(aa *assetAction) {
e.assetActions[aa.id] = aa
if aa.chainID == e.primaryEthChainID && aa.blockHeight > e.lastSeenPrimaryEthBlock {
e.lastSeenPrimaryEthBlock = aa.blockHeight
} else if aa.chainID == e.secondaryEthChainID && aa.blockHeight > e.lastSeenSecondaryEthBlock {
e.lastSeenSecondaryEthBlock = aa.blockHeight
}
}
26 changes: 20 additions & 6 deletions core/banking/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 18 additions & 15 deletions core/banking/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"code.vegaprotocol.io/vega/core/types"
vgcontext "code.vegaprotocol.io/vega/libs/context"
"code.vegaprotocol.io/vega/libs/num"
"code.vegaprotocol.io/vega/libs/proto"
"code.vegaprotocol.io/vega/logging"
Expand Down Expand Up @@ -312,7 +313,7 @@ func (e *Engine) LoadState(ctx context.Context, p *types.Payload) ([]types.State
case *types.PayloadBankingWithdrawals:
return nil, e.restoreWithdrawals(pl.BankingWithdrawals, p)
case *types.PayloadBankingSeen:
return nil, e.restoreSeen(pl.BankingSeen, p)
return nil, e.restoreSeen(ctx, pl.BankingSeen, p)
case *types.PayloadBankingAssetActions:
return nil, e.restoreAssetActions(pl.BankingAssetActions, p)
case *types.PayloadBankingRecurringTransfers:
Expand Down Expand Up @@ -423,13 +424,28 @@ func (e *Engine) restoreWithdrawals(withdrawals *types.BankingWithdrawals, p *ty
return err
}

func (e *Engine) restoreSeen(seen *types.BankingSeen, p *types.Payload) error {
func (e *Engine) restoreSeen(ctx context.Context, seen *types.BankingSeen, p *types.Payload) error {
var err error
e.log.Info("restoring seen", logging.Int("n", len(seen.Refs)))
e.seenAssetActions = treeset.NewWithStringComparator()
for _, v := range seen.Refs {
e.seenAssetActions.Add(v)
}

if vgcontext.InProgressUpgradeFrom(ctx, "v0.76.8") {
e.ethEventSource.UpdateContractBlock(
e.primaryBridgeView.CollateralBridgeAddress(),
e.primaryEthChainID,
seen.LastSeenPrimaryEthBlock,
)

e.ethEventSource.UpdateContractBlock(
e.secondaryBridgeView.CollateralBridgeAddress(),
e.secondaryEthChainID,
seen.LastSeenSecondaryEthBlock,
)
}

e.lastSeenPrimaryEthBlock = seen.LastSeenPrimaryEthBlock
e.lastSeenSecondaryEthBlock = seen.LastSeenSecondaryEthBlock
e.bss.serialisedSeen, err = proto.Marshal(p.IntoProto())
Expand Down Expand Up @@ -475,16 +491,3 @@ func (e *Engine) OnEpochRestore(_ context.Context, ep types.Epoch) {
e.log.Debug("epoch restoration notification received", logging.String("epoch", ep.String()))
e.currentEpoch = ep.Seq
}

func (e *Engine) OnStateLoaded(_ context.Context) error {
if e.lastSeenPrimaryEthBlock != 0 {
e.log.Info("restoring primary collateral bridge starting block", logging.Uint64("block", e.lastSeenPrimaryEthBlock))
e.primaryEthEventSource.UpdateCollateralStartingBlock(e.lastSeenPrimaryEthBlock)
}

if e.lastSeenSecondaryEthBlock != 0 {
e.log.Info("restoring secondary collateral bridge starting block", logging.Uint64("block", e.lastSeenSecondaryEthBlock))
e.secondaryEthEventSource.UpdateCollateralStartingBlock(e.lastSeenSecondaryEthBlock)
}
return nil
}
4 changes: 4 additions & 0 deletions core/bridges/erc20_logic_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func NewERC20LogicView(
}
}

func (e *ERC20LogicView) CollateralBridgeAddress() string {
return e.clt.CollateralBridgeAddress().Hex()
}

// FindAsset will try to find an asset and validate it's details on ethereum.
func (e *ERC20LogicView) FindAsset(
asset *types.AssetDetails,
Expand Down
18 changes: 18 additions & 0 deletions core/evtforward/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ func (e *Engine) UpdateMultisigControlStartingBlock(b uint64) {
e.ethEngine.UpdateMultiSigControlStartingBlock(b)
}

func (e *Engine) VerifyHeartbeat(ctx context.Context, height uint64, chainID string, contract string, blockTime uint64) error {
return e.ethEngine.VerifyHeartbeat(ctx, height, chainID, contract, blockTime)
}

func (e *Engine) UpdateStartingBlock(address string, block uint64) {
e.ethEngine.UpdateStartingBlock(address, block)
}

func (e *Engine) SetupEthereumEngine(
client ethereum.Client,
forwarder ethereum.Forwarder,
Expand Down Expand Up @@ -127,7 +135,9 @@ func (e *Engine) SetupEthereumEngine(
ethCfg.StakingBridge(),
ethCfg.VestingBridge(),
ethCfg.MultiSigControl(),
ethCfg.CollateralBridge(),
ethCfg.ChainID(),
ethCfg.BlockTime(),
)

e.UpdateCollateralStartingBlock(filterer.CurrentHeight(context.Background()))
Expand Down Expand Up @@ -196,7 +206,9 @@ func (e *Engine) SetupSecondaryEthereumEngine(
types.EthereumContract{},
types.EthereumContract{},
ethCfg.MultiSigControl(),
ethCfg.CollateralBridge(),
ethCfg.ChainID(),
ethCfg.BlockTime(),
)

e.UpdateCollateralStartingBlock(filterer.CurrentHeight(context.Background()))
Expand Down Expand Up @@ -260,6 +272,12 @@ func (e *NoopEngine) UpdateStakingStartingBlock(b uint64) {}

func (e *NoopEngine) UpdateMultisigControlStartingBlock(b uint64) {}

func (e *NoopEngine) VerifyHeartbeat(_ context.Context, _ uint64, _ string, _ string, _ uint64) error {
return nil
}

func (e *NoopEngine) UpdateStartingBlock(_ string, _ uint64) {}

func (e *NoopEngine) SetupEthereumEngine(
_ ethereum.Client,
_ ethereum.Forwarder,
Expand Down
23 changes: 15 additions & 8 deletions core/evtforward/ethereum/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,28 @@ import (
)

const (
defaultHeartbeatInterval = 1 * time.Hour
defaultDurationBetweenTwoRetry = 20 * time.Second
maxEthereumBlocks = 10000 // chosen because one of the validators wanted to use quicknode, and this is their limit
)

type Config struct {
// Level specifies the logging level of the Ethereum implementation of the
// Event Forwarder.
Level encoding.LogLevel `long:"log-level"`
MaxEthereumBlocks uint64 `long:"max-ethereum-blocks"`
PollEventRetryDuration encoding.Duration
ChainID string
SkipClientVerification bool
Level encoding.LogLevel `long:"log-level"`
MaxEthereumBlocks uint64 `long:"max-ethereum-blocks"`
PollEventRetryDuration encoding.Duration
ChainID string
SkipClientVerification bool
HeartbeatIntervalForTestOnlyDoNotChange encoding.Duration
}

func NewDefaultConfig() Config {
return Config{
Level: encoding.LogLevel{Level: logging.InfoLevel},
PollEventRetryDuration: encoding.Duration{Duration: defaultDurationBetweenTwoRetry},
MaxEthereumBlocks: maxEthereumBlocks,
Level: encoding.LogLevel{Level: logging.InfoLevel},
PollEventRetryDuration: encoding.Duration{Duration: defaultDurationBetweenTwoRetry},
MaxEthereumBlocks: maxEthereumBlocks,
HeartbeatIntervalForTestOnlyDoNotChange: encoding.Duration{Duration: defaultHeartbeatInterval},
}
}

Expand All @@ -53,4 +56,8 @@ func (c *Config) setDefaults() {
if c.PollEventRetryDuration.Duration == 0 {
c.PollEventRetryDuration.Duration = defaultDurationBetweenTwoRetry
}

if c.HeartbeatIntervalForTestOnlyDoNotChange.Duration == 0 {
c.HeartbeatIntervalForTestOnlyDoNotChange.Duration = defaultHeartbeatInterval
}
}
Loading

0 comments on commit e817e7e

Please sign in to comment.