diff --git a/cmd/committer.go b/cmd/committer.go index 989a700..eef0bba 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -30,5 +30,8 @@ func RunCommitter(cmd *cobra.Command, args []string) { }() committer.Init() + committer.InitReorg() + + go committer.RunReorgValidator() committer.CommitStreaming() } diff --git a/configs/config.go b/configs/config.go index 691fb47..da95539 100644 --- a/configs/config.go +++ b/configs/config.go @@ -302,6 +302,10 @@ type Config struct { ParquetMaxFileSizeMB int64 `env:"PARQUET_MAX_FILE_SIZE_MB" envDefault:"512"` InsightServiceUrl string `env:"INSIGHT_SERVICE_URL" envDefault:"https://insight.thirdweb.com"` InsightServiceApiKey string `env:"INSIGHT_SERVICE_API_KEY"` + RedisAddr string `env:"REDIS_ADDR" envDefault:"localhost:6379"` + RedisUsername string `env:"REDIS_USERNAME"` + RedisPassword string `env:"REDIS_PASSWORD"` + RedisDB int `env:"REDIS_DB" envDefault:"0"` } var Cfg Config diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 0c0538f..a72c4ac 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -6,7 +6,6 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/common" "github.com/thirdweb-dev/indexer/internal/libs" "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" "github.com/thirdweb-dev/indexer/internal/metrics" @@ -59,13 +58,7 @@ func pollLatest() error { Uint64("end_block", expectedBlockNumber-1). Msg("All blocks validated successfully. Publishing blocks to Kafka") - // Convert slice of BlockData to slice of *BlockData for Kafka publisher - blockDataPointers := make([]*common.BlockData, len(blockDataArray)) - for i, block := range blockDataArray { - blockDataPointers[i] = &block - } - - if err := libs.KafkaPublisherV2.PublishBlockData(blockDataPointers); err != nil { + if err := libs.KafkaPublisherV2.PublishBlockData(blockDataArray); err != nil { log.Panic(). Err(err). Int("blocks_count", len(blockDataArray)). diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go new file mode 100644 index 0000000..2a03e01 --- /dev/null +++ b/internal/committer/reorg.go @@ -0,0 +1,182 @@ +package committer + +import ( + "fmt" + "time" + + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" + "github.com/thirdweb-dev/indexer/internal/libs" + "github.com/thirdweb-dev/indexer/internal/libs/libblockdata" + "github.com/thirdweb-dev/indexer/internal/metrics" +) + +func InitReorg() { + libs.InitRedis() +} + +func RunReorgValidator() { + lastBlockCheck := int64(0) + for { + startBlock, endBlock, err := getReorgRange() + if err != nil { + log.Error().Err(err).Msg("Failed to get reorg range") + time.Sleep(2 * time.Second) + continue + } + + if endBlock == lastBlockCheck || endBlock-startBlock < 5 { + log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.") + time.Sleep(1 * time.Minute) + continue + } + + // Detect reorgs and handle them + err = detectAndHandleReorgs(startBlock, endBlock) + if err != nil { + log.Error().Err(err).Msg("Failed to detect and handle reorgs") + time.Sleep(2 * time.Second) + continue + } + lastBlockCheck = endBlock + } +} + +func getReorgRange() (int64, int64, error) { + lastValidBlock, err := getLastValidBlock() + if err != nil { + return 0, 0, fmt.Errorf("failed to get last valid block: %w", err) + } + + startBlock := max(lastValidBlock-1, 1) + endBlock, err := libs.GetMaxBlockNumberFromClickHouseV2(libs.ChainId) + if err != nil { + return 0, 0, fmt.Errorf("failed to get max block number: %w", err) + } + + endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety + + if startBlock >= endBlock { + return 0, 0, fmt.Errorf("start block is greater than end block") + } + + return startBlock, endBlock, nil +} + +func getLastValidBlock() (int64, error) { + // Try to get last reorg checked block number + lastReorgBlock, err := libs.GetReorgLastValidBlock(libs.ChainIdStr) + if err != nil { + return 0, fmt.Errorf("failed to get last reorg checked block: %w", err) + } + + if lastReorgBlock > 0 { + return lastReorgBlock, nil + } + + // get block number 1 day ago + lastValidBlock, err := libs.GetBlockNumberFromClickHouseV2DaysAgo(libs.ChainId, 1) + if err != nil { + return 0, fmt.Errorf("failed to get block number 1 day ago: %w", err) + } + + return lastValidBlock, nil +} + +func detectAndHandleReorgs(startBlock int64, endBlock int64) error { + log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) + + // Fetch block headers for the range + blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + if err != nil { + return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) + } + + if len(blockHeaders) == 0 { + log.Debug().Msg("detectAndHandleReorgs: No block headers found in range") + return nil + } + + // finding the reorg start and end block + reorgStartBlock := int64(-1) + reorgEndBlock := int64(-1) + for i := 1; i < len(blockHeaders); i++ { + if blockHeaders[i].Number.Int64() != blockHeaders[i-1].Number.Int64()+1 { + // non-sequential block numbers + reorgStartBlock = blockHeaders[i-1].Number.Int64() + reorgEndBlock = blockHeaders[i].Number.Int64() + break + } + if blockHeaders[i].ParentHash != blockHeaders[i-1].Hash { + // hash mismatch start + if reorgStartBlock == -1 { + reorgStartBlock = blockHeaders[i-1].Number.Int64() + } + continue + } else { + // hash matches end + if reorgStartBlock != -1 { + reorgEndBlock = blockHeaders[i].Number.Int64() + break + } + } + } + + // set end to the last block if not set + if reorgEndBlock == -1 { + reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() + } + + if reorgStartBlock > -1 { + if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { + return err + } + } + + // update last valid block. if there was no reorg, this will update to the last block + libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock) + + return nil +} + +func handleReorgForRange(startBlock uint64, endBlock uint64) error { + // nothing to do + if startBlock == 0 { + return nil + } + + // will panic if any block is invalid + newblockDataArray := libblockdata.GetValidBlockDataInBatch(endBlock, startBlock) + expectedBlockNumber := startBlock + for i, blockData := range newblockDataArray { + if blockData.Block.Number.Uint64() != expectedBlockNumber { + log.Error(). + Int("index", i). + Uint64("expected_block", expectedBlockNumber). + Uint64("actual_block", blockData.Block.Number.Uint64()). + Msg("Reorg: Block sequence mismatch - missing or out of order block") + + return fmt.Errorf("reorg: block sequence mismatch - missing or out of order block") + } + expectedBlockNumber++ + } + + oldblockDataArray, err := libs.GetBlockDataFromClickHouseV2(libs.ChainId.Uint64(), startBlock, endBlock) + if err != nil { + return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err) + } + + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil { + log.Error(). + Err(err). + Int("blocks_count", len(newblockDataArray)). + Msg("Reorg: Failed to publish blocks to Kafka") + return fmt.Errorf("reorg: failed to publish blocks to kafka") + } + + for _, blockData := range newblockDataArray { + metrics.CommitterLastPublishedReorgBlockNumber.WithLabelValues(config.Cfg.ZeetProjectName, libs.ChainIdStr).Set(float64(blockData.Block.Number.Uint64())) + } + + return nil +} diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 8eb2e5d..63afd8f 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -89,6 +89,32 @@ func initClickhouse(host string, port int, username string, password string, dat return clickhouseConn } +func GetBlockNumberFromClickHouseV2DaysAgo(chainId *big.Int, daysAgo int) (int64, error) { + query := fmt.Sprintf(`SELECT toString(max(block_number)) + FROM default.blocks WHERE chain_id = %d AND block_timestamp <= now() - INTERVAL %d DAY ;`, chainId.Uint64(), daysAgo) + rows, err := ClickhouseConnV2.Query(context.Background(), query) + if err != nil { + return -1, err + } + defer rows.Close() + + if !rows.Next() { + return -1, nil + } + + var blockNumberStr string + if err := rows.Scan(&blockNumberStr); err != nil { + return -1, err + } + + blockNumber, err := strconv.ParseInt(blockNumberStr, 10, 64) + if err != nil { + return -1, fmt.Errorf("failed to parse block number: %s", blockNumberStr) + } + + return blockNumber, nil +} + func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) { // Use toString() to convert UInt256 to string, then parse to int64 query := fmt.Sprintf("SELECT toString(max(block_number)) FROM blocks WHERE chain_id = %d HAVING count() > 0", chainId.Uint64()) @@ -115,6 +141,55 @@ func GetMaxBlockNumberFromClickHouseV2(chainId *big.Int) (int64, error) { return maxBlockNumber, nil } +func GetBlockReorgDataFromClickHouseV2(chainId *big.Int, startBlockNumber int64, endBlockNumber int64) ([]*common.Block, error) { + query := fmt.Sprintf(`SELECT block_number, hash, parent_hash + FROM default.blocks WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number`, chainId.Uint64(), startBlockNumber, endBlockNumber) + rows, err := ClickhouseConnV2.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + blocks := make([]*common.Block, 0) + for rows.Next() { + var block common.Block + err := rows.Scan(&block.Number, &block.Hash, &block.ParentHash) + if err != nil { + return nil, err + } + blocks = append(blocks, &block) + } + return blocks, nil +} + +func GetBlockHeadersForReorgCheck(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.Block, error) { + sb := startBlockNumber + length := endBlockNumber - startBlockNumber + 1 + blocksRaw := make([]*common.Block, length) + + query := fmt.Sprintf("SELECT block_number, hash, parent_hash FROM %s.blocks FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d order by block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + blocks, err := execQueryV2[common.Block](query) + if err != nil { + return blocksRaw, err + } + + // just to make sure the blocks are in the correct order + for _, block := range blocks { + idx := block.Number.Uint64() - sb + if idx >= length { + log.Error().Msgf("Block number %s is out of range", block.Number.String()) + continue + } + blocksRaw[idx] = &block + } + return blocksRaw, nil +} + func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]*common.BlockData, error) { length := endBlockNumber - startBlockNumber + 1 diff --git a/internal/libs/libblockdata/getblockdata.go b/internal/libs/libblockdata/getblockdata.go index a28df9c..81811ba 100644 --- a/internal/libs/libblockdata/getblockdata.go +++ b/internal/libs/libblockdata/getblockdata.go @@ -15,13 +15,13 @@ import ( "github.com/thirdweb-dev/indexer/internal/rpc" ) -func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []common.BlockData { +func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) []*common.BlockData { rpcNumParallelCalls := config.Cfg.RPCNumParallelCalls rpcBatchSize := config.Cfg.RPCBatchSize maxBlocksPerFetch := rpcBatchSize * rpcNumParallelCalls // Calculate the range of blocks to fetch - blocksToFetch := latestBlock - nextCommitBlockNumber + blocksToFetch := latestBlock - nextCommitBlockNumber + 1 if blocksToFetch > maxBlocksPerFetch { blocksToFetch = maxBlocksPerFetch } @@ -35,7 +35,7 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) Msg("Starting to fetch latest blocks") // Precreate array of block data - blockDataArray := make([]common.BlockData, blocksToFetch) + blockDataArray := make([]*common.BlockData, blocksToFetch) // Create batches and calculate number of parallel calls needed numBatches := min((blocksToFetch+rpcBatchSize-1)/rpcBatchSize, rpcNumParallelCalls) @@ -74,8 +74,8 @@ func GetValidBlockDataInBatch(latestBlock uint64, nextCommitBlockNumber uint64) for i, bd := range batchResults { arrayIndex := batchIdx*rpcBatchSize + uint64(i) if arrayIndex < uint64(len(blockDataArray)) { - blockDataArray[arrayIndex] = *bd // todo: update to use pointer, kafka is using normal block data - batchResults[i] = nil // free memory + blockDataArray[arrayIndex] = bd + batchResults[i] = nil // free memory } } mu.Unlock() diff --git a/internal/libs/redis.go b/internal/libs/redis.go new file mode 100644 index 0000000..6827282 --- /dev/null +++ b/internal/libs/redis.go @@ -0,0 +1,59 @@ +package libs + +import ( + "context" + "crypto/tls" + "strconv" + + "github.com/redis/go-redis/v9" + "github.com/rs/zerolog/log" + config "github.com/thirdweb-dev/indexer/configs" +) + +var RedisClient *redis.Client + +const RedisReorgLastValidBlock = "reorg_last_valid" + +// InitRedis initializes the Redis client +func InitRedis() { + RedisClient = redis.NewClient(&redis.Options{ + Addr: config.Cfg.RedisAddr, + Username: config.Cfg.RedisUsername, + Password: config.Cfg.RedisPassword, + DB: config.Cfg.RedisDB, + TLSConfig: &tls.Config{ + MinVersion: tls.VersionTLS12, + }, + }) + + // Test the connection + ctx := context.Background() + _, err := RedisClient.Ping(ctx).Result() + if err != nil { + log.Panic().Err(err).Msg("Failed to connect to Redis") + } + + log.Info().Msg("Redis client initialized successfully") +} + +func GetReorgLastValidBlock(chainID string) (int64, error) { + result, err := RedisClient.HGet(context.Background(), RedisReorgLastValidBlock, chainID).Result() + if err == redis.Nil { + return 0, nil + } + if err != nil { + return 0, err + } + return strconv.ParseInt(result, 10, 64) +} + +func SetReorgLastValidBlock(chainID string, blockNumber int64) error { + return RedisClient.HSet(context.Background(), RedisReorgLastValidBlock, chainID, blockNumber).Err() +} + +// CloseRedis closes the Redis client +func CloseRedis() { + if RedisClient != nil { + RedisClient.Close() + } +} diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index e7fdf15..a751e6a 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -256,6 +256,11 @@ var ( Help: "The last published block number to Kafka", }, []string{"project_name", "chain_id"}) + CommitterLastPublishedReorgBlockNumber = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "committer_last_published_reorg_block_number", + Help: "The last published reorg block number to Kafka", + }, []string{"project_name", "chain_id"}) + CommitterRPCRowsToFetch = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: "committer_rpc_rows_to_fetch", Help: "The total number of rows to fetch from RPC", diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 83b4ac0..e1a86bf 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -112,7 +112,19 @@ func NewKafkaPublisher(cfg *config.KafkaConfig) (*KafkaPublisher, error) { } func (p *KafkaPublisher) PublishBlockData(blockData []*common.BlockData) error { - return p.publishBlockData(blockData, false) + return p.publishBlockData(blockData, false, false) +} + +func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, oldBlockData []*common.BlockData) error { + if err := p.publishBlockData(oldBlockData, true, true); err != nil { + return fmt.Errorf("failed to publish old block data: %v", err) + } + + if err := p.publishBlockData(newBlockData, false, true); err != nil { + return fmt.Errorf("failed to publish new block data: %v", err) + } + + return nil } func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { @@ -123,11 +135,11 @@ func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*co return fmt.Errorf("failed to revert: %v", err) } - if err := p.publishBlockData(oldData, true); err != nil { + if err := p.publishBlockData(oldData, true, true); err != nil { return fmt.Errorf("failed to publish old block data: %v", err) } - if err := p.publishBlockData(newData, false); err != nil { + if err := p.publishBlockData(newData, false, true); err != nil { return fmt.Errorf("failed to publish new block data: %v", err) } return nil @@ -228,7 +240,7 @@ func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) return nil } -func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool) error { +func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool, isReorg bool) error { if len(blockData) == 0 { return nil } @@ -240,7 +252,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet for i, data := range blockData { // Block message - if blockMsg, err := p.createBlockDataMessage(data, isDeleted); err == nil { + if blockMsg, err := p.createBlockDataMessage(data, isDeleted, isReorg); err == nil { blockMessages[i] = blockMsg } else { return fmt.Errorf("failed to create block message: %v", err) @@ -255,7 +267,7 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet return nil } -func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDeleted bool) (*kgo.Record, error) { +func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDeleted bool, isReorg bool) (*kgo.Record, error) { timestamp := time.Now() data := PublishableMessageBlockData{ @@ -279,7 +291,7 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, msgJson) + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) } func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) { @@ -303,10 +315,10 @@ func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber ui return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, msgJson) + return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, false, msgJson) } -func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, msgJson []byte) (*kgo.Record, error) { +func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isDeleted bool, isReorg bool, msgJson []byte) (*kgo.Record, error) { compressionThreshold := config.Cfg.CommitterCompressionThresholdMB * 1024 * 1024 var value []byte @@ -328,8 +340,10 @@ func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, block // Create headers with metadata headers := []kgo.RecordHeader{ - {Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))}, - {Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))}, + {Key: "chain_id", Value: []byte(fmt.Sprintf("%d", chainId))}, // order is important. always 0 + {Key: "block_number", Value: []byte(fmt.Sprintf("%d", blockNumber))}, // order is important. always 1 + {Key: "is_reorg", Value: []byte(fmt.Sprintf("%t", isReorg))}, // order is important. always 2 + {Key: "is_deleted", Value: []byte(fmt.Sprintf("%t", isDeleted))}, // order is important. always 3 {Key: "type", Value: []byte(fmt.Sprintf("%s", msgType))}, {Key: "timestamp", Value: []byte(timestamp.Format(time.RFC3339Nano))}, {Key: "schema_version", Value: []byte("1")},