Skip to content

Commit

Permalink
Merge pull request erigontech#5 from testinprod-io/origin/relay-to-hi…
Browse files Browse the repository at this point in the history
…storical-and-sequencer

Relay to historical and sequencer
  • Loading branch information
ImTei committed Feb 24, 2023
2 parents c5ddc71 + 0b4e524 commit a339a23
Show file tree
Hide file tree
Showing 13 changed files with 376 additions and 39 deletions.
47 changes: 39 additions & 8 deletions cmd/erigon-el/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ type Ethereum struct {

networkID uint64

lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
chainConfig *chain.Config
genesisHash libcommon.Hash
miningSealingQuit chan struct{}
pendingBlocks chan *types.Block
minedBlocks chan *types.Block
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
chainConfig *chain.Config
genesisHash libcommon.Hash
seqRPCService *rpc.Client
historicalRPCService *rpc.Client
miningSealingQuit chan struct{}
pendingBlocks chan *types.Block
minedBlocks chan *types.Block

// downloader fields
sentryCtx context.Context
Expand Down Expand Up @@ -445,6 +447,26 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
return nil, err
}

// Setup sequencer and hsistorical RPC relay services
if config.RollupSequencerHTTP != "" {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
client, err := rpc.DialContext(ctx, config.RollupSequencerHTTP)
cancel()
if err != nil {
return nil, err
}
backend.seqRPCService = client
}
if config.RollupHistoricalRPC != "" {
ctx, cancel := context.WithTimeout(context.Background(), config.RollupHistoricalRPCTimeout)
client, err := rpc.DialContext(ctx, config.RollupHistoricalRPC)
cancel()
if err != nil {
return nil, err
}
backend.historicalRPCService = client
}

var miningRPC txpool_proto.MiningServer
stateDiffClient := direct.NewStateDiffClientDirect(kvRPC)
if config.DeprecatedTxPool.Disable {
Expand Down Expand Up @@ -659,8 +681,8 @@ func NewBackend(stack *node.Node, config *ethconfig.Config, logger log.Logger) (
if casted, ok := backend.engine.(*bor.Bor); ok {
borDb = casted.DB
}
apiList := commands.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine)
authApiList := commands.AuthAPIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine)
apiList := commands.APIList(chainKv, borDb, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine, backend.seqRPCService, backend.historicalRPCService, chainConfig)
authApiList := commands.AuthAPIList(chainKv, ethRpcClient, txPoolRpcClient, miningRpcClient, ff, stateCache, backend.blockReader, backend.agg, httpRpcCfg, backend.engine, backend.seqRPCService, backend.historicalRPCService, chainConfig)
go func() {
if err := cli.StartRpcServer(ctx, httpRpcCfg, apiList, authApiList); err != nil {
log.Error(err.Error())
Expand Down Expand Up @@ -990,6 +1012,15 @@ func (s *Ethereum) Stop() error {
if s.agg != nil {
s.agg.Close()
}

// Stop RPC services to sequencer and historical nodes
if s.seqRPCService != nil {
s.seqRPCService.Close()
}
if s.historicalRPCService != nil {
s.historicalRPCService.Close()
}

s.chainDB.Close()
return nil
}
Expand Down
11 changes: 7 additions & 4 deletions cmd/rpcdaemon/commands/daemon.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/gointerfaces/txpool"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/kvcache"
Expand All @@ -16,9 +17,10 @@ import (
func APIList(db kv.RoDB, borDb kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
filters *rpchelper.Filters, stateCache kvcache.Cache,
blockReader services.FullBlockReader, agg *libstate.AggregatorV3, cfg httpcfg.HttpCfg, engine consensus.EngineReader,
seqRPCService *rpc.Client, historicalRPCService *rpc.Client, chainConfig *chain.Config,
) (list []rpc.API) {
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit)
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, chainConfig)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, seqRPCService, historicalRPCService)
erigonImpl := NewErigonAPI(base, db, eth)
txpoolImpl := NewTxPoolAPI(base, db, txPool)
netImpl := NewNetAPIImpl(eth)
Expand Down Expand Up @@ -127,10 +129,11 @@ func AuthAPIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClien
filters *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader,
agg *libstate.AggregatorV3,
cfg httpcfg.HttpCfg, engine consensus.EngineReader,
seqRPCService *rpc.Client, historicalRPCService *rpc.Client, chainConfig *chain.Config,
) (list []rpc.API) {
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine)
base := NewBaseApi(filters, stateCache, blockReader, agg, cfg.WithDatadir, cfg.EvmCallTimeout, engine, chainConfig)

ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit)
ethImpl := NewEthAPI(base, db, eth, txPool, mining, cfg.Gascap, cfg.ReturnDataLimit, seqRPCService, historicalRPCService)
engineImpl := NewEngineAPI(base, db, eth, cfg.InternalCL)

