diff --git a/protocol/app/app.go b/protocol/app/app.go index 57626186ec..2ac4d17519 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -198,6 +198,10 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer" "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/indexer/msgsender" + + // Grpc Streaming + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" ) var ( @@ -298,8 +302,9 @@ type App struct { // module configurator configurator module.Configurator - IndexerEventManager indexer_manager.IndexerEventManager - Server *daemonserver.Server + IndexerEventManager indexer_manager.IndexerEventManager + GrpcStreamingManager streamingtypes.GrpcStreamingManager + Server *daemonserver.Server // startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a // closure of all relevant data structures that are shared with various keepers. Daemon services startup is @@ -679,6 +684,9 @@ func New( tkeys[indexer_manager.TransientStoreKey], indexerFlags.SendOffchainData, ) + + app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, appOpts, logger) + timeProvider := &timelib.TimeProviderImpl{} app.EpochsKeeper = *epochsmodulekeeper.NewKeeper( @@ -976,6 +984,7 @@ func New( app.StatsKeeper, app.RewardsKeeper, app.IndexerEventManager, + app.GrpcStreamingManager, txConfig.TxDecoder(), clobFlags, rate_limit.NewPanicRateLimiter[*clobmoduletypes.MsgPlaceOrder](), @@ -1740,3 +1749,17 @@ func getIndexerFromOptions( } return indexerMessageSender, indexerFlags } + +// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified +// options. This function will default to returning a no-op instance. +func getGrpcStreamingManagerFromOptions( + appFlags flags.Flags, + appOpts servertypes.AppOptions, + logger log.Logger, +) (manager streamingtypes.GrpcStreamingManager) { + // TODO(CT-625): add command line flags for full node streaming. + if appFlags.NonValidatingFullNode { + return streaming.NewGrpcStreamingManager() + } + return streaming.NewNoopGrpcStreamingManager() +} diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go new file mode 100644 index 0000000000..6b9250145f --- /dev/null +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -0,0 +1,38 @@ +package grpc + +import ( + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) + +// GrpcStreamingManagerImpl is an implementation for managing gRPC streaming subscriptions. +type GrpcStreamingManagerImpl struct { +} + +func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { + return &GrpcStreamingManagerImpl{} +} + +func (sm *GrpcStreamingManagerImpl) Enabled() bool { + return true +} + +// Subscribe subscribes to the orderbook updates stream. +func (sm *GrpcStreamingManagerImpl) Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, +) ( + finished chan bool, + err error, +) { + return nil, nil +} + +// SendOrderbookUpdates groups updates by their clob pair ids and +// sends messages to the subscribers. +func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( + updates *clobtypes.OffchainUpdates, +) { +} diff --git a/protocol/streaming/grpc/noop_streaming_manager.go b/protocol/streaming/grpc/noop_streaming_manager.go new file mode 100644 index 0000000000..b4670ba66f --- /dev/null +++ b/protocol/streaming/grpc/noop_streaming_manager.go @@ -0,0 +1,33 @@ +package grpc + +import ( + "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +var _ types.GrpcStreamingManager = (*NoopGrpcStreamingManager)(nil) + +type NoopGrpcStreamingManager struct{} + +func NewNoopGrpcStreamingManager() *NoopGrpcStreamingManager { + return &NoopGrpcStreamingManager{} +} + +func (sm *NoopGrpcStreamingManager) Enabled() bool { + return false +} + +func (sm *NoopGrpcStreamingManager) Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, +) ( + finished chan bool, + err error, +) { + return nil, nil +} + +func (sm *NoopGrpcStreamingManager) SendOrderbookUpdates( + updates *clobtypes.OffchainUpdates, +) { +} diff --git a/protocol/streaming/grpc/types/manager.go b/protocol/streaming/grpc/types/manager.go new file mode 100644 index 0000000000..7bf043c41d --- /dev/null +++ b/protocol/streaming/grpc/types/manager.go @@ -0,0 +1,19 @@ +package types + +import ( + clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" +) + +type GrpcStreamingManager interface { + Enabled() bool + + // L3+ Orderbook updates. + Subscribe( + req clobtypes.StreamOrderbookUpdatesRequest, + srv clobtypes.Query_StreamOrderbookUpdatesServer, + ) ( + finished chan bool, + err error, + ) + SendOrderbookUpdates(*clobtypes.OffchainUpdates) +} diff --git a/protocol/testutil/keeper/clob.go b/protocol/testutil/keeper/clob.go index 30db24e498..506ebe114a 100644 --- a/protocol/testutil/keeper/clob.go +++ b/protocol/testutil/keeper/clob.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/mocks" + streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc" clobtest "github.com/dydxprotocol/v4-chain/protocol/testutil/clob" "github.com/dydxprotocol/v4-chain/protocol/testutil/constants" asskeeper "github.com/dydxprotocol/v4-chain/protocol/x/assets/keeper" @@ -214,6 +215,7 @@ func createClobKeeper( statsKeeper, rewardsKeeper, indexerEventManager, + streaming.NewNoopGrpcStreamingManager(), constants.TestEncodingCfg.TxConfig.TxDecoder(), flags.GetDefaultClobFlags(), rate_limit.NewNoOpRateLimiter[*types.MsgPlaceOrder](), diff --git a/protocol/x/clob/keeper/keeper.go b/protocol/x/clob/keeper/keeper.go index e79827f1f1..13af8452b5 100644 --- a/protocol/x/clob/keeper/keeper.go +++ b/protocol/x/clob/keeper/keeper.go @@ -14,6 +14,7 @@ import ( "github.com/dydxprotocol/v4-chain/protocol/indexer/indexer_manager" "github.com/dydxprotocol/v4-chain/protocol/lib" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" + streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types" flags "github.com/dydxprotocol/v4-chain/protocol/x/clob/flags" "github.com/dydxprotocol/v4-chain/protocol/x/clob/rate_limit" "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" @@ -31,16 +32,18 @@ type ( UntriggeredConditionalOrders map[types.ClobPairId]*UntriggeredConditionalOrders PerpetualIdToClobPairId map[uint32][]types.ClobPairId - subaccountsKeeper types.SubaccountsKeeper - assetsKeeper types.AssetsKeeper - bankKeeper types.BankKeeper - blockTimeKeeper types.BlockTimeKeeper - feeTiersKeeper types.FeeTiersKeeper - perpetualsKeeper types.PerpetualsKeeper - pricesKeeper types.PricesKeeper - statsKeeper types.StatsKeeper - rewardsKeeper types.RewardsKeeper + subaccountsKeeper types.SubaccountsKeeper + assetsKeeper types.AssetsKeeper + bankKeeper types.BankKeeper + blockTimeKeeper types.BlockTimeKeeper + feeTiersKeeper types.FeeTiersKeeper + perpetualsKeeper types.PerpetualsKeeper + pricesKeeper types.PricesKeeper + statsKeeper types.StatsKeeper + rewardsKeeper types.RewardsKeeper + indexerEventManager indexer_manager.IndexerEventManager + streamingManager streamingtypes.GrpcStreamingManager memStoreInitialized *atomic.Bool @@ -82,6 +85,7 @@ func NewKeeper( statsKeeper types.StatsKeeper, rewardsKeeper types.RewardsKeeper, indexerEventManager indexer_manager.IndexerEventManager, + grpcStreamingManager streamingtypes.GrpcStreamingManager, txDecoder sdk.TxDecoder, clobFlags flags.ClobFlags, placeOrderRateLimiter rate_limit.RateLimiter[*types.MsgPlaceOrder], @@ -107,6 +111,7 @@ func NewKeeper( statsKeeper: statsKeeper, rewardsKeeper: rewardsKeeper, indexerEventManager: indexerEventManager, + streamingManager: grpcStreamingManager, memStoreInitialized: &atomic.Bool{}, txDecoder: txDecoder, mevTelemetryConfig: MevTelemetryConfig{ @@ -136,6 +141,10 @@ func (k Keeper) GetIndexerEventManager() indexer_manager.IndexerEventManager { return k.indexerEventManager } +func (k Keeper) GetGrpcStreamingManager() streamingtypes.GrpcStreamingManager { + return k.streamingManager +} + func (k Keeper) Logger(ctx sdk.Context) log.Logger { return ctx.Logger().With( log.ModuleKey, "x/clob",