Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
les, light: remove untrusted header retrieval in ODR (#21907)
* les, light: remove untrusted header retrieval in ODR

* les: polish

* light: check the hash equality in odr
  • Loading branch information
rjl493456442 committed Dec 10, 2020
1 parent 817a3fb commit 9f6bb49
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 82 deletions.
62 changes: 54 additions & 8 deletions les/client_handler.go
Expand Up @@ -17,6 +17,7 @@
package les

import (
"context"
"math/big"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -200,14 +201,23 @@ func (h *clientHandler) handleMsg(p *serverPeer) error {
p.fcServer.ReceivedReply(resp.ReqID, resp.BV)
p.answeredRequest(resp.ReqID)

// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
}
if len(headers) != 0 || !filter {
if err := h.downloader.DeliverHeaders(p.id, headers); err != nil {
log.Debug("Failed to deliver headers", "err", err)
// Filter out the explicitly requested header by the retriever
if h.backend.retriever.requested(resp.ReqID) {
deliverMsg = &Msg{
MsgType: MsgBlockHeaders,
ReqID: resp.ReqID,
Obj: resp.Headers,
}
} else {
// Filter out any explicitly requested headers, deliver the rest to the downloader
filter := len(headers) == 1
if filter {
headers = h.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers)
}
if len(headers) != 0 || !filter {
if err := h.downloader.DeliverHeaders(p.id, headers); err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
}
case BlockBodiesMsg:
Expand Down Expand Up @@ -394,6 +404,42 @@ func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip
return nil
}

// RetrieveSingleHeaderByNumber requests a single header by the specified block
// number. This function will wait the response until it's timeout or delivered.
func (pc *peerConnection) RetrieveSingleHeaderByNumber(context context.Context, number uint64) (*types.Header, error) {
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*serverPeer)
return peer.getRequestCost(GetBlockHeadersMsg, 1)
},
canSend: func(dp distPeer) bool {
return dp.(*serverPeer) == pc.peer
},
request: func(dp distPeer) func() {
peer := dp.(*serverPeer)
cost := peer.getRequestCost(GetBlockHeadersMsg, 1)
peer.fcServer.QueuedRequest(reqID, cost)
return func() { peer.requestHeadersByNumber(reqID, number, 1, 0, false) }
},
}
var header *types.Header
if err := pc.handler.backend.retriever.retrieve(context, reqID, rq, func(peer distPeer, msg *Msg) error {
if msg.MsgType != MsgBlockHeaders {
return errInvalidMessageType
}
headers := msg.Obj.([]*types.Header)
if len(headers) != 1 {
return errInvalidEntryCount
}
header = headers[0]
return nil
}, nil); err != nil {
return nil, err
}
return header, nil
}

// downloaderPeerNotify implements peerSetNotify
type downloaderPeerNotify clientHandler

Expand Down
22 changes: 13 additions & 9 deletions les/odr.go
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
)

// LesOdr implements light.OdrBackend
Expand Down Expand Up @@ -83,7 +82,8 @@ func (odr *LesOdr) IndexerConfig() *light.IndexerConfig {
}

const (
MsgBlockBodies = iota
MsgBlockHeaders = iota
MsgBlockBodies
MsgCode
MsgReceipts
MsgProofsV2
Expand Down Expand Up @@ -122,13 +122,17 @@ func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err erro
return func() { lreq.Request(reqID, p) }
},
}
sent := mclock.Now()
if err = odr.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(odr.db, msg) }, odr.stop); err == nil {
// retrieved from network, store in db
req.StoreResult(odr.db)

defer func(sent mclock.AbsTime) {
if err != nil {
return
}
requestRTT.Update(time.Duration(mclock.Now() - sent))
} else {
log.Debug("Failed to retrieve data from network", "err", err)
}(mclock.Now())

if err := odr.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(odr.db, msg) }, odr.stop); err != nil {
return err
}
return
req.StoreResult(odr.db)
return nil
}
52 changes: 22 additions & 30 deletions les/odr_requests.go
Expand Up @@ -327,9 +327,6 @@ func (r *ChtRequest) CanSend(peer *serverPeer) bool {
peer.lock.RLock()
defer peer.lock.RUnlock()

if r.Untrusted {
return peer.headInfo.Number >= r.BlockNum && peer.id == r.PeerId
}
return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize
}

