Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ mysql1
meili_data
postgres
mysql
MIGRATION_STATUS.md
RPC_MANAGER_GUIDE.md

Binary file added build/dispute-explorer
Binary file not shown.
15 changes: 15 additions & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package handler

import (
"time"

"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/rpc"
)

func Run(ctx *svc.ServiceContext) {
// Start RPC monitoring
go startRPCMonitoring(ctx)

// query last block number
go LatestBlackNumber(ctx)
// sync blocks
Expand All @@ -20,3 +26,12 @@ func Run(ctx *svc.ServiceContext) {
// sync claim len
go SyncClaimDataLen(ctx)
}

// startRPCMonitoring starts RPC monitoring (internal function)
func startRPCMonitoring(ctx *svc.ServiceContext) {
// Create monitor, output statistics every 30 seconds
monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second)

// Start monitoring
monitor.Start(ctx.Context)
}
7 changes: 4 additions & 3 deletions internal/handler/latestBlockNumber.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ import (

func LatestBlackNumber(ctx *svc.ServiceContext) {
for {
latest, err := ctx.L1RPC.BlockNumber(context.Background())
// use unified RPC manager to get latest block number (automatically applies rate limiting)
latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true) // true indicates L1
if err != nil {
log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err))
log.Errorf("[Handler.LatestBlackNumber] Get latest block number error (with rate limit): %s\n", errors.WithStack(err))
time.Sleep(12 * time.Second)
continue
}

ctx.LatestBlockNumber = cast.ToInt64(latest)
log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest)
log.Infof("[Handler.LatestBlackNumber] Latest block number: %d (via RPC Manager)\n", latest)
time.Sleep(12 * time.Second)
}
}
17 changes: 9 additions & 8 deletions internal/handler/logFilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ func LogFilter(ctx *svc.ServiceContext, block schema.SyncBlock, addresses []comm
Topics: topics,
Addresses: addresses,
}
logs, err := ctx.L1RPC.FilterLogs(context.Background(), query)
// use unified RPC manager to filter logs (automatically applies rate limiting)
logs, err := ctx.RPCManager.FilterLogs(context.Background(), query, true) // true indicates L1
if err != nil {
return nil, errors.WithStack(err)
}
log.Infof("[CancelOrder.Handle] Cancel Pending List Length is %d ,block number is %d \n", len(logs), block.BlockNumber)
log.Infof("[LogFilter] Event logs length is %d, block number is %d (via RPC Manager)\n", len(logs), block.BlockNumber)
return LogsToEvents(ctx, logs, block.ID)
}

Expand All @@ -49,22 +50,22 @@ func LogsToEvents(ctx *svc.ServiceContext, logs []types.Log, syncBlockID int64)
blockNumber := cast.ToInt64(vlog.BlockNumber)
log.Infof("[LogsToEvents] Fetching block info for block number: %d, txHash: %s", blockNumber, vlog.TxHash.Hex())

// Try to get block using L1RPC client first
block, err := ctx.L1RPC.BlockByNumber(context.Background(), big.NewInt(blockNumber))
// Use unified RPC manager to get block (automatically applies rate limiting)
block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1
if err != nil {
log.Errorf("[LogsToEvents] BlockByNumber failed for block %d, txHash: %s, error: %s", blockNumber, vlog.TxHash.Hex(), err.Error())
log.Errorf("[LogsToEvents] GetBlockByNumber failed for block %d, txHash: %s, error: %s (via RPC Manager)", blockNumber, vlog.TxHash.Hex(), err.Error())

// If error contains "transaction type not supported", try alternative approach
if strings.Contains(err.Error(), "transaction type not supported") {
log.Infof("[LogsToEvents] Attempting to get block timestamp using header only for block %d", blockNumber)
header, headerErr := ctx.L1RPC.HeaderByNumber(context.Background(), big.NewInt(blockNumber))
header, headerErr := ctx.RPCManager.HeaderByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1
if headerErr != nil {
log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s", blockNumber, headerErr.Error())
log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s (via RPC Manager)", blockNumber, headerErr.Error())
return nil, errors.WithStack(err)
}
blockTime = cast.ToInt64(header.Time)
blockTimes[blockNumber] = blockTime
log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header", blockTime, blockNumber)
log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header (via RPC Manager)", blockTime, blockNumber)
} else {
return nil, errors.WithStack(err)
}
Expand Down
118 changes: 118 additions & 0 deletions internal/handler/rpc_manager_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package handler

import (
"context"
"math/big"
"time"

"github.com/optimism-java/dispute-explorer/internal/svc"
"github.com/optimism-java/dispute-explorer/pkg/log"
"github.com/optimism-java/dispute-explorer/pkg/rpc"
"github.com/pkg/errors"
"github.com/spf13/cast"
)

// LatestBlockNumberWithRateLimit uses unified RPC manager for latest block number retrieval
func LatestBlockNumberWithRateLimit(ctx *svc.ServiceContext) {
for {
// use unified RPC manager to get L1 latest block number (with rate limiting)
latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true)
if err != nil {
log.Errorf("[Handler.LatestBlockNumberWithRateLimit] Get latest block number error: %s\n", errors.WithStack(err))
time.Sleep(12 * time.Second)
continue
}

ctx.LatestBlockNumber = cast.ToInt64(latest)
log.Infof("[Handler.LatestBlockNumberWithRateLimit] Latest block number: %d (using RPC Manager)\n", latest)
time.Sleep(12 * time.Second)
}
}

