diff --git a/.gitignore b/.gitignore index 44a724e..f943960 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,6 @@ mysql1 meili_data postgres mysql +MIGRATION_STATUS.md +RPC_MANAGER_GUIDE.md diff --git a/build/dispute-explorer b/build/dispute-explorer new file mode 100755 index 0000000..08295c8 Binary files /dev/null and b/build/dispute-explorer differ diff --git a/internal/handler/handler.go b/internal/handler/handler.go index 2db4f33..9bdf034 100644 --- a/internal/handler/handler.go +++ b/internal/handler/handler.go @@ -1,10 +1,16 @@ package handler import ( + "time" + "github.com/optimism-java/dispute-explorer/internal/svc" + "github.com/optimism-java/dispute-explorer/pkg/rpc" ) func Run(ctx *svc.ServiceContext) { + // Start RPC monitoring + go startRPCMonitoring(ctx) + // query last block number go LatestBlackNumber(ctx) // sync blocks @@ -20,3 +26,12 @@ func Run(ctx *svc.ServiceContext) { // sync claim len go SyncClaimDataLen(ctx) } + +// startRPCMonitoring starts RPC monitoring (internal function) +func startRPCMonitoring(ctx *svc.ServiceContext) { + // Create monitor, output statistics every 30 seconds + monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second) + + // Start monitoring + monitor.Start(ctx.Context) +} diff --git a/internal/handler/latestBlockNumber.go b/internal/handler/latestBlockNumber.go index 92ef5aa..3937662 100644 --- a/internal/handler/latestBlockNumber.go +++ b/internal/handler/latestBlockNumber.go @@ -12,15 +12,16 @@ import ( func LatestBlackNumber(ctx *svc.ServiceContext) { for { - latest, err := ctx.L1RPC.BlockNumber(context.Background()) + // use unified RPC manager to get latest block number (automatically applies rate limiting) + latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true) // true indicates L1 if err != nil { - log.Errorf("[Handler.LatestBlackNumber] Syncing block by number error: %s\n", errors.WithStack(err)) + log.Errorf("[Handler.LatestBlackNumber] Get latest block number error (with rate limit): %s\n", errors.WithStack(err)) time.Sleep(12 * time.Second) continue } ctx.LatestBlockNumber = cast.ToInt64(latest) - log.Infof("[Handle.LatestBlackNumber] Syncing latest block number: %d \n", latest) + log.Infof("[Handler.LatestBlackNumber] Latest block number: %d (via RPC Manager)\n", latest) time.Sleep(12 * time.Second) } } diff --git a/internal/handler/logFilter.go b/internal/handler/logFilter.go index 95c75d4..c66f3a6 100644 --- a/internal/handler/logFilter.go +++ b/internal/handler/logFilter.go @@ -24,11 +24,12 @@ func LogFilter(ctx *svc.ServiceContext, block schema.SyncBlock, addresses []comm Topics: topics, Addresses: addresses, } - logs, err := ctx.L1RPC.FilterLogs(context.Background(), query) + // use unified RPC manager to filter logs (automatically applies rate limiting) + logs, err := ctx.RPCManager.FilterLogs(context.Background(), query, true) // true indicates L1 if err != nil { return nil, errors.WithStack(err) } - log.Infof("[CancelOrder.Handle] Cancel Pending List Length is %d ,block number is %d \n", len(logs), block.BlockNumber) + log.Infof("[LogFilter] Event logs length is %d, block number is %d (via RPC Manager)\n", len(logs), block.BlockNumber) return LogsToEvents(ctx, logs, block.ID) } @@ -49,22 +50,22 @@ func LogsToEvents(ctx *svc.ServiceContext, logs []types.Log, syncBlockID int64) blockNumber := cast.ToInt64(vlog.BlockNumber) log.Infof("[LogsToEvents] Fetching block info for block number: %d, txHash: %s", blockNumber, vlog.TxHash.Hex()) - // Try to get block using L1RPC client first - block, err := ctx.L1RPC.BlockByNumber(context.Background(), big.NewInt(blockNumber)) + // Use unified RPC manager to get block (automatically applies rate limiting) + block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1 if err != nil { - log.Errorf("[LogsToEvents] BlockByNumber failed for block %d, txHash: %s, error: %s", blockNumber, vlog.TxHash.Hex(), err.Error()) + log.Errorf("[LogsToEvents] GetBlockByNumber failed for block %d, txHash: %s, error: %s (via RPC Manager)", blockNumber, vlog.TxHash.Hex(), err.Error()) // If error contains "transaction type not supported", try alternative approach if strings.Contains(err.Error(), "transaction type not supported") { log.Infof("[LogsToEvents] Attempting to get block timestamp using header only for block %d", blockNumber) - header, headerErr := ctx.L1RPC.HeaderByNumber(context.Background(), big.NewInt(blockNumber)) + header, headerErr := ctx.RPCManager.HeaderByNumber(context.Background(), big.NewInt(blockNumber), true) // true indicates L1 if headerErr != nil { - log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s", blockNumber, headerErr.Error()) + log.Errorf("[LogsToEvents] HeaderByNumber also failed for block %d: %s (via RPC Manager)", blockNumber, headerErr.Error()) return nil, errors.WithStack(err) } blockTime = cast.ToInt64(header.Time) blockTimes[blockNumber] = blockTime - log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header", blockTime, blockNumber) + log.Infof("[LogsToEvents] Successfully got block timestamp %d for block %d using header (via RPC Manager)", blockTime, blockNumber) } else { return nil, errors.WithStack(err) } diff --git a/internal/handler/rpc_manager_migration.go b/internal/handler/rpc_manager_migration.go new file mode 100644 index 0000000..cf2e6d0 --- /dev/null +++ b/internal/handler/rpc_manager_migration.go @@ -0,0 +1,118 @@ +package handler + +import ( + "context" + "math/big" + "time" + + "github.com/optimism-java/dispute-explorer/internal/svc" + "github.com/optimism-java/dispute-explorer/pkg/log" + "github.com/optimism-java/dispute-explorer/pkg/rpc" + "github.com/pkg/errors" + "github.com/spf13/cast" +) + +// LatestBlockNumberWithRateLimit uses unified RPC manager for latest block number retrieval +func LatestBlockNumberWithRateLimit(ctx *svc.ServiceContext) { + for { + // use unified RPC manager to get L1 latest block number (with rate limiting) + latest, err := ctx.RPCManager.GetLatestBlockNumber(context.Background(), true) + if err != nil { + log.Errorf("[Handler.LatestBlockNumberWithRateLimit] Get latest block number error: %s\n", errors.WithStack(err)) + time.Sleep(12 * time.Second) + continue + } + + ctx.LatestBlockNumber = cast.ToInt64(latest) + log.Infof("[Handler.LatestBlockNumberWithRateLimit] Latest block number: %d (using RPC Manager)\n", latest) + time.Sleep(12 * time.Second) + } +} + +// SyncBlockWithRateLimit uses unified RPC manager for block synchronization +func SyncBlockWithRateLimit(ctx *svc.ServiceContext) { + for { + // Check pending block count + // ... existing check logic ... + + syncingBlockNumber := ctx.SyncedBlockNumber + 1 + log.Infof("[Handler.SyncBlockWithRateLimit] Try to sync block number: %d using RPC Manager\n", syncingBlockNumber) + + if syncingBlockNumber > ctx.LatestBlockNumber { + time.Sleep(3 * time.Second) + continue + } + + // Use unified RPC manager to get block (automatically handles rate limiting) + block, err := ctx.RPCManager.GetBlockByNumber(context.Background(), big.NewInt(syncingBlockNumber), true) + if err != nil { + log.Errorf("[Handler.SyncBlockWithRateLimit] Get block by number error: %s\n", errors.WithStack(err)) + time.Sleep(3 * time.Second) + continue + } + + log.Infof("[Handler.SyncBlockWithRateLimit] Got block number: %d, hash: %v, parent hash: %v\n", + block.Number().Int64(), block.Hash().Hex(), block.ParentHash().Hex()) + + // Verify parent hash + if block.ParentHash() != ctx.SyncedBlockHash { + log.Errorf("[Handler.SyncBlockWithRateLimit] ParentHash mismatch: expected %s, got %s\n", + ctx.SyncedBlockHash.Hex(), block.ParentHash().Hex()) + // Can call rollback logic here + time.Sleep(3 * time.Second) + continue + } + + // Save block to database + // ... existing database save logic ... + + // Update sync status + ctx.SyncedBlockNumber = block.Number().Int64() + ctx.SyncedBlockHash = block.Hash() + + log.Infof("[Handler.SyncBlockWithRateLimit] Successfully synced block %d\n", block.Number().Int64()) + } +} + +// GetBlockByNumberHTTP gets block using HTTP method (with rate limiting) +func GetBlockByNumberHTTP(ctx *svc.ServiceContext, blockNumber int64) ([]byte, error) { + requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + + cast.ToString(blockNumber) + "\", true],\"id\":1}" + + // Use unified RPC manager for HTTP calls (automatically handles rate limiting) + return ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) +} + +// MigrateExistingHandlers example of migrating existing handlers +func MigrateExistingHandlers(ctx *svc.ServiceContext) { + log.Infof("[Handler.Migration] Starting migration to RPC Manager") + + // Start handlers with rate limiting + go LatestBlockNumberWithRateLimit(ctx) + go SyncBlockWithRateLimit(ctx) + + // Start RPC monitoring + go StartRPCMonitoring(ctx) + + log.Infof("[Handler.Migration] All handlers migrated to use RPC Manager") +} + +// StartRPCMonitoring starts RPC monitoring +func StartRPCMonitoring(ctx *svc.ServiceContext) { + // Create monitor + monitor := rpc.NewMonitor(ctx.RPCManager, 30*time.Second) + + // Start monitoring + monitor.Start(ctx.Context) +} + +// Compatibility functions: provide smooth migration for existing code +func GetL1Client(ctx *svc.ServiceContext) interface{} { + // Return rate-limited client wrapper + return ctx.RPCManager.GetRawClient(true) +} + +func GetL2Client(ctx *svc.ServiceContext) interface{} { + // Return rate-limited client wrapper + return ctx.RPCManager.GetRawClient(false) +} diff --git a/internal/handler/syncBlock.go b/internal/handler/syncBlock.go index 84fb64e..5f4fc4c 100644 --- a/internal/handler/syncBlock.go +++ b/internal/handler/syncBlock.go @@ -1,6 +1,7 @@ package handler import ( + "context" "fmt" "time" @@ -58,15 +59,16 @@ func SyncBlock(ctx *svc.ServiceContext) { continue } - // block, err := ctx.RPC.BlockByNumber(context.Background(), big.NewInt(syncingBlockNumber)) - blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", syncingBlockNumber)+"\", true],\"id\":1}") + // use unified RPC manager to get block (automatically applies rate limiting) + requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", syncingBlockNumber) + "\", true],\"id\":1}" + blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1 if err != nil { - log.Errorf("[Handler.SyncBlock] Syncing block by number error: %s\n", errors.WithStack(err)) + log.Errorf("[Handler.SyncBlock] Syncing block by number error (with rate limit): %s\n", errors.WithStack(err)) time.Sleep(3 * time.Second) continue } block := rpc.ParseJSONBlock(string(blockJSON)) - log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v \n", block.Number(), block.Hash(), block.ParentHash()) + log.Infof("[Handler.SyncBlock] Syncing block number: %d, hash: %v, parent hash: %v (via RPC Manager)\n", block.Number(), block.Hash(), block.ParentHash()) if common.HexToHash(block.ParentHash()) != ctx.SyncedBlockHash { log.Errorf("[Handler.SyncBlock] ParentHash of the block being synchronized is inconsistent: %s \n", ctx.SyncedBlockHash) @@ -102,16 +104,18 @@ func rollbackBlock(ctx *svc.ServiceContext) { for { rollbackBlockNumber := ctx.SyncedBlockNumber - log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber) + log.Infof("[Handler.SyncBlock.RollBackBlock] Try to rollback block number: %d\n", rollbackBlockNumber) - blockJSON, err := rpc.HTTPPostJSON("", ctx.Config.L1RPCUrl, "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\""+fmt.Sprintf("0x%X", rollbackBlockNumber)+"\", true],\"id\":1}") + // use unified RPC manager for rollback operation (automatically applies rate limiting) + requestBody := "{\"jsonrpc\":\"2.0\",\"method\":\"eth_getBlockByNumber\",\"params\":[\"" + fmt.Sprintf("0x%X", rollbackBlockNumber) + "\", true],\"id\":1}" + blockJSON, err := ctx.RPCManager.HTTPPostJSON(context.Background(), requestBody, true) // true indicates L1 if err != nil { - log.Errorf("[Handler.SyncBlock.RollRackBlock]Rollback block by number error: %s\n", errors.WithStack(err)) + log.Errorf("[Handler.SyncBlock.RollBackBlock] Rollback block by number error (with rate limit): %s\n", errors.WithStack(err)) continue } rollbackBlock := rpc.ParseJSONBlock(string(blockJSON)) - log.Errorf("[Handler.SyncBlock.RollRackBlock] rollbackBlock: %s, syncedBlockHash: %s \n", rollbackBlock.Hash(), ctx.SyncedBlockHash) + log.Errorf("[Handler.SyncBlock.RollBackBlock] rollbackBlock: %s, syncedBlockHash: %s (via RPC Manager)\n", rollbackBlock.Hash(), ctx.SyncedBlockHash) if common.HexToHash(rollbackBlock.Hash()) == ctx.SyncedBlockHash { err = ctx.DB.Transaction(func(tx *gorm.DB) error { diff --git a/internal/svc/svc.go b/internal/svc/svc.go index c062f52..774962d 100644 --- a/internal/svc/svc.go +++ b/internal/svc/svc.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/optimism-java/dispute-explorer/internal/types" + "github.com/optimism-java/dispute-explorer/pkg/rpc" "gorm.io/gorm" "gorm.io/gorm/logger" ) @@ -18,8 +19,9 @@ var svc *ServiceContext type ServiceContext struct { Config *types.Config - L1RPC *ethclient.Client - L2RPC *ethclient.Client + L1RPC *ethclient.Client // 保留向后兼容 + L2RPC *ethclient.Client // 保留向后兼容 + RPCManager *rpc.Manager // 新增统一RPC管理器 DB *gorm.DB LatestBlockNumber int64 SyncedBlockNumber int64 @@ -45,22 +47,30 @@ func NewServiceContext(ctx context.Context, cfg *types.Config) *ServiceContext { // SetConnMaxLifetime sqlDB.SetConnMaxLifetime(time.Duration(cfg.MySQLConnMaxLifetime) * time.Second) - rpc, err := ethclient.Dial(cfg.L1RPCUrl) + // 创建原有的以太坊客户端(保持向后兼容) + l1Client, err := ethclient.Dial(cfg.L1RPCUrl) if err != nil { - log.Panicf("[svc] get eth client panic: %s\n", err) + log.Panicf("[svc] get L1 eth client panic: %s\n", err) } - rpc2, err := ethclient.Dial(cfg.L2RPCUrl) + l2Client, err := ethclient.Dial(cfg.L2RPCUrl) if err != nil { - log.Panicf("[svc] get eth client panic: %s\n", err) + log.Panicf("[svc] get L2 eth client panic: %s\n", err) + } + + // 创建统一的RPC管理器 + rpcManager, err := rpc.CreateManagerFromConfig(cfg) + if err != nil { + log.Panicf("[svc] create RPC manager panic: %s\n", err) } svc = &ServiceContext{ - Config: cfg, - L1RPC: rpc, - L2RPC: rpc2, - DB: storage, - Context: ctx, + Config: cfg, + L1RPC: l1Client, // 保留向后兼容 + L2RPC: l2Client, // 保留向后兼容 + RPCManager: rpcManager, // 新的统一管理器 + DB: storage, + Context: ctx, } return svc } diff --git a/pkg/rpc/factory.go b/pkg/rpc/factory.go new file mode 100644 index 0000000..4f37848 --- /dev/null +++ b/pkg/rpc/factory.go @@ -0,0 +1,48 @@ +package rpc + +import ( + "time" + + "github.com/optimism-java/dispute-explorer/internal/types" +) + +// CreateManagerFromConfig creates RPC manager from configuration +func CreateManagerFromConfig(config *types.Config) (*Manager, error) { + return NewManager(Config{ + L1RPCUrl: config.L1RPCUrl, + L2RPCUrl: config.L2RPCUrl, + ProxyURL: "", // if proxy is needed, can be added from configuration + RateLimit: config.RPCRateLimit, + RateBurst: config.RPCRateBurst, + HTTPTimeout: 10 * time.Second, + }) +} + +// CreateManagerWithSeparateLimits creates manager with different L1/L2 limits +func CreateManagerWithSeparateLimits( + l1URL, l2URL string, + l1Rate, l1Burst, _, _ int, // l2Rate, l2Burst unused for now +) (*Manager, error) { + // Note: current implementation uses same limits for L1 and L2 + // if different limits are needed, Manager structure needs to be modified + return NewManager(Config{ + L1RPCUrl: l1URL, + L2RPCUrl: l2URL, + RateLimit: l1Rate, // 使用L1的限制作为默认 + RateBurst: l1Burst, + HTTPTimeout: 10 * time.Second, + }) +} + +// WrapExistingClient wraps existing ethclient.Client (for backward compatibility) +func WrapExistingClient(config *types.Config, _, _ interface{}) (*Manager, error) { + // create new manager but maintain backward compatibility + manager, err := CreateManagerFromConfig(config) + if err != nil { + return nil, err + } + + // logic can be added here to integrate existing clients + // for now, return newly created manager + return manager, nil +} diff --git a/pkg/rpc/manager.go b/pkg/rpc/manager.go new file mode 100644 index 0000000..3b0281f --- /dev/null +++ b/pkg/rpc/manager.go @@ -0,0 +1,341 @@ +package rpc + +import ( + "context" + "fmt" + "math/big" + "net/http" + "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "golang.org/x/time/rate" +) + +// Manager unified RPC resource manager +type Manager struct { + // Configuration + l1RPCUrl string + l2RPCUrl string + proxyURL string + + // Rate limiters + l1Limiter *rate.Limiter + l2Limiter *rate.Limiter + + // Native Ethereum clients + l1Client *ethclient.Client + l2Client *ethclient.Client + + // HTTP client + httpClient *http.Client + + // Statistics + stats *Stats + mu sync.RWMutex +} + +// Config RPC manager configuration +type Config struct { + L1RPCUrl string + L2RPCUrl string + ProxyURL string + RateLimit int + RateBurst int + HTTPTimeout time.Duration +} + +// Stats RPC call statistics +type Stats struct { + L1RequestCount int64 + L2RequestCount int64 + L1RateLimitedCount int64 + L2RateLimitedCount int64 + HTTPRequestCount int64 + LastRequestTime time.Time + mu sync.RWMutex +} + +// NewManager creates a new RPC manager +func NewManager(config Config) (*Manager, error) { + // Create Ethereum clients + l1Client, err := ethclient.Dial(config.L1RPCUrl) + if err != nil { + return nil, fmt.Errorf("failed to connect to L1 RPC: %w", err) + } + + l2Client, err := ethclient.Dial(config.L2RPCUrl) + if err != nil { + return nil, fmt.Errorf("failed to connect to L2 RPC: %w", err) + } + + // Create rate limiters + l1Limiter := rate.NewLimiter(rate.Limit(config.RateLimit), config.RateBurst) + l2Limiter := rate.NewLimiter(rate.Limit(config.RateLimit), config.RateBurst) + + // Set HTTP timeout + timeout := config.HTTPTimeout + if timeout == 0 { + timeout = 10 * time.Second + } + + httpClient := &http.Client{ + Timeout: timeout, + } + + return &Manager{ + l1RPCUrl: config.L1RPCUrl, + l2RPCUrl: config.L2RPCUrl, + proxyURL: config.ProxyURL, + l1Limiter: l1Limiter, + l2Limiter: l2Limiter, + l1Client: l1Client, + l2Client: l2Client, + httpClient: httpClient, + stats: &Stats{}, + }, nil +} + +// GetLatestBlockNumber gets the latest block number (with rate limiting) +func (m *Manager) GetLatestBlockNumber(ctx context.Context, isL1 bool) (uint64, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return 0, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.BlockNumber(ctx) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return 0, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.BlockNumber(ctx) +} + +// GetBlockByNumber gets a block by number (with rate limiting) +func (m *Manager) GetBlockByNumber(ctx context.Context, number *big.Int, isL1 bool) (*types.Block, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.BlockByNumber(ctx, number) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.BlockByNumber(ctx, number) +} + +// GetBlockByHash gets a block by hash (with rate limiting) +func (m *Manager) GetBlockByHash(ctx context.Context, hash common.Hash, isL1 bool) (*types.Block, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.BlockByHash(ctx, hash) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.BlockByHash(ctx, hash) +} + +// FilterLogs filters logs (with rate limiting) +func (m *Manager) FilterLogs(ctx context.Context, query ethereum.FilterQuery, isL1 bool) ([]types.Log, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.FilterLogs(ctx, query) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.FilterLogs(ctx, query) +} + +// HeaderByNumber gets a block header by number (with rate limiting) +func (m *Manager) HeaderByNumber(ctx context.Context, number *big.Int, isL1 bool) (*types.Header, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.HeaderByNumber(ctx, number) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.HeaderByNumber(ctx, number) +} + +// CallContract calls a smart contract (with rate limiting) +func (m *Manager) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int, isL1 bool) ([]byte, error) { + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + return m.l1Client.CallContract(ctx, call, blockNumber) + } + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + return m.l2Client.CallContract(ctx, call, blockNumber) +} + +// HTTPPostJSON HTTP POST JSON request (with rate limiting) +func (m *Manager) HTTPPostJSON(ctx context.Context, bodyJSON string, isL1 bool) ([]byte, error) { + var rpcURL string + + if isL1 { + if err := m.l1Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(true) + return nil, fmt.Errorf("L1 rate limit exceeded: %w", err) + } + m.updateRequestStats(true) + rpcURL = m.l1RPCUrl + } else { + if err := m.l2Limiter.Wait(ctx); err != nil { + m.updateRateLimitedStats(false) + return nil, fmt.Errorf("L2 rate limit exceeded: %w", err) + } + m.updateRequestStats(false) + rpcURL = m.l2RPCUrl + } + + m.updateHTTPRequestStats() + return HTTPPostJSON(m.proxyURL, rpcURL, bodyJSON) +} + +// GetRawClient gets the raw client (for backward compatibility) +func (m *Manager) GetRawClient(isL1 bool) *ethclient.Client { + if isL1 { + return m.l1Client + } + return m.l2Client +} + +// UpdateRateLimit dynamically updates rate limit +func (m *Manager) UpdateRateLimit(rateLimit int, rateBurst int, isL1 bool) { + m.mu.Lock() + defer m.mu.Unlock() + + if isL1 { + m.l1Limiter.SetLimit(rate.Limit(rateLimit)) + m.l1Limiter.SetBurst(rateBurst) + } else { + m.l2Limiter.SetLimit(rate.Limit(rateLimit)) + m.l2Limiter.SetBurst(rateBurst) + } +} + +// GetRateLimit gets current rate limit settings +func (m *Manager) GetRateLimit(isL1 bool) (rateLimit float64, rateBurst int) { + m.mu.RLock() + defer m.mu.RUnlock() + + if isL1 { + return float64(m.l1Limiter.Limit()), m.l1Limiter.Burst() + } + return float64(m.l2Limiter.Limit()), m.l2Limiter.Burst() +} + +// GetTokens 返回当前可用的令牌数 +func (m *Manager) GetTokens(isL1 bool) float64 { + if isL1 { + return m.l1Limiter.Tokens() + } + return m.l2Limiter.Tokens() +} + +// GetStats gets statistics information +func (m *Manager) GetStats() StatsSnapshot { + m.stats.mu.RLock() + defer m.stats.mu.RUnlock() + + return StatsSnapshot{ + L1RequestCount: m.stats.L1RequestCount, + L2RequestCount: m.stats.L2RequestCount, + L1RateLimitedCount: m.stats.L1RateLimitedCount, + L2RateLimitedCount: m.stats.L2RateLimitedCount, + HTTPRequestCount: m.stats.HTTPRequestCount, + LastRequestTime: m.stats.LastRequestTime, + } +} + +// StatsSnapshot statistics snapshot +type StatsSnapshot struct { + L1RequestCount int64 + L2RequestCount int64 + L1RateLimitedCount int64 + L2RateLimitedCount int64 + HTTPRequestCount int64 + LastRequestTime time.Time +} + +// updateRequestStats updates request statistics +func (m *Manager) updateRequestStats(isL1 bool) { + m.stats.mu.Lock() + defer m.stats.mu.Unlock() + + m.stats.LastRequestTime = time.Now() + if isL1 { + m.stats.L1RequestCount++ + } else { + m.stats.L2RequestCount++ + } +} + +// updateRateLimitedStats updates rate-limited request statistics +func (m *Manager) updateRateLimitedStats(isL1 bool) { + m.stats.mu.Lock() + defer m.stats.mu.Unlock() + + if isL1 { + m.stats.L1RateLimitedCount++ + } else { + m.stats.L2RateLimitedCount++ + } +} + +// updateHTTPRequestStats 更新HTTP请求统计 +func (m *Manager) updateHTTPRequestStats() { + m.stats.mu.Lock() + defer m.stats.mu.Unlock() + + m.stats.HTTPRequestCount++ +} + +// Close closes all connections +func (m *Manager) Close() { + if m.l1Client != nil { + m.l1Client.Close() + } + if m.l2Client != nil { + m.l2Client.Close() + } +} diff --git a/pkg/rpc/monitor.go b/pkg/rpc/monitor.go new file mode 100644 index 0000000..4bcdecb --- /dev/null +++ b/pkg/rpc/monitor.go @@ -0,0 +1,168 @@ +package rpc + +import ( + "context" + "fmt" + "log" + "time" +) + +// Monitor RPC manager monitor +type Monitor struct { + manager *Manager + interval time.Duration + logger Logger +} + +// Logger logging interface +type Logger interface { + Printf(format string, v ...interface{}) +} + +// DefaultLogger default logging implementation +type DefaultLogger struct{} + +func (dl *DefaultLogger) Printf(format string, v ...interface{}) { + log.Printf(format, v...) +} + +// NewMonitor creates monitor +func NewMonitor(manager *Manager, interval time.Duration) *Monitor { + return &Monitor{ + manager: manager, + interval: interval, + logger: &DefaultLogger{}, + } +} + +// SetLogger sets custom logger +func (m *Monitor) SetLogger(logger Logger) { + m.logger = logger +} + +// Start starts monitoring +func (m *Monitor) Start(ctx context.Context) { + ticker := time.NewTicker(m.interval) + defer ticker.Stop() + + m.logger.Printf("[RPC Monitor] Started with interval %v", m.interval) + + for { + select { + case <-ticker.C: + m.logStats() + case <-ctx.Done(): + m.logger.Printf("[RPC Monitor] Stopped") + return + } + } +} + +// logStats logs statistics information +func (m *Monitor) logStats() { + stats := m.manager.GetStats() + + l1Rate, l1Burst := m.manager.GetRateLimit(true) + l2Rate, l2Burst := m.manager.GetRateLimit(false) + + l1Tokens := m.manager.GetTokens(true) + l2Tokens := m.manager.GetTokens(false) + + m.logger.Printf( + "[RPC Stats] L1: %d requests (%d limited), L2: %d requests (%d limited), HTTP: %d", + stats.L1RequestCount, stats.L1RateLimitedCount, + stats.L2RequestCount, stats.L2RateLimitedCount, + stats.HTTPRequestCount, + ) + + m.logger.Printf( + "[RPC Limits] L1: %.1f/s (burst %d, tokens %.2f), L2: %.1f/s (burst %d, tokens %.2f)", + l1Rate, l1Burst, l1Tokens, + l2Rate, l2Burst, l2Tokens, + ) + + // warning messages + if stats.L1RateLimitedCount > 0 || stats.L2RateLimitedCount > 0 { + m.logger.Printf("[RPC Warning] Rate limiting is active!") + } + + if l1Tokens < 1.0 { + m.logger.Printf("[RPC Warning] L1 tokens running low: %.2f", l1Tokens) + } + + if l2Tokens < 1.0 { + m.logger.Printf("[RPC Warning] L2 tokens running low: %.2f", l2Tokens) + } +} + +// GetHealthCheck gets health check information +func (m *Monitor) GetHealthCheck() HealthCheckResult { + stats := m.manager.GetStats() + l1Tokens := m.manager.GetTokens(true) + l2Tokens := m.manager.GetTokens(false) + + isHealthy := true + var issues []string + + // check token count + if l1Tokens < 1 { + isHealthy = false + issues = append(issues, "L1 rate limit exhausted") + } + + if l2Tokens < 1 { + isHealthy = false + issues = append(issues, "L2 rate limit exhausted") + } + + // check if there are recent rate limits + if stats.L1RateLimitedCount > 0 { + issues = append(issues, fmt.Sprintf("L1 has %d rate limited requests", stats.L1RateLimitedCount)) + } + + if stats.L2RateLimitedCount > 0 { + issues = append(issues, fmt.Sprintf("L2 has %d rate limited requests", stats.L2RateLimitedCount)) + } + + // check if there are recent requests + if time.Since(stats.LastRequestTime) > 5*time.Minute { + issues = append(issues, "No recent RPC requests detected") + } + + return HealthCheckResult{ + Healthy: isHealthy, + Issues: issues, + L1Tokens: l1Tokens, + L2Tokens: l2Tokens, + LastRequest: stats.LastRequestTime, + TotalL1Requests: stats.L1RequestCount, + TotalL2Requests: stats.L2RequestCount, + TotalHTTPRequests: stats.HTTPRequestCount, + } +} + +// HealthCheckResult health check result +type HealthCheckResult struct { + Healthy bool + Issues []string + L1Tokens float64 + L2Tokens float64 + LastRequest time.Time + TotalL1Requests int64 + TotalL2Requests int64 + TotalHTTPRequests int64 +} + +// LogHealthCheck logs health check result +func (m *Monitor) LogHealthCheck() { + health := m.GetHealthCheck() + + if health.Healthy { + m.logger.Printf("[RPC Health] System is healthy") + } else { + m.logger.Printf("[RPC Health] System has issues: %v", health.Issues) + } + + m.logger.Printf("[RPC Health] Total requests - L1: %d, L2: %d, HTTP: %d", + health.TotalL1Requests, health.TotalL2Requests, health.TotalHTTPRequests) +}