Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]Feature/fix da exception #117

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ var optionalFlags = []cli.Flag{
SkipSyncStartCheck,
MantleDaIndexerSocketFlag,
MantleDAIndexerEnableFlag,
RPCEnableAdmin,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
138 changes: 68 additions & 70 deletions op-node/rollup/da/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package da

import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/common/hexutil"
byteflyfunny marked this conversation as resolved.
Show resolved Hide resolved
"strconv"
"time"

"github.com/pkg/errors"
"github.com/shurcooL/graphql"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/Layr-Labs/datalayr/common/graphView"
pb "github.com/Layr-Labs/datalayr/common/interfaces/interfaceRetrieverServer"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)

Expand Down Expand Up @@ -74,34 +75,47 @@ func (mda *MantleDataStore) getDataStoreById(dataStoreId uint32) (*graphView.Dat
return dataStore, nil
}

func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte, error) {
conn, err := grpc.Dial(mda.Cfg.RetrieverSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
func (mda *MantleDataStore) getLatestDataStoreId() (*graphView.DataStore, error) {
var query struct {
DataStores []graphView.DataStoreGql `graphql:"dataStores(first:1,orderBy:storeNumber,orderDirection:desc,where:{confirmed: true})"`
}
err := mda.GraphqlClient.Query(mda.Ctx, &query, nil)
if err != nil {
log.Error("Connect to da retriever fail", "err", err)
log.Error("failed to query lastest dataStore id", "err", err)
return nil, err
}
defer conn.Close()
client := pb.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
DataStoreId: dataStoreId,
if len(query.DataStores) == 0 {
return nil, errors.New("no data store found in this round")
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
dataStore, err := query.DataStores[0].Convert()
if err != nil {
log.Error("Retrieve frames and data fail", "err", err)
log.Error("DataStoreGql convert to DataStore fail", "err", err)
return nil, err
}
log.Debug("Get reply data success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
return dataStore, nil
}

func (mda *MantleDataStore) getFramesFromIndexerByDataStoreId(dataStoreId uint32) ([]byte, error) {
conn, err := grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte, error) {

var conn *grpc.ClientConn
var err error
if mda.Cfg.MantleDAIndexerEnable {
log.Info("sync block data from mantle da indexer")
conn, err = grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
}
} else {
log.Info("sync block data from mantle da retriever")
conn, err = grpc.Dial(mda.Cfg.RetrieverSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to da retriever fail", "err", err)
return nil, err
}
}
defer conn.Close()

client := pb.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
Expand All @@ -112,60 +126,44 @@ func (mda *MantleDataStore) getFramesFromIndexerByDataStoreId(dataStoreId uint32
log.Error("Retrieve frames and data fail", "err", err)
return nil, err
}
log.Debug("Get reply data from mantle da success", "replyLength", len(reply.GetData()))
log.Debug("Get reply data success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
}

func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, error) {
pollingTimeout := time.NewTimer(mda.Cfg.DataStorePollingDuration)
defer pollingTimeout.Stop()
intervalTicker := time.NewTicker(POLLING_INTERVAL)
defer intervalTicker.Stop()
for {
select {
case <-intervalTicker.C:
if dataStoreId <= 0 {
log.Error("DataStoreId less than zero", "dataStoreId", dataStoreId)
return nil, errors.New("dataStoreId less than 0")
}
dataStore, err := mda.getDataStoreById(dataStoreId)
if err != nil {
log.Warn("Get datastore by id fail", "err", err)
continue
}
log.Info("Get dataStore success",
"DurationDataStoreId", dataStore.DurationDataStoreId,
"Confirmed", dataStore.Confirmed,
"ConfirmTxHash", hexutil.Encode(dataStore.ConfirmTxHash[:]))
if !dataStore.Confirmed {
log.Warn("This batch is not confirmed")
continue
}
var frames []byte
if mda.Cfg.MantleDAIndexerEnable { // from mantle da indexer
log.Info("sync block data from mantle da indexer")
frames, err = mda.getFramesFromIndexerByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from indexer fail", "err", err)
continue
}
} else { // from mantle da retriever
log.Info("sync block data from mantle da retriever")
frames, err = mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from mantle da retriever fail", "err", err)
continue
}
}
if frames == nil {
continue
}
return frames, nil
case <-pollingTimeout.C:
return nil, errors.New("Get frame ticker exit")
case err := <-mda.Ctx.Done():
log.Warn("Retrieval service shutting down", "err", err)
return nil, errors.New("Retrieval service shutting down")
}
dataStore, err := mda.getDataStoreById(dataStoreId)
if err != nil {
log.Error("get datastore by id fail", "err", err)
return nil, err
}

log.Info("get dataStore success",
"durationDataStoreId", dataStore.DurationDataStoreId,
"confirmed", dataStore.Confirmed,
"confirmTxHash", hexutil.Encode(dataStore.ConfirmTxHash[:]))
lastDataStore, err := mda.getLatestDataStoreId()
if err != nil {
log.Error("get lastest datastore fail", "err", err)
return nil, err
}
log.Info("get last dataStore success", "dataStoreId", dataStore.StoreNumber)

if !dataStore.Confirmed && dataStoreId < lastDataStore.StoreNumber {
log.Warn("this batch is not confirmed in mantle da,but new batch is confirmed,data corruption exists,need to skip this dataStoreId ", "dataStore id", dataStoreId, "latest dataStore id", lastDataStore.StoreNumber)
return nil, nil
} else if !dataStore.Confirmed {
log.Warn("this batch is not confirmed in mantle da ", "dataStore id", dataStoreId)
return nil, fmt.Errorf("this batch is not confirmed in mantle da,datastore id %d", dataStoreId)
}
var frames []byte
frames, err = mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Error("Get frames from indexer fail", "err", err)
return nil, err
}
if frames == nil {
log.Error("frames is nil ,waiting da indexer syncing")
return nil, fmt.Errorf("frames is nil,maybe da indexer is syncing,need to try again,dataStore id %d", dataStoreId)
}
return frames, nil
}
50 changes: 37 additions & 13 deletions op-node/rollup/derive/calldata_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,24 @@ func NewDataSource(ctx context.Context, log log.Logger, cfg *rollup.Config, fetc
batcherAddr: batcherAddr,
}
} else {
return &DataSource{
open: true,
data: dataFromMantleDa(cfg, receipts, syncer, metrics, log.New("origin", block)),
data, err := dataFromMantleDa(cfg, receipts, syncer, metrics, log.New("origin", block))
if err != nil {
return &DataSource{
open: false,
id: block,
cfg: cfg,
fetcher: fetcher,
syncer: syncer,
metrics: metrics,
log: log,
batcherAddr: batcherAddr,
}
} else {
return &DataSource{
open: true,
data: data,
}

}
}
}
Expand Down Expand Up @@ -129,8 +144,12 @@ func (ds *DataSource) Next(ctx context.Context) (eth.Data, error) {
if !ds.open {
if ds.cfg.MantleDaSwitch { // fetch data from mantleDA
if _, receipts, err := ds.fetcher.FetchReceipts(ctx, ds.id.Hash); err == nil {
data, err := dataFromMantleDa(ds.cfg, receipts, ds.syncer, ds.metrics, log.New("origin", ds.id))
if err != nil {
return nil, NewTemporaryError(fmt.Errorf("failed to open mantle da calldata source: %w", err))
}
ds.open = true
ds.data = dataFromMantleDa(ds.cfg, receipts, ds.syncer, ds.metrics, log.New("origin", ds.id))
ds.data = data
} else if errors.Is(err, ethereum.NotFound) {
return nil, NewResetError(fmt.Errorf("failed to open mantle da calldata source: %w", err))
} else {
Expand Down Expand Up @@ -178,17 +197,17 @@ func DataFromEVMTransactions(config *rollup.Config, batcherAddr common.Address,
return out
}

func dataFromMantleDa(config *rollup.Config, receipts types.Receipts, syncer MantleDaSyncer, metrics Metrics, log log.Logger) []eth.Data {
func dataFromMantleDa(config *rollup.Config, receipts types.Receipts, syncer MantleDaSyncer, metrics Metrics, log log.Logger) ([]eth.Data, error) {
var out []eth.Data
abiUint32, err := abi.NewType("uint32", "uint32", nil)
if err != nil {
log.Error("Abi new uint32 type error", "err", err)
return out
return out, err
}
abiBytes32, err := abi.NewType("bytes32", "bytes32", nil)
if err != nil {
log.Error("Abi new bytes32 type error", "err", err)
return out
return out, err
}
confirmDataStoreArgs := abi.Arguments{
{
Expand Down Expand Up @@ -217,25 +236,30 @@ func dataFromMantleDa(config *rollup.Config, receipts types.Receipts, syncer Man
continue
}
if dataStoreData != nil {
dataStoreId := dataStoreData["dataStoreId"].(uint32)
nextDataStoreId := dataStoreData["dataStoreId"].(uint32)
dataStoreId := nextDataStoreId - 1
log.Info("Parse confirmed dataStoreId success", "dataStoreId", dataStoreId, "address", rLog.Address.String())
daFrames, err := syncer.RetrievalFramesFromDa(dataStoreId - 1)
daFrames, err := syncer.RetrievalFramesFromDa(dataStoreId)
if err != nil {
log.Error("Retrieval frames from mantleDa error", "dataStoreId", dataStoreId, "err", err)
continue
return out, err
}
//skip this dataStore id
if daFrames == nil {
return out, nil
}
log.Info("Retrieval frames from mantle da success", "daFrames length", len(daFrames), "dataStoreId", dataStoreId)
err = rlp.DecodeBytes(daFrames, &out)
if err != nil {
log.Error("Decode retrieval frames in error", "err", err)
log.Error("Decode retrieval frames in error,skip wrong data", "err", err, "skip datastore id", dataStoreId)
continue
}
metrics.RecordParseDataStoreId(dataStoreId)
log.Info("Decode bytes success", "out length", len(out), "dataStoreId", dataStoreId)
}
return out
return out, nil
}
}
}
return out
return out, nil
}