Skip to content

Commit

Permalink
Handshake in tests, correct ENR updates, etc. (#1578)
Browse files Browse the repository at this point in the history
* Remove Blockchain dependency from forkID, fix ENR update

* Fix handshake_test

* Remove db access from handshake

* Undo

* Use StagedSync in test handlers

* Compile fix

* Debugging

* dependency fixes

* More info

* Print test name

* Increase timeout

* Disable checkpoint test

* Optimise RW message pipe

* Fix test

* Print handshake errors

* See where the pipe is closing

* Remove checkpoints

* Remove printouts

* Revert "Fix test"

This reverts commit d154e07.

* Revert "Optimise RW message pipe"

This reverts commit 6936111.

* Revert "Increase timeout"

This reverts commit 9dc0e23.

* Revert "See where the pipe is closing"

This reverts commit 3cf22af.

* Remove printing

* Relax peerEventCh

Co-authored-by: Alexey Sharp <alexeysharp@Alexeys-iMac.local>
  • Loading branch information
2 people authored and AskAlexSharov committed Apr 6, 2021
1 parent a045848 commit fa2fe13
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 140 deletions.
40 changes: 3 additions & 37 deletions core/forkid/forkid.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"strings"

"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/log"
"github.com/ledgerwatch/turbo-geth/params"
)
Expand All @@ -45,18 +44,6 @@ var (
ErrLocalIncompatibleOrStale = errors.New("local incompatible or needs update")
)

// Blockchain defines all necessary method to build a forkID.
type Blockchain interface {
// Config retrieves the chain's fork configuration.
Config() *params.ChainConfig

// Genesis retrieves the chain's genesis block.
Genesis() *types.Block

// CurrentHeader retrieves the current head header of the canonical chain.
CurrentHeader() *types.Header
}