// SyncBlockWithRateLimit uses unified RPC manager for block synchronization
func SyncBlockWithRateLimit(ctx *svc.ServiceContext) {
for {
// Check pending block count
// ... existing check logic ...

syncingBlockNumber := ctx.SyncedBlockNumber + 1
log.Infof("[Handler.SyncBlockWithRateLimit] Try to sync block number: %d using RPC Manager\n", syncingBlockNumber)

if syncingBlockNumber > ctx.LatestBlockNumber {
time.Sleep(3 * time.Second)
continue
}

// Use unified RPC manager to get block (automatically handles rate limiting)
block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(syncingBlockNumber), true)
if err != nil {
log.Errorf("[Handler.SyncBlockWithRateLimit] Get block by number error: %s\n", errors.WithStack(err))
time.Sleep(3 * time.Second)
continue
}

log.Infof("[Handler.SyncBlockWithRateLimit] Got block number: %d, hash: %v, parent hash: %v\n",
block.Number().Int64(), block.Hash().Hex(), block.ParentHash().Hex())

// Verify parent hash
if block.ParentHash() != ctx.SyncedBlockHash {
log.Errorf("[Handler.SyncBlockWithRateLimit] ParentHash mismatch: expected %s, got %s\n",
ctx.SyncedBlockHash.Hex(), block.ParentHash().Hex())
// Can call rollback logic here
time.Sleep(3 * time.Second)
continue
}

// Save block to database
// ... existing database save logic ...

// Update sync status
ctx.SyncedBlockNumber = block.Number().Int64()
ctx.SyncedBlockHash = block.Hash()

log.Infof("[Handler.SyncBlockWithRateLimit] Successfully synced block %d\n", block.Number().Int64())
}
}

// GetBlockByNumberHTTP gets block using HTTP method (with rate limiting)
func GetBlockByNumberHTTP(ctx *svc.ServiceContext, blockNumber int64) ([]byte, error) {
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" +
cast.ToString(blockNumber) + "\", true],\"id\":1}"

// Use unified RPC manager for HTTP calls (automatically handles rate limiting)
return ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true)
}

// MigrateExistingHandlers example of migrating existing handlers
func MigrateExistingHandlers(ctx *svc.ServiceContext) {
log.Infof("[Handler.Migration] Starting migration to RPC Manager")

// Start handlers with rate limiting
go LatestBlockNumberWithRateLimit(ctx)
go SyncBlockWithRateLimit(ctx)

// Start RPC monitoring
go StartRPCMonitoring(ctx)

log.Infof("[Handler.Migration] All handlers migrated to use RPC Manager")
}

// StartRPCMonitoring starts RPC monitoring
func StartRPCMonitoring(ctx *svc.ServiceContext) {
// Create monitor
monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second)

// Start monitoring
monitor.Start(ctx.Context)
}

// Compatibility functions: provide smooth migration for existing code
func GetL1Client(ctx *svc.ServiceContext) interface{} {
// Return rate-limited client wrapper
return ctx.RPCManager.GetRawClient(true)
}

func GetL2Client(ctx *svc.ServiceContext) interface{} {
// Return rate-limited client wrapper
return ctx.RPCManager.GetRawClient(false)
}
20 changes: 12 additions & 8 deletions internal/handler/syncBlock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package handler

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -58,15 +59,16 @@ func SyncBlock(ctx *svc.ServiceContext) {
continue
}

// block, err := ctx.RPC.BlockByNumber(context.Background(), big.NewInt(syncingBlockNumber))
blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", syncingBlockNumber)+"\", true],\"id\":1}")
// use unified RPC manager to get block (automatically applies rate limiting)
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", syncingBlockNumber) + "\", true],\"id\":1}"
blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1
if err != nil {
log.Errorf("[Handler.SyncBlock] Syncing block by number error: %s\n", errors.WithStack(err))
log.Errorf("[Handler.SyncBlock] Syncing block by number error (with rate limit): %s\n", errors.WithStack(err))
time.Sleep(3 * time.Second)
continue
}
block := rpc.ParseJSONBlock(string(blockJSON))
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v \n", block.Number(), block.Hash(), block.ParentHash())
log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v (via RPC Manager)\n", block.Number(), block.Hash(), block.ParentHash())

