Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Krishna/add graph to self heal #1037

Merged
merged 8 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions bridge/setu/listener/rootchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type RootChainListener struct {

stakingInfoAbi *abi.ABI
stateSenderAbi *abi.ABI

// For self-heal, Will be only initialised if sub_graph_url is provided
subGraphClient *subGraphClient
}

const (
Expand Down
221 changes: 0 additions & 221 deletions bridge/setu/listener/rootchain_events.go

This file was deleted.

69 changes: 39 additions & 30 deletions bridge/setu/listener/rootchain_selfheal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package listener
import (
"context"
"fmt"
"net/http"
"sync"
"time"

Expand All @@ -11,8 +12,6 @@ import (

"github.com/ethereum/go-ethereum/core/types"
"github.com/maticnetwork/heimdall/bridge/setu/util"
"github.com/maticnetwork/heimdall/contracts/stakinginfo"
"github.com/maticnetwork/heimdall/contracts/statesender"
"github.com/maticnetwork/heimdall/helper"
)

Expand All @@ -32,13 +31,23 @@ var (
}, []string{"id", "nonce", "contract_address", "block_number", "tx_hash"})
)

type subGraphClient struct {
graphUrl string
httpClient *http.Client
}

// startSelfHealing starts self-healing processes for all required events
func (rl *RootChainListener) startSelfHealing(ctx context.Context) {
if !helper.GetConfig().EnableSH {
if !helper.GetConfig().EnableSH || helper.GetConfig().SubGraphUrl == "" {
rl.Logger.Info("Self-healing disabled")
return
}

rl.subGraphClient = &subGraphClient{
graphUrl: helper.GetConfig().SubGraphUrl,
httpClient: &http.Client{Timeout: 5 * time.Second},
}

stakeUpdateTicker := time.NewTicker(helper.GetConfig().SHStakeUpdateInterval)
stateSyncedTicker := time.NewTicker(helper.GetConfig().SHStateSyncedInterval)

Expand Down Expand Up @@ -76,9 +85,15 @@ func (rl *RootChainListener) processStakeUpdate(ctx context.Context) {
for _, validator := range validatorSet.Validators {
wg.Add(1)

go func(id, nonce uint64) {
go func(id uint64) {
defer wg.Done()

nonce, _, err := util.GetValidatorNonce(rl.cliCtx, id)
if err != nil {
rl.Logger.Error("Error getting nonce for validator from heimdall", "error", err, "id", id)
return
}

var ethereumNonce uint64

if err = helper.ExponentialBackoff(func() error {
Expand All @@ -95,9 +110,9 @@ func (rl *RootChainListener) processStakeUpdate(ctx context.Context) {

nonce++

rl.Logger.Info("Processing stake update for validator", "id", id, "nonce", nonce)
rl.Logger.Info("Processing stake update for validator", "id", id, "ethereumNonce", ethereumNonce, "nonce", nonce)

var stakeUpdate *stakinginfo.StakinginfoStakeUpdate
var stakeUpdate *types.Log

if err = helper.ExponentialBackoff(func() error {
stakeUpdate, err = rl.getStakeUpdate(ctx, id, nonce)
Expand All @@ -108,19 +123,19 @@ func (rl *RootChainListener) processStakeUpdate(ctx context.Context) {
}

stakeUpdateCounter.WithLabelValues(
stakeUpdate.ValidatorId.String(),
stakeUpdate.Nonce.String(),
stakeUpdate.Raw.Address.String(),
fmt.Sprintf("%d", stakeUpdate.Raw.BlockNumber),
stakeUpdate.Raw.TxHash.String(),
fmt.Sprintf("%d", id),
fmt.Sprintf("%d", nonce),
stakeUpdate.Address.Hex(),
fmt.Sprintf("%d", stakeUpdate.BlockNumber),
stakeUpdate.TxHash.Hex(),
).Add(1)

if _, err = rl.processEvent(ctx, stakeUpdate.Raw); err != nil {
if _, err = rl.processEvent(ctx, stakeUpdate); err != nil {
rl.Logger.Error("Error processing stake update for validator", "error", err, "id", id)
} else {
rl.Logger.Info("Processed stake update for validator", "id", id, "nonce", nonce)
}
}(validator.ID.Uint64(), validator.Nonce)
}(validator.ID.Uint64())
}

wg.Wait()
Expand Down Expand Up @@ -152,7 +167,7 @@ func (rl *RootChainListener) processStateSynced(ctx context.Context) {

rl.Logger.Info("Processing state sync", "id", i)

var stateSynced *statesender.StatesenderStateSynced
var stateSynced *types.Log

if err = helper.ExponentialBackoff(func() error {
stateSynced, err = rl.getStateSync(ctx, i)
Expand All @@ -163,13 +178,13 @@ func (rl *RootChainListener) processStateSynced(ctx context.Context) {
}

stateSyncedCounter.WithLabelValues(
stateSynced.Id.String(),
stateSynced.Raw.Address.String(),
fmt.Sprintf("%d", stateSynced.Raw.BlockNumber),
stateSynced.Raw.TxHash.String(),
fmt.Sprintf("%d", i),
stateSynced.Address.Hex(),
fmt.Sprintf("%d", stateSynced.BlockNumber),
stateSynced.TxHash.Hex(),
).Add(1)

ignore, err := rl.processEvent(ctx, stateSynced.Raw)
ignore, err := rl.processEvent(ctx, stateSynced)
if err != nil {
rl.Logger.Error("Unable to update state id on heimdall", "error", err)
i--
Expand Down Expand Up @@ -199,32 +214,26 @@ func (rl *RootChainListener) processStateSynced(ctx context.Context) {
}
}

func (rl *RootChainListener) processEvent(ctx context.Context, event types.Log) (bool, error) {
// Check existence of topics beforehand and ignore if no topic exists
// (TODO): Identify issue of empty events: See Jira POS-818
if len(event.Topics) == 0 {
return true, nil
}

blockTime, err := rl.contractConnector.GetMainChainBlockTime(ctx, event.BlockNumber)
func (rl *RootChainListener) processEvent(ctx context.Context, vLog *types.Log) (bool, error) {
blockTime, err := rl.contractConnector.GetMainChainBlockTime(ctx, vLog.BlockNumber)
if err != nil {
rl.Logger.Error("Unable to get block time", "error", err)
return false, err
}

if time.Since(blockTime) < helper.GetConfig().SHMaxDepthDuration {
rl.Logger.Info("Block time is less than an hour, skipping state sync")
rl.Logger.Info("Block time is less than max time depth, skipping event")
return true, err
}

topic := event.Topics[0].Bytes()
topic := vLog.Topics[0].Bytes()
0xKrishna marked this conversation as resolved.
Show resolved Hide resolved
for _, abiObject := range rl.abis {
selectedEvent := helper.EventByID(abiObject, topic)
if selectedEvent == nil {
continue
}

rl.handleLog(event, selectedEvent)
rl.handleLog(*vLog, selectedEvent)
}

return false, nil
Expand Down