Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R]Feature/fix da exception #117

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions op-node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ var optionalFlags = []cli.Flag{
SkipSyncStartCheck,
MantleDaIndexerSocketFlag,
MantleDAIndexerEnableFlag,
RPCEnableAdmin,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
148 changes: 87 additions & 61 deletions op-node/rollup/da/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@ package da

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/shurcooL/graphql"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/Layr-Labs/datalayr/common/graphView"
pb "github.com/Layr-Labs/datalayr/common/interfaces/interfaceRetrieverServer"
mdar "github.com/ethereum-optimism/optimism/op-node/rollup/da/interfaceRetrieverServer"

"github.com/Layr-Labs/datalayr/common/graphView"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -74,16 +75,38 @@ func (mda *MantleDataStore) getDataStoreById(dataStoreId uint32) (*graphView.Dat
return dataStore, nil
}

func (mda *MantleDataStore) getLatestDataStoreId() (*graphView.DataStore, error) {
var query struct {
DataStores []graphView.DataStoreGql `graphql:"dataStores(first:1,orderBy:storeNumber,orderDirection:desc,where:{confirmed: true})"`
}
err := mda.GraphqlClient.Query(mda.Ctx, &query, nil)
if err != nil {
log.Error("failed to query lastest dataStore id", "err", err)
return nil, err
}
if len(query.DataStores) == 0 {
return nil, errors.New("no data store found in this round")
}
dataStore, err := query.DataStores[0].Convert()
if err != nil {
log.Error("DataStoreGql convert to DataStore fail", "err", err)
return nil, err
}
return dataStore, nil
}

func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte, error) {
log.Info("sync block data from mantle da retriever")
conn, err := grpc.Dial(mda.Cfg.RetrieverSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to da retriever fail", "err", err)
return nil, err
}
defer conn.Close()
client := pb.NewDataRetrievalClient(conn)

client := mdar.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
request := &mdar.FramesAndDataRequest{
DataStoreId: dataStoreId,
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
Expand All @@ -95,77 +118,80 @@ func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte,
return reply.GetData(), nil
}

func (mda *MantleDataStore) getFramesFromIndexerByDataStoreId(dataStoreId uint32) ([]byte, error) {
func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, error) {
dataStore, err := mda.getDataStoreById(dataStoreId)
if err != nil {
log.Error("get datastore by id fail", "err", err)
return nil, err
}

log.Info("get dataStore success",
"durationDataStoreId", dataStore.DurationDataStoreId,
"confirmed", dataStore.Confirmed,
"confirmTxHash", hexutil.Encode(dataStore.ConfirmTxHash[:]))
lastDataStore, err := mda.getLatestDataStoreId()
if err != nil {
log.Error("get lastest datastore fail", "err", err)
return nil, err
}
log.Info("get last dataStore success", "dataStoreId", dataStore.StoreNumber)

if !dataStore.Confirmed && dataStoreId < lastDataStore.StoreNumber {
log.Warn("this batch is not confirmed in mantle da,but new batch is confirmed,data corruption exists,need to skip this dataStoreId ", "dataStore id", dataStoreId, "latest dataStore id", lastDataStore.StoreNumber)
return nil, nil
} else if !dataStore.Confirmed {
log.Warn("this batch is not confirmed in mantle da ", "dataStore id", dataStoreId)
return nil, fmt.Errorf("this batch is not confirmed in mantle da,datastore id %d", dataStoreId)
}
var frames []byte
frames, err = mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Error("Get frames from indexer fail", "err", err)
return nil, err
}
if frames == nil {
log.Error("frames is nil ,waiting da indexer syncing")
return nil, fmt.Errorf("frames is nil,maybe da indexer is syncing,need to try again,dataStore id %d", dataStoreId)
}
return frames, nil
}

func (mda *MantleDataStore) RetrievalFramesFromDaIndexer(dataStoreId uint32) ([]byte, error) {
log.Info("sync block data from mantle da indexer")
conn, err := grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
}
defer conn.Close()
client := pb.NewDataRetrievalClient(conn)

client := mdar.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
request := &mdar.FramesAndDataRequest{
DataStoreId: dataStoreId,
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
if err != nil {
log.Error("Retrieve frames and data fail", "err", err)
return nil, err
}
log.Debug("Get reply data from mantle da success", "replyLength", len(reply.GetData()))
if reply.GetData() == nil {
lastDataStoreId, err := client.RetrieveLastConfirmDataStoreId(mda.Ctx, &mdar.LastDataStoreIdRequest{}, opt)
if err != nil {
log.Error("Retrieve last confirmed data store id fail", "err", err)
return nil, err
}
if dataStoreId < lastDataStoreId.GetDataStoreId() {
log.Warn("this batch is not confirmed in mantle da,but new batch is confirmed,data corruption exists,need to skip this dataStoreId ", "dataStore id", dataStoreId, "latest dataStore id", lastDataStoreId.GetDataStoreId())
return nil, nil
}
log.Error("frames is nil ,waiting da indexer syncing")
return nil, fmt.Errorf("frames is nil,maybe da indexer is syncing,need to try again,dataStore id %d", dataStoreId)
}
log.Debug("Get reply data success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
}

func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, error) {
pollingTimeout := time.NewTimer(mda.Cfg.DataStorePollingDuration)
defer pollingTimeout.Stop()
intervalTicker := time.NewTicker(POLLING_INTERVAL)
defer intervalTicker.Stop()
for {
select {
case <-intervalTicker.C:
if dataStoreId <= 0 {
log.Error("DataStoreId less than zero", "dataStoreId", dataStoreId)
return nil, errors.New("dataStoreId less than 0")
}
dataStore, err := mda.getDataStoreById(dataStoreId)
if err != nil {
log.Warn("Get datastore by id fail", "err", err)
continue
}
log.Info("Get dataStore success",
"DurationDataStoreId", dataStore.DurationDataStoreId,
"Confirmed", dataStore.Confirmed,
"ConfirmTxHash", hexutil.Encode(dataStore.ConfirmTxHash[:]))
if !dataStore.Confirmed {
log.Warn("This batch is not confirmed")
continue
}
var frames []byte
if mda.Cfg.MantleDAIndexerEnable { // from mantle da indexer
log.Info("sync block data from mantle da indexer")
frames, err = mda.getFramesFromIndexerByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from indexer fail", "err", err)
continue
}
} else { // from mantle da retriever
log.Info("sync block data from mantle da retriever")
frames, err = mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from mantle da retriever fail", "err", err)
continue
}
}
if frames == nil {
continue
}
return frames, nil
case <-pollingTimeout.C:
return nil, errors.New("Get frame ticker exit")
case err := <-mda.Ctx.Done():
log.Warn("Retrieval service shutting down", "err", err)
return nil, errors.New("Retrieval service shutting down")
}
}
func (mda *MantleDataStore) IsDaIndexer() bool {
return mda.Cfg.MantleDAIndexerEnable
}