From 17757d36cd3be5cb7dc1f96bc70ff9ead218c82d Mon Sep 17 00:00:00 2001 From: nischit Date: Sat, 9 Aug 2025 01:12:31 +0545 Subject: [PATCH 1/3] cleanup blocks on committer start --- internal/orchestrator/committer.go | 26 +++++++++++++++++ internal/storage/clickhouse.go | 47 ++++++++++++++++++++++++++++++ internal/storage/connector.go | 1 + internal/storage/postgres.go | 6 ++++ test/mocks/MockIStagingStorage.go | 47 ++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+) diff --git a/internal/orchestrator/committer.go b/internal/orchestrator/committer.go index ded24b2..e95978e 100644 --- a/internal/orchestrator/committer.go +++ b/internal/orchestrator/committer.go @@ -79,6 +79,10 @@ func (c *Committer) Start(ctx context.Context) { interval := time.Duration(c.triggerIntervalMs) * time.Millisecond log.Debug().Msgf("Committer running") + + // Clean up staging data before starting the committer + c.cleanupStagingData() + for { select { case <-ctx.Done(): @@ -112,6 +116,28 @@ func (c *Committer) Start(ctx context.Context) { } } +func (c *Committer) cleanupStagingData() { + // Get the last committed block number from main storage + latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID()) + if err != nil { + log.Error().Msgf("Error getting latest committed block number: %v", err) + return + } + + if latestCommittedBlockNumber.Sign() == 0 { + log.Debug().Msg("No blocks committed yet, skipping staging data cleanup") + return + } + + // Delete all staging data older than the latest committed block number + if err := c.storage.StagingStorage.DeleteOlderThan(c.rpc.GetChainID(), latestCommittedBlockNumber); err != nil { + log.Error().Msgf("Error deleting staging data older than %v: %v", latestCommittedBlockNumber, err) + return + } + + log.Info().Msgf("Deleted staging data older than or equal to %v", latestCommittedBlockNumber) +} + func (c *Committer) getBlockNumbersToCommit(ctx context.Context) ([]*big.Int, error) { startTime := time.Now() defer func() { diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 653090f..4b6c7aa 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -2114,6 +2114,53 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] return blockData, nil } +func (c *ClickHouseConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + // First, get all the block numbers that need to be deleted + query := fmt.Sprintf(` + SELECT DISTINCT chain_id, block_number + FROM %s.block_data + WHERE chain_id = ? AND block_number <= ? AND is_deleted = 0 + `, c.cfg.Database) + + rows, err := c.conn.Query(context.Background(), query, chainId, blockNumber) + if err != nil { + return err + } + defer rows.Close() + + // Prepare batch for deletion + deleteQuery := fmt.Sprintf(` + INSERT INTO %s.block_data ( + chain_id, block_number, is_deleted + ) VALUES (?, ?, ?) + `, c.cfg.Database) + + batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery) + if err != nil { + return err + } + defer batch.Close() + + // Add each block to the deletion batch + for rows.Next() { + var chainIdVal, blockNumberVal *big.Int + if err := rows.Scan(&chainIdVal, &blockNumberVal); err != nil { + return err + } + + err := batch.Append( + chainIdVal, + blockNumberVal, + 1, // is_deleted = 1 + ) + if err != nil { + return err + } + } + + return batch.Send() +} + // Helper function to test query generation func (c *ClickHouseConnector) TestQueryGeneration(table, columns string, qf QueryFilter) string { return c.buildQuery(table, columns, qf) diff --git a/internal/storage/connector.go b/internal/storage/connector.go index 15813d0..66c1d90 100644 --- a/internal/storage/connector.go +++ b/internal/storage/connector.go @@ -83,6 +83,7 @@ type IStagingStorage interface { GetStagingData(qf QueryFilter) (data []common.BlockData, err error) DeleteStagingData(data []common.BlockData) error GetLastStagedBlockNumber(chainId *big.Int, rangeStart *big.Int, rangeEnd *big.Int) (maxBlockNumber *big.Int, err error) + DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error } type IMainStorage interface { diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 9498e8d..2421576 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -387,6 +387,12 @@ func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStar return blockNumber, nil } +func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number < $2` + _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) + return err +} + // Close closes the database connection func (p *PostgresConnector) Close() error { return p.db.Close() diff --git a/test/mocks/MockIStagingStorage.go b/test/mocks/MockIStagingStorage.go index 090f8f2..5931f59 100644 --- a/test/mocks/MockIStagingStorage.go +++ b/test/mocks/MockIStagingStorage.go @@ -236,6 +236,53 @@ func (_c *MockIStagingStorage_InsertStagingData_Call) RunAndReturn(run func([]co return _c } +// DeleteOlderThan provides a mock function with given fields: chainId, blockNumber +func (_m *MockIStagingStorage) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { + ret := _m.Called(chainId, blockNumber) + + if len(ret) == 0 { + panic("no return value specified for DeleteOlderThan") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*big.Int, *big.Int) error); ok { + r0 = rf(chainId, blockNumber) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockIStagingStorage_DeleteOlderThan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteOlderThan' +type MockIStagingStorage_DeleteOlderThan_Call struct { + *mock.Call +} + +// DeleteOlderThan is a helper method to define mock.On call +// - chainId *big.Int +// - blockNumber *big.Int +func (_e *MockIStagingStorage_Expecter) DeleteOlderThan(chainId interface{}, blockNumber interface{}) *MockIStagingStorage_DeleteOlderThan_Call { + return &MockIStagingStorage_DeleteOlderThan_Call{Call: _e.mock.On("DeleteOlderThan", chainId, blockNumber)} +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) Run(run func(chainId *big.Int, blockNumber *big.Int)) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*big.Int), args[1].(*big.Int)) + }) + return _c +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) Return(_a0 error) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockIStagingStorage_DeleteOlderThan_Call) RunAndReturn(run func(*big.Int, *big.Int) error) *MockIStagingStorage_DeleteOlderThan_Call { + _c.Call.Return(run) + return _c +} + // NewMockIStagingStorage creates a new instance of MockIStagingStorage. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockIStagingStorage(t interface { From 1109c089074cb2575d4c375be7f6a46afc99dc67 Mon Sep 17 00:00:00 2001 From: nischit Date: Sat, 9 Aug 2025 01:17:01 +0545 Subject: [PATCH 2/3] minor change --- internal/storage/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/postgres.go b/internal/storage/postgres.go index 2421576..3dc773d 100644 --- a/internal/storage/postgres.go +++ b/internal/storage/postgres.go @@ -388,7 +388,7 @@ func (p *PostgresConnector) GetLastStagedBlockNumber(chainId *big.Int, rangeStar } func (p *PostgresConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { - query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number < $2` + query := `DELETE FROM block_data WHERE chain_id = $1 AND block_number <= $2` _, err := p.db.Exec(query, chainId.String(), blockNumber.String()) return err } From 60f778864fa75c2dad8c6239356f50d7c6825422 Mon Sep 17 00:00:00 2001 From: nischit Date: Sat, 9 Aug 2025 01:24:00 +0545 Subject: [PATCH 3/3] changes from comment --- internal/orchestrator/committer_test.go | 6 ++++ internal/storage/clickhouse.go | 48 ++++--------------------- 2 files changed, 12 insertions(+), 42 deletions(-) diff --git a/internal/orchestrator/committer_test.go b/internal/orchestrator/committer_test.go index ddcbddd..e1c4f16 100644 --- a/internal/orchestrator/committer_test.go +++ b/internal/orchestrator/committer_test.go @@ -404,6 +404,9 @@ func TestStartCommitter(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + // Add expectation for DeleteOlderThan call during cleanup + mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) + blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, {Block: common.Block{Number: big.NewInt(102)}}, @@ -438,6 +441,9 @@ func TestCommitterRespectsSIGTERM(t *testing.T) { mockRPC.EXPECT().GetChainID().Return(chainID) mockMainStorage.EXPECT().GetMaxBlockNumber(chainID).Return(big.NewInt(100), nil) + // Add expectation for DeleteOlderThan call during cleanup + mockStagingStorage.On("DeleteOlderThan", chainID, big.NewInt(100)).Return(nil) + blockData := []common.BlockData{ {Block: common.Block{Number: big.NewInt(101)}}, {Block: common.Block{Number: big.NewInt(102)}}, diff --git a/internal/storage/clickhouse.go b/internal/storage/clickhouse.go index 4b6c7aa..26b5fea 100644 --- a/internal/storage/clickhouse.go +++ b/internal/storage/clickhouse.go @@ -2115,50 +2115,14 @@ func (c *ClickHouseConnector) GetFullBlockData(chainId *big.Int, blockNumbers [] } func (c *ClickHouseConnector) DeleteOlderThan(chainId *big.Int, blockNumber *big.Int) error { - // First, get all the block numbers that need to be deleted query := fmt.Sprintf(` - SELECT DISTINCT chain_id, block_number - FROM %s.block_data + INSERT INTO %s.block_data (chain_id, block_number, is_deleted) + SELECT chain_id, block_number, 1 + FROM %s.block_data WHERE chain_id = ? AND block_number <= ? AND is_deleted = 0 - `, c.cfg.Database) - - rows, err := c.conn.Query(context.Background(), query, chainId, blockNumber) - if err != nil { - return err - } - defer rows.Close() - - // Prepare batch for deletion - deleteQuery := fmt.Sprintf(` - INSERT INTO %s.block_data ( - chain_id, block_number, is_deleted - ) VALUES (?, ?, ?) - `, c.cfg.Database) - - batch, err := c.conn.PrepareBatch(context.Background(), deleteQuery) - if err != nil { - return err - } - defer batch.Close() - - // Add each block to the deletion batch - for rows.Next() { - var chainIdVal, blockNumberVal *big.Int - if err := rows.Scan(&chainIdVal, &blockNumberVal); err != nil { - return err - } - - err := batch.Append( - chainIdVal, - blockNumberVal, - 1, // is_deleted = 1 - ) - if err != nil { - return err - } - } - - return batch.Send() + GROUP BY chain_id, block_number + `, c.cfg.Database, c.cfg.Database) + return c.conn.Exec(context.Background(), query, chainId, blockNumber) } // Helper function to test query generation