Skip to content
Merged

reorg #294

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ func RunCommitter(cmd *cobra.Command, args []string) {
}()

committer.Init()
committer.InitReorg()

go committer.RunReorgValidator()
committer.CommitStreaming()
}
4 changes: 4 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ type Config struct {
ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"`
InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"`
InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"`
RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"`
RedisUsername string `env:"REDIS_USERNAME"`
RedisPassword string `env:"REDIS_PASSWORD"`
RedisDB int `env:"REDIS_DB" envDefault:"0"`
}

var Cfg Config
Expand Down
9 changes: 1 addition & 8 deletions internal/committer/poollatest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/common"
"github.com/thirdweb-dev/indexer/internal/libs"
"github.com/thirdweb-dev/indexer/internal/libs/libblockdata"
"github.com/thirdweb-dev/indexer/internal/metrics"
Expand Down Expand Up @@ -59,13 +58,7 @@ func pollLatest() error {
Uint64("end_block", expectedBlockNumber-1).
Msg("All blocks validated successfully. Publishing blocks to Kafka")

// Convert slice of BlockData to slice of *BlockData for Kafka publisher
blockDataPointers := make([]*common.BlockData, len(blockDataArray))
for i, block := range blockDataArray {
blockDataPointers[i] = &block
}

if err := libs.KafkaPublisherV2.PublishBlockData(blockDataPointers); err != nil {
if err := libs.KafkaPublisherV2.PublishBlockData(blockDataArray); err != nil {
log.Panic().
Err(err).
Int("blocks_count", len(blockDataArray)).
Expand Down
182 changes: 182 additions & 0 deletions internal/committer/reorg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package committer

import (
"fmt"
"time"

"github.com/rs/zerolog/log"
config "github.com/thirdweb-dev/indexer/configs"
"github.com/thirdweb-dev/indexer/internal/libs"
"github.com/thirdweb-dev/indexer/internal/libs/libblockdata"
"github.com/thirdweb-dev/indexer/internal/metrics"
)

func InitReorg() {
libs.InitRedis()
}

func RunReorgValidator() {
lastBlockCheck := int64(0)
for {
startBlock, endBlock, err := getReorgRange()
if err != nil {
log.Error().Err(err).Msg("Failed to get reorg range")
time.Sleep(2 * time.Second)
continue
}

if endBlock == lastBlockCheck || endBlock-startBlock < 5 {
log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.")
time.Sleep(1 * time.Minute)
continue
}

// Detect reorgs and handle them
err = detectAndHandleReorgs(startBlock, endBlock)
if err != nil {
log.Error().Err(err).Msg("Failed to detect and handle reorgs")
time.Sleep(2 * time.Second)
continue
}
lastBlockCheck = endBlock
}
}

func getReorgRange() (int64, int64, error) {
lastValidBlock, err := getLastValidBlock()
if err != nil {
return 0, 0, fmt.Errorf("failed to get last valid block: %w", err)
}

startBlock := max(lastValidBlock-1, 1)
endBlock, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId)
if err != nil {
return 0, 0, fmt.Errorf("failed to get max block number: %w", err)
}

endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety

if startBlock >= endBlock {
return 0, 0, fmt.Errorf("start block is greater than end block")
}

return startBlock, endBlock, nil
}

func getLastValidBlock() (int64, error) {
// Try to get last reorg checked block number
lastReorgBlock, err := libs.GetReorgLastValidBlock(libs.ChainIdStr)
if err != nil {
return 0, fmt.Errorf("failed to get last reorg checked block: %w", err)
}

if lastReorgBlock > 0 {
return lastReorgBlock, nil
}

// get block number 1 day ago
lastValidBlock, err := libs.GetBlockNumberFromClickHouseV2DaysAgo(libs.ChainId, 1)
if err != nil {
return 0, fmt.Errorf("failed to get block number 1 day ago: %w", err)
}

return lastValidBlock, nil
}

func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock)

// Fetch block headers for the range
blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
}

if len(blockHeaders) == 0 {
log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
return nil
}

// finding the reorg start and end block
reorgStartBlock := int64(-1)
reorgEndBlock := int64(-1)
for i := 1; i < len(blockHeaders); i++ {
if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 {
// non-sequential block numbers
reorgStartBlock = blockHeaders[i-1].Number.Int64()
reorgEndBlock = blockHeaders[i].Number.Int64()
break
}
if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash {
// hash mismatch start
if reorgStartBlock == -1 {
reorgStartBlock = blockHeaders[i-1].Number.Int64()
}
continue
} else {
// hash matches end
if reorgStartBlock != -1 {
reorgEndBlock = blockHeaders[i].Number.Int64()
break
}
}
}

// set end to the last block if not set
if reorgEndBlock == -1 {
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
}

if reorgStartBlock > -1 {
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
return err
}
}

// update last valid block. if there was no reorg, this will update to the last block
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)

return nil
}