list = append(list, rpc.API{
Expand Down
111 changes: 111 additions & 0 deletions cmd/rpcdaemon/commands/eth_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,34 @@ func (api *APIImpl) GetBalance(ctx context.Context, address libcommon.Address, b
return nil, fmt.Errorf("getBalance cannot open tx: %w", err1)
}
defer tx.Rollback()

// Handle pre-bedrock blocks
var blockNum uint64
if number, ok := blockNrOrHash.Number(); ok {
blockNum = uint64(number)
} else if hash, ok := blockNrOrHash.Hash(); ok {
block, err := api.blockByHashWithSenders(tx, hash)
if err != nil {
return nil, fmt.Errorf("invalid hash: %w", err)
}
blockNum = block.NumberU64()
} else {
return nil, fmt.Errorf("invalid block number of hash")
}

if api._chainConfig.IsOptimismPreBedrock(blockNum) {
if api.historicalRPCService != nil {
var res hexutil.Big
err := api.historicalRPCService.CallContext(ctx, &res, "eth_getBalance", address, fmt.Sprintf("0x%x", blockNum))
if err != nil {
return nil, fmt.Errorf("historical backend error: %w", err)
}
return &res, nil
} else {
return nil, rpc.ErrNoHistoricalFallback
}
}

reader, err := rpchelper.CreateStateReader(ctx, tx, blockNrOrHash, 0, api.filters, api.stateCache, api.historyV3(tx), "")
if err != nil {
return nil, err
Expand Down Expand Up @@ -62,6 +90,34 @@ func (api *APIImpl) GetTransactionCount(ctx context.Context, address libcommon.A
return nil, fmt.Errorf("getTransactionCount cannot open tx: %w", err1)
}
defer tx.Rollback()

// Handle pre-bedrock blocks
var blockNum uint64
if number, ok := blockNrOrHash.Number(); ok {
blockNum = uint64(number)
} else if hash, ok := blockNrOrHash.Hash(); ok {
block, err := api.blockByHashWithSenders(tx, hash)
if err != nil {
return nil, fmt.Errorf("invalid hash: %w", err)
}
blockNum = block.NumberU64()
} else {
return nil, fmt.Errorf("invalid block number of hash")
}

if api._chainConfig.IsOptimismPreBedrock(blockNum) {
if api.historicalRPCService != nil {
var res hexutil.Uint64
err := api.historicalRPCService.CallContext(ctx, &res, "eth_getTransactionCount", address, fmt.Sprintf("0x%x", blockNum))
if err != nil {
return nil, fmt.Errorf("historical backend error: %w", err)
}
return &res, nil
} else {
return nil, rpc.ErrNoHistoricalFallback
}
}

reader, err := rpchelper.CreateStateReader(ctx, tx, blockNrOrHash, 0, api.filters, api.stateCache, api.historyV3(tx), "")
if err != nil {
return nil, err
Expand All @@ -80,6 +136,34 @@ func (api *APIImpl) GetCode(ctx context.Context, address libcommon.Address, bloc
if err1 != nil {
return nil, fmt.Errorf("getCode cannot open tx: %w", err1)
}

// Handle pre-bedrock blocks
var blockNum uint64
if number, ok := blockNrOrHash.Number(); ok {
blockNum = uint64(number)
} else if hash, ok := blockNrOrHash.Hash(); ok {
block, err := api.blockByHashWithSenders(tx, hash)
if err != nil {
return nil, fmt.Errorf("invalid hash: %w", err)
}
blockNum = block.NumberU64()
} else {
return nil, fmt.Errorf("invalid block number of hash")
}

if api._chainConfig.IsOptimismPreBedrock(blockNum) {
if api.historicalRPCService != nil {
var res hexutil.Bytes
err := api.historicalRPCService.CallContext(ctx, &res, "eth_getCode", address, fmt.Sprintf("0x%x", blockNum))
if err != nil {
return nil, fmt.Errorf("historical backend error: %w", err)
}
return res, nil
} else {
return nil, rpc.ErrNoHistoricalFallback
}
}

defer tx.Rollback()
chainConfig, err := api.chainConfig(tx)
if err != nil {
Expand Down Expand Up @@ -111,6 +195,33 @@ func (api *APIImpl) GetStorageAt(ctx context.Context, address libcommon.Address,
}
defer tx.Rollback()

// Handle pre-bedrock blocks
var blockNum uint64
if number, ok := blockNrOrHash.Number(); ok {
blockNum = uint64(number)
} else if hash, ok := blockNrOrHash.Hash(); ok {
block, err := api.blockByHashWithSenders(tx, hash)
if err != nil {
return hexutility.Encode(common.LeftPadBytes(empty, 32)), fmt.Errorf("invalid hash: %w", err)
}
blockNum = block.NumberU64()
} else {
return hexutility.Encode(common.LeftPadBytes(empty, 32)), fmt.Errorf("invalid block number of hash")
}

if api._chainConfig.IsOptimismPreBedrock(blockNum) {
if api.historicalRPCService != nil {
var res hexutil.Bytes
err := api.historicalRPCService.CallContext(ctx, &res, "eth_getStorageAt", address, fmt.Sprintf("0x%x", blockNum))
if err != nil {
return hexutility.Encode(common.LeftPadBytes(empty, 32)), fmt.Errorf("historical backend error: %w", err)
}
return hexutility.Encode(common.LeftPadBytes(res, 32)), nil
} else {
return hexutility.Encode(common.LeftPadBytes(empty, 32)), rpc.ErrNoHistoricalFallback
}
}

reader, err := rpchelper.CreateStateReader(ctx, tx, blockNrOrHash, 0, api.filters, api.stateCache, api.historyV3(tx), "")
if err != nil {
return hexutility.Encode(common.LeftPadBytes(empty, 32)), err
Expand Down
49 changes: 31 additions & 18 deletions cmd/rpcdaemon/commands/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ type BaseAPI struct {
evmCallTimeout time.Duration
}

func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.AggregatorV3, singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader) *BaseAPI {
func NewBaseApi(
f *rpchelper.Filters, stateCache kvcache.Cache, blockReader services.FullBlockReader, agg *libstate.AggregatorV3,
singleNodeMode bool, evmCallTimeout time.Duration, engine consensus.EngineReader, chainConfig *chain.Config,
) *BaseAPI {
blocksLRUSize := 128 // ~32Mb
if !singleNodeMode {
blocksLRUSize = 512
Expand All @@ -129,7 +132,10 @@ func NewBaseApi(f *rpchelper.Filters, stateCache kvcache.Cache, blockReader serv
panic(err)
}

return &BaseAPI{filters: f, stateCache: stateCache, blocksLRU: blocksLRU, _blockReader: blockReader, _txnReader: blockReader, _agg: agg, evmCallTimeout: evmCallTimeout, _engine: engine}
return &BaseAPI{
filters: f, stateCache: stateCache, blocksLRU: blocksLRU, _blockReader: blockReader, _txnReader: blockReader,
_agg: agg, evmCallTimeout: evmCallTimeout, _engine: engine, _chainConfig: chainConfig,
}
}

func (api *BaseAPI) chainConfig(tx kv.Tx) (*chain.Config, error) {
Expand Down Expand Up @@ -271,30 +277,37 @@ func (api *BaseAPI) headerByRPCNumber(number rpc.BlockNumber, tx kv.Tx) (*types.
// APIImpl is implementation of the EthAPI interface based on remote Db access
type APIImpl struct {
*BaseAPI
ethBackend rpchelper.ApiBackend
txPool txpool.TxpoolClient
mining txpool.MiningClient
gasCache *GasPriceCache
db kv.RoDB
GasCap uint64
ReturnDataLimit int
ethBackend rpchelper.ApiBackend
txPool txpool.TxpoolClient
mining txpool.MiningClient
gasCache *GasPriceCache
db kv.RoDB
GasCap uint64
ReturnDataLimit int
seqRPCService *rpc.Client
historicalRPCService *rpc.Client
}

// NewEthAPI returns APIImpl instance
func NewEthAPI(base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient, gascap uint64, returnDataLimit int) *APIImpl {
func NewEthAPI(
base *BaseAPI, db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, mining txpool.MiningClient,
gascap uint64, returnDataLimit int, seqRPCService *rpc.Client, histRPCService *rpc.Client,
) *APIImpl {
if gascap == 0 {
gascap = uint64(math.MaxUint64 / 2)
}

return &APIImpl{
BaseAPI: base,
db: db,
ethBackend: eth,
txPool: txPool,
mining: mining,
gasCache: NewGasPriceCache(),
GasCap: gascap,
ReturnDataLimit: returnDataLimit,
BaseAPI: base,
db: db,
ethBackend: eth,
txPool: txPool,
mining: mining,
gasCache: NewGasPriceCache(),
GasCap: gascap,
ReturnDataLimit: returnDataLimit,
seqRPCService: seqRPCService,
historicalRPCService: histRPCService,
}
}

Expand Down
Loading

0 comments on commit a339a23

Please sign in to comment.