Skip to content

Commit

Permalink
Merge branch 'master' into danu/5373/add-metric-epoch-failure
Browse files Browse the repository at this point in the history
  • Loading branch information
danuio committed May 14, 2021
2 parents 63e8f00 + 4c6b6f6 commit 861c833
Show file tree
Hide file tree
Showing 26 changed files with 2,740 additions and 61 deletions.
11 changes: 2 additions & 9 deletions consensus/hotstuff/forks/finalizer/finalizer.go
Expand Up @@ -40,19 +40,12 @@ func New(trustedRoot *forks.BlockQC, finalizationCallback module.Finalizer, noti
fnlzr := Finalizer{
notifier: notifier,
finalizationCallback: finalizationCallback,
forest: *forest.NewLevelledForest(),
forest: *forest.NewLevelledForest(trustedRoot.Block.View),
lastLocked: trustedRoot,
lastFinalized: trustedRoot,
}

// We can already pre-prune the levelled forest to the view below it.
// Thereby, the levelled forest won't event store older (unnecessary) blocks
err := fnlzr.forest.PruneUpToLevel(trustedRoot.Block.View)
if err != nil {
return nil, fmt.Errorf("internal levelled forest error: %w", err)
}
// verify and add root block to levelled forest
err = fnlzr.VerifyBlock(trustedRoot.Block)
err := fnlzr.VerifyBlock(trustedRoot.Block)
if err != nil {
return nil, fmt.Errorf("invalid root block: %w", err)
}
Expand Down
19 changes: 19 additions & 0 deletions engine/consensus/approval_processor.go
@@ -0,0 +1,19 @@
package consensus

import "github.com/onflow/flow-go/model/flow"

// ResultApprovalProcessor performs processing of execution results and result approvals.
// Accepts `flow.IncorporatedResult` to start processing approvals for particular result.
// Whenever enough approvals are collected produces a candidate seal and adds it to the mempool.
type ResultApprovalProcessor interface {
// ProcessApproval processes approval in blocking way. Concurrency safe.
// Returns:
// * exception in case of unexpected error
// * nil - successfully processed result approval
ProcessApproval(approval *flow.ResultApproval) error
// ProcessIncorporatedResult processes incorporated result in blocking way. Concurrency safe.
// Returns:
// * exception in case of unexpected error
// * nil - successfully processed incorporated result
ProcessIncorporatedResult(result *flow.IncorporatedResult) error
}
72 changes: 72 additions & 0 deletions engine/consensus/approvals/aggregated_signatures.go
@@ -0,0 +1,72 @@
package approvals

import (
"sync"

"github.com/onflow/flow-go/model/flow"
)

// AggregatedSignatures is an utility struct that provides concurrency safe access
// to map of aggregated signatures indexed by chunk index
type AggregatedSignatures struct {
signatures map[uint64]flow.AggregatedSignature // aggregated signature for each chunk
lock sync.RWMutex // lock for modifying aggregatedSignatures
numberOfChunks uint64
}

func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures {
return &AggregatedSignatures{
signatures: make(map[uint64]flow.AggregatedSignature, chunks),
lock: sync.RWMutex{},
numberOfChunks: chunks,
}
}

// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`.
// The returned int is the resulting number of approved chunks.
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) int {
as.lock.Lock()
defer as.lock.Unlock()
if _, found := as.signatures[chunkIndex]; !found {
as.signatures[chunkIndex] = aggregatedSignature
}
return len(as.signatures)
}

// HasSignature returns boolean depending if we have signature for particular chunk
func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool {
as.lock.RLock()
defer as.lock.RUnlock()
_, found := as.signatures[chunkIndex]
return found
}

// Collect returns array with aggregated signature for each chunk
func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature {
aggregatedSigs := make([]flow.AggregatedSignature, len(as.signatures))

as.lock.RLock()
defer as.lock.RUnlock()
for chunkIndex, sig := range as.signatures {
aggregatedSigs[chunkIndex] = sig
}

return aggregatedSigs
}

// CollectChunksWithMissingApprovals returns indexes of chunks that don't have an aggregated signature
func (as *AggregatedSignatures) CollectChunksWithMissingApprovals() []uint64 {
// provide enough capacity to avoid allocations while we hold the lock
missingChunks := make([]uint64, 0, as.numberOfChunks)
as.lock.RLock()
defer as.lock.RUnlock()
for i := uint64(0); i < as.numberOfChunks; i++ {
chunkIndex := uint64(i)
if _, found := as.signatures[chunkIndex]; found {
// skip if we already have enough valid approvals for this chunk
continue
}
missingChunks = append(missingChunks, chunkIndex)
}
return missingChunks
}
127 changes: 127 additions & 0 deletions engine/consensus/approvals/approval_collector.go
@@ -0,0 +1,127 @@
package approvals

import (
"fmt"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/chunks"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/mempool"
)

