diff --git a/datalayr b/datalayr index dffa26d8d..7998b0656 160000 --- a/datalayr +++ b/datalayr @@ -1 +1 @@ -Subproject commit dffa26d8d23c26d86a961f95ac7fc4adbcc7761f +Subproject commit 7998b0656221043aa2e0fbe190c797ecdee662bd diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index cb7458ac4..8b16d6c48 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -82,6 +82,16 @@ var ( Value: 120 * time.Second, EnvVar: prefixEnvVar("DATASTORE_POLLING_DURATION"), } + MantleDaIndexerSocketFlag = cli.StringFlag{ + Name: "da.indexer-socket", + Usage: "mantle da indexer socket", + EnvVar: prefixEnvVar("INDEXER_SOCKET"), + } + MantleDAIndexerEnableFlag = cli.BoolFlag{ + Name: "da.indexer-enable", + Usage: "mantle da enable", + EnvVar: prefixEnvVar("INDEXER_ENABLE"), + } /* Optional Flags */ L1TrustRPC = cli.BoolFlag{ @@ -286,6 +296,8 @@ var optionalFlags = []cli.Flag{ BackupL2UnsafeSyncRPCTrustRPC, L2EngineSyncEnabled, SkipSyncStartCheck, + MantleDaIndexerSocketFlag, + MantleDAIndexerEnableFlag, } // Flags contains the list of configuration options available to the binary. diff --git a/op-node/rollup/da/datastore.go b/op-node/rollup/da/datastore.go index 75dcb6fc3..9151fbef0 100644 --- a/op-node/rollup/da/datastore.go +++ b/op-node/rollup/da/datastore.go @@ -27,6 +27,8 @@ type MantleDataStoreConfig struct { RetrieverTimeout time.Duration GraphProvider string DataStorePollingDuration time.Duration + MantleDaIndexerSocket string + MantleDAIndexerEnable bool } type MantleDataStore struct { @@ -89,7 +91,28 @@ func (mda *MantleDataStore) getFramesByDataStoreId(dataStoreId uint32) ([]byte, log.Error("Retrieve frames and data fail", "err", err) return nil, err } - log.Debug("Get reply data success", "reply length", len(reply.GetData())) + log.Debug("Get reply data success", "replyLength", len(reply.GetData())) + return reply.GetData(), nil +} + +func (mda *MantleDataStore) getFramesFromIndexerByDataStoreId(dataStoreId uint32) ([]byte, error) { + conn, err := grpc.Dial(mda.Cfg.MantleDaIndexerSocket, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Error("Connect to mantle da index retriever fail", "err", err) + return nil, err + } + defer conn.Close() + client := pb.NewDataRetrievalClient(conn) + opt := grpc.MaxCallRecvMsgSize(MAX_RPC_MESSAGE_SIZE) + request := &pb.FramesAndDataRequest{ + DataStoreId: dataStoreId, + } + reply, err := client.RetrieveFramesAndData(mda.Ctx, request, opt) + if err != nil { + log.Error("Retrieve frames and data fail", "err", err) + return nil, err + } + log.Debug("Get reply data from mantle da success", "replyLength", len(reply.GetData())) return reply.GetData(), nil } @@ -118,14 +141,27 @@ func (mda *MantleDataStore) RetrievalFramesFromDa(dataStoreId uint32) ([]byte, e log.Warn("This batch is not confirmed") continue } - frames, err := mda.getFramesByDataStoreId(dataStoreId) - if err != nil { - log.Warn("Get frames fail", "err", err) + var frames []byte + if mda.Cfg.MantleDAIndexerEnable { // from mantle da indexer + log.Info("sync block data from mantle da indexer") + frames, err = mda.getFramesFromIndexerByDataStoreId(dataStoreId) + if err != nil { + log.Warn("Get frames from indexer fail", "err", err) + continue + } + } else { // from mantle da retriever + log.Info("sync block data from mantle da retriever") + frames, err = mda.getFramesByDataStoreId(dataStoreId) + if err != nil { + log.Warn("Get frames from mantle da retriever fail", "err", err) + continue + } + } + if frames == nil { continue } return frames, nil case <-pollingTimeout.C: - // todo: add metrics in the future return nil, errors.New("Get frame ticker exit") case err := <-mda.Ctx.Done(): log.Warn("Retrieval service shutting down", "err", err) diff --git a/op-node/service.go b/op-node/service.go index 08d001290..8b0d2287f 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -213,11 +213,15 @@ func NewMantleDataStoreConfig(ctx *cli.Context) (datastore.MantleDataStoreConfig retrieverTimeout := ctx.GlobalDuration(flags.RetrieverTimeoutFlag.Name) graphProvider := ctx.GlobalString(flags.GraphProviderFlag.Name) dataStorePollingDuration := ctx.GlobalDuration(flags.DataStorePollingDurationFlag.Name) + mantleDaIndexerSocket := ctx.GlobalString(flags.MantleDaIndexerSocketFlag.Name) + mantleDAIndexerEnable := ctx.GlobalBool(flags.MantleDAIndexerEnableFlag.Name) return datastore.MantleDataStoreConfig{ RetrieverSocket: retrieverSocket, RetrieverTimeout: retrieverTimeout, GraphProvider: graphProvider, DataStorePollingDuration: dataStorePollingDuration, + MantleDaIndexerSocket: mantleDaIndexerSocket, + MantleDAIndexerEnable: mantleDAIndexerEnable, }, nil }