Skip to content

Commit

Permalink
move remote eth backend from ethdb to core package (#1567)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Mar 20, 2021
1 parent 81ea5ba commit ba37706
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 81 deletions.
10 changes: 6 additions & 4 deletions cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ledgerwatch/turbo-geth/cmd/utils"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/internal/debug"
"github.com/ledgerwatch/turbo-geth/log"
Expand Down Expand Up @@ -110,13 +111,14 @@ func OpenDB(cfg Flags) (ethdb.KV, ethdb.Backend, error) {
}
}
if cfg.PrivateApiAddr != "" {
var remoteDb ethdb.KV
remoteDb, ethBackend, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
var remoteKv ethdb.KV
remoteKv, err = ethdb.NewRemote().Path(cfg.PrivateApiAddr).Open(cfg.TLSCertfile, cfg.TLSKeyFile, cfg.TLSCACert)
if err != nil {
return nil, nil, fmt.Errorf("could not connect to remoteDb: %w", err)
return nil, nil, fmt.Errorf("could not connect to remoteKv: %w", err)
}
core.NewRemoteBackend(remoteKv)
if db == nil {
db = remoteDb
db = remoteKv
}
} else {
return nil, nil, fmt.Errorf("either remote db or lmdb must be specified")
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/gas_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var gasLimitsCmd = &cobra.Command{
}
}).MustOpen()

remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/state/commands/state_growth.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func init() {
}
}).MustOpen()

remoteDB, _, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
remoteDB, err := ethdb.NewRemote().Path(privateApiAddr).Open("", "", "")
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion common/dbutils/composite_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ func HeaderKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
}


// blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash
func BlockBodyKey(number uint64, hash common.Hash) []byte {
return append(EncodeBlockNumber(number), hash.Bytes()...)
Expand Down
2 changes: 0 additions & 2 deletions common/dbutils/composite_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/stretchr/testify/assert"
)



func TestPlainParseStoragePrefix(t *testing.T) {
expectedAddr := common.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c")
expectedIncarnation := uint64(999000999)
Expand Down
64 changes: 64 additions & 0 deletions core/eth_backend.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package core

import (
"context"
"io"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/gointerfaces"
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/rlp"
)

Expand Down Expand Up @@ -34,3 +40,61 @@ func (back *EthBackend) Subscribe(func(*remote.SubscribeReply)) error {
// do nothing
return nil
}

type RemoteBackend struct {
remoteEthBackend remote.ETHBACKENDClient
log log.Logger
}

func NewRemoteBackend(kv ethdb.KV) *RemoteBackend {
return &RemoteBackend{
remoteEthBackend: remote.NewETHBACKENDClient(kv.(*ethdb.RemoteKV).GrpcConn()),
log: log.New("remote_db"),
}
}

func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) {
res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx})
if err != nil {
return common.Hash{}.Bytes(), err
}
return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil
}

func (back *RemoteBackend) Etherbase() (common.Address, error) {
res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{})
if err != nil {
return common.Address{}, err
}

return gointerfaces.ConvertH160toAddress(res.Address), nil
}

func (back *RemoteBackend) NetVersion() (uint64, error) {
res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{})
if err != nil {
return 0, err
}

return res.Id, nil
}