// ApprovalCollector is responsible for distributing work to chunk collectorTree,
// collecting aggregated signatures for chunks that reached seal construction threshold,
// creating and submitting seal candidates once signatures for every chunk are aggregated.
type ApprovalCollector struct {
incorporatedBlock *flow.Header // block that incorporates execution result
incorporatedResult *flow.IncorporatedResult // incorporated result that is being sealed
chunkCollectors []*ChunkApprovalCollector // slice of chunk collectorTree that is created on construction and doesn't change
aggregatedSignatures *AggregatedSignatures // aggregated signature for each chunk
seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results
numberOfChunks int // number of chunks for execution result, remains constant
requiredApprovalsForSealConstruction uint // min number of approvals required for constructing a candidate seal
}

func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *flow.Header, assignment *chunks.Assignment, seals mempool.IncorporatedResultSeals, requiredApprovalsForSealConstruction uint) *ApprovalCollector {
chunkCollectors := make([]*ChunkApprovalCollector, 0, result.Result.Chunks.Len())
for _, chunk := range result.Result.Chunks {
chunkAssignment := assignment.Verifiers(chunk).Lookup()
collector := NewChunkApprovalCollector(chunkAssignment, requiredApprovalsForSealConstruction)
chunkCollectors = append(chunkCollectors, collector)
}

numberOfChunks := result.Result.Chunks.Len()
return &ApprovalCollector{
incorporatedResult: result,
incorporatedBlock: incorporatedBlock,
numberOfChunks: numberOfChunks,
chunkCollectors: chunkCollectors,
requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction,
aggregatedSignatures: NewAggregatedSignatures(uint64(numberOfChunks)),
seals: seals,
}
}

// IncorporatedBlockID returns the ID of block which incorporates execution result
func (c *ApprovalCollector) IncorporatedBlockID() flow.Identifier {
return c.incorporatedResult.IncorporatedBlockID
}

// IncorporatedBlock returns the block which incorporates execution result
func (c *ApprovalCollector) IncorporatedBlock() *flow.Header {
return c.incorporatedBlock
}

func (c *ApprovalCollector) SealResult() error {
// get final state of execution result
finalState, err := c.incorporatedResult.Result.FinalStateCommitment()
if err != nil {
// message correctness should have been checked before: failure here is an internal implementation bug
return fmt.Errorf("failed to get final state commitment from Execution Result: %w", err)
}

// TODO: Check SPoCK proofs

// generate & store seal
seal := &flow.Seal{
BlockID: c.incorporatedResult.Result.BlockID,
ResultID: c.incorporatedResult.Result.ID(),
FinalState: finalState,
AggregatedApprovalSigs: c.aggregatedSignatures.Collect(),
}

// we don't care if the seal is already in the mempool
_, err = c.seals.Add(&flow.IncorporatedResultSeal{
IncorporatedResult: c.incorporatedResult,
Seal: seal,
})
if err != nil {
return fmt.Errorf("failed to store IncorporatedResultSeal in mempool: %w", err)
}

return nil
}

// ProcessApproval performs processing of result approvals and bookkeeping of aggregated signatures
// for every chunk. Triggers sealing of execution result when processed last result approval needed for sealing.
// Returns:
// - engine.InvalidInputError - result approval is invalid
// - exception in case of any other error, usually this is not expected
// - nil on success
func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error {
chunkIndex := approval.Body.ChunkIndex
if chunkIndex >= uint64(len(c.chunkCollectors)) {
return engine.NewInvalidInputErrorf("approval collector chunk index out of range: %v", chunkIndex)
}
// there is no need to process approval if we have already enough info for sealing
if c.aggregatedSignatures.HasSignature(chunkIndex) {
return nil
}

collector := c.chunkCollectors[chunkIndex]
aggregatedSignature, collected := collector.ProcessApproval(approval)
if !collected {
return nil
}

approvedChunks := c.aggregatedSignatures.PutSignature(chunkIndex, aggregatedSignature)
if approvedChunks < c.numberOfChunks {
return nil // still missing approvals for some chunks
}

return c.SealResult()
}

// CollectMissingVerifiers collects ids of verifiers who haven't provided an approval for particular chunk
// Returns: map { ChunkIndex -> []VerifierId }
func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList {
targetIDs := make(map[uint64]flow.IdentifierList)
for _, chunkIndex := range c.aggregatedSignatures.CollectChunksWithMissingApprovals() {
missingSigners := c.chunkCollectors[chunkIndex].GetMissingSigners()
if missingSigners.Len() > 0 {
targetIDs[chunkIndex] = missingSigners
}
}

return targetIDs
}
131 changes: 131 additions & 0 deletions engine/consensus/approvals/approval_collector_test.go
@@ -0,0 +1,131 @@
package approvals

import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
mempool "github.com/onflow/flow-go/module/mempool/mock"
"github.com/onflow/flow-go/utils/unittest"
)