// ID is a fork identifier as defined by EIP-2124.
type ID struct {
Hash [4]byte // CRC32 checksum of the genesis block and passed fork block numbers
Expand Down Expand Up @@ -85,15 +72,6 @@ func NewID(config *params.ChainConfig, genesis common.Hash, head uint64) ID {
return ID{Hash: checksumToBytes(hash), Next: next}
}

// NewIDWithChain calculates the Ethereum fork ID from an existing chain instance.
func NewIDWithChain(chain Blockchain) ID {
return NewID(
chain.Config(),
chain.Genesis().Hash(),
chain.CurrentHeader().Number.Uint64(),
)
}

func NewIDFromForks(forks []uint64, genesis common.Hash) ID {
// Calculate the starting checksum from the genesis hash
hash := crc32.ChecksumIEEE(genesis[:])
Expand All @@ -105,26 +83,14 @@ func NewIDFromForks(forks []uint64, genesis common.Hash) ID {
return ID{Hash: checksumToBytes(hash), Next: 0}
}

// NewFilter creates a filter that returns if a fork ID should be rejected or not
// based on the local chain's status.
func NewFilter(chain Blockchain) Filter {
return NewFilterAutofork(
chain.Config(),
chain.Genesis().Hash(),
chain.CurrentHeader().Number.Uint64(),
)
}

// NewFilterAutofork creates a filter that returns if a fork ID should be rejected or notI
// NewFilter creates a filter that returns if a fork ID should be rejected or notI
// based on the local chain's status.
func NewFilterAutofork(config *params.ChainConfig, genesis common.Hash, head uint64) Filter {
func NewFilter(config *params.ChainConfig, genesis common.Hash, head func() uint64) Filter {
forks := GatherForks(config)
return newFilter(
forks,
genesis,
func() uint64 {
return head
},
head,
)
}

Expand Down
3 changes: 1 addition & 2 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ func (hc *HeaderChain) GetCanonicalHash(number uint64) common.Hash {
return h
}

// CurrentHeader retrieves the current head header of the canonical chain. The
// header is retrieved from the HeaderChain's internal cache.
// CurrentHeader retrieves the current head header of the canonical chain.
func (hc *HeaderChain) CurrentHeader() *types.Header {
headHash := rawdb.ReadHeadHeaderHash(hc.chainDb)
headNumber := rawdb.ReadHeaderNumber(hc.chainDb, headHash)
Expand Down
23 changes: 15 additions & 8 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/ledgerwatch/turbo-geth/eth/gasprice"
"github.com/ledgerwatch/turbo-geth/eth/protocols/eth"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/ethdb/remote/remotedbserver"
"github.com/ledgerwatch/turbo-geth/event"
Expand Down Expand Up @@ -110,7 +111,10 @@ type Ethereum struct {

torrentClient *bittorrent.Client

lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
events *remotedbserver.Events
chainConfig *params.ChainConfig
genesisHash common.Hash
}

// New creates a new Ethereum object (including the
Expand Down Expand Up @@ -291,6 +295,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
bloomRequests: make(chan chan *bloombits.Retrieval),
p2pServer: stack.Server(),
torrentClient: torrentClient,
chainConfig: chainConfig,
genesisHash: genesisHash,
}
eth.gasPrice, _ = uint256.FromBig(config.Miner.GasPrice)

Expand Down Expand Up @@ -364,14 +370,14 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
stagedSync := config.StagedSync

// setting notifier to support streaming events to rpc daemon
remoteEvents := remotedbserver.NewEvents()
eth.events = remotedbserver.NewEvents()
if stagedSync == nil {
// if there is not stagedsync, we create one with the custom notifier
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: remoteEvents})
stagedSync = stagedsync.New(stagedsync.DefaultStages(), stagedsync.DefaultUnwindOrder(), stagedsync.OptionalParameters{Notifier: eth.events})
} else {
// otherwise we add one if needed
if stagedSync.Notifier == nil {
stagedSync.Notifier = remoteEvents
stagedSync.Notifier = eth.events
}
}

Expand Down Expand Up @@ -407,12 +413,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, remoteEvents)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, &creds, eth.events)
if err != nil {
return nil, err
}
} else {
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, remoteEvents)
eth.privateAPI, err = remotedbserver.StartGrpc(chainDb.(ethdb.HasKV).KV(), eth, stack.Config().PrivateApiAddr, nil, eth.events)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -768,14 +774,15 @@ func (s *Ethereum) ArchiveMode() bool { return !s.config.Pruning }
// Protocols returns all the currently configured
// network protocols to start.
func (s *Ethereum) Protocols() []p2p.Protocol {
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates)
headHeight, _ := stages.GetStageProgress(s.chainDb, stages.Finish)
protos := eth.MakeProtocols((*ethHandler)(s.handler), s.networkID, s.ethDialCandidates, s.chainConfig, s.genesisHash, headHeight)
return protos
}