if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash {
log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash)
Expand Down Expand Up @@ -102,16 +104,18 @@ func rollbackBlock(ctx *svc.ServiceContext) {
for {
rollbackBlockNumber := ctx.SyncedBlockNumber

log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)
log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber)

blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}")
// use unified RPC manager for rollback operation (automatically applies rate limiting)
requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", rollbackBlockNumber) + "\", true],\"id\":1}"
blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1
if err != nil {
log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err))
log.Errorf("[Handler.SyncBlock.RollBackBlock] Rollback block by number error (with rate limit): %s\n", errors.WithStack(err))
continue
}

rollbackBlock := rpc.ParseJSONBlock(string(blockJSON))
log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash)
log.Errorf("[Handler.SyncBlock.RollBackBlock] rollbackBlock: %s, syncedBlockHash: %s (via RPC Manager)\n", rollbackBlock.Hash(), ctx.SyncedBlockHash)

if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash {
err = ctx.DB.Transaction(func(tx *gorm.DB) error {
Expand Down
32 changes: 21 additions & 11 deletions internal/svc/svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/optimism-java/dispute-explorer/internal/types"
"github.com/optimism-java/dispute-explorer/pkg/rpc"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
Expand All @@ -18,8 +19,9 @@ var svc *ServiceContext

type ServiceContext struct {
Config *types.Config
L1RPC *ethclient.Client
L2RPC *ethclient.Client
L1RPC *ethclient.Client // 保留向后兼容
L2RPC *ethclient.Client // 保留向后兼容
RPCManager *rpc.Manager // 新增统一RPC管理器
DB *gorm.DB
LatestBlockNumber int64
SyncedBlockNumber int64
Expand All @@ -45,22 +47,30 @@ func NewServiceContext(ctx context.Context, cfg *types.Config) *ServiceContext {
// SetConnMaxLifetime
sqlDB.SetConnMaxLifetime(time.Duration(cfg.MySQLConnMaxLifetime) * time.Second)

rpc, err := ethclient.Dial(cfg.L1RPCUrl)
// 创建原有的以太坊客户端(保持向后兼容)
l1Client, err := ethclient.Dial(cfg.L1RPCUrl)
if err != nil {
log.Panicf("[svc] get eth client panic: %s\n", err)
log.Panicf("[svc] get L1 eth client panic: %s\n", err)
}

rpc2, err := ethclient.Dial(cfg.L2RPCUrl)
l2Client, err := ethclient.Dial(cfg.L2RPCUrl)
if err != nil {
log.Panicf("[svc] get eth client panic: %s\n", err)
log.Panicf("[svc] get L2 eth client panic: %s\n", err)
}

// 创建统一的RPC管理器
rpcManager, err := rpc.CreateManagerFromConfig(cfg)
if err != nil {
log.Panicf("[svc] create RPC manager panic: %s\n", err)
}

svc = &ServiceContext{
Config: cfg,
L1RPC: rpc,
L2RPC: rpc2,
DB: storage,
Context: ctx,
Config: cfg,
L1RPC: l1Client, // 保留向后兼容
L2RPC: l2Client, // 保留向后兼容
RPCManager: rpcManager, // 新的统一管理器
DB: storage,
Context: ctx,
}
return svc
}
48 changes: 48 additions & 0 deletions pkg/rpc/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package rpc

import (
"time"

"github.com/optimism-java/dispute-explorer/internal/types"
)

// CreateManagerFromConfig creates RPC manager from configuration
func CreateManagerFromConfig(config *types.Config) (*Manager, error) {
return NewManager(Config{
L1RPCUrl: config.L1RPCUrl,
L2RPCUrl: config.L2RPCUrl,
ProxyURL: "", // if proxy is needed, can be added from configuration
RateLimit: config.RPCRateLimit,
RateBurst: config.RPCRateBurst,
HTTPTimeout: 10 * time.Second,
})
}

// CreateManagerWithSeparateLimits creates manager with different L1/L2 limits
func CreateManagerWithSeparateLimits(
l1URL, l2URL string,
l1Rate, l1Burst, _, _ int, // l2Rate, l2Burst unused for now
) (*Manager, error) {
// Note: current implementation uses same limits for L1 and L2
// if different limits are needed, Manager structure needs to be modified
return NewManager(Config{
L1RPCUrl: l1URL,
L2RPCUrl: l2URL,
RateLimit: l1Rate, // 使用L1的限制作为默认
RateBurst: l1Burst,
HTTPTimeout: 10 * time.Second,
})
}

// WrapExistingClient wraps existing ethclient.Client (for backward compatibility)
func WrapExistingClient(config *types.Config, _, _ interface{}) (*Manager, error) {
// create new manager but maintain backward compatibility
manager, err := CreateManagerFromConfig(config)
if err != nil {
return nil, err
}

// logic can be added here to integrate existing clients
// for now, return newly created manager
return manager, nil
}
Loading
Loading