func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error {
subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{})
if err != nil {
return err
}
for {
event, err := subscription.Recv()
if err == io.EOF {
log.Info("rpcdaemon: the subscription channel was closed")
break
}
if err != nil {
return err
}

onNewEvent(event)
}
return nil
}
2 changes: 1 addition & 1 deletion ethdb/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func setupDatabases(f ethdb.BucketConfigsFunc) (writeDBs []ethdb.KV, readDBs []e

conn := bufconn.Listen(1024 * 1024)

rdb, _ := ethdb.NewRemote().InMem(conn).MustOpen()
rdb := ethdb.NewRemote().InMem(conn).MustOpen()
readDBs = []ethdb.KV{
writeDBs[0],
writeDBs[1],
Expand Down
84 changes: 13 additions & 71 deletions ethdb/kv_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"unsafe"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/common/dbutils"
"github.com/ledgerwatch/turbo-geth/gointerfaces"
"github.com/ledgerwatch/turbo-geth/gointerfaces/remote"
"github.com/ledgerwatch/turbo-geth/log"
"google.golang.org/grpc"
Expand Down Expand Up @@ -88,14 +86,7 @@ func (opts remoteOpts) InMem(listener *bufconn.Listener) remoteOpts {
return opts
}

type RemoteBackend struct {
opts remoteOpts
remoteEthBackend remote.ETHBACKENDClient
conn *grpc.ClientConn
log log.Logger
}

func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, error) {
func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, error) {
var dialOpts []grpc.DialOption
dialOpts = []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{Backoff: backoff.DefaultConfig, MinConnectTimeout: 10 * time.Minute}),
Expand All @@ -111,19 +102,19 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro
creds, err = credentials.NewClientTLSFromFile(certFile, "")

if err != nil {
return nil, nil, err
return nil, err
}
} else {
// load peer cert/key, ca cert
peerCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Error("load peer cert/key error:%v", err)
return nil, nil, err
return nil, err
}
caCert, err := ioutil.ReadFile(caCert)
if err != nil {
log.Error("read ca cert file error:%v", err)
return nil, nil, err
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
Expand All @@ -150,7 +141,7 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro

conn, err := grpc.DialContext(ctx, opts.DialAddress, dialOpts...)
if err != nil {
return nil, nil, err
return nil, err
}

db := &RemoteKV{
Expand All @@ -166,22 +157,15 @@ func (opts remoteOpts) Open(certFile, keyFile, caCert string) (KV, Backend, erro
db.buckets[name] = cfg
}

eth := &RemoteBackend{
opts: opts,
remoteEthBackend: remote.NewETHBACKENDClient(conn),
conn: conn,
log: log.New("remote_db", opts.DialAddress),
}

return db, eth, nil
return db, nil
}

func (opts remoteOpts) MustOpen() (KV, Backend) {
db, txPool, err := opts.Open("", "", "")
func (opts remoteOpts) MustOpen() KV {
db, err := opts.Open("", "", "")
if err != nil {
panic(err)
}
return db, txPool
return db
}

func NewRemote() remoteOpts {
Expand All @@ -192,6 +176,10 @@ func (db *RemoteKV) AllBuckets() dbutils.BucketsCfg {
return db.buckets
}

func (db *RemoteKV) GrpcConn() *grpc.ClientConn {
return db.conn
}

// Close
// All transactions must be closed before closing the database.
func (db *RemoteKV) Close() {
Expand Down Expand Up @@ -642,49 +630,3 @@ func (c *remoteCursorDupSort) LastDup() ([]byte, error) {
}
return c.lastDup()
}

func (back *RemoteBackend) AddLocal(signedTx []byte) ([]byte, error) {
res, err := back.remoteEthBackend.Add(context.Background(), &remote.TxRequest{Signedtx: signedTx})
if err != nil {
return common.Hash{}.Bytes(), err
}
return gointerfaces.ConvertH256ToHash(res.Hash).Bytes(), nil
}

func (back *RemoteBackend) Etherbase() (common.Address, error) {
res, err := back.remoteEthBackend.Etherbase(context.Background(), &remote.EtherbaseRequest{})
if err != nil {
return common.Address{}, err
}

return gointerfaces.ConvertH160toAddress(res.Address), nil
}

func (back *RemoteBackend) NetVersion() (uint64, error) {
res, err := back.remoteEthBackend.NetVersion(context.Background(), &remote.NetVersionRequest{})
if err != nil {
return 0, err
}

return res.Id, nil
}

func (back *RemoteBackend) Subscribe(onNewEvent func(*remote.SubscribeReply)) error {
subscription, err := back.remoteEthBackend.Subscribe(context.Background(), &remote.SubscribeRequest{})
if err != nil {
return err
}
for {
event, err := subscription.Recv()
if err == io.EOF {
log.Info("rpcdaemon: the subscription channel was closed")
break
}
if err != nil {
return err
}

onNewEvent(event)
}
return nil
}

0 comments on commit ba37706

Please sign in to comment.