// Start implements node.Lifecycle, starting all internal goroutines needed by the
// Ethereum protocol implementation.
func (s *Ethereum) Start() error {
eth.StartENRUpdater(s.blockchain, s.p2pServer.LocalNode())
eth.StartENRUpdater(s.chainConfig, s.genesisHash, s.events, s.p2pServer.LocalNode())

// Figure out a max peers count based on the server limits
maxPeers := s.p2pServer.MaxPeers
Expand Down
56 changes: 32 additions & 24 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eth

import (
"errors"
"fmt"
"math"
"math/big"
"sync"
Expand All @@ -28,11 +29,13 @@ import (
"github.com/ledgerwatch/turbo-geth/common"
"github.com/ledgerwatch/turbo-geth/core"
"github.com/ledgerwatch/turbo-geth/core/forkid"
"github.com/ledgerwatch/turbo-geth/core/rawdb"
"github.com/ledgerwatch/turbo-geth/core/types"
"github.com/ledgerwatch/turbo-geth/eth/downloader"
"github.com/ledgerwatch/turbo-geth/eth/fetcher"
"github.com/ledgerwatch/turbo-geth/eth/protocols/eth"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync"
"github.com/ledgerwatch/turbo-geth/eth/stagedsync/stages"
"github.com/ledgerwatch/turbo-geth/ethdb"
"github.com/ledgerwatch/turbo-geth/event"
"github.com/ledgerwatch/turbo-geth/log"
Expand Down Expand Up @@ -137,22 +140,25 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
config.EventMux = new(event.TypeMux) // Nicety initialization for tests
}
h := &handler{
networkID: config.Network,
forkFilter: forkid.NewFilterAutofork(config.Chain.Config(), config.Chain.Genesis().Hash(), config.Chain.CurrentHeader().Number.Uint64()),
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
networkID: config.Network,
eventMux: config.EventMux,
database: config.Database,
txpool: config.TxPool,
chain: config.Chain,
peers: newPeerSet(),
whitelist: config.Whitelist,
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
// If we have trusted checkpoints, enforce them on the chain
if config.Checkpoint != nil {
h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1
h.checkpointHash = config.Checkpoint.SectionHead
if headHeight, err := stages.GetStageProgress(config.Database, stages.Finish); err == nil {
h.currentHeight = headHeight
} else {
return nil, fmt.Errorf("could not get Finish stage progress: %v", err)
}
heighter := func() uint64 {
return atomic.LoadUint64(&h.currentHeight)
}
h.forkFilter = forkid.NewFilter(config.Chain.Config(), config.Chain.Genesis().Hash(), heighter)
// Construct the downloader (long sync) and its backing state bloom if fast
// sync is requested. The downloader is responsible for deallocating the state
// bloom when it's done.
Expand All @@ -168,9 +174,6 @@ func newHandler(config *handlerConfig) (*handler, error) { //nolint:unparam
validator := func(header *types.Header) error {
return h.chain.Engine().VerifyHeader(h.chain, header, true)
}
heighter := func() uint64 {
return atomic.LoadUint64(&h.currentHeight)
}
inserter := func(blocks types.Blocks) (int, error) {
if err == nil {
atomic.StoreUint32(&h.acceptTxs, 1) // Mark initial sync done on any fetcher import
Expand Down Expand Up @@ -217,7 +220,7 @@ func (h *handler) SetStagedSync(stagedSync *stagedsync.StagedSync) {
// various subsistems and starts handling messages.
func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// TODO(karalabe): Not sure why this is needed
if !h.chainSync.handlePeerEvent(peer) {
if !h.chainSync.handlePeerEvent() {
return p2p.DiscQuitting
}
h.peerWG.Add(1)
Expand All @@ -226,12 +229,17 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// Execute the Ethereum handshake
var (
genesis = h.chain.Genesis()
head = h.chain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = h.chain.GetTd(hash, number)
number = atomic.LoadUint64(&h.currentHeight)
)
forkID := forkid.NewID(h.chain.Config(), h.chain.Genesis().Hash(), h.chain.CurrentHeader().Number.Uint64())
hash, err := rawdb.ReadCanonicalHash(h.database, number)
if err != nil {
return fmt.Errorf("reading canonical hash for %d: %v", number, err)
}
td, err1 := rawdb.ReadTd(h.database, hash, number)
if err1 != nil {
return fmt.Errorf("reading td for %d %x: %v", number, hash, err1)
}
forkID := forkid.NewID(h.chain.Config(), genesis.Hash(), number)
if err := peer.Handshake(h.networkID, td, hash, genesis.Hash(), forkID, h.forkFilter); err != nil {
peer.Log().Debug("Ethereum handshake failed", "err", err)
return err
Expand Down Expand Up @@ -261,7 +269,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Error("Failed to register peer in eth syncer", "err", err)
return err
}
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()

// Propagate existing transactions. new transactions appearing
// after this will be sent via broadcasts.
Expand Down
2 changes: 1 addition & 1 deletion eth/handler_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *ethHandler) handleBlockBroadcast(peer *eth.Peer, block *types.Block, td
// Update the peer's total difficulty if better than the previous
if _, headNumber := peer.Head(); block.NumberU64() > headNumber {
peer.SetHead(trueHead, block.NumberU64())
h.chainSync.handlePeerEvent(peer)
h.chainSync.handlePeerEvent()
}
return nil
}
Loading

0 comments on commit fa2fe13

Please sign in to comment.