From b1610aeb19ba073dc8177ccfec88f3d2a4006287 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 20:47:46 +0545 Subject: [PATCH 01/10] reorg selected blocks for chain --- cmd/reorgapi.go | 35 ++++++ cmd/root.go | 1 + configs/config.go | 4 + internal/libs/clickhouse.go | 63 +++++++++++ internal/libs/libblockdata/getblockdata.go | 98 ++++++++++++---- internal/reorgapi/server.go | 126 +++++++++++++++++++++ 6 files changed, 306 insertions(+), 21 deletions(-) create mode 100644 cmd/reorgapi.go create mode 100644 internal/reorgapi/server.go diff --git a/cmd/reorgapi.go b/cmd/reorgapi.go new file mode 100644 index 0000000..8bd35ef --- /dev/null +++ b/cmd/reorgapi.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/reorgapi" +) + +var reorgAPICmd = &cobra.Command{ + Use: "reorg-api", + Short: "HTTP API to publish manual reorg fixes to Kafka", + Long: `Loads old block data from ClickHouse, fetches canonical data from RPC, and publishes +to Kafka using the same reorg semantics as automatic reorg handling (old blocks as deleted, then new blocks). + +Requires the same env as committer for RPC, ClickHouse, and Kafka (no S3). + +Example: + curl -sS -X POST http://localhost:8080/v1/reorg/publish \ + -H 'Content-Type: application/json' \ + -d '{"chain_id":8453,"block_numbers":[12345,12346]}'`, + Run: runReorgAPI, +} + +func runReorgAPI(cmd *cobra.Command, args []string) { + libs.InitRPCClient() + libs.InitNewClickHouseV2() + libs.InitKafkaV2() + + log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api") + + if err := reorgapi.RunHTTPServer(); err != nil { + log.Fatal().Err(err).Msg("reorg-api server exited with error") + } +} diff --git a/cmd/root.go b/cmd/root.go index 6440fb3..b209e4c 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -33,6 +33,7 @@ func init() { rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "optional config file path (defaults to env-only when unset)") rootCmd.AddCommand(committerCmd) rootCmd.AddCommand(backfillCmd) + rootCmd.AddCommand(reorgAPICmd) } func initConfig() { diff --git a/configs/config.go b/configs/config.go index 2987132..4df6ffa 100644 --- a/configs/config.go +++ b/configs/config.go @@ -90,6 +90,10 @@ type Config struct { RedisDB int `env:"REDIS_DB" envDefault:"0"` ValidationMode string `env:"VALIDATION_MODE" envDefault:"minimal"` EnableReorgValidation bool `env:"ENABLE_REORG_VALIDATION" envDefault:"true"` + // ReorgAPIListenAddr is the bind address for the manual reorg publish HTTP server (reorg-api command). + ReorgAPIListenAddr string `env:"REORG_API_LISTEN_ADDR" envDefault:":8080"` + // ReorgAPIKey, when non-empty, requires requests to send Authorization: Bearer . + ReorgAPIKey string `env:"REORG_API_KEY"` } var Cfg Config diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index b5419d7..8538ebc 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "math/big" + "slices" "strconv" "strings" "sync" @@ -250,6 +251,68 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } +// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers (non-contiguous ranges are merged into efficient queries). +func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []uint64) ([]*common.BlockData, error) { + if len(blockNumbers) == 0 { + return nil, nil + } + nums := slices.Clone(blockNumbers) + slices.Sort(nums) + nums = slices.Compact(nums) + + ranges := contiguousUint64Ranges(nums) + byNumber := make(map[uint64]*common.BlockData, len(nums)) + for _, r := range ranges { + start, end := r[0], r[1] + chunk, err := GetBlockDataFromClickHouseV2(chainId, start, end) + if err != nil { + return nil, fmt.Errorf("clickhouse range %d-%d: %w", start, end, err) + } + for _, bd := range chunk { + if bd == nil || bd.Block.ChainId == nil || bd.Block.ChainId.Uint64() == 0 || bd.Block.Number == nil { + continue + } + bn := bd.Block.Number.Uint64() + byNumber[bn] = bd + } + } + + out := make([]*common.BlockData, 0, len(nums)) + var missing []uint64 + for _, bn := range nums { + bd, ok := byNumber[bn] + if !ok || bd == nil { + missing = append(missing, bn) + continue + } + out = append(out, bd) + } + if len(missing) > 0 { + return nil, fmt.Errorf("missing block data in ClickHouse for blocks: %v", missing) + } + return out, nil +} + +func contiguousUint64Ranges(sorted []uint64) [][2]uint64 { + if len(sorted) == 0 { + return nil + } + var out [][2]uint64 + start := sorted[0] + end := sorted[0] + for i := 1; i < len(sorted); i++ { + if sorted[i] == end+1 { + end = sorted[i] + } else { + out = append(out, [2]uint64{start, end}) + start = sorted[i] + end = sorted[i] + } + } + out = append(out, [2]uint64{start, end}) + return out +} + // GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range, // where the stored transaction_count in the blocks table does not match the number // of transactions in the transactions table. It returns the minimum and maximum diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index e0fb992..e4c712d 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -185,17 +185,23 @@ func GetValidBlockDataFromRpc(blockNumbers []uint64) []*common.BlockData { } func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { + blockData, err := fetchBlockDataFromRpcBatch(blockNumbers) + if err != nil { + log.Panic().Err(err).Msg("Failed to fetch block data from RPC") + } + return blockData +} + +// fetchBlockDataFromRpcBatch fetches full block data from RPC with retries; returns an error instead of panicking. +func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, error) { var rpcResults []rpc.GetFullBlockResult - var fetchErr error chainIdStr := libs.ChainIdStr indexerName := config.Cfg.ZeetProjectName - // Initial fetch rpcResults = libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(blockNumbers)) metrics.CommitterRPCRowsToFetch.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockNumbers))) - // Create array of failed block numbers for retry failedBlockNumbers := make([]uint64, 0) for i, result := range rpcResults { if result.Error != nil { @@ -204,13 +210,11 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { } } - // Retry only failed blocks up to 3 times for retry := range 3 { if len(failedBlockNumbers) == 0 { - break // All blocks succeeded + break } - // Track retry metric metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Set(float64(len(failedBlockNumbers))) log.Warn(). @@ -218,21 +222,16 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { Int("failed_count", len(failedBlockNumbers)). Msg("Retrying failed block fetches...") - // Retry only the failed blocks retryResults := libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(failedBlockNumbers)) - // Update rpcResults with successful ones and create new failed array newFailedBlockNumbers := make([]uint64, 0) retryIndex := 0 for i, result := range rpcResults { if result.Error != nil { - // This was a failed block, check if retry succeeded if retryIndex < len(retryResults) && retryResults[retryIndex].Error == nil { - // Retry succeeded - update the result rpcResults[i] = retryResults[retryIndex] } else { - // Still failed - add to new failed array newFailedBlockNumbers = append(newFailedBlockNumbers, blockNumbers[i]) } retryIndex++ @@ -241,34 +240,91 @@ func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { failedBlockNumbers = newFailedBlockNumbers - // Add delay between retries if len(failedBlockNumbers) > 0 && retry < 2 { time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) } } - // Check if any blocks still failed after all retries if len(failedBlockNumbers) > 0 { - fetchErr = fmt.Errorf("failed to fetch %d block(s) from RPC after 3 retries", len(failedBlockNumbers)) - } - - if fetchErr != nil { - log.Panic().Err(fetchErr).Msg("Failed to fetch block data from RPC") + return nil, fmt.Errorf("failed to fetch %d block(s) from RPC after 3 retries", len(failedBlockNumbers)) } blockData := make([]*common.BlockData, len(rpcResults)) for i, result := range rpcResults { blockData[i] = &result.Data - rpcResults[i] = rpc.GetFullBlockResult{} // free memory + rpcResults[i] = rpc.GetFullBlockResult{} } for i, block := range blockData { if isValid, _ := Validate(block); !isValid { - log.Panic().Int("index", i).Msg("Failed to validate block data from rpc") + bn := uint64(0) + if i < len(blockNumbers) { + bn = blockNumbers[i] + } + return nil, fmt.Errorf("validation failed for block %d (batch index %d)", bn, i) } } - return blockData + return blockData, nil +} + +// FetchBlockDataFromRPC fetches and validates block data from RPC, returning an error on failure (for HTTP/tools). +func FetchBlockDataFromRPC(blockNumbers []uint64) ([]*common.BlockData, error) { + rpcBatchSize := config.Cfg.RPCBatchSize + totalBlocks := len(blockNumbers) + if totalBlocks == 0 { + return nil, nil + } + blockData := make([]*common.BlockData, totalBlocks) + numBatches := (totalBlocks + int(rpcBatchSize) - 1) / int(rpcBatchSize) + + var wg sync.WaitGroup + var mu sync.Mutex + var firstErr error + + maxConcurrentBatches := 4 + semaphore := make(chan struct{}, maxConcurrentBatches) + + for batchIndex := range numBatches { + wg.Add(1) + go func(batchIdx int) { + defer wg.Done() + + semaphore <- struct{}{} + defer func() { <-semaphore }() + + start := batchIdx * int(rpcBatchSize) + end := min(start+int(rpcBatchSize), totalBlocks) + + batchBlockNumbers := blockNumbers[start:end] + batchResults, err := fetchBlockDataFromRpcBatch(batchBlockNumbers) + if err != nil { + mu.Lock() + if firstErr == nil { + firstErr = fmt.Errorf("batch starting at block %d: %w", batchBlockNumbers[0], err) + } + mu.Unlock() + return + } + + for i, result := range batchResults { + blockData[start+i] = result + } + + log.Debug(). + Int("batch", batchIdx). + Int("start", start). + Int("end", end). + Int("batch_size", len(batchBlockNumbers)). + Msg("Completed RPC batch fetch (FetchBlockDataFromRPC)") + }(batchIndex) + } + + wg.Wait() + if firstErr != nil { + return nil, firstErr + } + return blockData, nil } func blockNumbersToBigInt(blockNumbers []uint64) []*big.Int { diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go new file mode 100644 index 0000000..dd2fad0 --- /dev/null +++ b/internal/reorgapi/server.go @@ -0,0 +1,126 @@ +package reorgapi + +import ( + "fmt" + "net/http" + "slices" + "strings" + + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" +) + +// PublishReorgRequest is the JSON body for POST /v1/reorg/publish. +type PublishReorgRequest struct { + ChainID uint64 `json:"chain_id"` + BlockNumbers []uint64 `json:"block_numbers"` +} + +type PublishReorgResponse struct { + OK bool `json:"ok"` + BlocksPublished int `json:"blocks_published"` + Message string `json:"message,omitempty"` +} + +// RunHTTPServer starts a blocking HTTP server that publishes manual reorg batches to Kafka. +func RunHTTPServer() error { + gin.SetMode(gin.ReleaseMode) + r := gin.New() + r.Use(gin.Recovery()) + r.Use(gin.LoggerWithWriter(gin.DefaultWriter)) + + r.GET("/health", func(c *gin.Context) { + c.JSON(http.StatusOK, gin.H{"status": "ok"}) + }) + + v1 := r.Group("/v1") + v1.POST("/reorg/publish", authMiddleware(), handlePublishReorg) + + addr := config.Cfg.ReorgAPIListenAddr + log.Info().Str("addr", addr).Msg("reorg-api HTTP server listening") + return r.Run(addr) +} + +func authMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + key := config.Cfg.ReorgAPIKey + if key == "" { + c.Next() + return + } + auth := c.GetHeader("Authorization") + const prefix = "Bearer " + if !strings.HasPrefix(auth, prefix) || strings.TrimPrefix(auth, prefix) != key { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + c.Abort() + return + } + c.Next() + } +} + +func handlePublishReorg(c *gin.Context) { + var req PublishReorgRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("invalid json: %v", err)}) + return + } + if req.ChainID == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "chain_id is required"}) + return + } + if libs.ChainId == nil || libs.ChainId.Uint64() != req.ChainID { + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf("chain_id must match this deployment's RPC chain (%s)", libs.ChainIdStr), + }) + return + } + if len(req.BlockNumbers) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "block_numbers must be non-empty"}) + return + } + + sorted := slices.Clone(req.BlockNumbers) + slices.Sort(sorted) + sorted = slices.Compact(sorted) + + oldData, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, sorted) + if err != nil { + log.Error().Err(err).Msg("manual reorg: clickhouse") + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + newData, err := libblockdata.FetchBlockDataFromRPC(sorted) + if err != nil { + log.Error().Err(err).Msg("manual reorg: rpc") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + + if len(newData) != len(sorted) { + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal: rpc result length mismatch"}) + return + } + for i, bn := range sorted { + if newData[i] == nil || newData[i].Block.Number == nil || newData[i].Block.Number.Uint64() != bn { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("rpc block order mismatch at index %d", i)}) + return + } + } + + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newData, oldData); err != nil { + log.Error().Err(err).Msg("manual reorg: kafka") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } + + c.JSON(http.StatusOK, PublishReorgResponse{ + OK: true, + BlocksPublished: len(sorted), + Message: "published old (deleted) then new blocks with reorg headers", + }) +} From f80936686ea67bec26882dd1b675109684ac55f5 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 21:27:10 +0545 Subject: [PATCH 02/10] log fix --- internal/libs/clickhouse.go | 64 +++++++++++++++++++++---------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 8538ebc..0fbc92a 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -8,9 +8,9 @@ import ( "slices" "strconv" "strings" - "sync" "github.com/rs/zerolog/log" + "golang.org/x/sync/errgroup" "github.com/ClickHouse/clickhouse-go/v2" config "github.com/thirdweb-dev/indexer/configs" @@ -211,34 +211,44 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl logsRaw := make([][]common.Log, length) tracesRaw := make([][]common.Trace, length) - wg := sync.WaitGroup{} - wg.Add(4) - go func() { - defer wg.Done() - blocksRaw, _ = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) - }() - - go func() { - defer wg.Done() - transactionsRaw, _ = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber) - }() - - go func() { - defer wg.Done() - logsRaw, _ = getLogsFromV2(chainId, startBlockNumber, endBlockNumber) - }() - - go func() { - defer wg.Done() - tracesRaw, _ = getTracesFromV2(chainId, startBlockNumber, endBlockNumber) - }() - wg.Wait() + g := new(errgroup.Group) + g.Go(func() (err error) { + blocksRaw, err = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + return err + }) + g.Go(func() (err error) { + transactionsRaw, err = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber) + return err + }) + g.Go(func() (err error) { + logsRaw, err = getLogsFromV2(chainId, startBlockNumber, endBlockNumber) + return err + }) + g.Go(func() (err error) { + tracesRaw, err = getTracesFromV2(chainId, startBlockNumber, endBlockNumber) + return err + }) + if err := g.Wait(); err != nil { + return nil, err + } for i := range blockData { - if blocksRaw[i].ChainId == nil || blocksRaw[i].ChainId.Uint64() == 0 { - log.Info(). - Any("chainId", blocksRaw[i].ChainId). - Msg("skipping block because chainId is nil") + b := blocksRaw[i] + expectedBlockNumber := startBlockNumber + uint64(i) + if b.Number == nil { + // No row in ClickHouse for this index: the range is denser than stored blocks + // (e.g. catch-up gap, not yet committed, or sparse query). + log.Debug(). + Uint64("chainId", chainId). + Uint64("blockNumber", expectedBlockNumber). + Msg("skipping slot: no block row in ClickHouse for this block number") + continue + } + if b.ChainId == nil || b.ChainId.Uint64() == 0 { + log.Warn(). + Uint64("chainId", chainId). + Uint64("blockNumber", b.Number.Uint64()). + Msg("skipping block: chain_id missing or zero on ClickHouse row") continue } blockData[i] = &common.BlockData{ From 98c240ba34cf692078f55cb621697e762cfd2a22 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 21:46:37 +0545 Subject: [PATCH 03/10] publish missing block --- internal/libs/clickhouse.go | 11 +++++++---- internal/reorgapi/server.go | 8 +++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 0fbc92a..10627ef 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -236,12 +236,11 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl b := blocksRaw[i] expectedBlockNumber := startBlockNumber + uint64(i) if b.Number == nil { - // No row in ClickHouse for this index: the range is denser than stored blocks - // (e.g. catch-up gap, not yet committed, or sparse query). + // No row returned for this height in the requested range (indexer gap or not committed). log.Debug(). Uint64("chainId", chainId). Uint64("blockNumber", expectedBlockNumber). - Msg("skipping slot: no block row in ClickHouse for this block number") + Msg("skipping slot: no block row returned for this block number") continue } if b.ChainId == nil || b.ChainId.Uint64() == 0 { @@ -298,7 +297,11 @@ func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []ui out = append(out, bd) } if len(missing) > 0 { - return nil, fmt.Errorf("missing block data in ClickHouse for blocks: %v", missing) + log.Info(). + Uint64("chain_id", chainId). + Interface("missing_block_numbers", missing). + Int("found_blocks", len(out)). + Msg("manual reorg: no ClickHouse rows (FINAL) for some blocks; publishing RPC data as reorg inserts only for those (no delete tombstones)") } return out, nil } diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index dd2fad0..bd5755b 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -87,6 +87,8 @@ func handlePublishReorg(c *gin.Context) { slices.Sort(sorted) sorted = slices.Compact(sorted) + // Old snapshot for tombstones: may be shorter than sorted when FINAL has no row for some + // heights (empty/partial tables). PublishBlockDataReorg still publishes RPC data as reorg inserts for every block. oldData, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, sorted) if err != nil { log.Error().Err(err).Msg("manual reorg: clickhouse") @@ -118,9 +120,13 @@ func handlePublishReorg(c *gin.Context) { return } + msg := "published old (deleted) then new blocks with reorg headers" + if len(oldData) < len(sorted) { + msg = "published reorg inserts for all blocks; delete tombstones only for blocks found in ClickHouse (some requested heights had no FINAL row)" + } c.JSON(http.StatusOK, PublishReorgResponse{ OK: true, BlocksPublished: len(sorted), - Message: "published old (deleted) then new blocks with reorg headers", + Message: msg, }) } From 5ca3a27227126e49a89ad040287b6a2fd2c25e36 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 22:13:20 +0545 Subject: [PATCH 04/10] batch --- configs/config.go | 2 + internal/libs/clickhouse.go | 180 +++++++++++++++++++++++++----------- internal/reorgapi/server.go | 30 ++++-- 3 files changed, 153 insertions(+), 59 deletions(-) diff --git a/configs/config.go b/configs/config.go index 4df6ffa..62a9130 100644 --- a/configs/config.go +++ b/configs/config.go @@ -94,6 +94,8 @@ type Config struct { ReorgAPIListenAddr string `env:"REORG_API_LISTEN_ADDR" envDefault:":8080"` // ReorgAPIKey, when non-empty, requires requests to send Authorization: Bearer . ReorgAPIKey string `env:"REORG_API_KEY"` + // ReorgAPIClickhouseBatchSize is how many block numbers to load from ClickHouse per reorg-api sub-request (manual reorg publish). + ReorgAPIClickhouseBatchSize uint64 `env:"REORG_API_CLICKHOUSE_BATCH_SIZE" envDefault:"10"` } var Cfg Config diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 10627ef..f7f6122 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -234,13 +234,8 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl for i := range blockData { b := blocksRaw[i] - expectedBlockNumber := startBlockNumber + uint64(i) if b.Number == nil { - // No row returned for this height in the requested range (indexer gap or not committed). - log.Debug(). - Uint64("chainId", chainId). - Uint64("blockNumber", expectedBlockNumber). - Msg("skipping slot: no block row returned for this block number") + // No row for this index in the dense [start,end] range (gap vs FINAL). continue } if b.ChainId == nil || b.ChainId.Uint64() == 0 { @@ -260,7 +255,76 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } -// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers (non-contiguous ranges are merged into efficient queries). +func joinUint64sForIN(nums []uint64) string { + var b strings.Builder + b.Grow(len(nums) * 12) + for i, n := range nums { + if i > 0 { + b.WriteByte(',') + } + b.WriteString(strconv.FormatUint(n, 10)) + } + return b.String() +} + +func queryBlocksByBlockNumbers(chainId uint64, nums []uint64) ([]common.Block, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.blocks FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number", + strings.Join(defaultBlockFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Block](q) +} + +func queryTransactionsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Transaction, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.transactions FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index", + strings.Join(defaultTransactionFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Transaction](q) +} + +func queryLogsByBlockNumbers(chainId uint64, nums []uint64) ([]common.Log, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.logs FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, log_index", + strings.Join(defaultLogFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Log](q) +} + +func queryTracesByBlockNumbers(chainId uint64, nums []uint64) ([]common.Trace, error) { + if len(nums) == 0 { + return nil, nil + } + q := fmt.Sprintf( + "SELECT %s FROM %s.traces FINAL WHERE chain_id = %d AND block_number IN (%s) ORDER BY block_number, transaction_index", + strings.Join(defaultTraceFields, ", "), + config.Cfg.CommitterClickhouseDatabase, + chainId, + joinUint64sForIN(nums), + ) + return execQueryV2[common.Trace](q) +} + +// GetBlockDataFromClickHouseForBlockNumbers loads stored block data for specific block numbers using +// block_number IN (...). Callers that send very large lists should chunk requests (e.g. reorg-api batches by REORG_API_CLICKHOUSE_BATCH_SIZE). func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []uint64) ([]*common.BlockData, error) { if len(blockNumbers) == 0 { return nil, nil @@ -269,63 +333,75 @@ func GetBlockDataFromClickHouseForBlockNumbers(chainId uint64, blockNumbers []ui slices.Sort(nums) nums = slices.Compact(nums) - ranges := contiguousUint64Ranges(nums) - byNumber := make(map[uint64]*common.BlockData, len(nums)) - for _, r := range ranges { - start, end := r[0], r[1] - chunk, err := GetBlockDataFromClickHouseV2(chainId, start, end) - if err != nil { - return nil, fmt.Errorf("clickhouse range %d-%d: %w", start, end, err) + var blocks []common.Block + var txs []common.Transaction + var logs []common.Log + var traces []common.Trace + g := new(errgroup.Group) + g.Go(func() (err error) { + blocks, err = queryBlocksByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + txs, err = queryTransactionsByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + logs, err = queryLogsByBlockNumbers(chainId, nums) + return err + }) + g.Go(func() (err error) { + traces, err = queryTracesByBlockNumbers(chainId, nums) + return err + }) + if err := g.Wait(); err != nil { + return nil, err + } + + blocksByNum := make(map[uint64]common.Block, len(blocks)) + for _, b := range blocks { + if b.Number != nil { + blocksByNum[b.Number.Uint64()] = b } - for _, bd := range chunk { - if bd == nil || bd.Block.ChainId == nil || bd.Block.ChainId.Uint64() == 0 || bd.Block.Number == nil { - continue - } - bn := bd.Block.Number.Uint64() - byNumber[bn] = bd + } + txByNum := make(map[uint64][]common.Transaction) + for _, t := range txs { + if t.BlockNumber != nil { + bn := t.BlockNumber.Uint64() + txByNum[bn] = append(txByNum[bn], t) + } + } + logsByNum := make(map[uint64][]common.Log) + for _, l := range logs { + if l.BlockNumber != nil { + bn := l.BlockNumber.Uint64() + logsByNum[bn] = append(logsByNum[bn], l) + } + } + tracesByNum := make(map[uint64][]common.Trace) + for _, tr := range traces { + if tr.BlockNumber != nil { + bn := tr.BlockNumber.Uint64() + tracesByNum[bn] = append(tracesByNum[bn], tr) } } out := make([]*common.BlockData, 0, len(nums)) - var missing []uint64 for _, bn := range nums { - bd, ok := byNumber[bn] - if !ok || bd == nil { - missing = append(missing, bn) + b, ok := blocksByNum[bn] + if !ok || b.ChainId == nil || b.Number == nil || b.ChainId.Uint64() == 0 { continue } - out = append(out, bd) - } - if len(missing) > 0 { - log.Info(). - Uint64("chain_id", chainId). - Interface("missing_block_numbers", missing). - Int("found_blocks", len(out)). - Msg("manual reorg: no ClickHouse rows (FINAL) for some blocks; publishing RPC data as reorg inserts only for those (no delete tombstones)") + out = append(out, &common.BlockData{ + Block: b, + Transactions: txByNum[bn], + Logs: logsByNum[bn], + Traces: tracesByNum[bn], + }) } return out, nil } -func contiguousUint64Ranges(sorted []uint64) [][2]uint64 { - if len(sorted) == 0 { - return nil - } - var out [][2]uint64 - start := sorted[0] - end := sorted[0] - for i := 1; i < len(sorted); i++ { - if sorted[i] == end+1 { - end = sorted[i] - } else { - out = append(out, [2]uint64{start, end}) - start = sorted[i] - end = sorted[i] - } - } - out = append(out, [2]uint64{start, end}) - return out -} - // GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range, // where the stored transaction_count in the blocks table does not match the number // of transactions in the transactions table. It returns the minimum and maximum diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index bd5755b..b077631 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -9,6 +9,7 @@ import ( "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" ) @@ -87,13 +88,28 @@ func handlePublishReorg(c *gin.Context) { slices.Sort(sorted) sorted = slices.Compact(sorted) - // Old snapshot for tombstones: may be shorter than sorted when FINAL has no row for some - // heights (empty/partial tables). PublishBlockDataReorg still publishes RPC data as reorg inserts for every block. - oldData, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, sorted) - if err != nil { - log.Error().Err(err).Msg("manual reorg: clickhouse") - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) - return + batchSize := config.Cfg.ReorgAPIClickhouseBatchSize + if batchSize == 0 { + batchSize = 10 + } + var oldData []*common.BlockData + for i := 0; i < len(sorted); i += int(batchSize) { + end := min(i+int(batchSize), len(sorted)) + chunk := sorted[i:end] + chunkOld, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, chunk) + if err != nil { + log.Error().Err(err).Msg("manual reorg: clickhouse") + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + oldData = append(oldData, chunkOld...) + } + if len(oldData) < len(sorted) { + log.Info(). + Uint64("chain_id", req.ChainID). + Int("requested_blocks", len(sorted)). + Int("found_in_clickhouse", len(oldData)). + Msg("manual reorg: some blocks had no FINAL row in ClickHouse; delete tombstones only for loaded heights") } newData, err := libblockdata.FetchBlockDataFromRPC(sorted) From bdbd567ad3cf4d447747b2b7c5e49eb17cbc8fc5 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 22:20:24 +0545 Subject: [PATCH 05/10] minor change --- internal/reorgapi/server.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index b077631..55882b2 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -93,9 +93,22 @@ func handlePublishReorg(c *gin.Context) { batchSize = 10 } var oldData []*common.BlockData + totalBatches := (len(sorted) + int(batchSize) - 1) / int(batchSize) + batchIdx := 0 for i := 0; i < len(sorted); i += int(batchSize) { end := min(i+int(batchSize), len(sorted)) chunk := sorted[i:end] + batchIdx++ + rangeStart := chunk[0] + rangeEnd := chunk[len(chunk)-1] + log.Info(). + Uint64("chain_id", req.ChainID). + Int("batch", batchIdx). + Int("batch_total", totalBatches). + Uint64("block_range_start", rangeStart). + Uint64("block_range_end", rangeEnd). + Int("batch_block_count", len(chunk)). + Msg("manual reorg: loading ClickHouse snapshot for block range") chunkOld, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, chunk) if err != nil { log.Error().Err(err).Msg("manual reorg: clickhouse") From bfe962eb46b4aac764b77b929e4afacb40b04f31 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 22:56:23 +0545 Subject: [PATCH 06/10] batch fix --- internal/reorgapi/server.go | 64 +++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index 55882b2..12e29ed 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -9,7 +9,6 @@ import ( "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" ) @@ -92,8 +91,8 @@ func handlePublishReorg(c *gin.Context) { if batchSize == 0 { batchSize = 10 } - var oldData []*common.BlockData totalBatches := (len(sorted) + int(batchSize) - 1) / int(batchSize) + var anyPartialOld bool batchIdx := 0 for i := 0; i < len(sorted); i += int(batchSize) { end := min(i+int(batchSize), len(sorted)) @@ -108,50 +107,47 @@ func handlePublishReorg(c *gin.Context) { Uint64("block_range_start", rangeStart). Uint64("block_range_end", rangeEnd). Int("batch_block_count", len(chunk)). - Msg("manual reorg: loading ClickHouse snapshot for block range") + Msg("manual reorg: processing batch (ClickHouse → RPC → Kafka)") + chunkOld, err := libs.GetBlockDataFromClickHouseForBlockNumbers(req.ChainID, chunk) if err != nil { log.Error().Err(err).Msg("manual reorg: clickhouse") c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - oldData = append(oldData, chunkOld...) - } - if len(oldData) < len(sorted) { - log.Info(). - Uint64("chain_id", req.ChainID). - Int("requested_blocks", len(sorted)). - Int("found_in_clickhouse", len(oldData)). - Msg("manual reorg: some blocks had no FINAL row in ClickHouse; delete tombstones only for loaded heights") - } - - newData, err := libblockdata.FetchBlockDataFromRPC(sorted) - if err != nil { - log.Error().Err(err).Msg("manual reorg: rpc") - c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) - return - } + if len(chunkOld) < len(chunk) { + anyPartialOld = true + } - if len(newData) != len(sorted) { - c.JSON(http.StatusInternalServerError, gin.H{"error": "internal: rpc result length mismatch"}) - return - } - for i, bn := range sorted { - if newData[i] == nil || newData[i].Block.Number == nil || newData[i].Block.Number.Uint64() != bn { - c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("rpc block order mismatch at index %d", i)}) + chunkNew, err := libblockdata.FetchBlockDataFromRPC(chunk) + if err != nil { + log.Error().Err(err).Msg("manual reorg: rpc") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return } - } + if len(chunkNew) != len(chunk) { + c.JSON(http.StatusInternalServerError, gin.H{"error": "internal: rpc result length mismatch"}) + return + } + for j, bn := range chunk { + if chunkNew[j] == nil || chunkNew[j].Block.Number == nil || chunkNew[j].Block.Number.Uint64() != bn { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("rpc block order mismatch at index %d", j)}) + return + } + } - if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newData, oldData); err != nil { - log.Error().Err(err).Msg("manual reorg: kafka") - c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) - return + // PublishBlockDataReorg(new, old): tombstones for rows we had in CH, then inserts for full batch. + // Empty chunkOld → insert-only for this batch (no deletes). + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(chunkNew, chunkOld); err != nil { + log.Error().Err(err).Msg("manual reorg: kafka") + c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) + return + } } - msg := "published old (deleted) then new blocks with reorg headers" - if len(oldData) < len(sorted) { - msg = "published reorg inserts for all blocks; delete tombstones only for blocks found in ClickHouse (some requested heights had no FINAL row)" + msg := "published reorg batches (per batch: delete old from CH if present, then insert new from RPC)" + if anyPartialOld { + msg = "published reorg batches; at least one batch had fewer ClickHouse rows than requested (FINAL); Kafka inserts still sent for every block in those batches" } c.JSON(http.StatusOK, PublishReorgResponse{ OK: true, From a9f63645116ec252dcd5a451cba85fd6894c326a Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 23:00:02 +0545 Subject: [PATCH 07/10] api lock --- internal/reorgapi/server.go | 60 +++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 12 deletions(-) diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index 12e29ed..b278bf0 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -5,6 +5,7 @@ import ( "net/http" "slices" "strings" + "sync" "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" @@ -15,16 +16,21 @@ import ( // PublishReorgRequest is the JSON body for POST /v1/reorg/publish. type PublishReorgRequest struct { - ChainID uint64 `json:"chain_id"` - BlockNumbers []uint64 `json:"block_numbers"` + ChainID uint64 `json:"chain_id"` + BlockNumbers []uint64 `json:"block_numbers"` } type PublishReorgResponse struct { - OK bool `json:"ok"` - BlocksPublished int `json:"blocks_published"` - Message string `json:"message,omitempty"` + OK bool `json:"ok"` + BlocksPublished int `json:"blocks_published"` + Message string `json:"message,omitempty"` } +var ( + manualReorgMu sync.Mutex + lastPublishedMaxBlock uint64 // blocks <= this were already published successfully (in-process resume cursor) +) + // RunHTTPServer starts a blocking HTTP server that publishes manual reorg batches to Kafka. func RunHTTPServer() error { gin.SetMode(gin.ReleaseMode) @@ -83,20 +89,48 @@ func handlePublishReorg(c *gin.Context) { return } + if !manualReorgMu.TryLock() { + c.JSON(http.StatusConflict, gin.H{"error": "a manual reorg publish is already running"}) + return + } + defer manualReorgMu.Unlock() + sorted := slices.Clone(req.BlockNumbers) slices.Sort(sorted) sorted = slices.Compact(sorted) + work := make([]uint64, 0, len(sorted)) + for _, bn := range sorted { + if bn > lastPublishedMaxBlock { + work = append(work, bn) + } + } + if len(work) == 0 { + c.JSON(http.StatusOK, PublishReorgResponse{ + OK: true, + BlocksPublished: 0, + Message: fmt.Sprintf("nothing to publish: all requested blocks are at or below last published max (%d)", lastPublishedMaxBlock), + }) + return + } + if len(work) < len(sorted) { + log.Info(). + Uint64("last_published_max_block", lastPublishedMaxBlock). + Int("skipped", len(sorted)-len(work)). + Int("to_publish", len(work)). + Msg("manual reorg: skipping blocks already processed") + } + batchSize := config.Cfg.ReorgAPIClickhouseBatchSize if batchSize == 0 { batchSize = 10 } - totalBatches := (len(sorted) + int(batchSize) - 1) / int(batchSize) + totalBatches := (len(work) + int(batchSize) - 1) / int(batchSize) var anyPartialOld bool batchIdx := 0 - for i := 0; i < len(sorted); i += int(batchSize) { - end := min(i+int(batchSize), len(sorted)) - chunk := sorted[i:end] + for i := 0; i < len(work); i += int(batchSize) { + end := min(i+int(batchSize), len(work)) + chunk := work[i:end] batchIdx++ rangeStart := chunk[0] rangeEnd := chunk[len(chunk)-1] @@ -136,13 +170,15 @@ func handlePublishReorg(c *gin.Context) { } } - // PublishBlockDataReorg(new, old): tombstones for rows we had in CH, then inserts for full batch. - // Empty chunkOld → insert-only for this batch (no deletes). if err := libs.KafkaPublisherV2.PublishBlockDataReorg(chunkNew, chunkOld); err != nil { log.Error().Err(err).Msg("manual reorg: kafka") c.JSON(http.StatusBadGateway, gin.H{"error": err.Error()}) return } + + if rangeEnd > lastPublishedMaxBlock { + lastPublishedMaxBlock = rangeEnd + } } msg := "published reorg batches (per batch: delete old from CH if present, then insert new from RPC)" @@ -151,7 +187,7 @@ func handlePublishReorg(c *gin.Context) { } c.JSON(http.StatusOK, PublishReorgResponse{ OK: true, - BlocksPublished: len(sorted), + BlocksPublished: len(work), Message: msg, }) } From 293ec21659abd6d1b1377439e4496218afc01d92 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 9 Apr 2026 23:12:36 +0545 Subject: [PATCH 08/10] kafka producer id --- cmd/reorgapi.go | 2 +- configs/config.go | 9 +++++---- internal/libs/kafka.go | 13 +++++++++---- internal/storage/kafka_publisher.go | 8 ++++++-- 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/cmd/reorgapi.go b/cmd/reorgapi.go index 8bd35ef..b6b548c 100644 --- a/cmd/reorgapi.go +++ b/cmd/reorgapi.go @@ -25,7 +25,7 @@ Example: func runReorgAPI(cmd *cobra.Command, args []string) { libs.InitRPCClient() libs.InitNewClickHouseV2() - libs.InitKafkaV2() + libs.InitKafkaV2ForRole("reorg-api") log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api") diff --git a/configs/config.go b/configs/config.go index 62a9130..dac12c1 100644 --- a/configs/config.go +++ b/configs/config.go @@ -16,10 +16,11 @@ type LogConfig struct { } type KafkaConfig struct { - Brokers string `mapstructure:"brokers"` - Username string `mapstructure:"username"` - Password string `mapstructure:"password"` - EnableTLS bool `mapstructure:"enableTLS"` + Brokers string `mapstructure:"brokers"` + Username string `mapstructure:"username"` + Password string `mapstructure:"password"` + EnableTLS bool `mapstructure:"enableTLS"` + ProducerRole string } type RPCBatchRequestConfig struct { diff --git a/internal/libs/kafka.go b/internal/libs/kafka.go index ac43c0c..09933a6 100644 --- a/internal/libs/kafka.go +++ b/internal/libs/kafka.go @@ -9,12 +9,17 @@ import ( var KafkaPublisherV2 *storage.KafkaPublisher func InitKafkaV2() { + InitKafkaV2ForRole("committer") +} + +func InitKafkaV2ForRole(role string) { var err error KafkaPublisherV2, err = storage.NewKafkaPublisher(&config.KafkaConfig{ - Brokers: config.Cfg.CommitterKafkaBrokers, - Username: config.Cfg.CommitterKafkaUsername, - Password: config.Cfg.CommitterKafkaPassword, - EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + Brokers: config.Cfg.CommitterKafkaBrokers, + Username: config.Cfg.CommitterKafkaUsername, + Password: config.Cfg.CommitterKafkaPassword, + EnableTLS: config.Cfg.CommitterKafkaEnableTLS, + ProducerRole: role, }) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize Kafka publisher") diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 3dabbc1..92cb3e9 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -61,13 +61,17 @@ func (b PublishableMessageRevert) GetType() MessageType { func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { brokers := strings.Split(cfg.Brokers, ",") chainID := config.Cfg.RPC.ChainID + role := strings.TrimSpace(cfg.ProducerRole) + if role == "" { + role = "default" + } opts := []kgo.Opt{ kgo.SeedBrokers(brokers...), kgo.AllowAutoTopicCreation(), kgo.ProducerBatchCompression(kgo.ZstdCompression()), - kgo.ClientID(fmt.Sprintf("insight-indexer-kafka-storage-%s", chainID)), - kgo.TransactionalID(fmt.Sprintf("insight-producer-%s", chainID)), + kgo.ClientID(fmt.Sprintf("insight-indexer-kafka-storage-%s-%s", chainID, role)), + kgo.TransactionalID(fmt.Sprintf("insight-producer-%s-%s", chainID, role)), kgo.MaxBufferedBytes(2 * 1024 * 1024 * 1024), // 2GB kgo.MaxBufferedRecords(1_000_000), kgo.ProducerBatchMaxBytes(100 * 1024 * 1024), // 100MB From 7b407876d270f871dfaffff89e940d99a325a13c Mon Sep 17 00:00:00 2001 From: nischit Date: Fri, 10 Apr 2026 09:40:50 +0545 Subject: [PATCH 09/10] skip idempotency check --- cmd/reorgapi.go | 1 + internal/libs/redis.go | 24 ++++++++++++ internal/reorgapi/server.go | 75 +++++++++++++++++++++++++------------ 3 files changed, 76 insertions(+), 24 deletions(-) diff --git a/cmd/reorgapi.go b/cmd/reorgapi.go index b6b548c..f492e85 100644 --- a/cmd/reorgapi.go +++ b/cmd/reorgapi.go @@ -26,6 +26,7 @@ func runReorgAPI(cmd *cobra.Command, args []string) { libs.InitRPCClient() libs.InitNewClickHouseV2() libs.InitKafkaV2ForRole("reorg-api") + libs.InitRedis() log.Info().Str("chain_id", libs.ChainIdStr).Msg("starting reorg-api") diff --git a/internal/libs/redis.go b/internal/libs/redis.go index 6827282..967daae 100644 --- a/internal/libs/redis.go +++ b/internal/libs/redis.go @@ -13,6 +13,7 @@ import ( var RedisClient *redis.Client const RedisReorgLastValidBlock = "reorg_last_valid" +const RedisReorgAPIMaxProcessedBlock = "reorg_api_max_processed_block" // InitRedis initializes the Redis client func InitRedis() { @@ -51,6 +52,29 @@ func SetReorgLastValidBlock(chainID string, blockNumber int64) error { return RedisClient.HSet(context.Background(), RedisReorgLastValidBlock, chainID, blockNumber).Err() } +func GetReorgAPIMaxProcessedBlock(chainID string) (uint64, error) { + result, err := RedisClient.HGet(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID).Result() + if err == redis.Nil { + return 0, nil + } + if err != nil { + return 0, err + } + n, err := strconv.ParseUint(result, 10, 64) + if err != nil { + return 0, err + } + return n, nil +} + +func SetReorgAPIMaxProcessedBlock(chainID string, blockNumber uint64) error { + return RedisClient.HSet(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID, strconv.FormatUint(blockNumber, 10)).Err() +} + +func ClearReorgAPIMaxProcessedBlock(chainID string) error { + return RedisClient.HDel(context.Background(), RedisReorgAPIMaxProcessedBlock, chainID).Err() +} + // CloseRedis closes the Redis client func CloseRedis() { if RedisClient != nil { diff --git a/internal/reorgapi/server.go b/internal/reorgapi/server.go index b278bf0..37e8645 100644 --- a/internal/reorgapi/server.go +++ b/internal/reorgapi/server.go @@ -16,8 +16,9 @@ import ( // PublishReorgRequest is the JSON body for POST /v1/reorg/publish. type PublishReorgRequest struct { - ChainID uint64 `json:"chain_id"` - BlockNumbers []uint64 `json:"block_numbers"` + ChainID uint64 `json:"chain_id"` + BlockNumbers []uint64 `json:"block_numbers"` + SkipIdempotencyCheck bool `json:"skip_idempotency_check"` } type PublishReorgResponse struct { @@ -27,8 +28,7 @@ type PublishReorgResponse struct { } var ( - manualReorgMu sync.Mutex - lastPublishedMaxBlock uint64 // blocks <= this were already published successfully (in-process resume cursor) + manualReorgMu sync.Mutex ) // RunHTTPServer starts a blocking HTTP server that publishes manual reorg batches to Kafka. @@ -99,26 +99,48 @@ func handlePublishReorg(c *gin.Context) { slices.Sort(sorted) sorted = slices.Compact(sorted) - work := make([]uint64, 0, len(sorted)) - for _, bn := range sorted { - if bn > lastPublishedMaxBlock { - work = append(work, bn) - } - } - if len(work) == 0 { - c.JSON(http.StatusOK, PublishReorgResponse{ - OK: true, - BlocksPublished: 0, - Message: fmt.Sprintf("nothing to publish: all requested blocks are at or below last published max (%d)", lastPublishedMaxBlock), - }) - return - } - if len(work) < len(sorted) { + var lastPublishedMaxBlock uint64 + var work []uint64 + if req.SkipIdempotencyCheck { + work = sorted log.Info(). - Uint64("last_published_max_block", lastPublishedMaxBlock). - Int("skipped", len(sorted)-len(work)). - Int("to_publish", len(work)). - Msg("manual reorg: skipping blocks already processed") + Uint64("chain_id", req.ChainID). + Int("requested", len(sorted)). + Msg("manual reorg: skip_idempotency_check enabled; processing all requested blocks") + } else { + if libs.RedisClient == nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "redis is not initialized"}) + return + } + var err error + lastPublishedMaxBlock, err = libs.GetReorgAPIMaxProcessedBlock(libs.ChainIdStr) + if err != nil { + log.Error().Err(err).Msg("manual reorg: redis get max processed block") + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read idempotency cursor from redis"}) + return + } + + work = make([]uint64, 0, len(sorted)) + for _, bn := range sorted { + if bn > lastPublishedMaxBlock { + work = append(work, bn) + } + } + if len(work) == 0 { + c.JSON(http.StatusOK, PublishReorgResponse{ + OK: true, + BlocksPublished: 0, + Message: fmt.Sprintf("nothing to publish: all requested blocks are at or below last published max (%d)", lastPublishedMaxBlock), + }) + return + } + if len(work) < len(sorted) { + log.Info(). + Uint64("last_published_max_block", lastPublishedMaxBlock). + Int("skipped", len(sorted)-len(work)). + Int("to_publish", len(work)). + Msg("manual reorg: skipping blocks already processed") + } } batchSize := config.Cfg.ReorgAPIClickhouseBatchSize @@ -176,8 +198,13 @@ func handlePublishReorg(c *gin.Context) { return } - if rangeEnd > lastPublishedMaxBlock { + if !req.SkipIdempotencyCheck && rangeEnd > lastPublishedMaxBlock { lastPublishedMaxBlock = rangeEnd + if err := libs.SetReorgAPIMaxProcessedBlock(libs.ChainIdStr, lastPublishedMaxBlock); err != nil { + log.Error().Err(err).Msg("manual reorg: redis set max processed block") + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update idempotency cursor in redis"}) + return + } } } From 7b32a4b560e1de18a98e3579f2a481dde1fa4b7a Mon Sep 17 00:00:00 2001 From: nischit Date: Fri, 10 Apr 2026 10:05:33 +0545 Subject: [PATCH 10/10] dont touch committer flow --- internal/libs/clickhouse.go | 57 ++++++------ internal/libs/libblockdata/getblockdata.go | 100 +++++++++++++++++++-- 2 files changed, 122 insertions(+), 35 deletions(-) diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index f7f6122..21c5fef 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -8,6 +8,7 @@ import ( "slices" "strconv" "strings" + "sync" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" @@ -211,38 +212,34 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl logsRaw := make([][]common.Log, length) tracesRaw := make([][]common.Trace, length) - g := new(errgroup.Group) - g.Go(func() (err error) { - blocksRaw, err = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) - return err - }) - g.Go(func() (err error) { - transactionsRaw, err = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber) - return err - }) - g.Go(func() (err error) { - logsRaw, err = getLogsFromV2(chainId, startBlockNumber, endBlockNumber) - return err - }) - g.Go(func() (err error) { - tracesRaw, err = getTracesFromV2(chainId, startBlockNumber, endBlockNumber) - return err - }) - if err := g.Wait(); err != nil { - return nil, err - } + wg := sync.WaitGroup{} + wg.Add(4) + go func() { + defer wg.Done() + blocksRaw, _ = getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + }() + + go func() { + defer wg.Done() + transactionsRaw, _ = getTransactionsFromV2(chainId, startBlockNumber, endBlockNumber) + }() + + go func() { + defer wg.Done() + logsRaw, _ = getLogsFromV2(chainId, startBlockNumber, endBlockNumber) + }() + + go func() { + defer wg.Done() + tracesRaw, _ = getTracesFromV2(chainId, startBlockNumber, endBlockNumber) + }() + wg.Wait() for i := range blockData { - b := blocksRaw[i] - if b.Number == nil { - // No row for this index in the dense [start,end] range (gap vs FINAL). - continue - } - if b.ChainId == nil || b.ChainId.Uint64() == 0 { - log.Warn(). - Uint64("chainId", chainId). - Uint64("blockNumber", b.Number.Uint64()). - Msg("skipping block: chain_id missing or zero on ClickHouse row") + if blocksRaw[i].ChainId == nil || blocksRaw[i].ChainId.Uint64() == 0 { + log.Info(). + Any("chainId", blocksRaw[i].ChainId). + Msg("skipping block because chainId is nil") continue } blockData[i] = &common.BlockData{ diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index e4c712d..541d13b 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -185,10 +185,89 @@ func GetValidBlockDataFromRpc(blockNumbers []uint64) []*common.BlockData { } func getValidBlockDataFromRpcBatch(blockNumbers []uint64) []*common.BlockData { - blockData, err := fetchBlockDataFromRpcBatch(blockNumbers) - if err != nil { - log.Panic().Err(err).Msg("Failed to fetch block data from RPC") + var rpcResults []rpc.GetFullBlockResult + var fetchErr error + chainIdStr := libs.ChainIdStr + indexerName := config.Cfg.ZeetProjectName + + // Initial fetch + rpcResults = libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(blockNumbers)) + + metrics.CommitterRPCRowsToFetch.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockNumbers))) + + // Create array of failed block numbers for retry + failedBlockNumbers := make([]uint64, 0) + for i, result := range rpcResults { + if result.Error != nil { + log.Error().Uint64("block_number", blockNumbers[i]).Err(result.Error).Msg("Failed to fetch block data from RPC") + failedBlockNumbers = append(failedBlockNumbers, blockNumbers[i]) + } + } + + // Retry only failed blocks up to 3 times + for retry := range 3 { + if len(failedBlockNumbers) == 0 { + break // All blocks succeeded + } + + // Track retry metric + metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Set(float64(len(failedBlockNumbers))) + + log.Warn(). + Int("retry", retry+1). + Int("failed_count", len(failedBlockNumbers)). + Msg("Retrying failed block fetches...") + + // Retry only the failed blocks + retryResults := libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(failedBlockNumbers)) + + // Update rpcResults with successful ones and create new failed array + newFailedBlockNumbers := make([]uint64, 0) + retryIndex := 0 + + for i, result := range rpcResults { + if result.Error != nil { + // This was a failed block, check if retry succeeded + if retryIndex < len(retryResults) && retryResults[retryIndex].Error == nil { + // Retry succeeded - update the result + rpcResults[i] = retryResults[retryIndex] + } else { + // Still failed - add to new failed array + newFailedBlockNumbers = append(newFailedBlockNumbers, blockNumbers[i]) + } + retryIndex++ + } + } + + failedBlockNumbers = newFailedBlockNumbers + + // Add delay between retries + if len(failedBlockNumbers) > 0 && retry < 2 { + time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) + } } + + // Check if any blocks still failed after all retries + if len(failedBlockNumbers) > 0 { + fetchErr = fmt.Errorf("failed to fetch %d block(s) from RPC after 3 retries", len(failedBlockNumbers)) + } + + if fetchErr != nil { + log.Panic().Err(fetchErr).Msg("Failed to fetch block data from RPC") + } + + blockData := make([]*common.BlockData, len(rpcResults)) + for i, result := range rpcResults { + blockData[i] = &result.Data + rpcResults[i] = rpc.GetFullBlockResult{} // free memory + } + + for i, block := range blockData { + if isValid, _ := Validate(block); !isValid { + log.Panic().Int("index", i).Msg("Failed to validate block data from rpc") + } + } + return blockData } @@ -198,10 +277,12 @@ func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, err chainIdStr := libs.ChainIdStr indexerName := config.Cfg.ZeetProjectName + // Initial fetch rpcResults = libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(blockNumbers)) metrics.CommitterRPCRowsToFetch.WithLabelValues(indexerName, chainIdStr).Set(float64(len(blockNumbers))) + // Create array of failed block numbers for retry failedBlockNumbers := make([]uint64, 0) for i, result := range rpcResults { if result.Error != nil { @@ -210,11 +291,13 @@ func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, err } } + // Retry only failed blocks up to 3 times for retry := range 3 { if len(failedBlockNumbers) == 0 { - break + break // All blocks succeeded } + // Track retry metric metrics.CommitterRPCRetries.WithLabelValues(indexerName, chainIdStr).Set(float64(len(failedBlockNumbers))) log.Warn(). @@ -222,16 +305,21 @@ func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, err Int("failed_count", len(failedBlockNumbers)). Msg("Retrying failed block fetches...") + // Retry only the failed blocks retryResults := libs.RpcClient.GetFullBlocks(context.Background(), blockNumbersToBigInt(failedBlockNumbers)) + // Update rpcResults with successful ones and create new failed array newFailedBlockNumbers := make([]uint64, 0) retryIndex := 0 for i, result := range rpcResults { if result.Error != nil { + // This was a failed block, check if retry succeeded if retryIndex < len(retryResults) && retryResults[retryIndex].Error == nil { + // Retry succeeded - update the result rpcResults[i] = retryResults[retryIndex] } else { + // Still failed - add to new failed array newFailedBlockNumbers = append(newFailedBlockNumbers, blockNumbers[i]) } retryIndex++ @@ -240,11 +328,13 @@ func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, err failedBlockNumbers = newFailedBlockNumbers + // Add delay between retries if len(failedBlockNumbers) > 0 && retry < 2 { time.Sleep(time.Duration(retry+1) * 100 * time.Millisecond) } } + // Check if any blocks still failed after all retries if len(failedBlockNumbers) > 0 { return nil, fmt.Errorf("failed to fetch %d block(s) from RPC after 3 retries", len(failedBlockNumbers)) } @@ -252,7 +342,7 @@ func fetchBlockDataFromRpcBatch(blockNumbers []uint64) ([]*common.BlockData, err blockData := make([]*common.BlockData, len(rpcResults)) for i, result := range rpcResults { blockData[i] = &result.Data - rpcResults[i] = rpc.GetFullBlockResult{} + rpcResults[i] = rpc.GetFullBlockResult{} // free memory } for i, block := range blockData {