func handleReorgForRange(startBlock uint64, endBlock uint64) error {
// nothing to do
if startBlock == 0 {
return nil
}

// will panic if any block is invalid
newblockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock)
expectedBlockNumber := startBlock
for i, blockData := range newblockDataArray {
if blockData.Block.Number.Uint64() != expectedBlockNumber {
log.Error().
Int("index", i).
Uint64("expected_block", expectedBlockNumber).
Uint64("actual_block", blockData.Block.Number.Uint64()).
Msg("Reorg: Block sequence mismatch - missing or out of order block")

return fmt.Errorf("reorg: block sequence mismatch - missing or out of order block")
}
expectedBlockNumber++
}

oldblockDataArray, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), startBlock, endBlock)
if err != nil {
return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err)
}

if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil {
log.Error().
Err(err).
Int("blocks_count", len(newblockDataArray)).
Msg("Reorg: Failed to publish blocks to Kafka")
return fmt.Errorf("reorg: failed to publish blocks to kafka")
}

for _, blockData := range newblockDataArray {
metrics.CommitterLastPublishedReorgBlockNumber.WithLabelValues(config.Cfg.ZeetProjectName, libs.ChainIdStr).Set(float64(blockData.Block.Number.Uint64()))
}

return nil
}
75 changes: 75 additions & 0 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,32 @@ func initClickhouse(host string, port int, username string, password string, dat
return clickhouseConn
}

func GetBlockNumberFromClickHouseV2DaysAgo(chainId *big.Int, daysAgo int) (int64, error) {
query := fmt.Sprintf(`SELECT toString(max(block_number))
FROM default.blocks WHERE chain_id = %d AND block_timestamp <= now() - INTERVAL %d DAY ;`, chainId.Uint64(), daysAgo)
rows, err := ClickhouseConnV2.Query(context.Background(), query)
if err != nil {
return -1, err
}
defer rows.Close()

if !rows.Next() {
return -1, nil
}

var blockNumberStr string
if err := rows.Scan(&blockNumberStr); err != nil {
return -1, err
}

blockNumber, err := strconv.ParseInt(blockNumberStr, 10, 64)
if err != nil {
return -1, fmt.Errorf("failed to parse block number: %s", blockNumberStr)
}

return blockNumber, nil
}

func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) {
// Use toString() to convert UInt256 to string, then parse to int64
query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d HAVING count() > 0", chainId.Uint64())
Expand All @@ -115,6 +141,55 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) {
return maxBlockNumber, nil
}

func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, endBlockNumber int64) ([]*common.Block, error) {
query := fmt.Sprintf(`SELECT block_number, hash, parent_hash
FROM default.blocks WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number`, chainId.Uint64(), startBlockNumber, endBlockNumber)
rows, err := ClickhouseConnV2.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

blocks := make([]*common.Block, 0)
for rows.Next() {
var block common.Block
err := rows.Scan(&block.Number, &block.Hash, &block.ParentHash)
if err != nil {
return nil, err
}
blocks = append(blocks, &block)
}
return blocks, nil
}

func GetBlockHeadersForReorgCheck(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.Block, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
blocksRaw := make([]*common.Block, length)

query := fmt.Sprintf("SELECT block_number, hash, parent_hash FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)
blocks, err := execQueryV2[common.Block](query)
if err != nil {
return blocksRaw, err
}

// just to make sure the blocks are in the correct order
for _, block := range blocks {
idx := block.Number.Uint64() - sb
if idx >= length {
log.Error().Msgf("Block number %s is out of range", block.Number.String())
continue
}
blocksRaw[idx] = &block
}
return blocksRaw, nil
}

func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) {
length := endBlockNumber - startBlockNumber + 1

Expand Down
10 changes: 5 additions & 5 deletions internal/libs/libblockdata/getblockdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/thirdweb-dev/indexer/internal/rpc"
)

func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []common.BlockData {
func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []*common.BlockData {
rpcNumParallelCalls := config.Cfg.RPCNumParallelCalls
rpcBatchSize := config.Cfg.RPCBatchSize
maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls

// Calculate the range of blocks to fetch
blocksToFetch := latestBlock - nextCommitBlockNumber
blocksToFetch := latestBlock - nextCommitBlockNumber + 1
if blocksToFetch > maxBlocksPerFetch {
blocksToFetch = maxBlocksPerFetch
}
Expand All @@ -35,7 +35,7 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64)
Msg("Starting to fetch latest blocks")

// Precreate array of block data
blockDataArray := make([]common.BlockData, blocksToFetch)
blockDataArray := make([]*common.BlockData, blocksToFetch)

// Create batches and calculate number of parallel calls needed
numBatches := min((blocksToFetch+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls)
Expand Down Expand Up @@ -74,8 +74,8 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64)
for i, bd := range batchResults {
arrayIndex := batchIdx*rpcBatchSize + uint64(i)
if arrayIndex < uint64(len(blockDataArray)) {
blockDataArray[arrayIndex] = *bd // todo: update to use pointer, kafka is using normal block data
batchResults[i] = nil // free memory
blockDataArray[arrayIndex] = bd
batchResults[i] = nil // free memory
}
}
mu.Unlock()
Expand Down
Loading