Skip to content

Commit

Permalink
trace enhancement (#79)
Browse files Browse the repository at this point in the history
* draft

* add round

* 100 pins limitation

* sort go imports

* refine import

* get rid of func (Mempool) LastBlockGasUsed() uint64
  • Loading branch information
zhongqiuwood committed Oct 8, 2021
1 parent aa8a44c commit c4e6ab1
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 90 deletions.
2 changes: 1 addition & 1 deletion blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ FOR_LOOP:
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
var err error
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first)
state, _, err = bcR.blockExec.ApplyBlock(state, firstID, first) // rpc
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
Expand Down
56 changes: 41 additions & 15 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/tendermint/tendermint/trace"

"github.com/pkg/errors"

"github.com/tendermint/tendermint/crypto"
Expand Down Expand Up @@ -140,6 +142,8 @@ type State struct {

// for reporting metrics
metrics *Metrics

trc *trace.Tracer
}

// StateOption sets an optional parameter on the State.
Expand Down Expand Up @@ -170,6 +174,7 @@ func NewState(
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
trc: trace.NewTracer(),
}
// set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal
Expand Down Expand Up @@ -772,6 +777,10 @@ func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
case cstypes.RoundStepNewHeight:
// NewRound event fired from enterNewRound.
// XXX: should we fire timeout here (for timeout commit)?
trace.GetElapsedInfo().AddInfo(trace.Produce, cs.trc.Format())
trace.GetElapsedInfo().Dump(cs.Logger.With("module", "main"))

cs.trc.Reset()
cs.enterNewRound(ti.Height, 0)
case cstypes.RoundStepNewRound:
cs.enterPropose(ti.Height, 0)
Expand Down Expand Up @@ -839,7 +848,9 @@ func (cs *State) enterNewRound(height int64, round int) {
return
}

track.set(height, cstypes.RoundStepNewRound, true)
cs.trc.Pin("NewRound-%d", round)

track.setTrace(height, cstypes.RoundStepNewRound, true)
if now := tmtime.Now(); cs.StartTime.After(now) {
logger.Info("Need to set a buffer and log message here for sanity.", "startTime", cs.StartTime, "now", now)
}
Expand Down Expand Up @@ -919,11 +930,13 @@ func (cs *State) enterPropose(height int64, round int) {
cs.Step))
return
}
cs.trc.Pin("Propose-%d", round)

logger.Info(fmt.Sprintf("enterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))

track.set(height, cstypes.RoundStepNewRound, false)
track.setTrace(height, cstypes.RoundStepNewRound, false)
cs.calcProcessingTime(height, cstypes.RoundStepNewRound)
track.set(height, cstypes.RoundStepPropose, true)
track.setTrace(height, cstypes.RoundStepPropose, true)
defer func() {
// Done enterPropose:
cs.updateRoundStep(round, cstypes.RoundStepPropose)
Expand Down Expand Up @@ -1090,11 +1103,12 @@ func (cs *State) enterPrevote(height int64, round int) {
cs.Step))
return
}
cs.trc.Pin("Prevote-%d", round)

track.set(height, cstypes.RoundStepPropose, false)
track.setTrace(height, cstypes.RoundStepPropose, false)
cs.calcProcessingTime(height, cstypes.RoundStepPropose)

track.set(height, cstypes.RoundStepPrevote, true)
track.setTrace(height, cstypes.RoundStepPrevote, true)
defer func() {
// Done enterPrevote:
cs.updateRoundStep(round, cstypes.RoundStepPrevote)
Expand Down Expand Up @@ -1157,7 +1171,9 @@ func (cs *State) enterPrevoteWait(height int64, round int) {
cs.Step))
return
}
track.set(height, cstypes.RoundStepPrevoteWait, true)
cs.trc.Pin("PrevoteWait-%d", round)

track.setTrace(height, cstypes.RoundStepPrevoteWait, true)
if !cs.Votes.Prevotes(round).HasTwoThirdsAny() {
panic(fmt.Sprintf("enterPrevoteWait(%v/%v), but Prevotes does not have any +2/3 votes", height, round))
}
Expand Down Expand Up @@ -1193,12 +1209,14 @@ func (cs *State) enterPrecommit(height int64, round int) {
return
}

cs.trc.Pin("Precommit-%d", round)

