diff --git a/cmd/root.go b/cmd/root.go index 1cce104..6ba9702 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,6 +135,7 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") + rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher") rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks") @@ -250,6 +251,7 @@ func init() { viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression")) viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout")) viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled")) + viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode")) viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers")) viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled")) viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName")) diff --git a/configs/config.example.yml b/configs/config.example.yml index 33c46d6..9636521 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -190,9 +190,11 @@ api: publisher: # Whether the publisher is enabled enabled: true + # Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing + mode: default # Kafka broker addresses (comma-separated) brokers: localhost:9092 - + # Block publishing configuration blocks: # Whether to publish block data diff --git a/configs/config.go b/configs/config.go index d2d1017..0be0feb 100644 --- a/configs/config.go +++ b/configs/config.go @@ -172,6 +172,7 @@ type EventPublisherConfig struct { type PublisherConfig struct { Enabled bool `mapstructure:"enabled"` + Mode string `mapstructure:"mode"` Brokers string `mapstructure:"brokers"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` diff --git a/configs/test_config.yml b/configs/test_config.yml index 09c29bd..a817c6d 100644 --- a/configs/test_config.yml +++ b/configs/test_config.yml @@ -64,6 +64,7 @@ api: publisher: enabled: false + mode: default validation: mode: minimal diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index e95978e..25b1dd7 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -5,6 +5,8 @@ import ( "fmt" "math/big" "sort" + "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -25,7 +27,8 @@ type Committer struct { storage storage.IStorage commitFromBlock *big.Int rpc rpc.IRPCClient - lastCommittedBlock *big.Int + lastCommittedBlock atomic.Uint64 + lastPublishedBlock atomic.Uint64 publisher *publisher.Publisher workMode WorkMode workModeChan chan WorkMode @@ -58,15 +61,17 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) committer := &Committer{ - triggerIntervalMs: triggerInterval, - blocksPerCommit: blocksPerCommit, - storage: storage, - commitFromBlock: commitFromBlock, - rpc: rpc, - lastCommittedBlock: commitFromBlock, - publisher: publisher.GetInstance(), - workMode: "", - } + triggerIntervalMs: triggerInterval, + blocksPerCommit: blocksPerCommit, + storage: storage, + commitFromBlock: commitFromBlock, + rpc: rpc, + publisher: publisher.GetInstance(), + workMode: "", + } + cfb := commitFromBlock.Uint64() + committer.lastCommittedBlock.Store(cfb) + committer.lastPublishedBlock.Store(cfb) for _, opt := range opts { opt(committer) @@ -79,15 +84,63 @@ func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond log.Debug().Msgf("Committer running") + chainID := c.rpc.GetChainID() + + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) + if err != nil { + // It's okay to fail silently here; this value is only used for staging cleanup and + // the worker loop will eventually correct the state and delete as needed. + log.Error().Msgf("Error getting latest committed block number: %v", err) + } else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) + } + + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + // It's okay to fail silently here; it's only used for staging cleanup and will be + // corrected by the worker loop. + log.Error().Err(err).Msg("failed to get last published block number") + } else if lastPublished != nil && lastPublished.Sign() > 0 { + c.lastPublishedBlock.Store(lastPublished.Uint64()) + } else { + c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) + } + + c.cleanupProcessedStagingBlocks() + + if config.Cfg.Publisher.Mode == "parallel" { + var wg sync.WaitGroup + publishInterval := interval / 2 + if publishInterval <= 0 { + publishInterval = interval + } + wg.Add(2) + go func() { + defer wg.Done() + c.runPublishLoop(ctx, publishInterval) + }() + // allow the publisher to start before the committer + time.Sleep(publishInterval) + go func() { + defer wg.Done() + c.runCommitLoop(ctx, interval) + }() + <-ctx.Done() + wg.Wait() + log.Info().Msg("Committer shutting down") + c.publisher.Close() + return + } - // Clean up staging data before starting the committer - c.cleanupStagingData() + c.runCommitLoop(ctx, interval) + log.Info().Msg("Committer shutting down") + c.publisher.Close() +} +func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { for { select { case <-ctx.Done(): - log.Info().Msg("Committer shutting down") - c.publisher.Close() return case workMode := <-c.workModeChan: if workMode != c.workMode && workMode != "" { @@ -116,26 +169,46 @@ func (c *Committer) Start(ctx context.Context) { } } -func (c *Committer) cleanupStagingData() { - // Get the last committed block number from main storage - latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) - if err != nil { - log.Error().Msgf("Error getting latest committed block number: %v", err) - return +func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } } +} - if latestCommittedBlockNumber.Sign() == 0 { - log.Debug().Msg("No blocks committed yet, skipping staging data cleanup") +func (c *Committer) cleanupProcessedStagingBlocks() { + committed := c.lastCommittedBlock.Load() + published := c.lastPublishedBlock.Load() + if published == 0 || committed == 0 { return } - - // Delete all staging data older than the latest committed block number - if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil { - log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err) + limit := committed + if published < limit { + limit = published + } + if limit == 0 { return } - - log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber) + chainID := c.rpc.GetChainID() + blockNumber := new(big.Int).SetUint64(limit) + stagingDeleteStart := time.Now() + if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil { + log.Error().Err(err).Msg("Failed to delete staging data") + return + } + log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds()) + metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds()) } func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) { @@ -155,8 +228,9 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er // If no blocks have been committed yet, start from the fromBlock specified in the config latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { - if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 { - log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String()) + lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load()) + if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 { + log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String()) return []*big.Int{}, nil } } @@ -293,13 +367,89 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return sequentialBlockData, nil } +func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + return nil, fmt.Errorf("failed to get last published block number: %v", err) + } + + startBlock := new(big.Int).Set(c.commitFromBlock) + if lastPublished != nil && lastPublished.Sign() > 0 { + startBlock = new(big.Int).Add(lastPublished, big.NewInt(1)) + } + + endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) + + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ + ChainId: chainID, + StartBlock: startBlock, + EndBlock: endBlock, + }) + if err != nil { + return nil, fmt.Errorf("error fetching blocks to publish: %v", err) + } + if len(blocksData) == 0 { + return nil, nil + } + + sort.Slice(blocksData, func(i, j int) bool { + return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 + }) + if blocksData[0].Block.Number.Cmp(startBlock) != 0 { + log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String()) + return nil, nil + } + + sequential := []common.BlockData{blocksData[0]} + expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) + for i := 1; i < len(blocksData); i++ { + if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 { + continue + } + if blocksData[i].Block.Number.Cmp(expected) != 0 { + break + } + sequential = append(sequential, blocksData[i]) + expected.Add(expected, big.NewInt(1)) + } + + return sequential, nil +} + +func (c *Committer) publish(ctx context.Context) error { + blockData, err := c.getSequentialBlockDataToPublish(ctx) + if err != nil { + return err + } + if len(blockData) == 0 { + return nil + } + + if err := c.publisher.PublishBlockData(blockData); err != nil { + return err + } + + chainID := c.rpc.GetChainID() + highest := blockData[len(blockData)-1].Block.Number + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { + return err + } + c.lastPublishedBlock.Store(highest.Uint64()) + go c.cleanupProcessedStagingBlocks() + return nil +} + func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { blockNumbers := make([]*big.Int, len(blockData)) + highestBlock := blockData[0].Block for i, block := range blockData { blockNumbers[i] = block.Block.Number + if block.Block.Number.Cmp(highestBlock.Number) > 0 { + highestBlock = block.Block + } } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) - mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) @@ -308,31 +458,20 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - } - }() - - if c.workMode == WorkModeBackfill { + if config.Cfg.Publisher.Mode == "default" { + highest := highestBlock.Number.Uint64() go func() { - stagingDeleteStart := time.Now() - if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to delete staging data") + if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + return } - log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds()) - metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds()) + c.lastPublishedBlock.Store(highest) + c.cleanupProcessedStagingBlocks() }() } - // Find highest block number from committed blocks - highestBlock := blockData[0].Block - for _, block := range blockData { - if block.Block.Number.Cmp(highestBlock.Number) > 0 { - highestBlock = block.Block - } - } - c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number) + c.lastCommittedBlock.Store(highestBlock.Number.Uint64()) + go c.cleanupProcessedStagingBlocks() // Update metrics for successful commits metrics.SuccessfulCommits.Add(float64(len(blockData))) diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index e1c4f16..964535f 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -311,7 +311,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { assert.Equal(t, big.NewInt(103), result[2].Block.Number) } -func TestCommit(t *testing.T) { +func TestCommitDeletesAfterPublish(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) @@ -324,16 +324,19 @@ func TestCommit(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeBackfill + chainID := big.NewInt(1) blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - // Create a channel to signal when DeleteStagingData is called deleteDone := make(chan struct{}) + committer.lastPublishedBlock.Store(102) + + mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) - mockStagingStorage.EXPECT().DeleteStagingData(blockData).RunAndReturn(func(data []common.BlockData) error { + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { close(deleteDone) return nil }) @@ -341,16 +344,17 @@ func TestCommit(t *testing.T) { err := committer.commit(context.Background(), blockData) assert.NoError(t, err) - // Wait for DeleteStagingData to be called with a timeout select { case <-deleteDone: - // Success - DeleteStagingData was called case <-time.After(2 * time.Second): - t.Fatal("DeleteStagingData was not called within timeout period") + t.Fatal("DeleteOlderThan was not called within timeout period") } } -func TestHandleGap(t *testing.T) { +func TestCommitParallelPublisherMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) @@ -361,68 +365,100 @@ func TestHandleGap(t *testing.T) { OrchestratorStorage: mockOrchestratorStorage, } committer := NewCommitter(mockRPC, mockStorage) - committer.workMode = WorkModeBackfill + committer.workMode = WorkModeLive - expectedStartBlockNumber := big.NewInt(100) - actualFirstBlock := common.Block{Number: big.NewInt(105)} + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } - mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{ - Blocks: 5, - }) - mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{ - {BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}}, - {BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}}, - {BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}}, - {BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}}, - {BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}}, - }) - mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) + mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) - err := committer.handleGap(context.Background(), expectedStartBlockNumber, actualFirstBlock) + err := committer.commit(context.Background(), blockData) + assert.NoError(t, err) - assert.Error(t, err) - assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)") + mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } -func TestStartCommitter(t *testing.T) { +func TestCleanupProcessedStagingBlocks(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ MainStorage: mockMainStorage, StagingStorage: mockStagingStorage, OrchestratorStorage: mockOrchestratorStorage, } - committer := NewCommitter(mockRPC, mockStorage) - committer.triggerIntervalMs = 100 // Set a short interval for testing - committer.workMode = WorkModeBackfill chainID := big.NewInt(1) + committer.lastCommittedBlock.Store(100) + committer.lastPublishedBlock.Store(0) + + committer.cleanupProcessedStagingBlocks() + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) + + committer.lastPublishedBlock.Store(90) mockRPC.EXPECT().GetChainID().Return(chainID) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(90)).Return(nil) + committer.cleanupProcessedStagingBlocks() +} - // Add expectation for DeleteOlderThan call during cleanup - mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) +func TestPublishParallelMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil) - mockMainStorage.On("InsertBlockData", blockData).Return(nil) - mockStagingStorage.On("DeleteStagingData", blockData).Return(nil) - // Start the committer in a goroutine - go committer.Start(context.Background()) + publishDone := make(chan struct{}) - // Wait for a short time to allow the committer to run - time.Sleep(200 * time.Millisecond) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + return nil + }) + mockRPC.EXPECT().GetChainID().Return(chainID) + + err := committer.publish(context.Background()) + assert.NoError(t, err) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("SetLastPublishedBlockNumber was not called") + } + + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } -func TestCommitterRespectsSIGTERM(t *testing.T) { +func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + config.Cfg.Publisher.Enabled = false + mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) @@ -432,49 +468,119 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { StagingStorage: mockStagingStorage, OrchestratorStorage: mockOrchestratorStorage, } - committer := NewCommitter(mockRPC, mockStorage) - committer.triggerIntervalMs = 100 // Short interval for testing - committer.workMode = WorkModeBackfill + committer.workMode = WorkModeLive + committer.lastCommittedBlock.Store(102) chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + publishDone := make(chan struct{}) + deleteDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) mockRPC.EXPECT().GetChainID().Return(chainID) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) - // Add expectation for DeleteOlderThan call during cleanup - mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) + ctx, cancel := context.WithCancel(context.Background()) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + cancel() + return nil + }) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(deleteDone) + return nil + }) - blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + go committer.runPublishLoop(ctx, time.Millisecond) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("publish not triggered") + } + select { + case <-deleteDone: + case <-time.After(2 * time.Second): + t.Fatal("DeleteOlderThan not called") } - mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil) - mockMainStorage.On("InsertBlockData", blockData).Return(nil) - mockStagingStorage.On("DeleteStagingData", blockData).Return(nil) +} - // Create a context that we can cancel - ctx, cancel := context.WithCancel(context.Background()) +func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + config.Cfg.Publisher.Enabled = false + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + committer.lastCommittedBlock.Store(102) - // Start the committer in a goroutine - done := make(chan struct{}) - go func() { - committer.Start(ctx) - close(done) - }() + chainID := big.NewInt(1) - // Wait a bit to ensure the committer is running - time.Sleep(200 * time.Millisecond) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil) - // Cancel the context (simulating SIGTERM) + ctx, cancel := context.WithCancel(context.Background()) + go committer.runPublishLoop(ctx, time.Millisecond) + time.Sleep(2 * time.Millisecond) cancel() + time.Sleep(10 * time.Millisecond) - // Wait for the committer to stop with a timeout - select { - case <-done: - // Success - committer stopped - case <-time.After(2 * time.Second): - t.Fatal("Committer did not stop within timeout period after receiving cancel signal") + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) +} + +func TestHandleGap(t *testing.T) { + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeBackfill + + expectedStartBlockNumber := big.NewInt(100) + actualFirstBlock := common.Block{Number: big.NewInt(105)} + + mockRPC.EXPECT().GetBlocksPerRequest().Return(rpc.BlocksPerRequestConfig{ + Blocks: 5, + }) + mockRPC.EXPECT().GetFullBlocks(context.Background(), []*big.Int{big.NewInt(100), big.NewInt(101), big.NewInt(102), big.NewInt(103), big.NewInt(104)}).Return([]rpc.GetFullBlockResult{ + {BlockNumber: big.NewInt(100), Data: common.BlockData{Block: common.Block{Number: big.NewInt(100)}}}, + {BlockNumber: big.NewInt(101), Data: common.BlockData{Block: common.Block{Number: big.NewInt(101)}}}, + {BlockNumber: big.NewInt(102), Data: common.BlockData{Block: common.Block{Number: big.NewInt(102)}}}, + {BlockNumber: big.NewInt(103), Data: common.BlockData{Block: common.Block{Number: big.NewInt(103)}}}, + {BlockNumber: big.NewInt(104), Data: common.BlockData{Block: common.Block{Number: big.NewInt(104)}}}, + }) + mockStagingStorage.EXPECT().InsertStagingData(mock.Anything).Return(nil) + + err := committer.handleGap(context.Background(), expectedStartBlockNumber, actualFirstBlock) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "first block number (105) in commit batch does not match expected (100)") +} + +func TestStartCommitter(t *testing.T) { } func TestHandleMissingStagingData(t *testing.T) { diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 26b5fea..c61256b 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1014,8 +1014,17 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error { } func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { - query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", - c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) + var query string + + if len(qf.BlockNumbers) > 0 { + query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", + c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) + } else if qf.StartBlock != nil && qf.EndBlock != nil { + query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number BETWEEN %s AND %s AND is_deleted = 0", + c.cfg.Database, qf.StartBlock.String(), qf.EndBlock.String()) + } else { + return nil, fmt.Errorf("either BlockNumbers or StartBlock/EndBlock must be provided") + } if qf.ChainId.Sign() != 0 { query += fmt.Sprintf(" AND chain_id = %s", qf.ChainId.String()) @@ -1075,6 +1084,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { return batch.Send() } +func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String()) + return c.conn.Exec(context.Background(), query) +} + func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 66c1d90..1253213 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -11,6 +11,8 @@ import ( type QueryFilter struct { ChainId *big.Int BlockNumbers []*big.Int + StartBlock *big.Int + EndBlock *big.Int FilterParams map[string]string GroupBy []string SortBy string @@ -83,6 +85,8 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3dc773d..2788beb 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -284,6 +284,11 @@ func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, args = append(args, bn.String()) } query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ",")) + } else if qf.StartBlock != nil && qf.EndBlock != nil { + argCount++ + query += fmt.Sprintf(" AND block_number BETWEEN $%d AND $%d", argCount, argCount+1) + args = append(args, qf.StartBlock.String(), qf.EndBlock.String()) + argCount++ // Increment once more since we used two args } query += " ORDER BY block_number ASC" @@ -344,6 +349,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return err } +func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1` + + var blockNumberString string + err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value) + VALUES ($1, 'publish', $2) + ON CONFLICT (chain_id, cursor_type) + DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()` + + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { query := `SELECT MAX(block_number) FROM block_data WHERE 1=1` diff --git a/internal/storage/postgres_connector_test.go b/internal/storage/postgres_connector_test.go index 4b42d9c..8c2ee55 100644 --- a/internal/storage/postgres_connector_test.go +++ b/internal/storage/postgres_connector_test.go @@ -167,6 +167,17 @@ func TestPostgresConnector_StagingData(t *testing.T) { assert.NoError(t, err) assert.Len(t, retrievedData, 2) + // Test GetStagingData with StartBlock and EndBlock + rangeQf := QueryFilter{ + ChainId: big.NewInt(1), + StartBlock: big.NewInt(100), + EndBlock: big.NewInt(101), + } + + retrievedDataRange, err := conn.GetStagingData(rangeQf) + assert.NoError(t, err) + assert.Len(t, retrievedDataRange, 2) + // Test GetLastStagedBlockNumber lastBlock, err := conn.GetLastStagedBlockNumber(big.NewInt(1), big.NewInt(90), big.NewInt(110)) assert.NoError(t, err) diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 5931f59..14f8e68 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -72,6 +72,111 @@ func (_c *MockIStagingStorage_DeleteStagingData_Call) RunAndReturn(run func([]co return _c } +// GetLastPublishedBlockNumber provides a mock function with given fields: chainId +func (_m *MockIStagingStorage) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + ret := _m.Called(chainId) + + if len(ret) == 0 { + panic("no return value specified for GetLastPublishedBlockNumber") + } + + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int) (*big.Int, error)); ok { + return rf(chainId) + } + if rf, ok := ret.Get(0).(func(*big.Int) *big.Int); ok { + r0 = rf(chainId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int) error); ok { + r1 = rf(chainId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIStagingStorage_GetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastPublishedBlockNumber' +type MockIStagingStorage_GetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// GetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +func (_e *MockIStagingStorage_Expecter) GetLastPublishedBlockNumber(chainId interface{}) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_GetLastPublishedBlockNumber_Call{Call: _e.mock.On("GetLastPublishedBlockNumber", chainId)} +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Return(maxBlockNumber *big.Int, err error) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(maxBlockNumber, err) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + +// SetLastPublishedBlockNumber provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for SetLastPublishedBlockNumber") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_SetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLastPublishedBlockNumber' +type MockIStagingStorage_SetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// SetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) SetLastPublishedBlockNumber(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_SetLastPublishedBlockNumber_Call{Call: _e.mock.On("SetLastPublishedBlockNumber", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Return(_a0 error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + // GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeStart, rangeEnd func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { ret := _m.Called(chainId, rangeStart, rangeEnd)