Skip to content

Commit

Permalink
Shivam/ethstats backend fix (ethereum#341)
Browse files Browse the repository at this point in the history
* Emit events for all blocks insertChain()

* small fix

* test-case-fix

* Fix tests

Co-authored-by: Ferran <ferranbt@protonmail.com>
  • Loading branch information
0xsharma and ferranbt committed Apr 20, 2022
1 parent 44603db commit 65f4d27
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
39 changes: 28 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1641,12 +1641,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
for _, data := range bc.stateSyncData {
bc.stateSyncFeed.Send(StateSyncEvent{Data: data})
}

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadCanonicalEvent,
NewChain: []*types.Block{block},
})

// BOR
}
} else {
Expand Down Expand Up @@ -1749,11 +1743,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
defer func() {
if lastCanon != nil && bc.CurrentBlock().Hash() == lastCanon.Hash() {
bc.chainHeadFeed.Send(ChainHeadEvent{lastCanon})

bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadCanonicalEvent,
NewChain: []*types.Block{lastCanon},
})
}
}()
// Start the parallel header verifier
Expand Down Expand Up @@ -1853,6 +1842,22 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
}
}()

// accumulator for canonical blocks
var canonAccum []*types.Block

emitAccum := func() {
size := len(canonAccum)
if size == 0 || size > 5 {
// avoid reporting events for large sync events
return
}
bc.chain2HeadFeed.Send(Chain2HeadEvent{
Type: Chain2HeadCanonicalEvent,
NewChain: canonAccum,
})
canonAccum = canonAccum[:0]
}

for ; block != nil && err == nil || err == ErrKnownBlock; block, err = it.next() {
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
Expand Down Expand Up @@ -1993,6 +1998,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
blockWriteTimer.Update(time.Since(substart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits)
blockInsertTimer.UpdateSince(start)

// BOR
if status == CanonStatTy {
canonAccum = append(canonAccum, block)
} else {
emitAccum()
}
// BOR

switch status {
case CanonStatTy:
log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(),
Expand Down Expand Up @@ -2026,6 +2039,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
stats.report(chain, it.index, dirty)
}

// BOR
emitAccum()
// BOR

// Any blocks remaining here? The only ones we care about are the future ones
if block != nil && errors.Is(err, consensus.ErrFutureBlock) {
if err := bc.addFutureBlock(block); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion core/blockchain_bor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestChain2HeadEvent(t *testing.T) {
}
for j := 0; j < len(ev.NewChain); j++ {
if ev.NewChain[j].Hash() != expect.Added[j] {
t.Fatal("Newchain hashes Do Not Match")
t.Fatalf("Newchain hashes Do Not Match %s %s", ev.NewChain[j].Hash(), expect.Added[j])
}
}
case <-time.After(2 * time.Second):
Expand All @@ -92,6 +92,8 @@ func TestChain2HeadEvent(t *testing.T) {
readEvent(&eventTest{
Type: Chain2HeadCanonicalEvent,
Added: []common.Hash{
chain[0].Hash(),
chain[1].Hash(),
chain[2].Hash(),
}})

Expand Down Expand Up @@ -129,6 +131,8 @@ func TestChain2HeadEvent(t *testing.T) {
readEvent(&eventTest{
Type: Chain2HeadCanonicalEvent,
Added: []common.Hash{
replacementBlocks[2].Hash(),
replacementBlocks[3].Hash(),
}})

}

0 comments on commit 65f4d27

Please sign in to comment.