logger.Info(fmt.Sprintf("enterPrecommit(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))

track.set(height, cstypes.RoundStepPrevote, false)
track.setTrace(height, cstypes.RoundStepPrevote, false)
cs.calcProcessingTime(height, cstypes.RoundStepPrevote)

track.set(height, cstypes.RoundStepPrecommit, true)
track.setTrace(height, cstypes.RoundStepPrecommit, true)
defer func() {
// Done enterPrecommit:
cs.updateRoundStep(round, cstypes.RoundStepPrecommit)
Expand Down Expand Up @@ -1296,7 +1314,9 @@ func (cs *State) enterPrecommitWait(height int64, round int) {
height, round, cs.Height, cs.Round, cs.TriggeredTimeoutPrecommit))
return
}
track.set(height, cstypes.RoundStepPrecommitWait, true)
cs.trc.Pin("PrecommitWait-%d", round)

track.setTrace(height, cstypes.RoundStepPrecommitWait, true)
if !cs.Votes.Precommits(round).HasTwoThirdsAny() {
panic(fmt.Sprintf("enterPrecommitWait(%v/%v), but Precommits does not have any +2/3 votes", height, round))
}
Expand Down Expand Up @@ -1327,12 +1347,14 @@ func (cs *State) enterCommit(height int64, commitRound int) {
cs.Step))
return
}
cs.trc.Pin("%s-%d-%d", trace.RunTx, cs.Round, commitRound)