Expand Down Expand Up @@ -369,39 +366,34 @@ func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error {
if err := rlp.DecodeBytes(headerEnc, header); err != nil {
return errHeaderUnavailable
}

// Verify the CHT
// Note: For untrusted CHT request, there is no proof response but
// header data.
var node light.ChtNode
if !r.Untrusted {
var encNumber [8]byte
binary.BigEndian.PutUint64(encNumber[:], r.BlockNum)

reads := &readTraceDB{db: nodeSet}
value, err := trie.VerifyProof(r.ChtRoot, encNumber[:], reads)
if err != nil {
return fmt.Errorf("merkle proof verification failed: %v", err)
}
if len(reads.reads) != nodeSet.KeyCount() {
return errUselessNodes
}
var (
node light.ChtNode
encNumber [8]byte
)
binary.BigEndian.PutUint64(encNumber[:], r.BlockNum)

if err := rlp.DecodeBytes(value, &node); err != nil {
return err
}
if node.Hash != header.Hash() {
return errCHTHashMismatch
}
if r.BlockNum != header.Number.Uint64() {
return errCHTNumberMismatch
}
reads := &readTraceDB{db: nodeSet}
value, err := trie.VerifyProof(r.ChtRoot, encNumber[:], reads)
if err != nil {
return fmt.Errorf("merkle proof verification failed: %v", err)
}
if len(reads.reads) != nodeSet.KeyCount() {
return errUselessNodes
}
if err := rlp.DecodeBytes(value, &node); err != nil {
return err
}
if node.Hash != header.Hash() {
return errCHTHashMismatch
}
if r.BlockNum != header.Number.Uint64() {
return errCHTNumberMismatch
}
// Verifications passed, store and return
r.Header = header
r.Proof = nodeSet
r.Td = node.Td // For untrusted request, td here is nil, todo improve the les/2 protocol

r.Td = node.Td
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions les/retrieve.go
Expand Up @@ -155,6 +155,15 @@ func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc
return r
}

// requested reports whether the request with given reqid is sent by the retriever.
func (rm *retrieveManager) requested(reqId uint64) bool {
rm.lock.RLock()
defer rm.lock.RUnlock()

_, ok := rm.sentReqs[reqId]
return ok
}

// deliver is called by the LES protocol manager to deliver reply messages to waiting requests
func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
rm.lock.RLock()
Expand Down
6 changes: 3 additions & 3 deletions les/sync.go
Expand Up @@ -56,8 +56,8 @@ func (h *clientHandler) validateCheckpoint(peer *serverPeer) error {
defer cancel()

// Fetch the block header corresponding to the checkpoint registration.
cp := peer.checkpoint
header, err := light.GetUntrustedHeaderByNumber(ctx, h.backend.odr, peer.checkpointNumber, peer.id)
wrapPeer := &peerConnection{handler: h, peer: peer}
header, err := wrapPeer.RetrieveSingleHeaderByNumber(ctx, peer.checkpointNumber)
if err != nil {
return err
}
Expand All @@ -66,7 +66,7 @@ func (h *clientHandler) validateCheckpoint(peer *serverPeer) error {
if err != nil {
return err
}
events := h.backend.oracle.Contract().LookupCheckpointEvents(logs, cp.SectionIndex, cp.Hash())
events := h.backend.oracle.Contract().LookupCheckpointEvents(logs, peer.checkpoint.SectionIndex, peer.checkpoint.Hash())
if len(events) == 0 {
return errInvalidCheckpoint
}
Expand Down
2 changes: 1 addition & 1 deletion les/sync_test.go
Expand Up @@ -53,7 +53,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
time.Sleep(10 * time.Millisecond)
}
}
// Generate 512+4 blocks (totally 1 CHT sections)
// Generate 128+1 blocks (totally 1 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true)
defer tearDown()

Expand Down
11 changes: 3 additions & 8 deletions light/odr.go
Expand Up @@ -135,8 +135,6 @@ func (req *ReceiptsRequest) StoreResult(db ethdb.Database) {

// ChtRequest is the ODR request type for retrieving header by Canonical Hash Trie
type ChtRequest struct {
Untrusted bool // Indicator whether the result retrieved is trusted or not
PeerId string // The specified peer id from which to retrieve data.
Config *IndexerConfig
ChtNum, BlockNum uint64
ChtRoot common.Hash
Expand All @@ -148,12 +146,9 @@ type ChtRequest struct {
// StoreResult stores the retrieved data in local database
func (req *ChtRequest) StoreResult(db ethdb.Database) {
hash, num := req.Header.Hash(), req.Header.Number.Uint64()

if !req.Untrusted {
rawdb.WriteHeader(db, req.Header)
rawdb.WriteTd(db, hash, num, req.Td)
rawdb.WriteCanonicalHash(db, hash, num)
}
rawdb.WriteHeader(db, req.Header)
rawdb.WriteTd(db, hash, num, req.Td)
rawdb.WriteCanonicalHash(db, hash, num)
}

// BloomRequest is the ODR request type for retrieving bloom filters from a CHT structure
Expand Down
39 changes: 16 additions & 23 deletions light/odr_util.go
Expand Up @@ -19,20 +19,23 @@ package light
import (
"bytes"
"context"
"errors"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
)

var sha3Nil = crypto.Keccak256Hash(nil)
// errNonCanonicalHash is returned if the requested chain data doesn't belong
// to the canonical chain. ODR can only retrieve the canonical chain data covered
// by the CHT or Bloom trie for verification.
var errNonCanonicalHash = errors.New("hash is not currently canonical")

// GetHeaderByNumber retrieves the canonical block header corresponding to the
// given number.
// given number. The returned header is proven by local CHT.
func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*types.Header, error) {
// Try to find it in the local database first.
db := odr.Database()
Expand Down Expand Up @@ -63,25 +66,6 @@ func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*typ
return r.Header, nil
}

// GetUntrustedHeaderByNumber retrieves specified block header without
// correctness checking. Note this function should only be used in light
// client checkpoint syncing.
func GetUntrustedHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64, peerId string) (*types.Header, error) {
// todo(rjl493456442) it's a hack to retrieve headers which is not covered
// by CHT. Fix it in LES4
r := &ChtRequest{
BlockNum: number,
ChtNum: number / odr.IndexerConfig().ChtSize,
Untrusted: true,
PeerId: peerId,
Config: odr.IndexerConfig(),
}
if err := odr.Retrieve(ctx, r); err != nil {
return nil, err
}
return r.Header, nil
}

// GetCanonicalHash retrieves the canonical block hash corresponding to the number.
func GetCanonicalHash(ctx context.Context, odr OdrBackend, number uint64) (common.Hash, error) {
hash := rawdb.ReadCanonicalHash(odr.Database(), number)
Expand All @@ -102,10 +86,13 @@ func GetTd(ctx context.Context, odr OdrBackend, hash common.Hash, number uint64)
if td != nil {
return td, nil
}
_, err := GetHeaderByNumber(ctx, odr, number)
header, err := GetHeaderByNumber(ctx, odr, number)
if err != nil {
return nil, err
}
if header.Hash() != hash {
return nil, errNonCanonicalHash
}
// <hash, number> -> td mapping already be stored in db, get it.
return rawdb.ReadTd(odr.Database(), hash, number), nil
}
Expand All @@ -120,6 +107,9 @@ func GetBodyRLP(ctx context.Context, odr OdrBackend, hash common.Hash, number ui
if err != nil {
return nil, errNoHeader
}
if header.Hash() != hash {
return nil, errNonCanonicalHash
}
r := &BlockRequest{Hash: hash, Number: number, Header: header}
if err := odr.Retrieve(ctx, r); err != nil {
return nil, err
Expand Down Expand Up @@ -167,6 +157,9 @@ func GetBlockReceipts(ctx context.Context, odr OdrBackend, hash common.Hash, num
if err != nil {
return nil, errNoHeader
}
if header.Hash() != hash {
return nil, errNonCanonicalHash
}
r := &ReceiptsRequest{Hash: hash, Number: number, Header: header}
if err := odr.Retrieve(ctx, r); err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions light/trie.go
Expand Up @@ -30,6 +30,10 @@ import (
"github.com/ethereum/go-ethereum/trie"
)

var (
sha3Nil = crypto.Keccak256Hash(nil)
)

func NewState(ctx context.Context, head *types.Header, odr OdrBackend) *state.StateDB {
state, _ := state.New(head.Root, NewStateDatabase(ctx, head, odr), nil)
return state
Expand Down

0 comments on commit 9f6bb49

Please sign in to comment.