diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index df3b6bdb6..c6ca9af01 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -138,7 +138,10 @@ type Downloader struct { receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks headerProcCh chan []*types.Header // [eth/62] Channel to feed the header processor new tasks - // for stateFetcher + // State sync + pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root + pivotLock sync.RWMutex // Lock protecting pivot header reads from updates + stateSyncStart chan *stateSync trackStateReq chan *stateReq stateCh chan dataPack // [eth/63] Channel receiving inbound node state data @@ -451,10 +454,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - latest, err := d.fetchHeight(p) + latest, pivot, err := d.fetchHead(p) if err != nil { return err } + if mode == FastSync && pivot == nil { + // If no pivot block was returned, the head is below the min full block + // threshold (i.e. new chian). In that case we won't really fast sync + // anyway, but still need a valid pivot block to avoid some code hitting + // nil panics on an access. + pivot = d.blockchain.CurrentBlock().Header() + } height := latest.Number.Uint64() origin, err := d.findAncestor(p, latest) @@ -469,22 +479,21 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.syncStatsLock.Unlock() // Ensure our origin point is below any fast sync pivot point - pivot := uint64(0) if mode == FastSync { if height <= uint64(fsMinFullBlocks) { origin = 0 } else { - pivot = height - uint64(fsMinFullBlocks) - if pivot <= origin { - origin = pivot - 1 + pivotNumber := pivot.Number.Uint64() + if pivotNumber <= origin { + origin = pivotNumber - 1 } // Write out the pivot into the database so a rollback beyond it will // reenable fast sync - rawdb.WriteLastPivotNumber(d.stateDB, pivot) + rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber) } } d.committed = 1 - if mode == FastSync && pivot != 0 { + if mode == FastSync && pivot.Number.Uint64() != 0 { d.committed = 0 } if mode == FastSync { @@ -530,13 +539,17 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.syncInitHook(origin, height) } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1, pivot) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync - func() error { return d.processHeaders(origin+1, pivot, td) }, + func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved + func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and fast sync + func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync + func() error { return d.processHeaders(origin+1, td) }, } if mode == FastSync { - fetchers = append(fetchers, func() error { return d.processFastSyncContent(latest) }) + d.pivotLock.Lock() + d.pivotHeader = pivot + d.pivotLock.Unlock() + + fetchers = append(fetchers, func() error { return d.processFastSyncContent() }) } else if mode == FullSync { fetchers = append(fetchers, d.processFullSyncContent) } @@ -617,22 +630,26 @@ func (d *Downloader) Terminate() { d.Cancel() } -// fetchHeight retrieves the head header of the remote peer to aid in estimating -// the total time a pending synchronisation would take. -func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { - p.log.Debug("Retrieving remote chain height") +// fetchHead retrieves the head header and prior pivot block (if available) from +// a remote peer. +func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) { + p.log.Debug("Retrieving remote chain head") + mode := d.getMode() // Request the advertised remote head block and wait for the response - head, _ := p.peer.Head() - go p.peer.RequestHeadersByHash(head, 1, 0, false) + latest, _ := p.peer.Head() + fetch := 1 + if mode == FastSync { + fetch = 2 // head + pivot headers + } + go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true) ttl := d.requestTTL() timeout := time.After(ttl) - mode := d.getMode() for { select { case <-d.cancelCh: - return nil, errCanceled + return nil, nil, errCanceled case packet := <-d.headerCh: // Discard anything not from the origin peer @@ -640,23 +657,36 @@ func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } - // Make sure the peer actually gave something valid + // Make sure the peer gave us at least one and at most the requested headers headers := packet.(*headerPack).headers - if len(headers) != 1 { - p.log.Warn("Multiple headers for single request", "headers", len(headers)) - return nil, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) + if len(headers) == 0 || len(headers) > fetch { + return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch) } + // The first header needs to be the head, validate against the checkpoint + // and request. If only 1 header was returned, make sure there's no pivot + // or there was not one requested. head := headers[0] if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint { - p.log.Warn("Remote head below checkpoint", "number", head.Number, "hash", head.Hash()) - return nil, errUnsyncedPeer + return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint) + } + if len(headers) == 1 { + if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) { + return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer) + } + p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash()) + return head, nil, nil + } + // At this point we have 2 headers in total and the first is the + // validated head of the chian. Check the pivot number and return, + pivot := headers[1] + if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) { + return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks)) } - p.log.Debug("Remote head header identified", "number", head.Number, "hash", head.Hash()) - return head, nil + return head, pivot, nil case <-timeout: p.log.Debug("Waiting for head header timed out", "elapsed", ttl) - return nil, errTimeout + return nil, nil, errTimeout case <-d.bodyCh: case <-d.receiptCh: @@ -871,14 +901,14 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) case <-d.cancelCh: return 0, errCanceled - case packer := <-d.headerCh: + case packet := <-d.headerCh: // Discard anything not from the origin peer - if packer.PeerId() != p.id { - log.Debug("Received headers from incorrect peer", "peer", packer.PeerId()) + if packet.PeerId() != p.id { + log.Debug("Received headers from incorrect peer", "peer", packet.PeerId()) break } // Make sure the peer actually gave something valid - headers := packer.(*headerPack).headers + headers := packet.(*headerPack).headers if len(headers) != 1 { p.log.Warn("Multiple headers for single request", "headers", len(headers)) return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers)) @@ -937,12 +967,13 @@ func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") // Create a timeout timer, and the associated header fetcher skeleton := true // Skeleton assembly phase or finishing up + pivoting := false // Whether the next request is pivot verification request := time.Now() // time of the last skeleton fetch request timeout := time.NewTimer(0) // timer to dump a non-responsive active peer <-timeout.C // timeout channel should be initially empty @@ -963,6 +994,20 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } + getNextPivot := func() { + pivoting = true + request = time.Now() + + ttl = d.requestTTL() + timeout.Reset(ttl) + + d.pivotLock.RLock() + pivot := d.pivotHeader.Number.Uint64() + d.pivotLock.RUnlock() + + p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) + go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep + } // Start pulling the header chain skeleton until all is done ancestor := from getHeaders(from) @@ -982,8 +1027,46 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) headerReqTimer.UpdateSince(request) timeout.Stop() + // If the pivot is being checked, move if it became stale and run the real retrieval + var pivot uint64 + + d.pivotLock.RLock() + if d.pivotHeader != nil { + pivot = d.pivotHeader.Number.Uint64() + } + d.pivotLock.RUnlock() + + if pivoting { + if packet.Items() == 2 { + // Retrieve the headers and do some sanity checks, just in case + headers := packet.(*headerPack).headers + + if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want { + log.Warn("Peer sent invalid next pivot", "have", have, "want", want) + return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want) + } + if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want { + log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want) + return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want) + } + log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number) + pivot = headers[0].Number.Uint64() + + d.pivotLock.Lock() + d.pivotHeader = headers[0] + d.pivotLock.Unlock() + + // Write out the pivot into the database so a rollback beyond + // it will reenable fast sync and update the state root that + // the state syncer will be downloading. + rawdb.WriteLastPivotNumber(d.stateDB, pivot) + } + pivoting = false + getHeaders(from) + continue + } // If the skeleton's finished, pull any remaining head headers directly from the origin - if packet.Items() == 0 && skeleton { + if skeleton && packet.Items() == 0 { skeleton = false getHeaders(from) continue @@ -1061,7 +1144,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, pivot uint64) return errCanceled } from += uint64(len(headers)) - getHeaders(from) + + // If we're still skeleton filling fast sync, check pivot staleness + // before continuing to the next skeleton filling + if skeleton && pivot > 0 { + getNextPivot() + } else { + getHeaders(from) + } } else { // No headers delivered, or all of them being delayed, sleep a bit and retry p.log.Trace("All headers delayed, waiting") @@ -1390,7 +1480,7 @@ func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) error { +func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // Keep a count of uncertain headers to roll back var ( rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) @@ -1493,6 +1583,14 @@ func (d *Downloader) processHeaders(origin uint64, pivot uint64, td *big.Int) er // In case of header only syncing, validate the chunk immediately if mode == FastSync || mode == LightSync { // If we're importing pure headers, verify based on their recentness + var pivot uint64 + + d.pivotLock.RLock() + if d.pivotHeader != nil { + pivot = d.pivotHeader.Number.Uint64() + } + d.pivotLock.RUnlock() + frequency := fsHeaderCheckFrequency if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 @@ -1609,10 +1707,13 @@ func (d *Downloader) importBlockResults(results []*fetchResult) error { // processFastSyncContent takes fetch results from the queue and writes them to the // database. It also controls the synchronisation of state nodes of the pivot block. -func (d *Downloader) processFastSyncContent(latest *types.Header) error { +func (d *Downloader) processFastSyncContent() error { // Start syncing state of the reported head block. This should get us most of // the state of the pivot block. - sync := d.syncState(latest.Root) + d.pivotLock.RLock() + sync := d.syncState(d.pivotHeader.Root) + d.pivotLock.RUnlock() + defer func() { // The `sync` object is replaced every time the pivot moves. We need to // defer close the very last active one, hence the lazy evaluation vs. @@ -1627,12 +1728,6 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { } go closeOnErr(sync) - // Figure out the ideal pivot block. Note, that this goalpost may move if the - // sync takes long enough for the chain head to move significantly. - pivot := uint64(0) - if height := latest.Number.Uint64(); height > uint64(fsMinFullBlocks) { - pivot = height - uint64(fsMinFullBlocks) - } // To cater for moving pivot points, track the pivot block and subsequently // accumulated download results separately. var ( @@ -1659,22 +1754,46 @@ func (d *Downloader) processFastSyncContent(latest *types.Header) error { if d.chainInsertHook != nil { d.chainInsertHook(results) } - if oldPivot != nil { + // If we haven't downloaded the pivot block yet, check pivot staleness + // notifications from the header downloader + d.pivotLock.RLock() + pivot := d.pivotHeader + d.pivotLock.RUnlock() + + if oldPivot == nil { + if pivot.Root != sync.root { + sync.Cancel() + sync = d.syncState(pivot.Root) + + go closeOnErr(sync) + } + } else { results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) } // Split around the pivot block and process the two sides via fast/full sync if atomic.LoadInt32(&d.committed) == 0 { - latest = results[len(results)-1].Header - if height := latest.Number.Uint64(); height > pivot+2*uint64(fsMinFullBlocks) { - log.Warn("Pivot became stale, moving", "old", pivot, "new", height-uint64(fsMinFullBlocks)) - pivot = height - uint64(fsMinFullBlocks) + latest := results[len(results)-1].Header + // If the height is above the pivot block by 2 sets, it means the pivot + // become stale in the network and it was garbage collected, move to a + // new pivot. + // + // Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those + // need to be taken into account, otherwise we're detecting the pivot move + // late and will drop peers due to unavailable state!!! + if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) { + log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay)) + pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted + + d.pivotLock.Lock() + d.pivotHeader = pivot + d.pivotLock.Unlock() // Write out the pivot into the database so a rollback beyond it will // reenable fast sync - rawdb.WriteLastPivotNumber(d.stateDB, pivot) + rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64()) } } - P, beforeP, afterP := splitAroundPivot(pivot, results) + P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results) if err := d.commitFastSyncData(beforeP, sync); err != nil { return err } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 5400049cc..4bf1e4d46 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -427,11 +427,7 @@ func (dlp *downloadTesterPeer) Head() (common.Hash, *big.Int) { // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { - if reverse { - panic("reverse header requests not supported") - } - - result := dlp.chain.headersByHash(origin, amount, skip) + result := dlp.chain.headersByHash(origin, amount, skip, reverse) go dlp.dl.downloader.DeliverHeaders(dlp.id, result) return nil } @@ -440,11 +436,7 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i // origin; associated with a particular peer in the download tester. The returned // function can be used to retrieve batches of headers from the particular peer. func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { - if reverse { - panic("reverse header requests not supported") - } - - result := dlp.chain.headersByNumber(origin, amount, skip) + result := dlp.chain.headersByNumber(origin, amount, skip, reverse) go dlp.dl.downloader.DeliverHeaders(dlp.id, result) return nil } @@ -1698,7 +1690,7 @@ func testCheckpointEnforcement(t *testing.T, protocol int, mode SyncMode) { if mode == FastSync || mode == LightSync { expect = errUnsyncedPeer } - if err := tester.sync("peer", nil, mode); err != expect { + if err := tester.sync("peer", nil, mode); !errors.Is(err, expect) { t.Fatalf("block sync error mismatch: have %v, want %v", err, expect) } if mode == FastSync || mode == LightSync { diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 66376502c..2d7b4d1f1 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -170,18 +170,27 @@ func (tc *testChain) td(hash common.Hash) *big.Int { return tc.tdm[hash] } -// headersByHash returns headers in ascending order from the given hash. -func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int) []*types.Header { +// headersByHash returns headers in order from the given hash. +func (tc *testChain) headersByHash(origin common.Hash, amount int, skip int, reverse bool) []*types.Header { num, _ := tc.hashToNumber(origin) - return tc.headersByNumber(num, amount, skip) + return tc.headersByNumber(num, amount, skip, reverse) } -// headersByNumber returns headers in ascending order from the given number. -func (tc *testChain) headersByNumber(origin uint64, amount int, skip int) []*types.Header { +// headersByNumber returns headers from the given number. +func (tc *testChain) headersByNumber(origin uint64, amount int, skip int, reverse bool) []*types.Header { result := make([]*types.Header, 0, amount) - for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 { - if header, ok := tc.headerm[tc.chain[int(num)]]; ok { - result = append(result, header) + + if !reverse { + for num := origin; num < uint64(len(tc.chain)) && len(result) < amount; num += uint64(skip) + 1 { + if header, ok := tc.headerm[tc.chain[int(num)]]; ok { + result = append(result, header) + } + } + } else { + for num := int64(origin); num >= 0 && len(result) < amount; num -= int64(skip) + 1 { + if header, ok := tc.headerm[tc.chain[int(num)]]; ok { + result = append(result, header) + } } } return result