logger.Info(fmt.Sprintf("enterCommit(%v/%v). Current: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step))

track.set(height, cstypes.RoundStepPrecommit, false)
track.setTrace(height, cstypes.RoundStepPrecommit, false)
cs.calcProcessingTime(height, cstypes.RoundStepPrecommit)

track.set(height, cstypes.RoundStepCommit, true)
track.setTrace(height, cstypes.RoundStepCommit, true)
defer func() {
// Done enterCommit:
// keep cs.Round the same, commitRound points to the right Precommits set.
Expand Down Expand Up @@ -1488,10 +1510,6 @@ func (cs *State) finalizeCommit(height int64) {
// Execute and commit the block, update and save the state, and update the mempool.
// NOTE The block.AppHash wont reflect these txs until the next block.

track.set(height, cstypes.RoundStepCommit, false)
cs.calcProcessingTime(height, cstypes.RoundStepCommit)
track.display(height)

var err error
var retainHeight int64
stateCopy, retainHeight, err = cs.blockExec.ApplyBlock(
Expand All @@ -1507,6 +1525,10 @@ func (cs *State) finalizeCommit(height int64) {
return
}

track.setTrace(height, cstypes.RoundStepCommit, false)
cs.calcProcessingTime(height, cstypes.RoundStepCommit)
track.display(height)

fail.Fail() // XXX

// Prune old heights, if requested by ABCI app.
Expand All @@ -1522,6 +1544,9 @@ func (cs *State) finalizeCommit(height int64) {
// must be called before we update state
cs.recordMetrics(height, block)

trace.GetElapsedInfo().AddInfo(trace.CommitRound, fmt.Sprintf("%d", cs.CommitRound))
trace.GetElapsedInfo().AddInfo(trace.Round, fmt.Sprintf("%d", cs.Round))

// NewHeightStep!
cs.updateToState(stateCopy)

Expand All @@ -1532,6 +1557,7 @@ func (cs *State) finalizeCommit(height int64) {
cs.Logger.Error("Can't get private validator pubkey", "err", err)
}

cs.trc.Pin("Waiting")
// cs.StartTime is already set.
// Schedule Round0 to start soon.
cs.scheduleRound0(&cs.RoundState)
Expand Down
3 changes: 2 additions & 1 deletion consensus/state_exchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (c *consensusTrack) setIsProposer(height int64, bTurn bool) {
}
}

func (c *consensusTrack) set(height int64, r cstypes.RoundStepType, begin bool) {
func (c *consensusTrack) setTrace(height int64, r cstypes.RoundStepType, begin bool) {

if !c.trackSwitch {
return
}
Expand Down
8 changes: 3 additions & 5 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/trace"
"github.com/tendermint/tendermint/types"
)

Expand Down Expand Up @@ -88,7 +89,6 @@ type CListMempool struct {
pendingPoolNotify chan map[string]uint64

txInfoparser TxInfoParser
lastBlockGasConsumed uint64
}

var _ Mempool = &CListMempool{}
Expand Down Expand Up @@ -889,7 +889,7 @@ func (mem *CListMempool) Update(
}
}
mem.metrics.GasUsed.Set(float64(gasUsed))
mem.lastBlockGasConsumed = gasUsed
trace.GetElapsedInfo().AddInfo(trace.GasUsed, fmt.Sprintf("%d", gasUsed))

for accAddr, accMaxNonce := range toCleanAccMap {
if txsRecord, ok := mem.addressRecord[accAddr]; ok {
Expand Down Expand Up @@ -1167,9 +1167,7 @@ func (mem *CListMempool) SetAccountRetriever(retriever AccountRetriever) {
func (mem *CListMempool) SetTxInfoParser(parser TxInfoParser) {
mem.txInfoparser = parser
}
func (mem *CListMempool) LastBlockGasUsed() uint64 {
return mem.lastBlockGasConsumed
}

func (mem *CListMempool) pendingPoolJob() {
for addressNonce := range mem.pendingPoolNotify {
timeStart := time.Now()
Expand Down
1 change: 0 additions & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ type Mempool interface {
SetAccountRetriever(retriever AccountRetriever)

SetTxInfoParser(parser TxInfoParser)
LastBlockGasUsed() uint64
}

//--------------------------------------------------------------------------------
Expand Down
1 change: 0 additions & 1 deletion mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,3 @@ func (Mempool) SetAccountRetriever(_ mempl.AccountRetriever) {
func (Mempool) SetTxInfoParser(_ mempl.TxInfoParser) {

}
func (Mempool) LastBlockGasUsed() uint64 { return 0 }
23 changes: 11 additions & 12 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"time"

"github.com/tendermint/tendermint/trace"

abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/fail"
Expand Down Expand Up @@ -131,23 +133,20 @@ func (blockExec *BlockExecutor) ValidateBlock(state State, block *types.Block) e
func (blockExec *BlockExecutor) ApplyBlock(
state State, blockID types.BlockID, block *types.Block,
) (State, int64, error) {
trc := &Tracer{}

trc := trace.NewTracer()

defer func() {
trc.dump(
fmt.Sprintf("ApplyBlock<%d>, tx<%d>, gasUsed<%d>",
block.Height,
len(block.Data.Txs),
blockExec.mempool.LastBlockGasUsed(),
),
blockExec.logger.With("module", "main"),
)
trace.GetElapsedInfo().AddInfo(trace.Height, fmt.Sprintf("%d", block.Height))
trace.GetElapsedInfo().AddInfo(trace.Tx, fmt.Sprintf("%d", len(block.Data.Txs)))
trace.GetElapsedInfo().AddInfo(trace.RunTx, trc.Format())

now := time.Now().UnixNano()
blockExec.metrics.IntervalTime.Set(float64(now-blockExec.metrics.lastBlockTime) / 1e6)
blockExec.metrics.lastBlockTime = now
}()

trc.pin("abci")
trc.Pin("abci")
if err := blockExec.ValidateBlock(state, block); err != nil {
return state, 0, ErrInvalidBlock(err)
}
Expand Down Expand Up @@ -188,7 +187,7 @@ func (blockExec *BlockExecutor) ApplyBlock(
return state, 0, fmt.Errorf("commit failed for application: %v", err)
}

trc.pin("commit")
trc.Pin("persist")
startTime = time.Now().UnixNano()

// Lock mempool, commit app state, update mempoool.
Expand All @@ -204,7 +203,7 @@ func (blockExec *BlockExecutor) ApplyBlock(

fail.Fail() // XXX

trc.pin("saveState")
trc.Pin("saveState")

// Update the app hash and save the state.
state.AppHash = appHash
Expand Down
54 changes: 0 additions & 54 deletions state/execution_exchain.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,8 @@
package state

import (
"fmt"
"time"

"github.com/tendermint/tendermint/libs/log"
)

var IgnoreSmbCheck bool = false

func SetIgnoreSmbCheck(check bool) {
IgnoreSmbCheck = check
}

var lastDump int64 = time.Now().UnixNano()

type Tracer struct {
startTime int64

lastPin string
lastTime int64

pins []string
times []int64
}

func (t *Tracer) pin(tag string) {
if len(tag) == 0 {
//panic("invalid tag")
return
}

now := time.Now().UnixNano()

if t.startTime == 0 {
t.startTime = now
}

if len(t.lastPin) > 0 {
t.pins = append(t.pins, t.lastPin)
t.times = append(t.times, (now-t.lastTime)/1e6)
}
t.lastTime = now
t.lastPin = tag
}

func (t *Tracer) dump(caller string, logger log.Logger) {
t.pin("_")

now := time.Now().UnixNano()
info := fmt.Sprintf("Interval<%dms>, %s, elapsed<%dms>",
(now-lastDump)/1e6,
caller,
(now-t.startTime)/1e6,
)
for i := range t.pins {
info += fmt.Sprintf(", %s<%dms>", t.pins[i], t.times[i])
}
logger.Info(info)
lastDump = now
}

0 comments on commit c4e6ab1

Please sign in to comment.