Skip to content

Commit

Permalink
[R4R]-{release v0.5.0}: Feature/mantle da indexer to v0.5.0 (#91)
Browse files Browse the repository at this point in the history
* dock to mantle da indexer service

* change config flags

* change flags

* add debug log

* fix review issure
  • Loading branch information
guoshijiang committed Jan 23, 2024
1 parent 9d2f150 commit bb0ff70
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 6 deletions.
2 changes: 1 addition & 1 deletion datalayr
12 changes: 12 additions & 0 deletions op-node/flags/flags.go
Expand Up @@ -82,6 +82,16 @@ var (
Value: 120 * time.Second,
EnvVar: prefixEnvVar("DATASTORE_POLLING_DURATION"),
}
MantleDaIndexerSocketFlag = cli.StringFlag{
Name: "da.indexer-socket",
Usage: "mantle da indexer socket",
EnvVar: prefixEnvVar("INDEXER_SOCKET"),
}
MantleDAIndexerEnableFlag = cli.BoolFlag{
Name: "da.indexer-enable",
Usage: "mantle da enable",
EnvVar: prefixEnvVar("INDEXER_ENABLE"),
}

/* Optional Flags */
L1TrustRPC = cli.BoolFlag{
Expand Down Expand Up @@ -286,6 +296,8 @@ var optionalFlags = []cli.Flag{
BackupL2UnsafeSyncRPCTrustRPC,
L2EngineSyncEnabled,
SkipSyncStartCheck,
MantleDaIndexerSocketFlag,
MantleDAIndexerEnableFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
46 changes: 41 additions & 5 deletions op-node/rollup/da/datastore.go
Expand Up @@ -27,6 +27,8 @@ type MantleDataStoreConfig struct {
RetrieverTimeout time.Duration
GraphProvider string
DataStorePollingDuration time.Duration
MantleDaIndexerSocket string
MantleDAIndexerEnable bool
}

type MantleDataStore struct {
Expand Down Expand Up @@ -89,7 +91,28 @@ func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte,
log.Error("Retrieve frames and data fail", "err", err)
return nil, err
}
log.Debug("Get reply data success", "reply length", len(reply.GetData()))
log.Debug("Get reply data success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
}

func (mda *MantleDataStore) getFramesFromIndexerByDataStoreId(dataStoreId uint32) ([]byte, error) {
conn, err := grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error("Connect to mantle da index retriever fail", "err", err)
return nil, err
}
defer conn.Close()
client := pb.NewDataRetrievalClient(conn)
opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE)
request := &pb.FramesAndDataRequest{
DataStoreId: dataStoreId,
}
reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt)
if err != nil {
log.Error("Retrieve frames and data fail", "err", err)
return nil, err
}
log.Debug("Get reply data from mantle da success", "replyLength", len(reply.GetData()))
return reply.GetData(), nil
}

Expand Down Expand Up @@ -118,14 +141,27 @@ func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, e
log.Warn("This batch is not confirmed")
continue
}
frames, err := mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames fail", "err", err)
var frames []byte
if mda.Cfg.MantleDAIndexerEnable { // from mantle da indexer
log.Info("sync block data from mantle da indexer")
frames, err = mda.getFramesFromIndexerByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from indexer fail", "err", err)
continue
}
} else { // from mantle da retriever
log.Info("sync block data from mantle da retriever")
frames, err = mda.getFramesByDataStoreId(dataStoreId)
if err != nil {
log.Warn("Get frames from mantle da retriever fail", "err", err)
continue
}
}
if frames == nil {
continue
}
return frames, nil
case <-pollingTimeout.C:
// todo: add metrics in the future
return nil, errors.New("Get frame ticker exit")
case err := <-mda.Ctx.Done():
log.Warn("Retrieval service shutting down", "err", err)
Expand Down
4 changes: 4 additions & 0 deletions op-node/service.go
Expand Up @@ -213,11 +213,15 @@ func NewMantleDataStoreConfig(ctx *cli.Context) (datastore.MantleDataStoreConfig
retrieverTimeout := ctx.GlobalDuration(flags.RetrieverTimeoutFlag.Name)
graphProvider := ctx.GlobalString(flags.GraphProviderFlag.Name)
dataStorePollingDuration := ctx.GlobalDuration(flags.DataStorePollingDurationFlag.Name)
mantleDaIndexerSocket := ctx.GlobalString(flags.MantleDaIndexerSocketFlag.Name)
mantleDAIndexerEnable := ctx.GlobalBool(flags.MantleDAIndexerEnableFlag.Name)
return datastore.MantleDataStoreConfig{
RetrieverSocket: retrieverSocket,
RetrieverTimeout: retrieverTimeout,
GraphProvider: graphProvider,
DataStorePollingDuration: dataStorePollingDuration,
MantleDaIndexerSocket: mantleDaIndexerSocket,
MantleDAIndexerEnable: mantleDAIndexerEnable,
}, nil
}

Expand Down

0 comments on commit bb0ff70

Please sign in to comment.