From 14cc48538d4476ef9a7b9a2380f168d76879fd05 Mon Sep 17 00:00:00 2001 From: nischit Date: Tue, 14 Oct 2025 23:23:02 +0545 Subject: [PATCH 1/8] handle reorg --- cmd/reorg.go | 52 +++++++ configs/config.go | 5 + internal/committer/poollatest.go | 9 +- internal/committer/reorg.go | 165 +++++++++++++++++++++ internal/libs/clickhouse.go | 47 ++++++ internal/libs/libblockdata/getblockdata.go | 8 +- internal/libs/redis.go | 59 ++++++++ internal/metrics/metrics.go | 5 + internal/storage/kafka_publisher.go | 24 +-- 9 files changed, 353 insertions(+), 21 deletions(-) create mode 100644 cmd/reorg.go create mode 100644 internal/committer/reorg.go create mode 100644 internal/libs/redis.go diff --git a/cmd/reorg.go b/cmd/reorg.go new file mode 100644 index 0000000..6917fb1 --- /dev/null +++ b/cmd/reorg.go @@ -0,0 +1,52 @@ +package cmd + +import ( + "context" + "os" + "os/signal" + "syscall" + + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/thirdweb-dev/indexer/internal/reorg" +) + +var ( + reorgCmd = &cobra.Command{ + Use: "reorg", + Short: "Run reorg detection and handling", + Long: "Continuously monitor blockchain for reorganizations and automatically fix them by refetching affected blocks", + Run: func(cmd *cobra.Command, args []string) { + RunReorg(cmd, args) + }, + } +) + +func RunReorg(cmd *cobra.Command, args []string) { + log.Info().Msg("Starting reorg validator") + + // Initialize reorg package + reorg.Init() + + // Create context with cancellation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle graceful shutdown + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + <-sigChan + log.Info().Msg("Received shutdown signal, stopping reorg validator") + cancel() + }() + + // Run the reorg validator + err := reorg.RunReorgValidator(ctx) + if err != nil && err != context.Canceled { + log.Fatal().Err(err).Msg("Reorg validator failed") + } + + log.Info().Msg("Reorg validator stopped") +} diff --git a/configs/config.go b/configs/config.go index 2dad77c..5bfd610 100644 --- a/configs/config.go +++ b/configs/config.go @@ -307,6 +307,11 @@ type Config struct { ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"` InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"` InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"` + RedisAddr string `env:"REDIS_ADDR"` + RedisUsername string `env:"REDIS_USERNAME"` + RedisPassword string `env:"REDIS_PASSWORD"` + RedisDB int `env:"REDIS_DB"` + RedisEnableTLS bool `env:"REDIS_ENABLE_TLS" envDefault:"true"` } var Cfg Config diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 0c0538f..a72c4ac 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -6,7 +6,6 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" "github.com/thirdweb-dev/indexer/internal/metrics" @@ -59,13 +58,7 @@ func pollLatest() error { Uint64("end_block", expectedBlockNumber-1). Msg("All blocks validated successfully. Publishing blocks to Kafka") - // Convert slice of BlockData to slice of *BlockData for Kafka publisher - blockDataPointers := make([]*common.BlockData, len(blockDataArray)) - for i, block := range blockDataArray { - blockDataPointers[i] = &block - } - - if err := libs.KafkaPublisherV2.PublishBlockData(blockDataPointers); err != nil { + if err := libs.KafkaPublisherV2.PublishBlockData(blockDataArray); err != nil { log.Panic(). Err(err). Int("blocks_count", len(blockDataArray)). diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go new file mode 100644 index 0000000..50aefe9 --- /dev/null +++ b/internal/committer/reorg.go @@ -0,0 +1,165 @@ +package committer + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" + "github.com/thirdweb-dev/indexer/internal/metrics" +) + +func RunReorgValidator(ctx context.Context) error { + for { + startBlock, endBlock, err := getReorgRange() + if err != nil { + log.Error().Err(err).Msg("Failed to get reorg range") + time.Sleep(2 * time.Second) + continue + } + + // Detect reorgs and handle them + err = detectAndHandleReorgs(startBlock, endBlock) + if err != nil { + log.Error().Err(err).Msg("Failed to detect and handle reorgs") + time.Sleep(2 * time.Second) + continue + } + } +} + +func getReorgRange() (int64, int64, error) { + lastValidBlock, err := getLastValidBlock() + if err != nil { + return 0, 0, fmt.Errorf("failed to get last valid block: %w", err) + } + + startBlock := min(lastValidBlock-1, 1) + endBlock, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId) + if err != nil { + return 0, 0, fmt.Errorf("failed to get max block number: %w", err) + } + + endBlock = endBlock - 5 // lag by some blocks for safety + + if startBlock >= endBlock { + return 0, 0, fmt.Errorf("start block is greater than end block") + } + + return startBlock, endBlock, nil +} + +func getLastValidBlock() (int64, error) { + // Try to get last reorg checked block number + lastReorgBlock, err := libs.GetReorgLastValidBlock(libs.ChainIdStr) + if err != nil { + return 0, fmt.Errorf("failed to get last reorg checked block: %w", err) + } + + if lastReorgBlock > 0 { + return lastReorgBlock, nil + } + + // get block number 1 day ago + lastValidBlock, err := libs.GetBlockNumberFromClickHouseV2DaysAgo(libs.ChainId, 1) + if err != nil { + return 0, fmt.Errorf("failed to get block number 1 day ago: %w", err) + } + + return lastValidBlock, nil +} + +func detectAndHandleReorgs(startBlock int64, endBlock int64) error { + log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) + + // Fetch block headers for the range + blockHeaders, err := libs.GetBlockReorgDataFromClickHouseV2(libs.ChainId, startBlock, endBlock) + if err != nil { + return fmt.Errorf("failed to get block headers: %w", err) + } + + if len(blockHeaders) == 0 { + log.Debug().Msg("No block headers found in range") + return nil + } + + // finding the reorg start and end block + reorgStartBlock := int64(-1) + reorgEndBlock := int64(-1) + for i := 1; i < len(blockHeaders); i++ { + if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 { + // non-sequential block numbers + reorgStartBlock = blockHeaders[i-1].Number.Int64() + reorgEndBlock = blockHeaders[i].Number.Int64() + break + } + if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash { + // hash mismatch start + if reorgStartBlock == -1 { + reorgStartBlock = blockHeaders[i-1].Number.Int64() + } + continue + } else { + // hash matches end + if reorgStartBlock != -1 { + reorgEndBlock = blockHeaders[i].Number.Int64() + break + } + } + } + + // set end to the last block if not set + if reorgEndBlock == -1 { + reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() + } + + if reorgStartBlock > -1 { + if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { + return err + } + } + + // update last valid block. if there was no reorg, this will update to the last block + libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock) + + return nil +} + +func handleReorgForRange(startBlock uint64, endBlock uint64) error { + // nothing to do + if startBlock == 0 { + return nil + } + + // will panic if any block is invalid + blockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock) + expectedBlockNumber := startBlock + for i, blockData := range blockDataArray { + if blockData.Block.Number.Uint64() != expectedBlockNumber { + log.Error(). + Int("index", i). + Uint64("expected_block", expectedBlockNumber). + Uint64("actual_block", blockData.Block.Number.Uint64()). + Msg("Reorg: Block sequence mismatch - missing or out of order block") + + return fmt.Errorf("reorg: block sequence mismatch - missing or out of order block") + } + expectedBlockNumber++ + } + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(blockDataArray); err != nil { + log.Error(). + Err(err). + Int("blocks_count", len(blockDataArray)). + Msg("Reorg: Failed to publish blocks to Kafka") + return fmt.Errorf("reorg: failed to publish blocks to kafka") + } + + for _, blockData := range blockDataArray { + metrics.CommitterLastPublishedReorgBlockNumber.WithLabelValues(config.Cfg.ZeetProjectName, libs.ChainIdStr).Set(float64(blockData.Block.Number.Uint64())) + } + + return nil +} diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index f377c97..92844b7 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -100,6 +100,32 @@ func initClickhouse(host string, port int, username string, password string, dat return clickhouseConn } +func GetBlockNumberFromClickHouseV2DaysAgo(chainId *big.Int, daysAgo int) (int64, error) { + query := fmt.Sprintf(`SELECT toString(max(block_number)) + FROM default.blocks WHERE chain_id = %d AND block_timestamp <= now() - INTERVAL %d DAY ;`, chainId.Uint64(), daysAgo) + rows, err := ClickhouseConnV2.Query(context.Background(), query) + if err != nil { + return -1, err + } + defer rows.Close() + + if !rows.Next() { + return -1, nil + } + + var blockNumberStr string + if err := rows.Scan(&blockNumberStr); err != nil { + return -1, err + } + + blockNumber, err := strconv.ParseInt(blockNumberStr, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse block number: %s", blockNumberStr) + } + + return blockNumber, nil +} + func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) { // Use toString() to convert UInt256 to string, then parse to int64 query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d HAVING count() > 0", chainId.Uint64()) @@ -126,6 +152,27 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) { return maxBlockNumber, nil } +func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, endBlockNumber int64) ([]*common.Block, error) { + query := fmt.Sprintf(`SELECT block_number, hash, parent_hash + FROM default.blocks WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number`, chainId.Uint64(), startBlockNumber, endBlockNumber) + rows, err := ClickhouseConnV2.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + blocks := make([]*common.Block, 0) + for rows.Next() { + var block common.Block + err := rows.Scan(&block.Number, &block.Hash, &block.ParentHash) + if err != nil { + return nil, err + } + blocks = append(blocks, &block) + } + return blocks, nil +} + func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { length := endBlockNumber - startBlockNumber + 1 diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index 9db558f..90e7738 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -15,7 +15,7 @@ import ( "github.com/thirdweb-dev/indexer/internal/rpc" ) -func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []common.BlockData { +func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []*common.BlockData { rpcNumParallelCalls := config.Cfg.RPCNumParallelCalls rpcBatchSize := config.Cfg.RPCBatchSize maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls @@ -35,7 +35,7 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) Msg("Starting to fetch latest blocks") // Precreate array of block data - blockDataArray := make([]common.BlockData, blocksToFetch) + blockDataArray := make([]*common.BlockData, blocksToFetch) // Create batches and calculate number of parallel calls needed numBatches := min((blocksToFetch+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls) @@ -74,8 +74,8 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) for i, bd := range batchResults { arrayIndex := batchIdx*rpcBatchSize + uint64(i) if arrayIndex < uint64(len(blockDataArray)) { - blockDataArray[arrayIndex] = *bd // todo: update to use pointer, kafka is using normal block data - batchResults[i] = nil // free memory + blockDataArray[arrayIndex] = bd + batchResults[i] = nil // free memory } } mu.Unlock() diff --git a/internal/libs/redis.go b/internal/libs/redis.go new file mode 100644 index 0000000..6827282 --- /dev/null +++ b/internal/libs/redis.go @@ -0,0 +1,59 @@ +package libs + +import ( + "context" + "crypto/tls" + "strconv" + + "github.com/redis/go-redis/v9" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" +) + +var RedisClient *redis.Client + +const RedisReorgLastValidBlock = "reorg_last_valid" + +// InitRedis initializes the Redis client +func InitRedis() { + RedisClient = redis.NewClient(&redis.Options{ + Addr: config.Cfg.RedisAddr, + Username: config.Cfg.RedisUsername, + Password: config.Cfg.RedisPassword, + DB: config.Cfg.RedisDB, + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + }) + + // Test the connection + ctx := context.Background() + _, err := RedisClient.Ping(ctx).Result() + if err != nil { + log.Panic().Err(err).Msg("Failed to connect to Redis") + } + + log.Info().Msg("Redis client initialized successfully") +} + +func GetReorgLastValidBlock(chainID string) (int64, error) { + result, err := RedisClient.HGet(context.Background(), RedisReorgLastValidBlock, chainID).Result() + if err == redis.Nil { + return 0, nil + } + if err != nil { + return 0, err + } + return strconv.ParseInt(result, 10, 64) +} + +func SetReorgLastValidBlock(chainID string, blockNumber int64) error { + return RedisClient.HSet(context.Background(), RedisReorgLastValidBlock, chainID, blockNumber).Err() +} + +// CloseRedis closes the Redis client +func CloseRedis() { + if RedisClient != nil { + RedisClient.Close() + } +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index e7fdf15..a751e6a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -256,6 +256,11 @@ var ( Help: "The last published block number to Kafka", }, []string{"project_name", "chain_id"}) + CommitterLastPublishedReorgBlockNumber = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "committer_last_published_reorg_block_number", + Help: "The last published reorg block number to Kafka", + }, []string{"project_name", "chain_id"}) + CommitterRPCRowsToFetch = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "committer_rpc_rows_to_fetch", Help: "The total number of rows to fetch from RPC", diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 4601131..0d92e64 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -38,7 +38,8 @@ type PublishableMessagePayload struct { type PublishableMessageBlockData struct { *common.BlockData ChainId uint64 `json:"chain_id"` - IsDeleted int8 `json:"is_deleted"` + IsDeleted int8 `json:"is_deleted"` // deprecated + IsReorg int8 `json:"is_reorg"` InsertTimestamp time.Time `json:"insert_timestamp"` } @@ -115,6 +116,10 @@ func (p *KafkaPublisher) PublishBlockData(blockData []*common.BlockData) error { return p.publishBlockData(blockData, false) } +func (p *KafkaPublisher) PublishBlockDataReorg(blockData []*common.BlockData) error { + return p.publishBlockData(blockData, true) +} + func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { chainId := newData[0].Block.ChainId.Uint64() newHead := uint64(newData[0].Block.Number.Uint64()) @@ -228,7 +233,7 @@ func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) return nil } -func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool) error { +func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isReorg bool) error { if len(blockData) == 0 { return nil } @@ -240,7 +245,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet for i, data := range blockData { // Block message - if blockMsg, err := p.createBlockDataMessage(data, isDeleted); err == nil { + if blockMsg, err := p.createBlockDataMessage(data, isReorg); err == nil { blockMessages[i] = blockMsg } else { return fmt.Errorf("failed to create block message: %v", err) @@ -255,7 +260,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet return nil } -func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDeleted bool) (*kgo.Record, error) { +func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isReorg bool) (*kgo.Record, error) { timestamp := time.Now() data := PublishableMessageBlockData{ @@ -264,8 +269,8 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet IsDeleted: 0, InsertTimestamp: timestamp, } - if isDeleted { - data.IsDeleted = 1 + if isReorg { + data.IsReorg = 1 } msg := PublishableMessagePayload{ @@ -279,7 +284,7 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, msgJson) + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isReorg, msgJson) } func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) { @@ -303,10 +308,10 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, msgJson) + return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, msgJson) } -func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) { +func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isReorg bool, msgJson []byte) (*kgo.Record, error) { encoder, err := zstd.NewWriter(nil) if err != nil { log.Fatal().Err(err).Msg("failed to create zstd encoder") @@ -322,6 +327,7 @@ func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, block {Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))}, {Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))}, {Key: "schema_version", Value: []byte("1")}, + {Key: "is_reorg", Value: []byte(fmt.Sprintf("%t", isReorg))}, {Key: "content-type", Value: []byte("zstd")}, } From d88e7917f1318562fb2084fee9d427224969430b Mon Sep 17 00:00:00 2001 From: nischit Date: Tue, 14 Oct 2025 23:34:32 +0545 Subject: [PATCH 2/8] minor change --- internal/storage/kafka_publisher.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 0d92e64..4c7036a 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -39,7 +39,7 @@ type PublishableMessageBlockData struct { *common.BlockData ChainId uint64 `json:"chain_id"` IsDeleted int8 `json:"is_deleted"` // deprecated - IsReorg int8 `json:"is_reorg"` + IsReorg bool `json:"is_reorg"` InsertTimestamp time.Time `json:"insert_timestamp"` } @@ -270,7 +270,7 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isReorg InsertTimestamp: timestamp, } if isReorg { - data.IsReorg = 1 + data.IsReorg = true } msg := PublishableMessagePayload{ @@ -322,12 +322,12 @@ func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, block // Create headers with metadata headers := []kgo.RecordHeader{ - {Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))}, - {Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))}, + {Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))}, // order is important. always 0 + {Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))}, // order is important. always 1 + {Key: "is_reorg", Value: []byte(fmt.Sprintf("%t", isReorg))}, // order is important. always 2 {Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))}, {Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))}, {Key: "schema_version", Value: []byte("1")}, - {Key: "is_reorg", Value: []byte(fmt.Sprintf("%t", isReorg))}, {Key: "content-type", Value: []byte("zstd")}, } From 0e5315e8cec691f3ea702b71023422edce6cf5ef Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 15 Oct 2025 23:53:05 +0545 Subject: [PATCH 3/8] minor change --- cmd/reorg.go | 52 ----------------------------- internal/committer/reorg.go | 50 ++++++++++++++++----------- internal/libs/clickhouse.go | 36 ++++++++++++-------- internal/storage/kafka_publisher.go | 38 ++++++++++++--------- 4 files changed, 76 insertions(+), 100 deletions(-) delete mode 100644 cmd/reorg.go diff --git a/cmd/reorg.go b/cmd/reorg.go deleted file mode 100644 index 6917fb1..0000000 --- a/cmd/reorg.go +++ /dev/null @@ -1,52 +0,0 @@ -package cmd - -import ( - "context" - "os" - "os/signal" - "syscall" - - "github.com/rs/zerolog/log" - "github.com/spf13/cobra" - "github.com/thirdweb-dev/indexer/internal/reorg" -) - -var ( - reorgCmd = &cobra.Command{ - Use: "reorg", - Short: "Run reorg detection and handling", - Long: "Continuously monitor blockchain for reorganizations and automatically fix them by refetching affected blocks", - Run: func(cmd *cobra.Command, args []string) { - RunReorg(cmd, args) - }, - } -) - -func RunReorg(cmd *cobra.Command, args []string) { - log.Info().Msg("Starting reorg validator") - - // Initialize reorg package - reorg.Init() - - // Create context with cancellation - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Handle graceful shutdown - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) - - go func() { - <-sigChan - log.Info().Msg("Received shutdown signal, stopping reorg validator") - cancel() - }() - - // Run the reorg validator - err := reorg.RunReorgValidator(ctx) - if err != nil && err != context.Canceled { - log.Fatal().Err(err).Msg("Reorg validator failed") - } - - log.Info().Msg("Reorg validator stopped") -} diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 50aefe9..57f1770 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" "github.com/thirdweb-dev/indexer/internal/metrics" @@ -76,36 +77,36 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) // Fetch block headers for the range - blockHeaders, err := libs.GetBlockReorgDataFromClickHouseV2(libs.ChainId, startBlock, endBlock) + blockData, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("failed to get block headers: %w", err) + return fmt.Errorf("detectAndHandleReorgs: failed to get block data: %w", err) } - if len(blockHeaders) == 0 { - log.Debug().Msg("No block headers found in range") + if len(blockData) == 0 { + log.Debug().Msg("detectAndHandleReorgs: No block data found in range") return nil } // finding the reorg start and end block reorgStartBlock := int64(-1) reorgEndBlock := int64(-1) - for i := 1; i < len(blockHeaders); i++ { - if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 { + for i := 1; i < len(blockData); i++ { + if blockData[i].Block.Number.Int64() != blockData[i-1].Block.Number.Int64()+1 { // non-sequential block numbers - reorgStartBlock = blockHeaders[i-1].Number.Int64() - reorgEndBlock = blockHeaders[i].Number.Int64() + reorgStartBlock = blockData[i-1].Block.Number.Int64() + reorgEndBlock = blockData[i].Block.Number.Int64() break } - if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash { + if blockData[i].Block.ParentHash != blockData[i-1].Block.Hash { // hash mismatch start if reorgStartBlock == -1 { - reorgStartBlock = blockHeaders[i-1].Number.Int64() + reorgStartBlock = blockData[i-1].Block.Number.Int64() } continue } else { // hash matches end if reorgStartBlock != -1 { - reorgEndBlock = blockHeaders[i].Number.Int64() + reorgEndBlock = blockData[i].Block.Number.Int64() break } } @@ -113,11 +114,11 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { // set end to the last block if not set if reorgEndBlock == -1 { - reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() + reorgEndBlock = blockData[len(blockData)-1].Block.Number.Int64() } if reorgStartBlock > -1 { - if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { + if err := handleReorgForRange(blockData, uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { return err } } @@ -128,16 +129,16 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { return nil } -func handleReorgForRange(startBlock uint64, endBlock uint64) error { +func handleReorgForRange(oldblockData []*common.BlockData, startBlock uint64, endBlock uint64) error { // nothing to do if startBlock == 0 { return nil } // will panic if any block is invalid - blockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock) + newblockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock) expectedBlockNumber := startBlock - for i, blockData := range blockDataArray { + for i, blockData := range newblockDataArray { if blockData.Block.Number.Uint64() != expectedBlockNumber { log.Error(). Int("index", i). @@ -149,15 +150,26 @@ func handleReorgForRange(startBlock uint64, endBlock uint64) error { } expectedBlockNumber++ } - if err := libs.KafkaPublisherV2.PublishBlockDataReorg(blockDataArray); err != nil { + + oldblockDataArray := make([]*common.BlockData, 0, len(oldblockData)) + for _, bd := range oldblockData { + if bd.Block.Number.Uint64() < startBlock || bd.Block.Number.Uint64() > endBlock { + continue + } + oldblockDataArray = append(oldblockDataArray, bd) + } + // order of deletion is important + // need to first delete then add so list should contain old data first + + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil { log.Error(). Err(err). - Int("blocks_count", len(blockDataArray)). + Int("blocks_count", len(newblockDataArray)). Msg("Reorg: Failed to publish blocks to Kafka") return fmt.Errorf("reorg: failed to publish blocks to kafka") } - for _, blockData := range blockDataArray { + for _, blockData := range newblockDataArray { metrics.CommitterLastPublishedReorgBlockNumber.WithLabelValues(config.Cfg.ZeetProjectName, libs.ChainIdStr).Set(float64(blockData.Block.Number.Uint64())) } diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 92844b7..7ebfe55 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -173,7 +173,15 @@ func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, return blocks, nil } +func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { + return getBlockDataFromClickhouse(ClickhouseConnV2, chainId, startBlockNumber, endBlockNumber) +} + func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { + return getBlockDataFromClickhouse(ClickhouseConnV1, chainId, startBlockNumber, endBlockNumber) +} + +func getBlockDataFromClickhouse(clickhouseConn clickhouse.Conn, chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { length := endBlockNumber - startBlockNumber + 1 blockData := make([]*common.BlockData, length) @@ -186,22 +194,22 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl wg.Add(4) go func() { defer wg.Done() - blocksRaw, _ = getBlocksFromV1(chainId, startBlockNumber, endBlockNumber) + blocksRaw, _ = getBlocksFrom(clickhouseConn, chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - transactionsRaw, _ = getTransactionsFromV1(chainId, startBlockNumber, endBlockNumber) + transactionsRaw, _ = getTransactionsFrom(clickhouseConn, chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - logsRaw, _ = getLogsFromV1(chainId, startBlockNumber, endBlockNumber) + logsRaw, _ = getLogsFrom(clickhouseConn, chainId, startBlockNumber, endBlockNumber) }() go func() { defer wg.Done() - tracesRaw, _ = getTracesFromV1(chainId, startBlockNumber, endBlockNumber) + tracesRaw, _ = getTracesFrom(clickhouseConn, chainId, startBlockNumber, endBlockNumber) }() wg.Wait() @@ -236,7 +244,7 @@ func GetBlockDataFromClickHouseV1(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } -func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { +func getBlocksFrom(clickhouseConn clickhouse.Conn, chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 blocksRaw := make([]common.Block, length) @@ -248,7 +256,7 @@ func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin startBlockNumber, endBlockNumber, ) - blocks, err := execQueryV1[common.Block](query) + blocks, err := execQuery[common.Block](clickhouseConn, query) if err != nil { return blocksRaw, err } @@ -265,7 +273,7 @@ func getBlocksFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin return blocksRaw, nil } -func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) { +func getTransactionsFrom(clickhouseConn clickhouse.Conn, chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Transaction, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 transactionsRaw := make([][]common.Transaction, length) @@ -277,7 +285,7 @@ func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumb startBlockNumber, endBlockNumber, ) - transactions, err := execQueryV1[common.Transaction](query) + transactions, err := execQuery[common.Transaction](clickhouseConn, query) if err != nil { return transactionsRaw, err } @@ -294,7 +302,7 @@ func getTransactionsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumb return transactionsRaw, nil } -func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) { +func getLogsFrom(clickhouseConn clickhouse.Conn, chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Log, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 logsRaw := make([][]common.Log, length) @@ -306,7 +314,7 @@ func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint6 startBlockNumber, endBlockNumber, ) - logs, err := execQueryV1[common.Log](query) + logs, err := execQuery[common.Log](clickhouseConn, query) if err != nil { return logsRaw, err } @@ -323,7 +331,7 @@ func getLogsFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint6 return logsRaw, nil } -func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) { +func getTracesFrom(clickhouseConn clickhouse.Conn, chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([][]common.Trace, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 tracesRaw := make([][]common.Trace, length) @@ -335,7 +343,7 @@ func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin startBlockNumber, endBlockNumber, ) - traces, err := execQueryV1[common.Trace](query) + traces, err := execQuery[common.Trace](clickhouseConn, query) if err != nil { return tracesRaw, err } @@ -352,9 +360,9 @@ func getTracesFromV1(chainId uint64, startBlockNumber uint64, endBlockNumber uin return tracesRaw, nil } -func execQueryV1[T any](query string) ([]T, error) { +func execQuery[T any](clickhouseConn clickhouse.Conn, query string) ([]T, error) { var out []T - if err := ClickhouseConnV1.Select(context.Background(), &out, query); err != nil { + if err := clickhouseConn.Select(context.Background(), &out, query); err != nil { return nil, err } return out, nil diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 45ce9bd..e1a86bf 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -38,8 +38,7 @@ type PublishableMessagePayload struct { type PublishableMessageBlockData struct { *common.BlockData ChainId uint64 `json:"chain_id"` - IsDeleted int8 `json:"is_deleted"` // deprecated - IsReorg bool `json:"is_reorg"` + IsDeleted int8 `json:"is_deleted"` InsertTimestamp time.Time `json:"insert_timestamp"` } @@ -113,11 +112,19 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { } func (p *KafkaPublisher) PublishBlockData(blockData []*common.BlockData) error { - return p.publishBlockData(blockData, false) + return p.publishBlockData(blockData, false, false) } -func (p *KafkaPublisher) PublishBlockDataReorg(blockData []*common.BlockData) error { - return p.publishBlockData(blockData, true) +func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, oldBlockData []*common.BlockData) error { + if err := p.publishBlockData(oldBlockData, true, true); err != nil { + return fmt.Errorf("failed to publish old block data: %v", err) + } + + if err := p.publishBlockData(newBlockData, false, true); err != nil { + return fmt.Errorf("failed to publish new block data: %v", err) + } + + return nil } func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { @@ -128,11 +135,11 @@ func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*co return fmt.Errorf("failed to revert: %v", err) } - if err := p.publishBlockData(oldData, true); err != nil { + if err := p.publishBlockData(oldData, true, true); err != nil { return fmt.Errorf("failed to publish old block data: %v", err) } - if err := p.publishBlockData(newData, false); err != nil { + if err := p.publishBlockData(newData, false, true); err != nil { return fmt.Errorf("failed to publish new block data: %v", err) } return nil @@ -233,7 +240,7 @@ func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) return nil } -func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isReorg bool) error { +func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool, isReorg bool) error { if len(blockData) == 0 { return nil } @@ -245,7 +252,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isReorg for i, data := range blockData { // Block message - if blockMsg, err := p.createBlockDataMessage(data, isReorg); err == nil { + if blockMsg, err := p.createBlockDataMessage(data, isDeleted, isReorg); err == nil { blockMessages[i] = blockMsg } else { return fmt.Errorf("failed to create block message: %v", err) @@ -260,7 +267,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isReorg return nil } -func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isReorg bool) (*kgo.Record, error) { +func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDeleted bool, isReorg bool) (*kgo.Record, error) { timestamp := time.Now() data := PublishableMessageBlockData{ @@ -269,8 +276,8 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isReorg IsDeleted: 0, InsertTimestamp: timestamp, } - if isReorg { - data.IsReorg = true + if isDeleted { + data.IsDeleted = 1 } msg := PublishableMessagePayload{ @@ -284,7 +291,7 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isReorg return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isReorg, msgJson) + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) } func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) { @@ -308,10 +315,10 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, msgJson) + return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, false, msgJson) } -func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isReorg bool, msgJson []byte) (*kgo.Record, error) { +func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isDeleted bool, isReorg bool, msgJson []byte) (*kgo.Record, error) { compressionThreshold := config.Cfg.CommitterCompressionThresholdMB * 1024 * 1024 var value []byte @@ -336,6 +343,7 @@ func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, block {Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))}, // order is important. always 0 {Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))}, // order is important. always 1 {Key: "is_reorg", Value: []byte(fmt.Sprintf("%t", isReorg))}, // order is important. always 2 + {Key: "is_deleted", Value: []byte(fmt.Sprintf("%t", isDeleted))}, // order is important. always 3 {Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))}, {Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))}, {Key: "schema_version", Value: []byte("1")}, From 53e49f6a6163481b9f00e79106f45e51934f33f5 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 16 Oct 2025 21:58:46 +0545 Subject: [PATCH 4/8] minor fixes --- cmd/committer.go | 2 ++ internal/committer/reorg.go | 5 ++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/committer.go b/cmd/committer.go index 989a700..568ae96 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -30,5 +30,7 @@ func RunCommitter(cmd *cobra.Command, args []string) { }() committer.Init() + + go committer.RunReorgValidator() committer.CommitStreaming() } diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 57f1770..2f1cf7a 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -1,7 +1,6 @@ package committer import ( - "context" "fmt" "time" @@ -13,7 +12,7 @@ import ( "github.com/thirdweb-dev/indexer/internal/metrics" ) -func RunReorgValidator(ctx context.Context) error { +func RunReorgValidator() { for { startBlock, endBlock, err := getReorgRange() if err != nil { @@ -44,7 +43,7 @@ func getReorgRange() (int64, int64, error) { return 0, 0, fmt.Errorf("failed to get max block number: %w", err) } - endBlock = endBlock - 5 // lag by some blocks for safety + endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety if startBlock >= endBlock { return 0, 0, fmt.Errorf("start block is greater than end block") From 9c1c1387e19a622c2b329e1d9a4078beccf57c60 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 16 Oct 2025 22:12:08 +0545 Subject: [PATCH 5/8] minor change --- internal/committer/reorg.go | 40 ++++++++++++++++--------------------- internal/libs/clickhouse.go | 28 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 2f1cf7a..ce31924 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -6,7 +6,6 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" "github.com/thirdweb-dev/indexer/internal/metrics" @@ -76,36 +75,36 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) // Fetch block headers for the range - blockData, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: failed to get block data: %w", err) + return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) } - if len(blockData) == 0 { - log.Debug().Msg("detectAndHandleReorgs: No block data found in range") + if len(blockHeaders) == 0 { + log.Debug().Msg("detectAndHandleReorgs: No block headers found in range") return nil } // finding the reorg start and end block reorgStartBlock := int64(-1) reorgEndBlock := int64(-1) - for i := 1; i < len(blockData); i++ { - if blockData[i].Block.Number.Int64() != blockData[i-1].Block.Number.Int64()+1 { + for i := 1; i < len(blockHeaders); i++ { + if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 { // non-sequential block numbers - reorgStartBlock = blockData[i-1].Block.Number.Int64() - reorgEndBlock = blockData[i].Block.Number.Int64() + reorgStartBlock = blockHeaders[i-1].Number.Int64() + reorgEndBlock = blockHeaders[i].Number.Int64() break } - if blockData[i].Block.ParentHash != blockData[i-1].Block.Hash { + if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash { // hash mismatch start if reorgStartBlock == -1 { - reorgStartBlock = blockData[i-1].Block.Number.Int64() + reorgStartBlock = blockHeaders[i-1].Number.Int64() } continue } else { // hash matches end if reorgStartBlock != -1 { - reorgEndBlock = blockData[i].Block.Number.Int64() + reorgEndBlock = blockHeaders[i].Number.Int64() break } } @@ -113,11 +112,11 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { // set end to the last block if not set if reorgEndBlock == -1 { - reorgEndBlock = blockData[len(blockData)-1].Block.Number.Int64() + reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() } if reorgStartBlock > -1 { - if err := handleReorgForRange(blockData, uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { + if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { return err } } @@ -128,7 +127,7 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { return nil } -func handleReorgForRange(oldblockData []*common.BlockData, startBlock uint64, endBlock uint64) error { +func handleReorgForRange(startBlock uint64, endBlock uint64) error { // nothing to do if startBlock == 0 { return nil @@ -150,15 +149,10 @@ func handleReorgForRange(oldblockData []*common.BlockData, startBlock uint64, en expectedBlockNumber++ } - oldblockDataArray := make([]*common.BlockData, 0, len(oldblockData)) - for _, bd := range oldblockData { - if bd.Block.Number.Uint64() < startBlock || bd.Block.Number.Uint64() > endBlock { - continue - } - oldblockDataArray = append(oldblockDataArray, bd) + oldblockDataArray, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), startBlock, endBlock) + if err != nil { + return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err) } - // order of deletion is important - // need to first delete then add so list should contain old data first if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil { log.Error(). diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index c41bd96..63afd8f 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -162,6 +162,34 @@ func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, return blocks, nil } +func GetBlockHeadersForReorgCheck(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.Block, error) { + sb := startBlockNumber + length := endBlockNumber - startBlockNumber + 1 + blocksRaw := make([]*common.Block, length) + + query := fmt.Sprintf("SELECT block_number, hash, parent_hash FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + blocks, err := execQueryV2[common.Block](query) + if err != nil { + return blocksRaw, err + } + + // just to make sure the blocks are in the correct order + for _, block := range blocks { + idx := block.Number.Uint64() - sb + if idx >= length { + log.Error().Msgf("Block number %s is out of range", block.Number.String()) + continue + } + blocksRaw[idx] = &block + } + return blocksRaw, nil +} + func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { length := endBlockNumber - startBlockNumber + 1 From a50b9b67b9dca30f9abdf105c4574238281d582d Mon Sep 17 00:00:00 2001 From: nischit Date: Fri, 17 Oct 2025 02:54:53 +0545 Subject: [PATCH 6/8] minor change --- configs/config.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/configs/config.go b/configs/config.go index 0eb2655..da95539 100644 --- a/configs/config.go +++ b/configs/config.go @@ -302,11 +302,10 @@ type Config struct { ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"` InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"` InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"` - RedisAddr string `env:"REDIS_ADDR"` + RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` RedisUsername string `env:"REDIS_USERNAME"` RedisPassword string `env:"REDIS_PASSWORD"` - RedisDB int `env:"REDIS_DB"` - RedisEnableTLS bool `env:"REDIS_ENABLE_TLS" envDefault:"true"` + RedisDB int `env:"REDIS_DB" envDefault:"0"` } var Cfg Config From 5c5ae91c7eeedbe42d256c9335793189e107841d Mon Sep 17 00:00:00 2001 From: nischit Date: Fri, 17 Oct 2025 14:35:40 +0545 Subject: [PATCH 7/8] init redis --- cmd/committer.go | 1 + internal/committer/reorg.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/cmd/committer.go b/cmd/committer.go index 568ae96..eef0bba 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -30,6 +30,7 @@ func RunCommitter(cmd *cobra.Command, args []string) { }() committer.Init() + committer.InitReorg() go committer.RunReorgValidator() committer.CommitStreaming() diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index ce31924..9ef2ee0 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -11,6 +11,10 @@ import ( "github.com/thirdweb-dev/indexer/internal/metrics" ) +func InitReorg() { + libs.InitRedis() +} + func RunReorgValidator() { for { startBlock, endBlock, err := getReorgRange() From 9c01c5465d5dac7f0e61c00d807781d52c1ae1e9 Mon Sep 17 00:00:00 2001 From: nischit Date: Fri, 17 Oct 2025 14:39:46 +0545 Subject: [PATCH 8/8] minor fixes --- internal/committer/reorg.go | 10 +++++++++- internal/libs/libblockdata/getblockdata.go | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 9ef2ee0..2a03e01 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -16,6 +16,7 @@ func InitReorg() { } func RunReorgValidator() { + lastBlockCheck := int64(0) for { startBlock, endBlock, err := getReorgRange() if err != nil { @@ -24,6 +25,12 @@ func RunReorgValidator() { continue } + if endBlock == lastBlockCheck || endBlock-startBlock < 5 { + log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.") + time.Sleep(1 * time.Minute) + continue + } + // Detect reorgs and handle them err = detectAndHandleReorgs(startBlock, endBlock) if err != nil { @@ -31,6 +38,7 @@ func RunReorgValidator() { time.Sleep(2 * time.Second) continue } + lastBlockCheck = endBlock } } @@ -40,7 +48,7 @@ func getReorgRange() (int64, int64, error) { return 0, 0, fmt.Errorf("failed to get last valid block: %w", err) } - startBlock := min(lastValidBlock-1, 1) + startBlock := max(lastValidBlock-1, 1) endBlock, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId) if err != nil { return 0, 0, fmt.Errorf("failed to get max block number: %w", err) diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index 7845f59..81811ba 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -21,7 +21,7 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Calculate the range of blocks to fetch - blocksToFetch := latestBlock - nextCommitBlockNumber + blocksToFetch := latestBlock - nextCommitBlockNumber + 1 if blocksToFetch > maxBlocksPerFetch { blocksToFetch = maxBlocksPerFetch }