// TestApprovalCollector performs isolated testing of ApprovalCollector
// ApprovalCollector is responsible for delegating approval processing to ChunkApprovalCollector
// ApprovalCollector stores aggregated signatures for every chunk, once there is a signature for each chunk it is responsible
// for creating IncorporatedResultSeal and submitting it to the mempool.
// ApprovalCollector should reject approvals with invalid chunk index.
func TestApprovalCollector(t *testing.T) {
suite.Run(t, new(ApprovalCollectorTestSuite))
}

type ApprovalCollectorTestSuite struct {
BaseApprovalsTestSuite

sealsPL *mempool.IncorporatedResultSeals
collector *ApprovalCollector
}

func (s *ApprovalCollectorTestSuite) SetupTest() {
s.BaseApprovalsTestSuite.SetupTest()

s.sealsPL = &mempool.IncorporatedResultSeals{}
s.collector = NewApprovalCollector(s.IncorporatedResult, &s.IncorporatedBlock, s.ChunksAssignment, s.sealsPL, uint(len(s.AuthorizedVerifiers)))
}

// TestProcessApproval_ValidApproval tests that valid approval is processed without error
func (s *ApprovalCollectorTestSuite) TestProcessApproval_ValidApproval() {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), unittest.WithApproverID(s.VerID))
err := s.collector.ProcessApproval(approval)
require.NoError(s.T(), err)
}

// TestProcessApproval_SealResult tests that after collecting enough approvals for every chunk ApprovalCollector will
// generate a seal and put it into seals mempool. This logic should be event driven and happen as soon as required threshold is
// met for each chunk.
func (s *ApprovalCollectorTestSuite) TestProcessApproval_SealResult() {
expectedSignatures := make([]flow.AggregatedSignature, s.IncorporatedResult.Result.Chunks.Len())
s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once()

for i, chunk := range s.Chunks {
var err error
sigCollector := flow.NewSignatureCollector()
for verID := range s.AuthorizedVerifiers {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), unittest.WithApproverID(verID))
err = s.collector.ProcessApproval(approval)
require.NoError(s.T(), err)
sigCollector.Add(approval.Body.ApproverID, approval.Body.AttestationSignature)
}
expectedSignatures[i] = sigCollector.ToAggregatedSignature()
}

finalState, _ := s.IncorporatedResult.Result.FinalStateCommitment()
expectedArguments := &flow.IncorporatedResultSeal{
IncorporatedResult: s.IncorporatedResult,
Seal: &flow.Seal{
BlockID: s.IncorporatedResult.Result.BlockID,
ResultID: s.IncorporatedResult.Result.ID(),
FinalState: finalState,
AggregatedApprovalSigs: expectedSignatures,
ServiceEvents: nil,
},
}

s.sealsPL.AssertCalled(s.T(), "Add", expectedArguments)
}

// TestProcessApproval_InvalidChunk tests that approval with invalid chunk index will be rejected without
// processing.
func (s *ApprovalCollectorTestSuite) TestProcessApproval_InvalidChunk() {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(uint64(s.Chunks.Len()+1)),
unittest.WithApproverID(s.VerID))
err := s.collector.ProcessApproval(approval)
require.Error(s.T(), err)
require.True(s.T(), engine.IsInvalidInputError(err))
}

// TestCollectMissingVerifiers tests that approval collector correctly assembles list of verifiers that haven't provided approvals
// for each chunk
func (s *ApprovalCollectorTestSuite) TestCollectMissingVerifiers() {
s.sealsPL.On("Add", mock.Anything).Return(true, nil).Maybe()

assignedVerifiers := make(map[uint64]flow.IdentifierList)
for _, chunk := range s.Chunks {
assignedVerifiers[chunk.Index] = s.ChunksAssignment.Verifiers(chunk)
}

// no approvals processed
for index, ids := range s.collector.CollectMissingVerifiers() {
require.ElementsMatch(s.T(), ids, assignedVerifiers[index])
}

// process one approval for one each chunk
for _, chunk := range s.Chunks {
verID := assignedVerifiers[chunk.Index][0]
approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index),
unittest.WithApproverID(verID))
err := s.collector.ProcessApproval(approval)
require.NoError(s.T(), err)
}

for index, ids := range s.collector.CollectMissingVerifiers() {
// skip first ID since we should have approval for it
require.ElementsMatch(s.T(), ids, assignedVerifiers[index][1:])
}

// process remaining approvals for each chunk
for _, chunk := range s.Chunks {
for _, verID := range assignedVerifiers[chunk.Index] {
approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index),
unittest.WithApproverID(verID))
err := s.collector.ProcessApproval(approval)
require.NoError(s.T(), err)
}
}

// skip first ID since we should have approval for it
require.Empty(s.T(), s.collector.CollectMissingVerifiers())
}

0 comments on commit 861c833

Please sign in to comment.