diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index ded24b2..e95978e 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -79,6 +79,10 @@ func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond log.Debug().Msgf("Committer running") + + // Clean up staging data before starting the committer + c.cleanupStagingData() + for { select { case <-ctx.Done(): @@ -112,6 +116,28 @@ func (c *Committer) Start(ctx context.Context) { } } +func (c *Committer) cleanupStagingData() { + // Get the last committed block number from main storage + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + log.Error().Msgf("Error getting latest committed block number: %v", err) + return + } + + if latestCommittedBlockNumber.Sign() == 0 { + log.Debug().Msg("No blocks committed yet, skipping staging data cleanup") + return + } + + // Delete all staging data older than the latest committed block number + if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil { + log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err) + return + } + + log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber) +} + func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) { startTime := time.Now() defer func() { diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index ddcbddd..e1c4f16 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -404,6 +404,9 @@ func TestStartCommitter(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + // Add expectation for DeleteOlderThan call during cleanup + mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) + blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, {Block: common.Block{Number: big.NewInt(102)}}, @@ -438,6 +441,9 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + // Add expectation for DeleteOlderThan call during cleanup + mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) + blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, {Block: common.Block{Number: big.NewInt(102)}}, diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 653090f..26b5fea 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -2114,6 +2114,17 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] return blockData, nil } +func (c *ClickHouseConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + query := fmt.Sprintf(` + INSERT INTO %s.block_data (chain_id, block_number, is_deleted) + SELECT chain_id, block_number, 1 + FROM %s.block_data + WHERE chain_id = ? AND block_number <= ? AND is_deleted = 0 + GROUP BY chain_id, block_number + `, c.cfg.Database, c.cfg.Database) + return c.conn.Exec(context.Background(), query, chainId, blockNumber) +} + // Helper function to test query generation func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string { return c.buildQuery(table, columns, qf) diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 15813d0..66c1d90 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,7 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } type IMainStorage interface { diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 9498e8d..3dc773d 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -387,6 +387,12 @@ func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStar return blockNumber, nil } +func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number <= $2` + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + // Close closes the database connection func (p *PostgresConnector) Close() error { return p.db.Close() diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 090f8f2..5931f59 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -236,6 +236,53 @@ func (_c *MockIStagingStorage_InsertStagingData_Call) RunAndReturn(run func([]co return _c } +// DeleteOlderThan provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for DeleteOlderThan") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_DeleteOlderThan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteOlderThan' +type MockIStagingStorage_DeleteOlderThan_Call struct { + *mock.Call +} + +// DeleteOlderThan is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) DeleteOlderThan(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_DeleteOlderThan_Call { + return &MockIStagingStorage_DeleteOlderThan_Call{Call: _e.mock.On("DeleteOlderThan", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) Return(_a0 error) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Return(run) + return _c +} + // NewMockIStagingStorage creates a new instance of MockIStagingStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockIStagingStorage(t interface {