Skip to content

Commit

Permalink
Merge pull request #1243 from onflow/ramtin/entity-life-cycle-tracing
Browse files Browse the repository at this point in the history
[All] Entity-centric tracing
  • Loading branch information
ramtinms committed Sep 20, 2021
2 parents 74c648c + 30d9c66 commit a783037
Show file tree
Hide file tree
Showing 71 changed files with 896 additions and 845 deletions.
3 changes: 2 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
builder.Component("follower core", func(_ cmd.NodeBuilder, node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// create a finalizer that will handle updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState)
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, builder.FollowerState, node.Tracer)

// initialize the staking & beacon verifiers, signature joiner
staking := signature.NewAggregationVerifier(encoding.ConsensusVoteTag)
Expand Down Expand Up @@ -282,6 +282,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuild
conCache,
builder.FollowerCore,
builder.SyncCore,
node.Tracer,
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func main() {

// create a finalizer that will handling updating the protocol
// state when the follower detects newly finalized blocks
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState)
finalizer := confinalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)

// initialize the staking & beacon verifiers, signature joiner
staking := signature.NewAggregationVerifier(encoding.ConsensusVoteTag)
Expand Down Expand Up @@ -243,6 +243,7 @@ func main() {
followerBuffer,
followerCore,
mainChainSyncCore,
node.Tracer,
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
Expand Down
1 change: 1 addition & 0 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func main() {
node.DB,
node.Storage.Headers,
mutableState,
node.Tracer,
finalizer.WithCleanup(finalizer.CleanupMempools(
node.Metrics.Mempool,
conMetrics,
Expand Down
2 changes: 0 additions & 2 deletions cmd/consensus/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import (
func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics, tracer module.Tracer, index storage.Index, chain flow.ChainID,
) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log, chain)
tracingConsumer := notifications.NewConsensusTracingConsumer(log, tracer, index)
metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics)
dis := pubsub.NewDistributor()
dis.AddConsumer(telemetryConsumer)
dis.AddConsumer(tracingConsumer)
dis.AddConsumer(metricsConsumer)
return dis
}
3 changes: 2 additions & 1 deletion cmd/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func main() {

// create a finalizer that handles updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState)
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)

// initialize the staking & beacon verifiers, signature joiner
staking := signature.NewAggregationVerifier(encoding.ConsensusVoteTag)
Expand Down Expand Up @@ -461,6 +461,7 @@ func main() {
pendingBlocks,
followerCore,
syncCore,
node.Tracer,
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type BaseConfig struct {
profilerInterval time.Duration
profilerDuration time.Duration
tracerEnabled bool
tracerSensitivity uint
metricsEnabled bool
guaranteesCacheSize uint
receiptsCacheSize uint
Expand Down Expand Up @@ -177,6 +178,7 @@ func DefaultBaseConfig() *BaseConfig {
profilerInterval: 15 * time.Minute,
profilerDuration: 10 * time.Second,
tracerEnabled: false,
tracerSensitivity: 4,
metricsEnabled: true,
receiptsCacheSize: bstorage.DefaultCacheSize,
guaranteesCacheSize: bstorage.DefaultCacheSize,
Expand Down
10 changes: 8 additions & 2 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,12 @@ func (fnb *FlowNodeBuilder) BaseFlags() {
"the duration to run the auto-profile for")
fnb.flags.BoolVar(&fnb.BaseConfig.tracerEnabled, "tracer-enabled", defaultConfig.tracerEnabled,
"whether to enable tracer")
fnb.flags.UintVar(&fnb.BaseConfig.tracerSensitivity, "tracer-sensitivity", defaultConfig.tracerSensitivity,
"adjusts the level of sampling when tracing is enabled. 0 means capture everything, higher value results in less samples")
fnb.flags.DurationVar(&fnb.BaseConfig.DNSCacheTTL, "dns-cache-ttl", dns.DefaultTimeToLive, "time-to-live for dns cache")

fnb.flags.UintVar(&fnb.BaseConfig.guaranteesCacheSize, "guarantees-cache-size", bstorage.DefaultCacheSize, "collection guarantees cache size")
fnb.flags.UintVar(&fnb.BaseConfig.receiptsCacheSize, "receipts-cache-size", bstorage.DefaultCacheSize, "receipts cache size")

}

func (fnb *FlowNodeBuilder) EnqueueNetworkInit(ctx context.Context) {
Expand Down Expand Up @@ -322,7 +324,11 @@ func (fnb *FlowNodeBuilder) initMetrics() {

fnb.Tracer = trace.NewNoopTracer()
if fnb.BaseConfig.tracerEnabled {
tracer, err := trace.NewTracer(fnb.Logger, fnb.BaseConfig.NodeRole)
serviceName := fnb.BaseConfig.NodeRole + "-" + fnb.BaseConfig.nodeIDHex[:8]
tracer, err := trace.NewTracer(fnb.Logger,
serviceName,
fnb.RootChainID.String(),
fnb.tracerSensitivity)
fnb.MustNot(err).Msg("could not initialize tracer")
fnb.Logger.Info().Msg("Tracer Started")
fnb.Tracer = tracer
Expand Down
3 changes: 2 additions & 1 deletion cmd/verification/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func main() {

// create a finalizer that handles updating the protocol
// state when the follower detects newly finalized blocks
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState)
final := finalizer.NewFinalizer(node.DB, node.Storage.Headers, followerState, node.Tracer)

// initialize the staking & beacon verifiers, signature joiner
staking := signature.NewAggregationVerifier(encoding.ConsensusVoteTag)
Expand Down Expand Up @@ -327,6 +327,7 @@ func main() {
pendingBlocks,
followerCore,
syncCore,
node.Tracer,
)
if err != nil {
return nil, fmt.Errorf("could not create follower engine: %w", err)
Expand Down
85 changes: 0 additions & 85 deletions consensus/hotstuff/notifications/tracing_consumer.go

This file was deleted.

2 changes: 1 addition & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func createNode(
require.NoError(t, err)

// initialize the block finalizer
final := finalizer.NewFinalizer(db, headersDB, fullState)
final := finalizer.NewFinalizer(db, headersDB, fullState, trace.NewNoopTracer())

// initialize the persister
persist := persister.New(db, rootHeader.ChainID)
Expand Down
7 changes: 4 additions & 3 deletions consensus/recovery/protocol/state_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package protocol_test

import (
"context"
"testing"

"github.com/dgraph-io/badger/v2"
Expand All @@ -26,19 +27,19 @@ func TestSaveBlockAsReplica(t *testing.T) {
b1 := unittest.BlockWithParentFixture(b0)
b1.SetPayload(flow.Payload{})

err = state.Extend(&b1)
err = state.Extend(context.Background(), &b1)
require.NoError(t, err)

b2 := unittest.BlockWithParentFixture(b1.Header)
b2.SetPayload(flow.Payload{})

err = state.Extend(&b2)
err = state.Extend(context.Background(), &b2)
require.NoError(t, err)

b3 := unittest.BlockWithParentFixture(b2.Header)
b3.SetPayload(flow.Payload{})

err = state.Extend(&b3)
err = state.Extend(context.Background(), &b3)
require.NoError(t, err)

metrics := metrics.NewNoopCollector()
Expand Down
27 changes: 21 additions & 6 deletions engine/common/follower/engine.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package follower

import (
"context"
"errors"
"fmt"

Expand All @@ -13,6 +14,7 @@ import (
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/state"
"github.com/onflow/flow-go/state/protocol"
Expand All @@ -34,6 +36,7 @@ type Engine struct {
follower module.HotStuffFollower
con network.Conduit
sync module.BlockRequester
tracer module.Tracer
}

func New(
Expand All @@ -49,6 +52,7 @@ func New(
pending module.PendingBlockBuffer,
follower module.HotStuffFollower,
sync module.BlockRequester,
tracer module.Tracer,
) (*Engine, error) {

e := &Engine{
Expand All @@ -64,6 +68,7 @@ func New(
pending: pending,
follower: follower,
sync: sync,
tracer: tracer,
}

con, err := net.Register(engine.ReceiveBlocks, e)
Expand Down Expand Up @@ -167,6 +172,9 @@ func (e *Engine) onSyncedBlock(originID flow.Identifier, synced *events.SyncedBl
// onBlockProposal handles incoming block proposals.
func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.BlockProposal) error {

span, ctx, _ := e.tracer.StartBlockSpan(context.Background(), proposal.Header.ID(), trace.FollowerOnBlockProposal)
defer span.Finish()

header := proposal.Header

log := e.log.With().
Expand Down Expand Up @@ -264,7 +272,7 @@ func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.Bl

// at this point, we should be able to connect the proposal to the finalized
// state and should process it to see whether to forward to hotstuff or not
err = e.processBlockProposal(proposal)
err = e.processBlockProposal(ctx, proposal)
if err != nil {
return fmt.Errorf("could not process block proposal: %w", err)
}
Expand All @@ -282,7 +290,10 @@ func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.Bl
// the finalized state; if a parent of children is validly processed, it means
// the children are also still on a valid chain and all missing links are there;
// no need to do all the processing again.
func (e *Engine) processBlockProposal(proposal *messages.BlockProposal) error {
func (e *Engine) processBlockProposal(ctx context.Context, proposal *messages.BlockProposal) error {

span, ctx := e.tracer.StartSpanFromContext(ctx, trace.FollowerProcessBlockProposal)
defer span.Finish()

header := proposal.Header

Expand All @@ -309,7 +320,7 @@ func (e *Engine) processBlockProposal(proposal *messages.BlockProposal) error {
// check whether the block is a valid extension of the chain.
// it only checks the block header, since checking block body is expensive.
// The full block check is done by the consensus participants.
err := e.state.Extend(block)
err := e.state.Extend(ctx, block)
// if the error is a known invalid extension of the protocol state, then
// the input is invalid
if state.IsInvalidExtensionError(err) {
Expand Down Expand Up @@ -338,7 +349,7 @@ func (e *Engine) processBlockProposal(proposal *messages.BlockProposal) error {
e.follower.SubmitProposal(header, parent.View)

// check for any descendants of the block to process
err = e.processPendingChildren(header)
err = e.processPendingChildren(ctx, header)
if err != nil {
return fmt.Errorf("could not process pending children: %w", err)
}
Expand All @@ -349,7 +360,11 @@ func (e *Engine) processBlockProposal(proposal *messages.BlockProposal) error {
// processPendingChildren checks if there are proposals connected to the given
// parent block that was just processed; if this is the case, they should now
// all be validly connected to the finalized state and we should process them.
func (e *Engine) processPendingChildren(header *flow.Header) error {
func (e *Engine) processPendingChildren(ctx context.Context, header *flow.Header) error {

span, ctx := e.tracer.StartSpanFromContext(ctx, trace.FollowerProcessPendingChildren)
defer span.Finish()

blockID := header.ID()

// check if there are any children for this parent in the cache
Expand All @@ -365,7 +380,7 @@ func (e *Engine) processPendingChildren(header *flow.Header) error {
Header: child.Header,
Payload: child.Payload,
}
err := e.processBlockProposal(proposal)
err := e.processBlockProposal(ctx, proposal)
if err != nil {
result = multierror.Append(result, err)
}
Expand Down

0 comments on commit a783037

Please sign in to comment.