Skip to content

Commit

Permalink
Merge pull request #736 from onflow/yurii/5417-approval-processing-en…
Browse files Browse the repository at this point in the history
…gine

[Consensus] Approval Processing Core -> Sealing Core
  • Loading branch information
durkmurder committed Jun 4, 2021
2 parents 4682be4 + 70cd038 commit f1bc65c
Show file tree
Hide file tree
Showing 60 changed files with 3,465 additions and 4,577 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -112,6 +112,7 @@ generate-mocks:
GO111MODULE=on mockery -name '.*' -dir="state/protocol/events" -case=underscore -output="./state/protocol/events/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=engine/execution/computation/computer -case=underscore -output="./engine/execution/computation/computer/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=engine/execution/state -case=underscore -output="./engine/execution/state/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=engine/consensus -case=underscore -output="./engine/consensus/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=fvm -case=underscore -output="./fvm/mock" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=fvm/state -case=underscore -output="./fvm/mock/state" -outpkg="mock"
GO111MODULE=on mockery -name '.*' -dir=ledger -case=underscore -output="./ledger/mock" -outpkg="mock"
Expand Down
129 changes: 80 additions & 49 deletions cmd/consensus/main.go
Expand Up @@ -15,15 +15,18 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/blockproducer"
"github.com/onflow/flow-go/consensus/hotstuff/committees"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/pacemaker/timeout"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/consensus/approvals"
"github.com/onflow/flow-go/engine/consensus/compliance"
"github.com/onflow/flow-go/engine/consensus/ingestion"
"github.com/onflow/flow-go/engine/consensus/matching"
"github.com/onflow/flow-go/engine/consensus/provider"
"github.com/onflow/flow-go/engine/consensus/sealing"
"github.com/onflow/flow-go/model/bootstrap"
Expand Down Expand Up @@ -73,24 +76,22 @@ func main() {
requiredApprovalsForSealConstruction uint
emergencySealing bool

err error
mutableState protocol.MutableState
privateDKGData *bootstrap.DKGParticipantPriv
guarantees mempool.Guarantees
results mempool.IncorporatedResults
receipts mempool.ExecutionTree
approvals mempool.Approvals
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
prov *provider.Engine
receiptRequester *requester.Engine
syncCore *synchronization.Core
comp *compliance.Engine
conMetrics module.ConsensusMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
approvalValidator module.ApprovalValidator
chunkAssigner *chmodule.ChunkAssigner
err error
mutableState protocol.MutableState
privateDKGData *bootstrap.DKGParticipantPriv
guarantees mempool.Guarantees
receipts mempool.ExecutionTree
seals mempool.IncorporatedResultSeals
pendingReceipts mempool.PendingReceipts
prov *provider.Engine
receiptRequester *requester.Engine
syncCore *synchronization.Core
comp *compliance.Engine
conMetrics module.ConsensusMetrics
mainMetrics module.HotstuffMetrics
receiptValidator module.ReceiptValidator
chunkAssigner *chmodule.ChunkAssigner
finalizationDistributor *pubsub.FinalizationDistributor
)

