Skip to content

Commit

Permalink
compatible matnel da indxer
Browse files Browse the repository at this point in the history
  • Loading branch information
byteflyfunny committed Mar 5, 2024
1 parent a43acc9 commit dcfc4b3
Show file tree
Hide file tree
Showing 8 changed files with 595 additions and 22 deletions.
70 changes: 49 additions & 21 deletions op-node/rollup/da/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/Layr-Labs/datalayr/common/graphView"
pb "github.com/Layr-Labs/datalayr/common/interfaces/interfaceRetrieverServer"
mdar "github.com/ethereum-optimism/optimism/op-node/rollup/da/interfaceRetrieverServer"

"github.com/Layr-Labs/datalayr/common/graphView"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
)
Expand Down Expand Up @@ -96,29 +96,17 @@ func (mda *MantleDataStore) getLatestDataStoreId() (*graphView.DataStore, error)
}

func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte, error) {

var conn *grpc.ClientConn
var err error
if mda.Cfg.MantleDAIndexerEnable {
log.Info("sync block data from mantle da indexer")
conn, err = grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
}
} else {
log.Info("sync block data from mantle da retriever")
conn, err = grpc.Dial(mda.Cfg.RetrieverSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to da retriever fail", "err", err)
return nil, err
}
log.Info("sync block data from mantle da retriever")
conn, err := grpc.Dial(mda.Cfg.RetrieverSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to da retriever fail", "err", err)
return nil, err
}
defer conn.Close()

client := pb.NewDataRetrievalClient(conn)
client := mdar.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
request := &mdar.FramesAndDataRequest{
DataStoreId: dataStoreId,
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
Expand Down Expand Up @@ -167,3 +155,43 @@ func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, e
}
return frames, nil
}

func (mda *MantleDataStore) RetrievalFramesFromDaIndexer(dataStoreId uint32) ([]byte, error) {
log.Info("sync block data from mantle da indexer")
conn, err := grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
}
defer conn.Close()

client := mdar.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &mdar.FramesAndDataRequest{
DataStoreId: dataStoreId,
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
if err != nil {
log.Error("Retrieve frames and data fail", "err", err)
return nil, err
}
if reply.GetData() == nil {
lastDataStoreId, err := client.RetrieveLastConfirmDataStoreId(mda.Ctx, &mdar.LastDataStoreIdRequest{}, opt)
if err != nil {
log.Error("Retrieve last confirmed data store id fail", "err", err)
return nil, err
}
if dataStoreId < lastDataStoreId.GetDataStoreId() {
log.Warn("this batch is not confirmed in mantle da,but new batch is confirmed,data corruption exists,need to skip this dataStoreId ", "dataStore id", dataStoreId, "latest dataStore id", lastDataStoreId.GetDataStoreId())
return nil, nil
}
log.Error("frames is nil ,waiting da indexer syncing")
return nil, fmt.Errorf("frames is nil,maybe da indexer is syncing,need to try again,dataStore id %d", dataStoreId)
}
log.Debug("Get reply data success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
}

func (mda *MantleDataStore) IsDaIndexer() bool {
return mda.Cfg.MantleDAIndexerEnable
}

0 comments on commit dcfc4b3

Please sign in to comment.