diff --git a/core/blockchain.go b/core/blockchain.go index 5bd2f9356145f..76c4b3cd3c407 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1641,12 +1641,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. for _, data := range bc.stateSyncData { bc.stateSyncFeed.Send(StateSyncEvent{Data: data}) } - - bc.chain2HeadFeed.Send(Chain2HeadEvent{ - Type: Chain2HeadCanonicalEvent, - NewChain: []*types.Block{block}, - }) - // BOR } } else { @@ -1749,11 +1743,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er defer func() { if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() { bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon}) - - bc.chain2HeadFeed.Send(Chain2HeadEvent{ - Type: Chain2HeadCanonicalEvent, - NewChain: []*types.Block{lastCanon}, - }) } }() // Start the parallel header verifier @@ -1853,6 +1842,22 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er } }() + // accumulator for canonical blocks + var canonAccum []*types.Block + + emitAccum := func() { + size := len(canonAccum) + if size == 0 || size > 5 { + // avoid reporting events for large sync events + return + } + bc.chain2HeadFeed.Send(Chain2HeadEvent{ + Type: Chain2HeadCanonicalEvent, + NewChain: canonAccum, + }) + canonAccum = canonAccum[:0] + } + for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() { // If the chain is terminating, stop processing blocks if bc.insertStopped() { @@ -1993,6 +1998,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits) blockInsertTimer.UpdateSince(start) + // BOR + if status == CanonStatTy { + canonAccum = append(canonAccum, block) + } else { + emitAccum() + } + // BOR + switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), @@ -2026,6 +2039,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er stats.report(chain, it.index, dirty) } + // BOR + emitAccum() + // BOR + // Any blocks remaining here? The only ones we care about are the future ones if block != nil && errors.Is(err, consensus.ErrFutureBlock) { if err := bc.addFutureBlock(block); err != nil { diff --git a/core/blockchain_bor_test.go b/core/blockchain_bor_test.go index 52f4f7cd83251..1b0373c41a8e8 100644 --- a/core/blockchain_bor_test.go +++ b/core/blockchain_bor_test.go @@ -80,7 +80,7 @@ func TestChain2HeadEvent(t *testing.T) { } for j := 0; j < len(ev.NewChain); j++ { if ev.NewChain[j].Hash() != expect.Added[j] { - t.Fatal("Newchain hashes Do Not Match") + t.Fatalf("Newchain hashes Do Not Match %s %s", ev.NewChain[j].Hash(), expect.Added[j]) } } case <-time.After(2 * time.Second): @@ -92,6 +92,8 @@ func TestChain2HeadEvent(t *testing.T) { readEvent(&eventTest{ Type: Chain2HeadCanonicalEvent, Added: []common.Hash{ + chain[0].Hash(), + chain[1].Hash(), chain[2].Hash(), }}) @@ -129,6 +131,8 @@ func TestChain2HeadEvent(t *testing.T) { readEvent(&eventTest{ Type: Chain2HeadCanonicalEvent, Added: []common.Hash{ + replacementBlocks[2].Hash(), replacementBlocks[3].Hash(), }}) + }