cmd.FlowNode(flow.RoleConsensus.String()).
Expand Down Expand Up @@ -150,10 +151,6 @@ func main() {

resultApprovalSigVerifier := signature.NewAggregationVerifier(encoding.ResultApprovalTag)

approvalValidator = validation.NewApprovalValidator(
node.State,
resultApprovalSigVerifier)

sealValidator := validation.NewSealValidator(
node.State,
node.Storage.Headers,
Expand Down Expand Up @@ -183,10 +180,6 @@ func main() {
guarantees, err = stdmap.NewGuarantees(guaranteeLimit)
return err
}).
Module("execution results mempool", func(node *cmd.FlowNodeBuilder) error {
results, err = stdmap.NewIncorporatedResults(resultLimit)
return err
}).
Module("execution receipts mempool", func(node *cmd.FlowNodeBuilder) error {
receipts = consensusMempools.NewExecutionTree()
// registers size method of backend for metrics
Expand All @@ -196,10 +189,6 @@ func main() {
}
return nil
}).
Module("result approvals mempool", func(node *cmd.FlowNodeBuilder) error {
approvals, err = stdmap.NewApprovals(approvalLimit)
return err
}).
Module("block seals mempool", func(node *cmd.FlowNodeBuilder) error {
// use a custom ejector so we don't eject seals that would break
// the chain of seals
Expand All @@ -223,8 +212,43 @@ func main() {
syncCore, err = synchronization.New(node.Logger, synchronization.DefaultConfig())
return err
}).
Module("finalization distributor", func(node *cmd.FlowNodeBuilder) error {
finalizationDistributor = pubsub.NewFinalizationDistributor()
return nil
}).
Component("sealing engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {

resultApprovalSigVerifier := signature.NewAggregationVerifier(encoding.ResultApprovalTag)

config := sealing.DefaultConfig()
config.EmergencySealingActive = emergencySealing
config.RequiredApprovalsForSealConstruction = requiredApprovalsForSealConstruction

e, err := sealing.NewEngine(
node.Logger,
node.Tracer,
conMetrics,
node.Metrics.Engine,
node.Metrics.Mempool,
node.Network,
node.Me,
node.Storage.Headers,
node.Storage.Payloads,
node.State,
node.Storage.Seals,
chunkAssigner,
resultApprovalSigVerifier,
seals,
config,
)

// subscribe for finalization events from hotstuff
finalizationDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)
finalizationDistributor.AddOnBlockIncorporatedConsumer(e.OnBlockIncorporated)

return e, err
}).
Component("matching engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
receiptRequester, err = requester.New(
node.Logger,
node.Metrics.Engine,
Expand All @@ -241,34 +265,39 @@ func main() {
return nil, err
}

match, err := sealing.NewEngine(
core := matching.NewCore(
node.Logger,
node.Metrics.Engine,
node.Tracer,
node.Metrics.Mempool,
conMetrics,
node.Network,
node.Metrics.Mempool,
node.State,
node.Me,
receiptRequester,
node.Storage.Receipts,
node.Storage.Headers,
node.Storage.Index,
results,
node.Storage.Receipts,
receipts,
approvals,
seals,
pendingReceipts,
chunkAssigner,
seals,
receiptValidator,
approvalValidator,
requiredApprovalsForSealConstruction,
emergencySealing,
receiptRequester,
matching.DefaultConfig(),
)

receiptRequester.WithHandle(match.HandleReceipt)
e, err := matching.NewEngine(
node.Logger,
node.Network,
node.Me,
node.Metrics.Engine,
node.Metrics.Mempool,
core,
)
if err != nil {
return nil, err
}

// subscribe engine to inputs from other node-internal components
receiptRequester.WithHandle(e.HandleReceipt)
finalizationDistributor.AddOnBlockFinalizedConsumer(e.OnFinalizedBlock)

return match, err
return e, err
}).
Component("provider engine", func(node *cmd.FlowNodeBuilder) (module.ReadyDoneAware, error) {
prov, err = provider.New(
Expand Down Expand Up @@ -340,7 +369,7 @@ func main() {
node.Storage.Results,
node.Storage.Receipts,
guarantees,
seals,
approvals.NewIncorporatedResultSeals(seals, node.Storage.Receipts),
receipts,
node.Tracer,
builder.WithMinInterval(minInterval),
Expand Down Expand Up @@ -404,7 +433,9 @@ func main() {
node.Storage.Index,
node.RootChainID,
)
// make compliance engine as a FinalizationConsumer

notifier.AddConsumer(finalizationDistributor)

// initialize the persister
persist := persister.New(node.DB, node.RootChainID)

Expand Down
3 changes: 1 addition & 2 deletions cmd/consensus/notifier.go
Expand Up @@ -3,7 +3,6 @@ package main
import (
"github.com/rs/zerolog"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/notifications"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/model/flow"
Expand All @@ -13,7 +12,7 @@ import (
)

func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics, tracer module.Tracer, index storage.Index, chain flow.ChainID,
) hotstuff.Consumer {
) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log, chain)
tracingConsumer := notifications.NewConsensusTracingConsumer(log, tracer, index)
metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics)
Expand Down
@@ -0,0 +1,83 @@
package pubsub

import (
"sync"

"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/model/flow"
)

type OnBlockFinalizedConsumer = func(finalizedBlockID flow.Identifier)
type OnBlockIncorporatedConsumer = func(incorporatedBlockID flow.Identifier)

// FinalizationDistributor subscribes for finalization events from hotstuff and distributes it to subscribers
type FinalizationDistributor struct {
blockFinalizedConsumers []OnBlockFinalizedConsumer
blockIncorporatedConsumers []OnBlockIncorporatedConsumer
lock sync.RWMutex
}

func NewFinalizationDistributor() *FinalizationDistributor {
return &FinalizationDistributor{
blockFinalizedConsumers: make([]OnBlockFinalizedConsumer, 0),
blockIncorporatedConsumers: make([]OnBlockIncorporatedConsumer, 0),
lock: sync.RWMutex{},
}
}

func (p *FinalizationDistributor) AddOnBlockFinalizedConsumer(consumer OnBlockFinalizedConsumer) {
p.lock.Lock()
defer p.lock.Unlock()
p.blockFinalizedConsumers = append(p.blockFinalizedConsumers, consumer)
}
func (p *FinalizationDistributor) AddOnBlockIncorporatedConsumer(consumer OnBlockIncorporatedConsumer) {
p.lock.Lock()
defer p.lock.Unlock()
p.blockIncorporatedConsumers = append(p.blockIncorporatedConsumers, consumer)
}

func (p *FinalizationDistributor) OnEventProcessed() {}

func (p *FinalizationDistributor) OnBlockIncorporated(block *model.Block) {
p.lock.RLock()
defer p.lock.RUnlock()
for _, consumer := range p.blockIncorporatedConsumers {
consumer(block.BlockID)
}
}

func (p *FinalizationDistributor) OnFinalizedBlock(block *model.Block) {
p.lock.RLock()
defer p.lock.RUnlock()
for _, consumer := range p.blockFinalizedConsumers {
consumer(block.BlockID)
}
}

func (p *FinalizationDistributor) OnDoubleProposeDetected(*model.Block, *model.Block) {}

func (p *FinalizationDistributor) OnReceiveVote(uint64, *model.Vote) {}

func (p *FinalizationDistributor) OnReceiveProposal(uint64, *model.Proposal) {}

func (p *FinalizationDistributor) OnEnteringView(uint64, flow.Identifier) {}

func (p *FinalizationDistributor) OnQcTriggeredViewChange(*flow.QuorumCertificate, uint64) {}

func (p *FinalizationDistributor) OnProposingBlock(*model.Proposal) {}

func (p *FinalizationDistributor) OnVoting(*model.Vote) {}

func (p *FinalizationDistributor) OnQcConstructedFromVotes(*flow.QuorumCertificate) {}

func (p *FinalizationDistributor) OnStartingTimeout(*model.TimerInfo) {}

func (p *FinalizationDistributor) OnReachedTimeout(*model.TimerInfo) {}

func (p *FinalizationDistributor) OnQcIncorporated(*flow.QuorumCertificate) {}

func (p *FinalizationDistributor) OnForkChoiceGenerated(uint64, *flow.QuorumCertificate) {}

func (p *FinalizationDistributor) OnDoubleVotingDetected(*model.Vote, *model.Vote) {}

func (p *FinalizationDistributor) OnInvalidVoteDetected(*model.Vote) {}
3 changes: 2 additions & 1 deletion consensus/integration/nodes_test.go
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/consensus/hotstuff/persister"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/engine/consensus/approvals"
"github.com/onflow/flow-go/engine/consensus/compliance"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
Expand Down Expand Up @@ -185,7 +186,7 @@ func createNode(

// initialize the block builder
build, err := builder.NewBuilder(metrics, db, fullState, headersDB, sealsDB, indexDB, blocksDB, resultsDB, receiptsDB,
guarantees, seals, receipts, tracer)
guarantees, approvals.NewIncorporatedResultSeals(seals, receiptsDB), receipts, tracer)
require.NoError(t, err)

signer := &Signer{identity.ID()}
Expand Down
8 changes: 4 additions & 4 deletions engine/consensus/approvals/aggregated_signatures.go
Expand Up @@ -24,13 +24,13 @@ func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures {

// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`.
// The returned int is the resulting number of approved chunks.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) int {
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) uint64 {
as.lock.Lock()
defer as.lock.Unlock()
if _, found := as.signatures[chunkIndex]; !found {
as.signatures[chunkIndex] = aggregatedSignature
}
return len(as.signatures)
return uint64(len(as.signatures))
}

// HasSignature returns boolean depending if we have signature for particular chunk
Expand All @@ -54,8 +54,8 @@ func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature {
return aggregatedSigs
}

// CollectChunksWithMissingApprovals returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) CollectChunksWithMissingApprovals() []uint64 {
// ChunksWithoutAggregatedSignature returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64 {
// provide enough capacity to avoid allocations while we hold the lock
missingChunks := make([]uint64, 0, as.numberOfChunks)
as.lock.RLock()
Expand Down

0 comments on commit f1bc65c

Please sign in to comment.