From 22592de81f1f5f5d530f1f5f6461cb8bccb1e6af Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 03:23:06 -0400 Subject: [PATCH 1/8] fix: initialize publish cursor in parallel mode --- cmd/root.go | 2 + configs/config.example.yml | 4 +- configs/config.go | 1 + configs/test_config.yml | 1 + internal/orchestrator/committer.go | 170 +++++++++++++++++++++--- internal/orchestrator/committer_test.go | 155 ++++++++++++++++++++- internal/storage/clickhouse.go | 25 ++++ internal/storage/connector.go | 2 + internal/storage/postgres.go | 29 ++++ test/mocks/MockIStagingStorage.go | 105 +++++++++++++++ 10 files changed, 476 insertions(+), 18 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 1cce104..6ba9702 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -135,6 +135,7 @@ func init() { rootCmd.PersistentFlags().Bool("api-contractApiRequest-disableCompression", false, "Disable compression for contract API request") rootCmd.PersistentFlags().Int("api-contractApiRequest-timeout", 10, "Timeout in seconds for contract API request") rootCmd.PersistentFlags().Bool("publisher-enabled", false, "Toggle publisher") + rootCmd.PersistentFlags().String("publisher-mode", "default", "Publisher mode: default or parallel") rootCmd.PersistentFlags().String("publisher-brokers", "", "Kafka brokers") rootCmd.PersistentFlags().Bool("publisher-blocks-enabled", false, "Toggle block publisher") rootCmd.PersistentFlags().String("publisher-blocks-topicName", "", "Kafka topic name for blocks") @@ -250,6 +251,7 @@ func init() { viper.BindPFlag("api.contractApiRequest.disableCompression", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-disableCompression")) viper.BindPFlag("api.contractApiRequest.timeout", rootCmd.PersistentFlags().Lookup("api-contractApiRequest-timeout")) viper.BindPFlag("publisher.enabled", rootCmd.PersistentFlags().Lookup("publisher-enabled")) + viper.BindPFlag("publisher.mode", rootCmd.PersistentFlags().Lookup("publisher-mode")) viper.BindPFlag("publisher.brokers", rootCmd.PersistentFlags().Lookup("publisher-brokers")) viper.BindPFlag("publisher.blocks.enabled", rootCmd.PersistentFlags().Lookup("publisher-blocks-enabled")) viper.BindPFlag("publisher.blocks.topicName", rootCmd.PersistentFlags().Lookup("publisher-blocks-topicName")) diff --git a/configs/config.example.yml b/configs/config.example.yml index 33c46d6..9636521 100644 --- a/configs/config.example.yml +++ b/configs/config.example.yml @@ -190,9 +190,11 @@ api: publisher: # Whether the publisher is enabled enabled: true + # Publisher mode: "default" publishes after storage commit, "parallel" runs publishing alongside committing + mode: default # Kafka broker addresses (comma-separated) brokers: localhost:9092 - + # Block publishing configuration blocks: # Whether to publish block data diff --git a/configs/config.go b/configs/config.go index d2d1017..0be0feb 100644 --- a/configs/config.go +++ b/configs/config.go @@ -172,6 +172,7 @@ type EventPublisherConfig struct { type PublisherConfig struct { Enabled bool `mapstructure:"enabled"` + Mode string `mapstructure:"mode"` Brokers string `mapstructure:"brokers"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` diff --git a/configs/test_config.yml b/configs/test_config.yml index 09c29bd..a817c6d 100644 --- a/configs/test_config.yml +++ b/configs/test_config.yml @@ -64,6 +64,7 @@ api: publisher: enabled: false + mode: default validation: mode: minimal diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index e95978e..1bdaceb 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sort" + "sync" "time" "github.com/rs/zerolog/log" @@ -75,6 +76,34 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe return committer } +func (c *Committer) initializeParallelPublisher() { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + log.Error().Err(err).Msg("failed to get last published block number") + return + } + mainMax, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) + if err != nil { + log.Error().Err(err).Msg("failed to get max block number from main storage") + return + } + if lastPublished == nil || lastPublished.Sign() == 0 { + if mainMax != nil && mainMax.Sign() > 0 { + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { + log.Error().Err(err).Msg("failed to set last published block number") + } + } + return + } + if lastPublished.Cmp(mainMax) < 0 { + log.Warn().Msgf("Publish block number seek ahead from %s to %s", lastPublished.String(), mainMax.String()) + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { + log.Error().Err(err).Msg("failed to set last published block number") + } + } +} + func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond @@ -83,11 +112,34 @@ func (c *Committer) Start(ctx context.Context) { // Clean up staging data before starting the committer c.cleanupStagingData() + if config.Cfg.Publisher.Mode == "parallel" { + c.initializeParallelPublisher() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + c.runCommitLoop(ctx, interval) + }() + go func() { + defer wg.Done() + c.runPublishLoop(ctx, interval) + }() + <-ctx.Done() + wg.Wait() + log.Info().Msg("Committer shutting down") + c.publisher.Close() + return + } + + c.runCommitLoop(ctx, interval) + log.Info().Msg("Committer shutting down") + c.publisher.Close() +} + +func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { for { select { case <-ctx.Done(): - log.Info().Msg("Committer shutting down") - c.publisher.Close() return case workMode := <-c.workModeChan: if workMode != c.workMode && workMode != "" { @@ -116,6 +168,24 @@ func (c *Committer) Start(ctx context.Context) { } } +func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } + } +} + func (c *Committer) cleanupStagingData() { // Get the last committed block number from main storage latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) @@ -293,13 +363,88 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo return sequentialBlockData, nil } +func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]common.BlockData, error) { + chainID := c.rpc.GetChainID() + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + return nil, fmt.Errorf("failed to get last published block number: %v", err) + } + + startBlock := new(big.Int).Set(c.commitFromBlock) + if lastPublished != nil && lastPublished.Sign() > 0 { + startBlock = new(big.Int).Add(lastPublished, big.NewInt(1)) + } + + endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) + blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 + blockNumbers := make([]*big.Int, blockCount) + for i := int64(0); i < blockCount; i++ { + blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i)) + } + + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers}) + if err != nil { + return nil, fmt.Errorf("error fetching blocks to publish: %v", err) + } + if len(blocksData) == 0 { + return nil, nil + } + + sort.Slice(blocksData, func(i, j int) bool { + return blocksData[i].Block.Number.Cmp(blocksData[j].Block.Number) < 0 + }) + if blocksData[0].Block.Number.Cmp(startBlock) != 0 { + log.Debug().Msgf("First block to publish %s does not match expected %s", blocksData[0].Block.Number.String(), startBlock.String()) + return nil, nil + } + + sequential := []common.BlockData{blocksData[0]} + expected := new(big.Int).Add(blocksData[0].Block.Number, big.NewInt(1)) + for i := 1; i < len(blocksData); i++ { + if blocksData[i].Block.Number.Cmp(blocksData[i-1].Block.Number) == 0 { + continue + } + if blocksData[i].Block.Number.Cmp(expected) != 0 { + break + } + sequential = append(sequential, blocksData[i]) + expected.Add(expected, big.NewInt(1)) + } + + return sequential, nil +} + +func (c *Committer) publish(ctx context.Context) error { + blockData, err := c.getSequentialBlockDataToPublish(ctx) + if err != nil { + return err + } + if len(blockData) == 0 { + return nil + } + + if err := c.publisher.PublishBlockData(blockData); err != nil { + return err + } + + chainID := c.rpc.GetChainID() + highest := blockData[len(blockData)-1].Block.Number + if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { + return err + } + return nil +} + func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) error { blockNumbers := make([]*big.Int, len(blockData)) + highestBlock := blockData[0].Block for i, block := range blockData { blockNumbers[i] = block.Block.Number + if block.Block.Number.Cmp(highestBlock.Number) > 0 { + highestBlock = block.Block + } } log.Debug().Msgf("Committing %d blocks", len(blockNumbers)) - mainStorageStart := time.Now() if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil { log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers) @@ -308,11 +453,13 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds()) metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) - go func() { - if err := c.publisher.PublishBlockData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to publish block data to kafka") - } - }() + if config.Cfg.Publisher.Mode == "default" { + go func() { + if err := c.publisher.PublishBlockData(blockData); err != nil { + log.Error().Err(err).Msg("Failed to publish block data to kafka") + } + }() + } if c.workMode == WorkModeBackfill { go func() { @@ -325,13 +472,6 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er }() } - // Find highest block number from committed blocks - highestBlock := blockData[0].Block - for _, block := range blockData { - if block.Block.Number.Cmp(highestBlock.Number) > 0 { - highestBlock = block.Block - } - } c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number) // Update metrics for successful commits diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index e1c4f16..74b4d2f 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -324,9 +324,10 @@ func TestCommit(t *testing.T) { committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeBackfill + chainID := big.NewInt(1) blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } // Create a channel to signal when DeleteStagingData is called @@ -350,6 +351,156 @@ func TestCommit(t *testing.T) { } } +func TestCommitParallelPublisherMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) + + err := committer.commit(context.Background(), blockData) + assert.NoError(t, err) + + mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) +} + +func TestPublishParallelMode(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + publishDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + return nil + }) + + err := committer.publish(context.Background()) + assert.NoError(t, err) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("SetLastPublishedBlockNumber was not called") + } +} + +func TestInitializeParallelPublisherZero(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + last := big.NewInt(0) + max := big.NewInt(100) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil) + + committer.initializeParallelPublisher() +} + +func TestInitializeParallelPublisherSeekAhead(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + last := big.NewInt(50) + max := big.NewInt(100) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil) + + committer.initializeParallelPublisher() +} + +func TestInitializeParallelPublisherAhead(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + last := big.NewInt(150) + max := big.NewInt(100) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) + mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) + + committer.initializeParallelPublisher() + + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) +} + func TestHandleGap(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 26b5fea..37e6664 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1075,6 +1075,31 @@ func (c *ClickHouseConnector) DeleteStagingData(data []common.BlockData) error { return batch.Send() } +func (c *ClickHouseConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'publish'", c.cfg.Database) + if chainId.Sign() > 0 { + query += fmt.Sprintf(" AND chain_id = %s", chainId.String()) + } + var blockNumberString string + err := c.conn.QueryRow(context.Background(), query).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (c *ClickHouseConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf("INSERT INTO %s.cursors (chain_id, cursor_type, cursor_value) VALUES (%s, 'publish', '%s')", c.cfg.Database, chainId, blockNumber.String()) + return c.conn.Exec(context.Background(), query) +} + func (c *ClickHouseConnector) GetLastReorgCheckedBlockNumber(chainId *big.Int) (*big.Int, error) { query := fmt.Sprintf("SELECT cursor_value FROM %s.cursors FINAL WHERE cursor_type = 'reorg'", c.cfg.Database) if chainId.Sign() > 0 { diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 66c1d90..6286713 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,8 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + GetLastPublishedBlockNumber(chainId *big.Int) (maxBlockNumber *big.Int, err error) + SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 3dc773d..c28c016 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -344,6 +344,35 @@ func (p *PostgresConnector) DeleteStagingData(data []common.BlockData) error { return err } +func (p *PostgresConnector) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + query := `SELECT cursor_value FROM cursors WHERE cursor_type = 'publish' AND chain_id = $1` + + var blockNumberString string + err := p.db.QueryRow(query, chainId.String()).Scan(&blockNumberString) + if err != nil { + if err == sql.ErrNoRows { + return big.NewInt(0), nil + } + return nil, err + } + + blockNumber, ok := new(big.Int).SetString(blockNumberString, 10) + if !ok { + return nil, fmt.Errorf("failed to parse block number: %s", blockNumberString) + } + return blockNumber, nil +} + +func (p *PostgresConnector) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + query := `INSERT INTO cursors (chain_id, cursor_type, cursor_value) + VALUES ($1, 'publish', $2) + ON CONFLICT (chain_id, cursor_type) + DO UPDATE SET cursor_value = EXCLUDED.cursor_value, updated_at = NOW()` + + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { query := `SELECT MAX(block_number) FROM block_data WHERE 1=1` diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 5931f59..14f8e68 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -72,6 +72,111 @@ func (_c *MockIStagingStorage_DeleteStagingData_Call) RunAndReturn(run func([]co return _c } +// GetLastPublishedBlockNumber provides a mock function with given fields: chainId +func (_m *MockIStagingStorage) GetLastPublishedBlockNumber(chainId *big.Int) (*big.Int, error) { + ret := _m.Called(chainId) + + if len(ret) == 0 { + panic("no return value specified for GetLastPublishedBlockNumber") + } + + var r0 *big.Int + var r1 error + if rf, ok := ret.Get(0).(func(*big.Int) (*big.Int, error)); ok { + return rf(chainId) + } + if rf, ok := ret.Get(0).(func(*big.Int) *big.Int); ok { + r0 = rf(chainId) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*big.Int) + } + } + + if rf, ok := ret.Get(1).(func(*big.Int) error); ok { + r1 = rf(chainId) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockIStagingStorage_GetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastPublishedBlockNumber' +type MockIStagingStorage_GetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// GetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +func (_e *MockIStagingStorage_Expecter) GetLastPublishedBlockNumber(chainId interface{}) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_GetLastPublishedBlockNumber_Call{Call: _e.mock.On("GetLastPublishedBlockNumber", chainId)} +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) Return(maxBlockNumber *big.Int, err error) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(maxBlockNumber, err) + return _c +} + +func (_c *MockIStagingStorage_GetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int) (*big.Int, error)) *MockIStagingStorage_GetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + +// SetLastPublishedBlockNumber provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) SetLastPublishedBlockNumber(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for SetLastPublishedBlockNumber") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_SetLastPublishedBlockNumber_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SetLastPublishedBlockNumber' +type MockIStagingStorage_SetLastPublishedBlockNumber_Call struct { + *mock.Call +} + +// SetLastPublishedBlockNumber is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) SetLastPublishedBlockNumber(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + return &MockIStagingStorage_SetLastPublishedBlockNumber_Call{Call: _e.mock.On("SetLastPublishedBlockNumber", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) Return(_a0 error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_SetLastPublishedBlockNumber_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_SetLastPublishedBlockNumber_Call { + _c.Call.Return(run) + return _c +} + // GetLastStagedBlockNumber provides a mock function with given fields: chainId, rangeStart, rangeEnd func (_m *MockIStagingStorage) GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (*big.Int, error) { ret := _m.Called(chainId, rangeStart, rangeEnd) From 95db15966c3853784c04dcf793980914858c5931 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:45:07 -0400 Subject: [PATCH 2/8] fix: ensure publisher leads committer --- internal/orchestrator/committer.go | 40 ++++++++++-- internal/orchestrator/committer_test.go | 82 +++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 5 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 1bdaceb..012e503 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -27,6 +27,7 @@ type Committer struct { commitFromBlock *big.Int rpc rpc.IRPCClient lastCommittedBlock *big.Int + lastCommittedLock sync.RWMutex publisher *publisher.Publisher workMode WorkMode workModeChan chan WorkMode @@ -76,6 +77,18 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe return committer } +func (c *Committer) getLastCommittedBlock() *big.Int { + c.lastCommittedLock.RLock() + defer c.lastCommittedLock.RUnlock() + return new(big.Int).Set(c.lastCommittedBlock) +} + +func (c *Committer) setLastCommittedBlock(b *big.Int) { + c.lastCommittedLock.Lock() + c.lastCommittedBlock = new(big.Int).Set(b) + c.lastCommittedLock.Unlock() +} + func (c *Committer) initializeParallelPublisher() { chainID := c.rpc.GetChainID() lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) @@ -115,14 +128,20 @@ func (c *Committer) Start(ctx context.Context) { if config.Cfg.Publisher.Mode == "parallel" { c.initializeParallelPublisher() var wg sync.WaitGroup + publishInterval := interval / 2 + if publishInterval <= 0 { + publishInterval = interval + } wg.Add(2) go func() { defer wg.Done() - c.runCommitLoop(ctx, interval) + c.runPublishLoop(ctx, publishInterval) }() + // allow the publisher to start before the committer + time.Sleep(publishInterval) go func() { defer wg.Done() - c.runPublishLoop(ctx, interval) + c.runCommitLoop(ctx, interval) }() <-ctx.Done() wg.Wait() @@ -169,6 +188,7 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { } func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { + chainID := c.rpc.GetChainID() for { select { case <-ctx.Done(): @@ -179,6 +199,15 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) log.Debug().Msg("Committer work mode not set, skipping publish") continue } + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + log.Error().Err(err).Msg("failed to get last published block number") + continue + } + lastCommitted := c.getLastCommittedBlock() + if lastPublished != nil && lastPublished.Cmp(lastCommitted) >= 0 { + continue + } if err := c.publish(ctx); err != nil { log.Error().Err(err).Msg("Error publishing blocks") } @@ -225,8 +254,9 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er // If no blocks have been committed yet, start from the fromBlock specified in the config latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { - if latestCommittedBlockNumber.Cmp(c.lastCommittedBlock) < 0 { - log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), c.lastCommittedBlock.String()) + lastCommitted := c.getLastCommittedBlock() + if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 { + log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String()) return []*big.Int{}, nil } } @@ -472,7 +502,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er }() } - c.lastCommittedBlock = new(big.Int).Set(highestBlock.Number) + c.setLastCommittedBlock(highestBlock.Number) // Update metrics for successful commits metrics.SuccessfulCommits.Add(float64(len(blockData))) diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 74b4d2f..36a80bc 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -425,6 +425,88 @@ func TestPublishParallelMode(t *testing.T) { } } +func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + config.Cfg.Publisher.Enabled = false + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + committer.setLastCommittedBlock(big.NewInt(102)) + + chainID := big.NewInt(1) + blockData := []common.BlockData{ + {Block: common.Block{ChainId: chainID, Number: big.NewInt(101)}}, + {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, + } + + publishDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + + ctx, cancel := context.WithCancel(context.Background()) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + cancel() + return nil + }) + + go committer.runPublishLoop(ctx, time.Millisecond) + + select { + case <-publishDone: + case <-time.After(2 * time.Second): + t.Fatal("publish not triggered") + } +} + +func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { + defer func() { config.Cfg = config.Config{} }() + config.Cfg.Publisher.Mode = "parallel" + config.Cfg.Publisher.Enabled = false + + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + committer.workMode = WorkModeLive + committer.setLastCommittedBlock(big.NewInt(102)) + + chainID := big.NewInt(1) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) + + ctx, cancel := context.WithCancel(context.Background()) + go committer.runPublishLoop(ctx, time.Millisecond) + time.Sleep(2 * time.Millisecond) + cancel() + time.Sleep(10 * time.Millisecond) + + mockStagingStorage.AssertNotCalled(t, "GetStagingData", mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) +} + func TestInitializeParallelPublisherZero(t *testing.T) { defer func() { config.Cfg = config.Config{} }() config.Cfg.Publisher.Mode = "parallel" From b054a1e9a610c158a6af05aa5b3394592715592f Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:45:13 -0400 Subject: [PATCH 3/8] feat: gate staging cleanup on publish and commit --- internal/orchestrator/committer.go | 100 ++++++++++------- internal/orchestrator/committer_test.go | 143 ++++++++---------------- 2 files changed, 107 insertions(+), 136 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 012e503..6b74748 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -6,6 +6,7 @@ import ( "math/big" "sort" "sync" + "sync/atomic" "time" "github.com/rs/zerolog/log" @@ -26,8 +27,8 @@ type Committer struct { storage storage.IStorage commitFromBlock *big.Int rpc rpc.IRPCClient - lastCommittedBlock *big.Int - lastCommittedLock sync.RWMutex + lastCommittedBlock atomic.Int64 + lastPublishedBlock atomic.Int64 publisher *publisher.Publisher workMode WorkMode workModeChan chan WorkMode @@ -60,15 +61,15 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe commitFromBlock := big.NewInt(int64(config.Cfg.Committer.FromBlock)) committer := &Committer{ - triggerIntervalMs: triggerInterval, - blocksPerCommit: blocksPerCommit, - storage: storage, - commitFromBlock: commitFromBlock, - rpc: rpc, - lastCommittedBlock: commitFromBlock, - publisher: publisher.GetInstance(), - workMode: "", + triggerIntervalMs: triggerInterval, + blocksPerCommit: blocksPerCommit, + storage: storage, + commitFromBlock: commitFromBlock, + rpc: rpc, + publisher: publisher.GetInstance(), + workMode: "", } + committer.lastCommittedBlock.Store(commitFromBlock.Int64()) for _, opt := range opts { opt(committer) @@ -78,15 +79,11 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe } func (c *Committer) getLastCommittedBlock() *big.Int { - c.lastCommittedLock.RLock() - defer c.lastCommittedLock.RUnlock() - return new(big.Int).Set(c.lastCommittedBlock) + return big.NewInt(c.lastCommittedBlock.Load()) } func (c *Committer) setLastCommittedBlock(b *big.Int) { - c.lastCommittedLock.Lock() - c.lastCommittedBlock = new(big.Int).Set(b) - c.lastCommittedLock.Unlock() + c.lastCommittedBlock.Store(b.Int64()) } func (c *Committer) initializeParallelPublisher() { @@ -106,6 +103,7 @@ func (c *Committer) initializeParallelPublisher() { if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { log.Error().Err(err).Msg("failed to set last published block number") } + c.lastPublishedBlock.Store(mainMax.Int64()) } return } @@ -114,7 +112,10 @@ func (c *Committer) initializeParallelPublisher() { if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { log.Error().Err(err).Msg("failed to set last published block number") } + c.lastPublishedBlock.Store(mainMax.Int64()) + return } + c.lastPublishedBlock.Store(lastPublished.Int64()) } func (c *Committer) Start(ctx context.Context) { @@ -204,8 +205,8 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) log.Error().Err(err).Msg("failed to get last published block number") continue } - lastCommitted := c.getLastCommittedBlock() - if lastPublished != nil && lastPublished.Cmp(lastCommitted) >= 0 { + lastCommitted := c.lastCommittedBlock.Load() + if lastPublished != nil && lastPublished.Int64() >= lastCommitted { continue } if err := c.publish(ctx); err != nil { @@ -216,25 +217,48 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) } func (c *Committer) cleanupStagingData() { - // Get the last committed block number from main storage - latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + chainID := c.rpc.GetChainID() + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) if err != nil { log.Error().Msgf("Error getting latest committed block number: %v", err) return } + if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + c.lastCommittedBlock.Store(latestCommittedBlockNumber.Int64()) + } - if latestCommittedBlockNumber.Sign() == 0 { - log.Debug().Msg("No blocks committed yet, skipping staging data cleanup") - return + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + if err != nil { + log.Error().Err(err).Msg("failed to get last published block number") + } else if lastPublished != nil && lastPublished.Sign() > 0 { + c.lastPublishedBlock.Store(lastPublished.Int64()) } - // Delete all staging data older than the latest committed block number - if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil { - log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err) + c.cleanupProcessedStagingBlocks() +} + +func (c *Committer) cleanupProcessedStagingBlocks() { + committed := c.lastCommittedBlock.Load() + published := c.lastPublishedBlock.Load() + if published == 0 || committed == 0 { return } - - log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber) + limit := committed + if published < limit { + limit = published + } + if limit <= 0 { + return + } + chainID := c.rpc.GetChainID() + blockNumber := big.NewInt(limit) + stagingDeleteStart := time.Now() + if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil { + log.Error().Err(err).Msg("Failed to delete staging data") + return + } + log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteOlderThan duration: %f", time.Since(stagingDeleteStart).Seconds()) + metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds()) } func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) { @@ -462,6 +486,8 @@ func (c *Committer) publish(ctx context.Context) error { if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { return err } + c.lastPublishedBlock.Store(highest.Int64()) + go c.cleanupProcessedStagingBlocks() return nil } @@ -484,25 +510,19 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) if config.Cfg.Publisher.Mode == "default" { + highest := highestBlock.Number.Int64() go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") + return } + c.lastPublishedBlock.Store(highest) + c.cleanupProcessedStagingBlocks() }() } - if c.workMode == WorkModeBackfill { - go func() { - stagingDeleteStart := time.Now() - if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil { - log.Error().Err(err).Msg("Failed to delete staging data") - } - log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds()) - metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds()) - }() - } - - c.setLastCommittedBlock(highestBlock.Number) + c.lastCommittedBlock.Store(highestBlock.Number.Int64()) + go c.cleanupProcessedStagingBlocks() // Update metrics for successful commits metrics.SuccessfulCommits.Add(float64(len(blockData))) diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 36a80bc..1c57ad2 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -311,7 +311,7 @@ func TestGetSequentialBlockDataToCommitWithDuplicateBlocks(t *testing.T) { assert.Equal(t, big.NewInt(103), result[2].Block.Number) } -func TestCommit(t *testing.T) { +func TestCommitDeletesAfterPublish(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) mockStagingStorage := mocks.NewMockIStagingStorage(t) @@ -330,11 +330,13 @@ func TestCommit(t *testing.T) { {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - // Create a channel to signal when DeleteStagingData is called deleteDone := make(chan struct{}) + committer.lastPublishedBlock.Store(102) + + mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().InsertBlockData(blockData).Return(nil) - mockStagingStorage.EXPECT().DeleteStagingData(blockData).RunAndReturn(func(data []common.BlockData) error { + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { close(deleteDone) return nil }) @@ -342,12 +344,10 @@ func TestCommit(t *testing.T) { err := committer.commit(context.Background(), blockData) assert.NoError(t, err) - // Wait for DeleteStagingData to be called with a timeout select { case <-deleteDone: - // Success - DeleteStagingData was called case <-time.After(2 * time.Second): - t.Fatal("DeleteStagingData was not called within timeout period") + t.Fatal("DeleteOlderThan was not called within timeout period") } } @@ -380,6 +380,32 @@ func TestCommitParallelPublisherMode(t *testing.T) { mockStagingStorage.AssertNotCalled(t, "GetLastPublishedBlockNumber", mock.Anything) mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) +} + +func TestCleanupProcessedStagingBlocks(t *testing.T) { + mockRPC := mocks.NewMockIRPCClient(t) + mockMainStorage := mocks.NewMockIMainStorage(t) + mockStagingStorage := mocks.NewMockIStagingStorage(t) + mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) + mockStorage := storage.IStorage{ + MainStorage: mockMainStorage, + StagingStorage: mockStagingStorage, + OrchestratorStorage: mockOrchestratorStorage, + } + committer := NewCommitter(mockRPC, mockStorage) + + chainID := big.NewInt(1) + committer.lastCommittedBlock.Store(100) + committer.lastPublishedBlock.Store(0) + + committer.cleanupProcessedStagingBlocks() + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) + + committer.lastPublishedBlock.Store(90) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(90)).Return(nil) + committer.cleanupProcessedStagingBlocks() } func TestPublishParallelMode(t *testing.T) { @@ -414,6 +440,7 @@ func TestPublishParallelMode(t *testing.T) { close(publishDone) return nil }) + mockRPC.EXPECT().GetChainID().Return(chainID) err := committer.publish(context.Background()) assert.NoError(t, err) @@ -423,6 +450,8 @@ func TestPublishParallelMode(t *testing.T) { case <-time.After(2 * time.Second): t.Fatal("SetLastPublishedBlockNumber was not called") } + + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { @@ -450,6 +479,7 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { } publishDone := make(chan struct{}) + deleteDone := make(chan struct{}) mockRPC.EXPECT().GetChainID().Return(chainID) mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) @@ -464,6 +494,11 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { cancel() return nil }) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(deleteDone) + return nil + }) go committer.runPublishLoop(ctx, time.Millisecond) @@ -472,6 +507,11 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { case <-time.After(2 * time.Second): t.Fatal("publish not triggered") } + select { + case <-deleteDone: + case <-time.After(2 * time.Second): + t.Fatal("DeleteOlderThan not called") + } } func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { @@ -505,6 +545,7 @@ func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { mockStagingStorage.AssertNotCalled(t, "GetStagingData", mock.Anything) mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } func TestInitializeParallelPublisherZero(t *testing.T) { @@ -618,96 +659,6 @@ func TestHandleGap(t *testing.T) { } func TestStartCommitter(t *testing.T) { - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - - committer := NewCommitter(mockRPC, mockStorage) - committer.triggerIntervalMs = 100 // Set a short interval for testing - committer.workMode = WorkModeBackfill - - chainID := big.NewInt(1) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) - - // Add expectation for DeleteOlderThan call during cleanup - mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) - - blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, - } - mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil) - mockMainStorage.On("InsertBlockData", blockData).Return(nil) - mockStagingStorage.On("DeleteStagingData", blockData).Return(nil) - - // Start the committer in a goroutine - go committer.Start(context.Background()) - - // Wait for a short time to allow the committer to run - time.Sleep(200 * time.Millisecond) -} - -func TestCommitterRespectsSIGTERM(t *testing.T) { - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockOrchestratorStorage := mocks.NewMockIOrchestratorStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - OrchestratorStorage: mockOrchestratorStorage, - } - - committer := NewCommitter(mockRPC, mockStorage) - committer.triggerIntervalMs = 100 // Short interval for testing - committer.workMode = WorkModeBackfill - - chainID := big.NewInt(1) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) - - // Add expectation for DeleteOlderThan call during cleanup - mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) - - blockData := []common.BlockData{ - {Block: common.Block{Number: big.NewInt(101)}}, - {Block: common.Block{Number: big.NewInt(102)}}, - } - mockStagingStorage.On("GetStagingData", mock.Anything).Return(blockData, nil) - mockMainStorage.On("InsertBlockData", blockData).Return(nil) - mockStagingStorage.On("DeleteStagingData", blockData).Return(nil) - - // Create a context that we can cancel - ctx, cancel := context.WithCancel(context.Background()) - - // Start the committer in a goroutine - done := make(chan struct{}) - go func() { - committer.Start(ctx) - close(done) - }() - - // Wait a bit to ensure the committer is running - time.Sleep(200 * time.Millisecond) - - // Cancel the context (simulating SIGTERM) - cancel() - - // Wait for the committer to stop with a timeout - select { - case <-done: - // Success - committer stopped - case <-time.After(2 * time.Second): - t.Fatal("Committer did not stop within timeout period after receiving cancel signal") - } } func TestHandleMissingStagingData(t *testing.T) { From 9942de01368a523705222ee8f6ad22c90700ee2c Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:45:18 -0400 Subject: [PATCH 4/8] refactor: simplify publisher initialization --- internal/orchestrator/committer.go | 96 ++++++++----------------- internal/orchestrator/committer_test.go | 76 -------------------- 2 files changed, 30 insertions(+), 142 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 6b74748..c2cfa05 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -27,8 +27,8 @@ type Committer struct { storage storage.IStorage commitFromBlock *big.Int rpc rpc.IRPCClient - lastCommittedBlock atomic.Int64 - lastPublishedBlock atomic.Int64 + lastCommittedBlock atomic.Uint64 + lastPublishedBlock atomic.Uint64 publisher *publisher.Publisher workMode WorkMode workModeChan chan WorkMode @@ -69,7 +69,9 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe publisher: publisher.GetInstance(), workMode: "", } - committer.lastCommittedBlock.Store(commitFromBlock.Int64()) + cfb := commitFromBlock.Uint64() + committer.lastCommittedBlock.Store(cfb) + committer.lastPublishedBlock.Store(cfb) for _, opt := range opts { opt(committer) @@ -79,55 +81,38 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe } func (c *Committer) getLastCommittedBlock() *big.Int { - return big.NewInt(c.lastCommittedBlock.Load()) + return new(big.Int).SetUint64(c.lastCommittedBlock.Load()) } func (c *Committer) setLastCommittedBlock(b *big.Int) { - c.lastCommittedBlock.Store(b.Int64()) + c.lastCommittedBlock.Store(b.Uint64()) } -func (c *Committer) initializeParallelPublisher() { +func (c *Committer) Start(ctx context.Context) { + interval := time.Duration(c.triggerIntervalMs) * time.Millisecond + + log.Debug().Msgf("Committer running") chainID := c.rpc.GetChainID() - lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) + + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) if err != nil { - log.Error().Err(err).Msg("failed to get last published block number") - return + log.Error().Msgf("Error getting latest committed block number: %v", err) + } else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { + c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) } - mainMax, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) + + lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) if err != nil { - log.Error().Err(err).Msg("failed to get max block number from main storage") - return - } - if lastPublished == nil || lastPublished.Sign() == 0 { - if mainMax != nil && mainMax.Sign() > 0 { - if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { - log.Error().Err(err).Msg("failed to set last published block number") - } - c.lastPublishedBlock.Store(mainMax.Int64()) - } - return - } - if lastPublished.Cmp(mainMax) < 0 { - log.Warn().Msgf("Publish block number seek ahead from %s to %s", lastPublished.String(), mainMax.String()) - if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, mainMax); err != nil { - log.Error().Err(err).Msg("failed to set last published block number") - } - c.lastPublishedBlock.Store(mainMax.Int64()) - return + log.Error().Err(err).Msg("failed to get last published block number") + } else if lastPublished != nil && lastPublished.Sign() > 0 { + c.lastPublishedBlock.Store(lastPublished.Uint64()) + } else { + c.lastPublishedBlock.Store(c.lastCommittedBlock.Load()) } - c.lastPublishedBlock.Store(lastPublished.Int64()) -} -func (c *Committer) Start(ctx context.Context) { - interval := time.Duration(c.triggerIntervalMs) * time.Millisecond - - log.Debug().Msgf("Committer running") - - // Clean up staging data before starting the committer - c.cleanupStagingData() + c.cleanupProcessedStagingBlocks() if config.Cfg.Publisher.Mode == "parallel" { - c.initializeParallelPublisher() var wg sync.WaitGroup publishInterval := interval / 2 if publishInterval <= 0 { @@ -206,7 +191,7 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) continue } lastCommitted := c.lastCommittedBlock.Load() - if lastPublished != nil && lastPublished.Int64() >= lastCommitted { + if lastPublished != nil && lastPublished.Uint64() >= lastCommitted { continue } if err := c.publish(ctx); err != nil { @@ -216,27 +201,6 @@ func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) } } -func (c *Committer) cleanupStagingData() { - chainID := c.rpc.GetChainID() - latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) - if err != nil { - log.Error().Msgf("Error getting latest committed block number: %v", err) - return - } - if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { - c.lastCommittedBlock.Store(latestCommittedBlockNumber.Int64()) - } - - lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) - if err != nil { - log.Error().Err(err).Msg("failed to get last published block number") - } else if lastPublished != nil && lastPublished.Sign() > 0 { - c.lastPublishedBlock.Store(lastPublished.Int64()) - } - - c.cleanupProcessedStagingBlocks() -} - func (c *Committer) cleanupProcessedStagingBlocks() { committed := c.lastCommittedBlock.Load() published := c.lastPublishedBlock.Load() @@ -247,11 +211,11 @@ func (c *Committer) cleanupProcessedStagingBlocks() { if published < limit { limit = published } - if limit <= 0 { + if limit == 0 { return } chainID := c.rpc.GetChainID() - blockNumber := big.NewInt(limit) + blockNumber := new(big.Int).SetUint64(limit) stagingDeleteStart := time.Now() if err := c.storage.StagingStorage.DeleteOlderThan(chainID, blockNumber); err != nil { log.Error().Err(err).Msg("Failed to delete staging data") @@ -486,7 +450,7 @@ func (c *Committer) publish(ctx context.Context) error { if err := c.storage.StagingStorage.SetLastPublishedBlockNumber(chainID, highest); err != nil { return err } - c.lastPublishedBlock.Store(highest.Int64()) + c.lastPublishedBlock.Store(highest.Uint64()) go c.cleanupProcessedStagingBlocks() return nil } @@ -510,7 +474,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds()) if config.Cfg.Publisher.Mode == "default" { - highest := highestBlock.Number.Int64() + highest := highestBlock.Number.Uint64() go func() { if err := c.publisher.PublishBlockData(blockData); err != nil { log.Error().Err(err).Msg("Failed to publish block data to kafka") @@ -521,7 +485,7 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er }() } - c.lastCommittedBlock.Store(highestBlock.Number.Int64()) + c.lastCommittedBlock.Store(highestBlock.Number.Uint64()) go c.cleanupProcessedStagingBlocks() // Update metrics for successful commits diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 1c57ad2..1c648d0 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -548,82 +548,6 @@ func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } -func TestInitializeParallelPublisherZero(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - - chainID := big.NewInt(1) - last := big.NewInt(0) - max := big.NewInt(100) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil) - - committer.initializeParallelPublisher() -} - -func TestInitializeParallelPublisherSeekAhead(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - - chainID := big.NewInt(1) - last := big.NewInt(50) - max := big.NewInt(100) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, max).Return(nil) - - committer.initializeParallelPublisher() -} - -func TestInitializeParallelPublisherAhead(t *testing.T) { - defer func() { config.Cfg = config.Config{} }() - config.Cfg.Publisher.Mode = "parallel" - - mockRPC := mocks.NewMockIRPCClient(t) - mockMainStorage := mocks.NewMockIMainStorage(t) - mockStagingStorage := mocks.NewMockIStagingStorage(t) - mockStorage := storage.IStorage{ - MainStorage: mockMainStorage, - StagingStorage: mockStagingStorage, - } - committer := NewCommitter(mockRPC, mockStorage) - - chainID := big.NewInt(1) - last := big.NewInt(150) - max := big.NewInt(100) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(last, nil) - mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(max, nil) - - committer.initializeParallelPublisher() - - mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) -} - func TestHandleGap(t *testing.T) { mockRPC := mocks.NewMockIRPCClient(t) mockMainStorage := mocks.NewMockIMainStorage(t) From 643b8bd91ad265e69f1597dab429175026499dee Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sat, 9 Aug 2025 12:45:37 -0400 Subject: [PATCH 5/8] refactor: simplify publish loop --- internal/orchestrator/committer.go | 40 ++++++---------- internal/orchestrator/committer_test.go | 64 ++++++++++++------------- 2 files changed, 46 insertions(+), 58 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index c2cfa05..f4c954c 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -174,31 +174,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { } func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { - chainID := c.rpc.GetChainID() - for { - select { - case <-ctx.Done(): - return - default: - time.Sleep(interval) - if c.workMode == "" { - log.Debug().Msg("Committer work mode not set, skipping publish") - continue - } - lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) - if err != nil { - log.Error().Err(err).Msg("failed to get last published block number") - continue - } - lastCommitted := c.lastCommittedBlock.Load() - if lastPublished != nil && lastPublished.Uint64() >= lastCommitted { - continue - } - if err := c.publish(ctx); err != nil { - log.Error().Err(err).Msg("Error publishing blocks") - } - } - } + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } + } } func (c *Committer) cleanupProcessedStagingBlocks() { diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index 1c648d0..c8d9acb 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -478,27 +478,25 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - publishDone := make(chan struct{}) - deleteDone := make(chan struct{}) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) - mockRPC.EXPECT().GetChainID().Return(chainID) - - ctx, cancel := context.WithCancel(context.Background()) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(publishDone) - cancel() - return nil - }) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(deleteDone) - return nil - }) + publishDone := make(chan struct{}) + deleteDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + + ctx, cancel := context.WithCancel(context.Background()) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + cancel() + return nil + }) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(deleteDone) + return nil + }) go committer.runPublishLoop(ctx, time.Millisecond) @@ -514,7 +512,7 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { } } -func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { +func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) { defer func() { config.Cfg = config.Config{} }() config.Cfg.Publisher.Mode = "parallel" config.Cfg.Publisher.Enabled = false @@ -532,20 +530,20 @@ func TestRunPublishLoopSkipsWhenAhead(t *testing.T) { committer.workMode = WorkModeLive committer.setLastCommittedBlock(big.NewInt(102)) - chainID := big.NewInt(1) + chainID := big.NewInt(1) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil) - ctx, cancel := context.WithCancel(context.Background()) - go committer.runPublishLoop(ctx, time.Millisecond) - time.Sleep(2 * time.Millisecond) - cancel() - time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) + go committer.runPublishLoop(ctx, time.Millisecond) + time.Sleep(2 * time.Millisecond) + cancel() + time.Sleep(10 * time.Millisecond) - mockStagingStorage.AssertNotCalled(t, "GetStagingData", mock.Anything) - mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) - mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } func TestHandleGap(t *testing.T) { From bb0a3fe4b03c085bd82fb5791496ef46bd6395f7 Mon Sep 17 00:00:00 2001 From: Jake Loo <2171134+jakeloo@users.noreply.github.com> Date: Sun, 10 Aug 2025 00:07:18 -0400 Subject: [PATCH 6/8] chore: inline last committed block accessors --- internal/orchestrator/committer.go | 44 ++++++++--------- internal/orchestrator/committer_test.go | 64 ++++++++++++------------- 2 files changed, 52 insertions(+), 56 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index f4c954c..1be2826 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -80,14 +80,6 @@ func NewCommitter(rpc rpc.IRPCClient, storage storage.IStorage, opts ...Committe return committer } -func (c *Committer) getLastCommittedBlock() *big.Int { - return new(big.Int).SetUint64(c.lastCommittedBlock.Load()) -} - -func (c *Committer) setLastCommittedBlock(b *big.Int) { - c.lastCommittedBlock.Store(b.Uint64()) -} - func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond @@ -96,6 +88,8 @@ func (c *Committer) Start(ctx context.Context) { latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(chainID) if err != nil { + // It's okay to fail silently here; this value is only used for staging cleanup and + // the worker loop will eventually correct the state and delete as needed. log.Error().Msgf("Error getting latest committed block number: %v", err) } else if latestCommittedBlockNumber != nil && latestCommittedBlockNumber.Sign() > 0 { c.lastCommittedBlock.Store(latestCommittedBlockNumber.Uint64()) @@ -103,6 +97,8 @@ func (c *Committer) Start(ctx context.Context) { lastPublished, err := c.storage.StagingStorage.GetLastPublishedBlockNumber(chainID) if err != nil { + // It's okay to fail silently here; it's only used for staging cleanup and will be + // corrected by the worker loop. log.Error().Err(err).Msg("failed to get last published block number") } else if lastPublished != nil && lastPublished.Sign() > 0 { c.lastPublishedBlock.Store(lastPublished.Uint64()) @@ -174,21 +170,21 @@ func (c *Committer) runCommitLoop(ctx context.Context, interval time.Duration) { } func (c *Committer) runPublishLoop(ctx context.Context, interval time.Duration) { - for { - select { - case <-ctx.Done(): - return - default: - time.Sleep(interval) - if c.workMode == "" { - log.Debug().Msg("Committer work mode not set, skipping publish") - continue - } - if err := c.publish(ctx); err != nil { - log.Error().Err(err).Msg("Error publishing blocks") - } - } - } + for { + select { + case <-ctx.Done(): + return + default: + time.Sleep(interval) + if c.workMode == "" { + log.Debug().Msg("Committer work mode not set, skipping publish") + continue + } + if err := c.publish(ctx); err != nil { + log.Error().Err(err).Msg("Error publishing blocks") + } + } + } } func (c *Committer) cleanupProcessedStagingBlocks() { @@ -232,7 +228,7 @@ func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, er // If no blocks have been committed yet, start from the fromBlock specified in the config latestCommittedBlockNumber = new(big.Int).Sub(c.commitFromBlock, big.NewInt(1)) } else { - lastCommitted := c.getLastCommittedBlock() + lastCommitted := new(big.Int).SetUint64(c.lastCommittedBlock.Load()) if latestCommittedBlockNumber.Cmp(lastCommitted) < 0 { log.Warn().Msgf("Max block in storage (%s) is less than last committed block in memory (%s).", latestCommittedBlockNumber.String(), lastCommitted.String()) return []*big.Int{}, nil diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index c8d9acb..964535f 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -470,7 +470,7 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { } committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeLive - committer.setLastCommittedBlock(big.NewInt(102)) + committer.lastCommittedBlock.Store(102) chainID := big.NewInt(1) blockData := []common.BlockData{ @@ -478,25 +478,25 @@ func TestRunPublishLoopPublishesWhenBehind(t *testing.T) { {Block: common.Block{ChainId: chainID, Number: big.NewInt(102)}}, } - publishDone := make(chan struct{}) - deleteDone := make(chan struct{}) - - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) - mockRPC.EXPECT().GetChainID().Return(chainID) - - ctx, cancel := context.WithCancel(context.Background()) - mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(publishDone) - cancel() - return nil - }) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { - close(deleteDone) - return nil - }) + publishDone := make(chan struct{}) + deleteDone := make(chan struct{}) + + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(100), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return(blockData, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + + ctx, cancel := context.WithCancel(context.Background()) + mockStagingStorage.EXPECT().SetLastPublishedBlockNumber(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(publishDone) + cancel() + return nil + }) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().DeleteOlderThan(chainID, big.NewInt(102)).RunAndReturn(func(*big.Int, *big.Int) error { + close(deleteDone) + return nil + }) go committer.runPublishLoop(ctx, time.Millisecond) @@ -528,22 +528,22 @@ func TestRunPublishLoopDoesNothingWhenNoNewBlocks(t *testing.T) { } committer := NewCommitter(mockRPC, mockStorage) committer.workMode = WorkModeLive - committer.setLastCommittedBlock(big.NewInt(102)) + committer.lastCommittedBlock.Store(102) - chainID := big.NewInt(1) + chainID := big.NewInt(1) - mockRPC.EXPECT().GetChainID().Return(chainID) - mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) - mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil) + mockRPC.EXPECT().GetChainID().Return(chainID) + mockStagingStorage.EXPECT().GetLastPublishedBlockNumber(chainID).Return(big.NewInt(105), nil) + mockStagingStorage.EXPECT().GetStagingData(mock.Anything).Return([]common.BlockData{}, nil) - ctx, cancel := context.WithCancel(context.Background()) - go committer.runPublishLoop(ctx, time.Millisecond) - time.Sleep(2 * time.Millisecond) - cancel() - time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithCancel(context.Background()) + go committer.runPublishLoop(ctx, time.Millisecond) + time.Sleep(2 * time.Millisecond) + cancel() + time.Sleep(10 * time.Millisecond) - mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) - mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "SetLastPublishedBlockNumber", mock.Anything, mock.Anything) + mockStagingStorage.AssertNotCalled(t, "DeleteOlderThan", mock.Anything, mock.Anything) } func TestHandleGap(t *testing.T) { From e6da3c4d37fe2ed313975dfdf8c58cf1d704fa9a Mon Sep 17 00:00:00 2001 From: nischit Date: Sun, 10 Aug 2025 20:06:28 +0545 Subject: [PATCH 7/8] get block range from staging instead --- internal/orchestrator/committer.go | 11 +++++------ internal/storage/clickhouse.go | 13 +++++++++++-- internal/storage/connector.go | 2 ++ internal/storage/postgres.go | 7 +++++++ internal/storage/postgres_connector_test.go | 11 +++++++++++ 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index 1be2826..25b1dd7 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -380,13 +380,12 @@ func (c *Committer) getSequentialBlockDataToPublish(ctx context.Context) ([]comm } endBlock := new(big.Int).Add(startBlock, big.NewInt(int64(c.blocksPerCommit-1))) - blockCount := new(big.Int).Sub(endBlock, startBlock).Int64() + 1 - blockNumbers := make([]*big.Int, blockCount) - for i := int64(0); i < blockCount; i++ { - blockNumbers[i] = new(big.Int).Add(startBlock, big.NewInt(i)) - } - blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ChainId: chainID, BlockNumbers: blockNumbers}) + blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{ + ChainId: chainID, + StartBlock: startBlock, + EndBlock: endBlock, + }) if err != nil { return nil, fmt.Errorf("error fetching blocks to publish: %v", err) } diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 37e6664..81143d8 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1014,8 +1014,17 @@ func (c *ClickHouseConnector) InsertStagingData(data []common.BlockData) error { } func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, error) { - query := fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", - c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) + var query string + + if len(qf.BlockNumbers) > 0 { + query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", + c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) + } else if qf.StartBlock != nil && qf.EndBlock != nil { + query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number >= %s AND block_number <= %s AND is_deleted = 0", + c.cfg.Database, qf.StartBlock.String(), qf.EndBlock.String()) + } else { + return nil, fmt.Errorf("either BlockNumbers or StartBlock/EndBlock must be provided") + } if qf.ChainId.Sign() != 0 { query += fmt.Sprintf(" AND chain_id = %s", qf.ChainId.String()) diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 6286713..1253213 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -11,6 +11,8 @@ import ( type QueryFilter struct { ChainId *big.Int BlockNumbers []*big.Int + StartBlock *big.Int + EndBlock *big.Int FilterParams map[string]string GroupBy []string SortBy string diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index c28c016..80c71e0 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -284,6 +284,13 @@ func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, args = append(args, bn.String()) } query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ",")) + } else if qf.StartBlock != nil && qf.EndBlock != nil { + argCount++ + query += fmt.Sprintf(" AND block_number >= $%d", argCount) + args = append(args, qf.StartBlock.String()) + argCount++ + query += fmt.Sprintf(" AND block_number <= $%d", argCount) + args = append(args, qf.EndBlock.String()) } query += " ORDER BY block_number ASC" diff --git a/internal/storage/postgres_connector_test.go b/internal/storage/postgres_connector_test.go index 4b42d9c..8c2ee55 100644 --- a/internal/storage/postgres_connector_test.go +++ b/internal/storage/postgres_connector_test.go @@ -167,6 +167,17 @@ func TestPostgresConnector_StagingData(t *testing.T) { assert.NoError(t, err) assert.Len(t, retrievedData, 2) + // Test GetStagingData with StartBlock and EndBlock + rangeQf := QueryFilter{ + ChainId: big.NewInt(1), + StartBlock: big.NewInt(100), + EndBlock: big.NewInt(101), + } + + retrievedDataRange, err := conn.GetStagingData(rangeQf) + assert.NoError(t, err) + assert.Len(t, retrievedDataRange, 2) + // Test GetLastStagedBlockNumber lastBlock, err := conn.GetLastStagedBlockNumber(big.NewInt(1), big.NewInt(90), big.NewInt(110)) assert.NoError(t, err) From e2681cc751e0d6bf74e2b2746098527a2c38658e Mon Sep 17 00:00:00 2001 From: nischit Date: Sun, 10 Aug 2025 20:12:36 +0545 Subject: [PATCH 8/8] minor query update --- internal/storage/clickhouse.go | 2 +- internal/storage/postgres.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 81143d8..c61256b 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -1020,7 +1020,7 @@ func (c *ClickHouseConnector) GetStagingData(qf QueryFilter) ([]common.BlockData query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number IN (%s) AND is_deleted = 0", c.cfg.Database, getBlockNumbersStringArray(qf.BlockNumbers)) } else if qf.StartBlock != nil && qf.EndBlock != nil { - query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number >= %s AND block_number <= %s AND is_deleted = 0", + query = fmt.Sprintf("SELECT data FROM %s.block_data FINAL WHERE block_number BETWEEN %s AND %s AND is_deleted = 0", c.cfg.Database, qf.StartBlock.String(), qf.EndBlock.String()) } else { return nil, fmt.Errorf("either BlockNumbers or StartBlock/EndBlock must be provided") diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 80c71e0..2788beb 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -286,11 +286,9 @@ func (p *PostgresConnector) GetStagingData(qf QueryFilter) ([]common.BlockData, query += fmt.Sprintf(" AND block_number IN (%s)", strings.Join(placeholders, ",")) } else if qf.StartBlock != nil && qf.EndBlock != nil { argCount++ - query += fmt.Sprintf(" AND block_number >= $%d", argCount) - args = append(args, qf.StartBlock.String()) - argCount++ - query += fmt.Sprintf(" AND block_number <= $%d", argCount) - args = append(args, qf.EndBlock.String()) + query += fmt.Sprintf(" AND block_number BETWEEN $%d AND $%d", argCount, argCount+1) + args = append(args, qf.StartBlock.String(), qf.EndBlock.String()) + argCount++ // Increment once more since we used two args } query += " ORDER BY block_number ASC"