diff --git a/consensus/hotstuff/forks/finalizer/finalizer.go b/consensus/hotstuff/forks/finalizer/finalizer.go index 912ec041f69..0f36e5f847d 100644 --- a/consensus/hotstuff/forks/finalizer/finalizer.go +++ b/consensus/hotstuff/forks/finalizer/finalizer.go @@ -40,19 +40,12 @@ func New(trustedRoot *forks.BlockQC, finalizationCallback module.Finalizer, noti fnlzr := Finalizer{ notifier: notifier, finalizationCallback: finalizationCallback, - forest: *forest.NewLevelledForest(), + forest: *forest.NewLevelledForest(trustedRoot.Block.View), lastLocked: trustedRoot, lastFinalized: trustedRoot, } - - // We can already pre-prune the levelled forest to the view below it. - // Thereby, the levelled forest won't event store older (unnecessary) blocks - err := fnlzr.forest.PruneUpToLevel(trustedRoot.Block.View) - if err != nil { - return nil, fmt.Errorf("internal levelled forest error: %w", err) - } // verify and add root block to levelled forest - err = fnlzr.VerifyBlock(trustedRoot.Block) + err := fnlzr.VerifyBlock(trustedRoot.Block) if err != nil { return nil, fmt.Errorf("invalid root block: %w", err) } diff --git a/engine/consensus/approval_processor.go b/engine/consensus/approval_processor.go new file mode 100644 index 00000000000..df171fe7faf --- /dev/null +++ b/engine/consensus/approval_processor.go @@ -0,0 +1,19 @@ +package consensus + +import "github.com/onflow/flow-go/model/flow" + +// ResultApprovalProcessor performs processing of execution results and result approvals. +// Accepts `flow.IncorporatedResult` to start processing approvals for particular result. +// Whenever enough approvals are collected produces a candidate seal and adds it to the mempool. +type ResultApprovalProcessor interface { + // ProcessApproval processes approval in blocking way. Concurrency safe. + // Returns: + // * exception in case of unexpected error + // * nil - successfully processed result approval + ProcessApproval(approval *flow.ResultApproval) error + // ProcessIncorporatedResult processes incorporated result in blocking way. Concurrency safe. + // Returns: + // * exception in case of unexpected error + // * nil - successfully processed incorporated result + ProcessIncorporatedResult(result *flow.IncorporatedResult) error +} diff --git a/engine/consensus/approvals/aggregated_signatures.go b/engine/consensus/approvals/aggregated_signatures.go new file mode 100644 index 00000000000..7939c4bb35b --- /dev/null +++ b/engine/consensus/approvals/aggregated_signatures.go @@ -0,0 +1,72 @@ +package approvals + +import ( + "sync" + + "github.com/onflow/flow-go/model/flow" +) + +// AggregatedSignatures is an utility struct that provides concurrency safe access +// to map of aggregated signatures indexed by chunk index +type AggregatedSignatures struct { + signatures map[uint64]flow.AggregatedSignature // aggregated signature for each chunk + lock sync.RWMutex // lock for modifying aggregatedSignatures + numberOfChunks uint64 +} + +func NewAggregatedSignatures(chunks uint64) *AggregatedSignatures { + return &AggregatedSignatures{ + signatures: make(map[uint64]flow.AggregatedSignature, chunks), + lock: sync.RWMutex{}, + numberOfChunks: chunks, + } +} + +// PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`. +// The returned int is the resulting number of approved chunks. +func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) int { + as.lock.Lock() + defer as.lock.Unlock() + if _, found := as.signatures[chunkIndex]; !found { + as.signatures[chunkIndex] = aggregatedSignature + } + return len(as.signatures) +} + +// HasSignature returns boolean depending if we have signature for particular chunk +func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool { + as.lock.RLock() + defer as.lock.RUnlock() + _, found := as.signatures[chunkIndex] + return found +} + +// Collect returns array with aggregated signature for each chunk +func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature { + aggregatedSigs := make([]flow.AggregatedSignature, len(as.signatures)) + + as.lock.RLock() + defer as.lock.RUnlock() + for chunkIndex, sig := range as.signatures { + aggregatedSigs[chunkIndex] = sig + } + + return aggregatedSigs +} + +// CollectChunksWithMissingApprovals returns indexes of chunks that don't have an aggregated signature +func (as *AggregatedSignatures) CollectChunksWithMissingApprovals() []uint64 { + // provide enough capacity to avoid allocations while we hold the lock + missingChunks := make([]uint64, 0, as.numberOfChunks) + as.lock.RLock() + defer as.lock.RUnlock() + for i := uint64(0); i < as.numberOfChunks; i++ { + chunkIndex := uint64(i) + if _, found := as.signatures[chunkIndex]; found { + // skip if we already have enough valid approvals for this chunk + continue + } + missingChunks = append(missingChunks, chunkIndex) + } + return missingChunks +} diff --git a/engine/consensus/approvals/approval_collector.go b/engine/consensus/approvals/approval_collector.go new file mode 100644 index 00000000000..9b52cea1361 --- /dev/null +++ b/engine/consensus/approvals/approval_collector.go @@ -0,0 +1,127 @@ +package approvals + +import ( + "fmt" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/chunks" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" +) + +// ApprovalCollector is responsible for distributing work to chunk collectorTree, +// collecting aggregated signatures for chunks that reached seal construction threshold, +// creating and submitting seal candidates once signatures for every chunk are aggregated. +type ApprovalCollector struct { + incorporatedBlock *flow.Header // block that incorporates execution result + incorporatedResult *flow.IncorporatedResult // incorporated result that is being sealed + chunkCollectors []*ChunkApprovalCollector // slice of chunk collectorTree that is created on construction and doesn't change + aggregatedSignatures *AggregatedSignatures // aggregated signature for each chunk + seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results + numberOfChunks int // number of chunks for execution result, remains constant + requiredApprovalsForSealConstruction uint // min number of approvals required for constructing a candidate seal +} + +func NewApprovalCollector(result *flow.IncorporatedResult, incorporatedBlock *flow.Header, assignment *chunks.Assignment, seals mempool.IncorporatedResultSeals, requiredApprovalsForSealConstruction uint) *ApprovalCollector { + chunkCollectors := make([]*ChunkApprovalCollector, 0, result.Result.Chunks.Len()) + for _, chunk := range result.Result.Chunks { + chunkAssignment := assignment.Verifiers(chunk).Lookup() + collector := NewChunkApprovalCollector(chunkAssignment, requiredApprovalsForSealConstruction) + chunkCollectors = append(chunkCollectors, collector) + } + + numberOfChunks := result.Result.Chunks.Len() + return &ApprovalCollector{ + incorporatedResult: result, + incorporatedBlock: incorporatedBlock, + numberOfChunks: numberOfChunks, + chunkCollectors: chunkCollectors, + requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction, + aggregatedSignatures: NewAggregatedSignatures(uint64(numberOfChunks)), + seals: seals, + } +} + +// IncorporatedBlockID returns the ID of block which incorporates execution result +func (c *ApprovalCollector) IncorporatedBlockID() flow.Identifier { + return c.incorporatedResult.IncorporatedBlockID +} + +// IncorporatedBlock returns the block which incorporates execution result +func (c *ApprovalCollector) IncorporatedBlock() *flow.Header { + return c.incorporatedBlock +} + +func (c *ApprovalCollector) SealResult() error { + // get final state of execution result + finalState, err := c.incorporatedResult.Result.FinalStateCommitment() + if err != nil { + // message correctness should have been checked before: failure here is an internal implementation bug + return fmt.Errorf("failed to get final state commitment from Execution Result: %w", err) + } + + // TODO: Check SPoCK proofs + + // generate & store seal + seal := &flow.Seal{ + BlockID: c.incorporatedResult.Result.BlockID, + ResultID: c.incorporatedResult.Result.ID(), + FinalState: finalState, + AggregatedApprovalSigs: c.aggregatedSignatures.Collect(), + } + + // we don't care if the seal is already in the mempool + _, err = c.seals.Add(&flow.IncorporatedResultSeal{ + IncorporatedResult: c.incorporatedResult, + Seal: seal, + }) + if err != nil { + return fmt.Errorf("failed to store IncorporatedResultSeal in mempool: %w", err) + } + + return nil +} + +// ProcessApproval performs processing of result approvals and bookkeeping of aggregated signatures +// for every chunk. Triggers sealing of execution result when processed last result approval needed for sealing. +// Returns: +// - engine.InvalidInputError - result approval is invalid +// - exception in case of any other error, usually this is not expected +// - nil on success +func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error { + chunkIndex := approval.Body.ChunkIndex + if chunkIndex >= uint64(len(c.chunkCollectors)) { + return engine.NewInvalidInputErrorf("approval collector chunk index out of range: %v", chunkIndex) + } + // there is no need to process approval if we have already enough info for sealing + if c.aggregatedSignatures.HasSignature(chunkIndex) { + return nil + } + + collector := c.chunkCollectors[chunkIndex] + aggregatedSignature, collected := collector.ProcessApproval(approval) + if !collected { + return nil + } + + approvedChunks := c.aggregatedSignatures.PutSignature(chunkIndex, aggregatedSignature) + if approvedChunks < c.numberOfChunks { + return nil // still missing approvals for some chunks + } + + return c.SealResult() +} + +// CollectMissingVerifiers collects ids of verifiers who haven't provided an approval for particular chunk +// Returns: map { ChunkIndex -> []VerifierId } +func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList { + targetIDs := make(map[uint64]flow.IdentifierList) + for _, chunkIndex := range c.aggregatedSignatures.CollectChunksWithMissingApprovals() { + missingSigners := c.chunkCollectors[chunkIndex].GetMissingSigners() + if missingSigners.Len() > 0 { + targetIDs[chunkIndex] = missingSigners + } + } + + return targetIDs +} diff --git a/engine/consensus/approvals/approval_collector_test.go b/engine/consensus/approvals/approval_collector_test.go new file mode 100644 index 00000000000..da8faeae6a6 --- /dev/null +++ b/engine/consensus/approvals/approval_collector_test.go @@ -0,0 +1,131 @@ +package approvals + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/model/flow" + mempool "github.com/onflow/flow-go/module/mempool/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestApprovalCollector performs isolated testing of ApprovalCollector +// ApprovalCollector is responsible for delegating approval processing to ChunkApprovalCollector +// ApprovalCollector stores aggregated signatures for every chunk, once there is a signature for each chunk it is responsible +// for creating IncorporatedResultSeal and submitting it to the mempool. +// ApprovalCollector should reject approvals with invalid chunk index. +func TestApprovalCollector(t *testing.T) { + suite.Run(t, new(ApprovalCollectorTestSuite)) +} + +type ApprovalCollectorTestSuite struct { + BaseApprovalsTestSuite + + sealsPL *mempool.IncorporatedResultSeals + collector *ApprovalCollector +} + +func (s *ApprovalCollectorTestSuite) SetupTest() { + s.BaseApprovalsTestSuite.SetupTest() + + s.sealsPL = &mempool.IncorporatedResultSeals{} + s.collector = NewApprovalCollector(s.IncorporatedResult, &s.IncorporatedBlock, s.ChunksAssignment, s.sealsPL, uint(len(s.AuthorizedVerifiers))) +} + +// TestProcessApproval_ValidApproval tests that valid approval is processed without error +func (s *ApprovalCollectorTestSuite) TestProcessApproval_ValidApproval() { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), unittest.WithApproverID(s.VerID)) + err := s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) +} + +// TestProcessApproval_SealResult tests that after collecting enough approvals for every chunk ApprovalCollector will +// generate a seal and put it into seals mempool. This logic should be event driven and happen as soon as required threshold is +// met for each chunk. +func (s *ApprovalCollectorTestSuite) TestProcessApproval_SealResult() { + expectedSignatures := make([]flow.AggregatedSignature, s.IncorporatedResult.Result.Chunks.Len()) + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + + for i, chunk := range s.Chunks { + var err error + sigCollector := flow.NewSignatureCollector() + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), unittest.WithApproverID(verID)) + err = s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) + sigCollector.Add(approval.Body.ApproverID, approval.Body.AttestationSignature) + } + expectedSignatures[i] = sigCollector.ToAggregatedSignature() + } + + finalState, _ := s.IncorporatedResult.Result.FinalStateCommitment() + expectedArguments := &flow.IncorporatedResultSeal{ + IncorporatedResult: s.IncorporatedResult, + Seal: &flow.Seal{ + BlockID: s.IncorporatedResult.Result.BlockID, + ResultID: s.IncorporatedResult.Result.ID(), + FinalState: finalState, + AggregatedApprovalSigs: expectedSignatures, + ServiceEvents: nil, + }, + } + + s.sealsPL.AssertCalled(s.T(), "Add", expectedArguments) +} + +// TestProcessApproval_InvalidChunk tests that approval with invalid chunk index will be rejected without +// processing. +func (s *ApprovalCollectorTestSuite) TestProcessApproval_InvalidChunk() { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(uint64(s.Chunks.Len()+1)), + unittest.WithApproverID(s.VerID)) + err := s.collector.ProcessApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) +} + +// TestCollectMissingVerifiers tests that approval collector correctly assembles list of verifiers that haven't provided approvals +// for each chunk +func (s *ApprovalCollectorTestSuite) TestCollectMissingVerifiers() { + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Maybe() + + assignedVerifiers := make(map[uint64]flow.IdentifierList) + for _, chunk := range s.Chunks { + assignedVerifiers[chunk.Index] = s.ChunksAssignment.Verifiers(chunk) + } + + // no approvals processed + for index, ids := range s.collector.CollectMissingVerifiers() { + require.ElementsMatch(s.T(), ids, assignedVerifiers[index]) + } + + // process one approval for one each chunk + for _, chunk := range s.Chunks { + verID := assignedVerifiers[chunk.Index][0] + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID)) + err := s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) + } + + for index, ids := range s.collector.CollectMissingVerifiers() { + // skip first ID since we should have approval for it + require.ElementsMatch(s.T(), ids, assignedVerifiers[index][1:]) + } + + // process remaining approvals for each chunk + for _, chunk := range s.Chunks { + for _, verID := range assignedVerifiers[chunk.Index] { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID)) + err := s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) + } + } + + // skip first ID since we should have approval for it + require.Empty(s.T(), s.collector.CollectMissingVerifiers()) +} diff --git a/engine/consensus/approvals/approvals_cache.go b/engine/consensus/approvals/approvals_cache.go new file mode 100644 index 00000000000..87fdedf612d --- /dev/null +++ b/engine/consensus/approvals/approvals_cache.go @@ -0,0 +1,46 @@ +package approvals + +import ( + "sync" + + "github.com/onflow/flow-go/model/flow" +) + +// Cache is a utility structure that encapsulates map that stores result approvals +// and provides concurrent access to it. +type Cache struct { + cache map[flow.Identifier]*flow.ResultApproval + lock sync.RWMutex +} + +func NewApprovalsCache(capacity uint) *Cache { + return &Cache{ + cache: make(map[flow.Identifier]*flow.ResultApproval, capacity), + lock: sync.RWMutex{}, + } +} + +// Put saves approval into cache +func (c *Cache) Put(approval *flow.ResultApproval) { + c.lock.Lock() + defer c.lock.Unlock() + c.cache[approval.Body.PartialID()] = approval +} + +// Get returns approval that is saved in cache +func (c *Cache) Get(approvalID flow.Identifier) *flow.ResultApproval { + c.lock.RLock() + defer c.lock.RUnlock() + return c.cache[approvalID] +} + +// All returns all stored approvals +func (c *Cache) All() []*flow.ResultApproval { + c.lock.RLock() + defer c.lock.RUnlock() + all := make([]*flow.ResultApproval, 0, len(c.cache)) + for _, approval := range c.cache { + all = append(all, approval) + } + return all +} diff --git a/engine/consensus/approvals/approvals_lru_cache.go b/engine/consensus/approvals/approvals_lru_cache.go new file mode 100644 index 00000000000..e878380b6bd --- /dev/null +++ b/engine/consensus/approvals/approvals_lru_cache.go @@ -0,0 +1,94 @@ +package approvals + +import ( + "sync" + + "github.com/hashicorp/golang-lru/simplelru" + + "github.com/onflow/flow-go/model/flow" +) + +// LruCache is a wrapper over `simplelru.LRUCache` that provides needed api for processing result approvals +// Extends functionality of `simplelru.LRUCache` by introducing additional index for quicker access. +type LruCache struct { + lru simplelru.LRUCache + lock sync.RWMutex + // secondary index by result id, since multiple approvals could + // reference same result + byResultID map[flow.Identifier]map[flow.Identifier]struct{} +} + +func NewApprovalsLRUCache(limit uint) *LruCache { + byResultID := make(map[flow.Identifier]map[flow.Identifier]struct{}) + // callback has to be called while we are holding lock + lru, _ := simplelru.NewLRU(int(limit), func(key interface{}, value interface{}) { + approval := value.(*flow.ResultApproval) + delete(byResultID[approval.Body.ExecutionResultID], approval.Body.PartialID()) + }) + return &LruCache{ + lru: lru, + byResultID: byResultID, + } +} + +func (c *LruCache) Peek(approvalID flow.Identifier) *flow.ResultApproval { + c.lock.RLock() + defer c.lock.RUnlock() + // check if we have it in the cache + resource, cached := c.lru.Peek(approvalID) + if cached { + return resource.(*flow.ResultApproval) + } + + return nil +} + +func (c *LruCache) Get(approvalID flow.Identifier) *flow.ResultApproval { + c.lock.Lock() + defer c.lock.Unlock() + // check if we have it in the cache + resource, cached := c.lru.Get(approvalID) + if cached { + return resource.(*flow.ResultApproval) + } + + return nil +} + +func (c *LruCache) TakeByResultID(resultID flow.Identifier) []*flow.ResultApproval { + c.lock.Lock() + defer c.lock.Unlock() + + ids, ok := c.byResultID[resultID] + if !ok { + return nil + } + + approvals := make([]*flow.ResultApproval, 0, len(ids)) + for approvalID := range ids { + // check if we have it in the cache + if resource, ok := c.lru.Peek(approvalID); ok { + // no need to cleanup secondary index since it will be + // cleaned up in evict callback + _ = c.lru.Remove(approvalID) + approvals = append(approvals, resource.(*flow.ResultApproval)) + } + } + + return approvals +} + +func (c *LruCache) Put(approval *flow.ResultApproval) { + approvalID := approval.Body.PartialID() + resultID := approval.Body.ExecutionResultID + c.lock.Lock() + defer c.lock.Unlock() + // cache the resource and eject least recently used one if we reached limit + _ = c.lru.Add(approvalID, approval) + _, ok := c.byResultID[resultID] + if !ok { + c.byResultID[resultID] = map[flow.Identifier]struct{}{approvalID: {}} + } else { + c.byResultID[resultID][approvalID] = struct{}{} + } +} diff --git a/engine/consensus/approvals/assignment_collector.go b/engine/consensus/approvals/assignment_collector.go new file mode 100644 index 00000000000..251f9c27573 --- /dev/null +++ b/engine/consensus/approvals/assignment_collector.go @@ -0,0 +1,354 @@ +package approvals + +import ( + "fmt" + "math/rand" + "sync" + + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/consensus/sealing" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/model/messages" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/network" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" +) + +// helper functor that can be used to retrieve cached block height +type GetCachedBlockHeight = func(blockID flow.Identifier) (uint64, error) + +// AssignmentCollector is responsible collecting approvals that satisfy one assignment, meaning that we will +// have multiple collectorTree for one execution result as same result can be incorporated in multiple forks. +// AssignmentCollector has a strict ordering of processing, before processing approvals at least one incorporated result has to be +// processed. +// AssignmentCollector takes advantage of internal caching to speed up processing approvals for different assignments +// AssignmentCollector is responsible for validating approvals on result-level(checking signature, identity). +// TODO: currently AssignmentCollector doesn't cleanup collectorTree when blocks that incorporate results get orphaned +// For BFT milestone we need to ensure that this cleanup is properly implemented and all orphan collectorTree are pruned by height +// when fork gets orphaned +type AssignmentCollector struct { + ResultID flow.Identifier // ID of execution result + result *flow.ExecutionResult // execution result that we are collecting approvals for + BlockHeight uint64 // height of block targeted by execution result + collectors map[flow.Identifier]*ApprovalCollector // collectors is a mapping IncorporatedBlockID -> ApprovalCollector + authorizedApprovers map[flow.Identifier]*flow.Identity // map of approvers pre-selected at block that is being sealed + lock sync.RWMutex // lock for protecting collectors map + verifiedApprovalsCache *Cache // in-memory cache of approvals (already verified) + requiredApprovalsForSealConstruction uint // number of approvals that are required for each chunk to be sealed + assigner module.ChunkAssigner // used to build assignment + headers storage.Headers // used to query headers from storage + state protocol.State // used to access the protocol state + verifier module.Verifier // used to validate result approvals + seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results + approvalConduit network.Conduit // used to request missing approvals from verification nodes + requestTracker *sealing.RequestTracker // used to keep track of number of approval requests, and blackout periods, by chunk +} + +func NewAssignmentCollector(result *flow.ExecutionResult, state protocol.State, headers storage.Headers, assigner module.ChunkAssigner, seals mempool.IncorporatedResultSeals, + sigVerifier module.Verifier, approvalConduit network.Conduit, requestTracker *sealing.RequestTracker, requiredApprovalsForSealConstruction uint, +) (*AssignmentCollector, error) { + block, err := headers.ByBlockID(result.BlockID) + if err != nil { + return nil, err + } + + collector := &AssignmentCollector{ + ResultID: result.ID(), + result: result, + BlockHeight: block.Height, + collectors: make(map[flow.Identifier]*ApprovalCollector), + state: state, + assigner: assigner, + seals: seals, + verifier: sigVerifier, + requestTracker: requestTracker, + approvalConduit: approvalConduit, + headers: headers, + requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction, + } + + // pre-select all authorized verifiers at the block that is being sealed + collector.authorizedApprovers, err = collector.authorizedVerifiersAtBlock(result.BlockID) + if err != nil { + return nil, engine.NewInvalidInputErrorf("could not determine authorized verifiers for sealing candidate: %w", err) + } + + collector.verifiedApprovalsCache = NewApprovalsCache(uint(result.Chunks.Len() * len(collector.authorizedApprovers))) + + return collector, nil +} + +// BlockID returns the ID of the executed block +func (ac *AssignmentCollector) BlockID() flow.Identifier { + return ac.result.BlockID +} + +func (ac *AssignmentCollector) collectorByBlockID(incorporatedBlockID flow.Identifier) *ApprovalCollector { + ac.lock.RLock() + defer ac.lock.RUnlock() + return ac.collectors[incorporatedBlockID] +} + +// authorizedVerifiersAtBlock pre-select all authorized Verifiers at the block that incorporates the result. +// The method returns the set of all node IDs that: +// * are authorized members of the network at the given block and +// * have the Verification role and +// * have _positive_ weight and +// * are not ejected +func (ac *AssignmentCollector) authorizedVerifiersAtBlock(blockID flow.Identifier) (map[flow.Identifier]*flow.Identity, error) { + authorizedVerifierList, err := ac.state.AtBlockID(blockID).Identities( + filter.And( + filter.HasRole(flow.RoleVerification), + filter.HasStake(true), + filter.Not(filter.Ejected), + )) + if err != nil { + return nil, fmt.Errorf("failed to retrieve Identities for block %v: %w", blockID, err) + } + if len(authorizedVerifierList) == 0 { + return nil, fmt.Errorf("no authorized verifiers found for block %v", blockID) + } + identities := make(map[flow.Identifier]*flow.Identity) + for _, identity := range authorizedVerifierList { + identities[identity.NodeID] = identity + } + return identities, nil +} + +// emergencySealable determines whether an incorporated Result qualifies for "emergency sealing". +// ATTENTION: this is a temporary solution, which is NOT BFT compatible. When the approval process +// hangs far enough behind finalization (measured in finalized but unsealed blocks), emergency +// sealing kicks in. This will be removed when implementation of seal & verification is finished. +func (ac *AssignmentCollector) emergencySealable(collector *ApprovalCollector, finalizedBlockHeight uint64) bool { + // Criterion for emergency sealing: + // there must be at least DefaultEmergencySealingThreshold number of blocks between + // the block that _incorporates_ result and the latest finalized block + return collector.IncorporatedBlock().Height+sealing.DefaultEmergencySealingThreshold <= finalizedBlockHeight +} + +func (ac *AssignmentCollector) CheckEmergencySealing(finalizedBlockHeight uint64) error { + for _, collector := range ac.allCollectors() { + sealable := ac.emergencySealable(collector, finalizedBlockHeight) + if sealable { + err := collector.SealResult() + if err != nil { + return fmt.Errorf("could not create emergency seal for result %x incorporated at %x: %w", + ac.ResultID, collector.IncorporatedBlockID(), err) + } + } + } + + return nil +} + +func (ac *AssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error { + // check that result is the one that this AssignmentCollector manages + if irID := incorporatedResult.Result.ID(); irID != ac.ResultID { + return fmt.Errorf("this AssignmentCollector manages result %x but got %x", ac.ResultID, irID) + } + + incorporatedBlockID := incorporatedResult.IncorporatedBlockID + if collector := ac.collectorByBlockID(incorporatedBlockID); collector != nil { + return nil + } + + // This function is not exactly thread safe, it can perform double computation of assignment and authorized verifiers + // It is safe in regards that only one collector will be stored to the cache + // In terms of locking time it's better to perform extra computation in edge cases than lock this logic with mutex + // since it's quite unlikely that same incorporated result will be processed by multiple goroutines simultaneously + + // chunk assigment is based on the first block in the fork that incorporates the result + assignment, err := ac.assigner.Assign(incorporatedResult.Result, incorporatedBlockID) + if err != nil { + return fmt.Errorf("could not determine chunk assignment: %w", err) + } + + incorporatedBlock, err := ac.headers.ByBlockID(incorporatedBlockID) + if err != nil { + return fmt.Errorf("failed to retrieve header of incorporated block %s: %w", + incorporatedBlockID, err) + } + + collector := NewApprovalCollector(incorporatedResult, incorporatedBlock, assignment, ac.seals, ac.requiredApprovalsForSealConstruction) + + isDuplicate := ac.putCollector(incorporatedBlockID, collector) + if isDuplicate { + return nil + } + + // process approvals that have passed needed checks and are ready to be processed + for _, approval := range ac.verifiedApprovalsCache.All() { + // those approvals are verified already and shouldn't yield any errors + _ = collector.ProcessApproval(approval) + + } + + return nil +} + +func (ac *AssignmentCollector) putCollector(incorporatedBlockID flow.Identifier, collector *ApprovalCollector) bool { + ac.lock.Lock() + defer ac.lock.Unlock() + if _, ok := ac.collectors[incorporatedBlockID]; ok { + return true + } + ac.collectors[incorporatedBlockID] = collector + return false +} + +func (ac *AssignmentCollector) allCollectors() []*ApprovalCollector { + ac.lock.RLock() + defer ac.lock.RUnlock() + collectors := make([]*ApprovalCollector, 0, len(ac.collectors)) + for _, collector := range ac.collectors { + collectors = append(collectors, collector) + } + return collectors +} + +func (ac *AssignmentCollector) verifyAttestationSignature(approval *flow.ResultApprovalBody, nodeIdentity *flow.Identity) error { + id := approval.Attestation.ID() + valid, err := ac.verifier.Verify(id[:], approval.AttestationSignature, nodeIdentity.StakingPubKey) + if err != nil { + return fmt.Errorf("failed to verify attestation signature: %w", err) + } + + if !valid { + return engine.NewInvalidInputErrorf("invalid attestation signature for (%x)", nodeIdentity.NodeID) + } + + return nil +} + +func (ac *AssignmentCollector) verifySignature(approval *flow.ResultApproval, nodeIdentity *flow.Identity) error { + id := approval.Body.ID() + valid, err := ac.verifier.Verify(id[:], approval.VerifierSignature, nodeIdentity.StakingPubKey) + if err != nil { + return fmt.Errorf("failed to verify signature: %w", err) + } + + if !valid { + return engine.NewInvalidInputErrorf("invalid signature for (%x)", nodeIdentity.NodeID) + } + + return nil +} + +// validateApproval performs result level checks of flow.ResultApproval +// checks: +// - verification node identity +// - attestation signature +// - signature of verification node +// - chunk index sanity check +// - block ID sanity check +// Returns: +// - engine.InvalidInputError - result approval is invalid +// - exception in case of any other error, usually this is not expected +// - nil on successful check +func (ac *AssignmentCollector) validateApproval(approval *flow.ResultApproval) error { + // check that approval is for the expected result to reject incompatible inputs + if approval.Body.ExecutionResultID != ac.ResultID { + return fmt.Errorf("this AssignmentCollector processes only approvals for result (%x) but got an approval for (%x)", ac.ResultID, approval.Body.ExecutionResultID) + } + + // approval has to refer same block as execution result + if approval.Body.BlockID != ac.BlockID() { + return engine.NewInvalidInputErrorf("result approval for invalid block, expected (%x) vs (%x)", + ac.BlockID(), approval.Body.BlockID) + } + + chunkIndex := approval.Body.ChunkIndex + if chunkIndex >= uint64(ac.result.Chunks.Len()) { + return engine.NewInvalidInputErrorf("chunk index out of range: %v", chunkIndex) + } + + identity, found := ac.authorizedApprovers[approval.Body.ApproverID] + if !found { + return engine.NewInvalidInputErrorf("approval not from authorized verifier") + } + + err := ac.verifyAttestationSignature(&approval.Body, identity) + if err != nil { + return fmt.Errorf("validating attestation signature failed: %w", err) + } + + err = ac.verifySignature(approval, identity) + if err != nil { + return fmt.Errorf("validating approval signature failed: %w", err) + } + + return nil +} + +func (ac *AssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error { + err := ac.validateApproval(approval) + if err != nil { + return fmt.Errorf("could not validate approval: %w", err) + } + + if cached := ac.verifiedApprovalsCache.Get(approval.Body.PartialID()); cached != nil { + // we have this approval cached already, no need to process it again + return nil + } + + ac.verifiedApprovalsCache.Put(approval) + + for _, collector := range ac.allCollectors() { + err := collector.ProcessApproval(approval) + if err != nil { + return fmt.Errorf("could not process approval: %w", err) + } + } + + return nil +} + +func (ac *AssignmentCollector) RequestMissingApprovals(maxHeightForRequesting uint64) error { + for _, collector := range ac.allCollectors() { + if collector.IncorporatedBlock().Height > maxHeightForRequesting { + continue + } + + for chunkIndex, verifiers := range collector.CollectMissingVerifiers() { + // Retrieve information about requests made for this chunk. Skip + // requesting if the blackout period hasn't expired. Otherwise, + // update request count and reset blackout period. + requestTrackerItem := ac.requestTracker.Get(ac.ResultID, collector.IncorporatedBlockID(), chunkIndex) + if requestTrackerItem.IsBlackout() { + continue + } + requestTrackerItem.Update() + ac.requestTracker.Set(ac.ResultID, collector.IncorporatedBlockID(), chunkIndex, requestTrackerItem) + + // for monitoring/debugging purposes, log requests if we start + // making more than 10 + if requestTrackerItem.Requests >= 10 { + log.Debug().Msgf("requesting approvals for result %v, incorporatedBlockID %v chunk %d: %d requests", + ac.ResultID, + collector.IncorporatedBlockID(), + chunkIndex, + requestTrackerItem.Requests, + ) + } + + // prepare the request + req := &messages.ApprovalRequest{ + Nonce: rand.Uint64(), + ResultID: ac.ResultID, + ChunkIndex: chunkIndex, + } + + err := ac.approvalConduit.Publish(req, verifiers...) + if err != nil { + log.Error().Err(err). + Msgf("could not publish approval request for chunk %d", chunkIndex) + } + } + } + return nil +} diff --git a/engine/consensus/approvals/assignment_collector_test.go b/engine/consensus/approvals/assignment_collector_test.go new file mode 100644 index 00000000000..071d9ab2253 --- /dev/null +++ b/engine/consensus/approvals/assignment_collector_test.go @@ -0,0 +1,427 @@ +package approvals + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/consensus/sealing" + "github.com/onflow/flow-go/model/chunks" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/model/messages" + mempool "github.com/onflow/flow-go/module/mempool/mock" + module "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network/mocknetwork" + realproto "github.com/onflow/flow-go/state/protocol" + protocol "github.com/onflow/flow-go/state/protocol/mock" + realstorage "github.com/onflow/flow-go/storage" + storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestAssignmentCollector tests behavior of AssignmentCollector in different scenarios +// AssignmentCollector is responsible collecting approvals that satisfy one assignment, meaning that we will +// have multiple collectorTree for one execution result as same result can be incorporated in multiple forks. +// AssignmentCollector has a strict ordering of processing, before processing approvals at least one incorporated result has to be +// processed. +// AssignmentCollector takes advantage of internal caching to speed up processing approvals for different assignments +// AssignmentCollector is responsible for validating approvals on result-level(checking signature, identity). +func TestAssignmentCollector(t *testing.T) { + suite.Run(t, new(AssignmentCollectorTestSuite)) +} + +type AssignmentCollectorTestSuite struct { + BaseApprovalsTestSuite + + blocks map[flow.Identifier]*flow.Header + state *protocol.State + headers *storage.Headers + assigner *module.ChunkAssigner + sealsPL *mempool.IncorporatedResultSeals + sigVerifier *module.Verifier + conduit *mocknetwork.Conduit + identitiesCache map[flow.Identifier]map[flow.Identifier]*flow.Identity // helper map to store identities for given block + requestTracker *sealing.RequestTracker + + collector *AssignmentCollector +} + +func (s *AssignmentCollectorTestSuite) SetupTest() { + s.BaseApprovalsTestSuite.SetupTest() + + s.sealsPL = &mempool.IncorporatedResultSeals{} + s.state = &protocol.State{} + s.assigner = &module.ChunkAssigner{} + s.sigVerifier = &module.Verifier{} + s.conduit = &mocknetwork.Conduit{} + s.headers = &storage.Headers{} + + s.requestTracker = sealing.NewRequestTracker(1, 3) + + // setup blocks cache for protocol state + s.blocks = make(map[flow.Identifier]*flow.Header) + s.blocks[s.Block.ID()] = &s.Block + s.blocks[s.IncorporatedBlock.ID()] = &s.IncorporatedBlock + + // setup identities for each block + s.identitiesCache = make(map[flow.Identifier]map[flow.Identifier]*flow.Identity) + s.identitiesCache[s.IncorporatedResult.Result.BlockID] = s.AuthorizedVerifiers + + s.assigner.On("Assign", mock.Anything, mock.Anything).Return(func(result *flow.ExecutionResult, blockID flow.Identifier) *chunks.Assignment { + return s.ChunksAssignment + }, func(result *flow.ExecutionResult, blockID flow.Identifier) error { return nil }) + + s.headers.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) *flow.Header { + return s.blocks[blockID] + }, func(blockID flow.Identifier) error { + _, found := s.blocks[blockID] + if found { + return nil + } else { + return realstorage.ErrNotFound + } + }) + + s.state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) realproto.Snapshot { + if block, found := s.blocks[blockID]; found { + return unittest.StateSnapshotForKnownBlock(block, s.identitiesCache[blockID]) + } else { + return unittest.StateSnapshotForUnknownBlock() + } + }, + ) + + var err error + s.collector, err = NewAssignmentCollector(s.IncorporatedResult.Result, s.state, s.headers, s.assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, uint(len(s.AuthorizedVerifiers))) + require.NoError(s.T(), err) +} + +// TestProcessApproval_ApprovalsAfterResult tests a scenario when first we have discovered execution result +// and after that we started receiving approvals. In this scenario we should be able to create a seal right +// after processing last needed approval to meet `requiredApprovalsForSealConstruction` threshold. +func (s *AssignmentCollectorTestSuite) TestProcessApproval_ApprovalsAfterResult() { + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + blockID := s.Block.ID() + resultID := s.IncorporatedResult.Result.ID() + for _, chunk := range s.Chunks { + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID), + unittest.WithBlockID(blockID), + unittest.WithExecutionResultID(resultID)) + err = s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) + } + } + + s.sealsPL.AssertCalled(s.T(), "Add", mock.Anything) +} + +// TestProcessIncorporatedResult_ReusingCachedApprovals tests a scenario where we successfully processed approvals for one incorporated result +// and we are able to reuse those approvals for another incorporated result of same execution result +func (s *AssignmentCollectorTestSuite) TestProcessIncorporatedResult_ReusingCachedApprovals() { + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Twice() + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + blockID := s.Block.ID() + resultID := s.IncorporatedResult.Result.ID() + for _, chunk := range s.Chunks { + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID), + unittest.WithBlockID(blockID), + unittest.WithExecutionResultID(resultID)) + err = s.collector.ProcessApproval(approval) + require.NoError(s.T(), err) + } + } + + incorporatedBlock := unittest.BlockHeaderWithParentFixture(&s.Block) + s.blocks[incorporatedBlock.ID()] = &incorporatedBlock + + // at this point we have proposed a seal, let's construct new incorporated result with same assignment + // but different incorporated block ID resulting in new seal. + incorporatedResult := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(incorporatedBlock.ID()), + unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result), + ) + + err = s.collector.ProcessIncorporatedResult(incorporatedResult) + require.NoError(s.T(), err) + s.sealsPL.AssertCalled(s.T(), "Add", mock.Anything) + +} + +// TestProcessApproval_InvalidSignature tests a scenario processing approval with invalid signature +func (s *AssignmentCollectorTestSuite) TestProcessApproval_InvalidSignature() { + + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), + unittest.WithApproverID(s.VerID), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + + // attestation signature is valid + s.sigVerifier.On("Verify", mock.Anything, approval.Body.AttestationSignature, mock.Anything).Return(true, nil).Once() + // approval signature is invalid + s.sigVerifier.On("Verify", mock.Anything, approval.VerifierSignature, mock.Anything).Return(false, nil).Once() + + err = s.collector.ProcessApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) +} + +// TestProcessApproval_InvalidBlockID tests a scenario processing approval with invalid block ID +func (s *AssignmentCollectorTestSuite) TestProcessApproval_InvalidBlockID() { + + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), + unittest.WithApproverID(s.VerID), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + + err = s.collector.ProcessApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) +} + +// TestProcessApproval_InvalidBlockChunkIndex tests a scenario processing approval with invalid chunk index +func (s *AssignmentCollectorTestSuite) TestProcessApproval_InvalidBlockChunkIndex() { + + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + approval := unittest.ResultApprovalFixture(unittest.WithChunk(uint64(s.Chunks.Len())), + unittest.WithApproverID(s.VerID), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + + err = s.collector.ProcessApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) +} + +// TestProcessIncorporatedResult tests different scenarios for processing incorporated result +// Expected to process valid incorporated result without error and reject invalid incorporated results +// with engine.InvalidInputError +func (s *AssignmentCollectorTestSuite) TestProcessIncorporatedResult() { + s.Run("valid-incorporated-result", func() { + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + }) + + s.Run("invalid-assignment", func() { + assigner := &module.ChunkAssigner{} + assigner.On("Assign", mock.Anything, mock.Anything).Return(nil, fmt.Errorf("")) + + collector, err := NewAssignmentCollector(s.IncorporatedResult.Result, s.state, s.headers, assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, 1) + require.NoError(s.T(), err) + + err = collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.Error(s.T(), err) + }) + + s.Run("invalid-verifier-identities", func() { + // delete identities for Result.BlockID + delete(s.identitiesCache, s.IncorporatedResult.Result.BlockID) + collector, err := NewAssignmentCollector(s.IncorporatedResult.Result, s.state, s.headers, s.assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, 1) + require.Error(s.T(), err) + require.Nil(s.T(), collector) + require.True(s.T(), engine.IsInvalidInputError(err)) + }) +} + +// TestProcessIncorporatedResult_InvalidIdentity tests a few scenarios where verifier identity is not correct +// by one or another reason +func (s *AssignmentCollectorTestSuite) TestProcessIncorporatedResult_InvalidIdentity() { + + s.Run("verifier-not-staked", func() { + identity := unittest.IdentityFixture(unittest.WithRole(flow.RoleVerification)) + identity.Stake = 0 // invalid stake + + state := &protocol.State{} + state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) realproto.Snapshot { + return unittest.StateSnapshotForKnownBlock( + &s.Block, + map[flow.Identifier]*flow.Identity{identity.NodeID: identity}, + ) + }, + ) + + collector, err := NewAssignmentCollector(s.IncorporatedResult.Result, state, s.headers, s.assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, 1) + require.Error(s.T(), err) + require.Nil(s.T(), collector) + require.True(s.T(), engine.IsInvalidInputError(err)) + }) + + s.Run("verifier-ejected", func() { + identity := unittest.IdentityFixture(unittest.WithRole(flow.RoleVerification)) + identity.Ejected = true // node ejected + + state := &protocol.State{} + state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) realproto.Snapshot { + return unittest.StateSnapshotForKnownBlock( + &s.Block, + map[flow.Identifier]*flow.Identity{identity.NodeID: identity}, + ) + }, + ) + + collector, err := NewAssignmentCollector(s.IncorporatedResult.Result, state, s.headers, s.assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, 1) + require.Nil(s.T(), collector) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) + }) + s.Run("verifier-invalid-role", func() { + // invalid role + identity := unittest.IdentityFixture(unittest.WithRole(flow.RoleAccess)) + + state := &protocol.State{} + state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) realproto.Snapshot { + return unittest.StateSnapshotForKnownBlock( + &s.Block, + map[flow.Identifier]*flow.Identity{identity.NodeID: identity}, + ) + }, + ) + + collector, err := NewAssignmentCollector(s.IncorporatedResult.Result, state, s.headers, s.assigner, s.sealsPL, + s.sigVerifier, s.conduit, s.requestTracker, 1) + require.Nil(s.T(), collector) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) + }) +} + +// TestProcessApproval_BeforeIncorporatedResult tests scenario when approval is submitted before execution result +// is discovered, without execution result we are missing information for verification. Calling `ProcessApproval` before `ProcessApproval` +// should result in error +func (s *AssignmentCollectorTestSuite) TestProcessApproval_BeforeIncorporatedResult() { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), + unittest.WithApproverID(s.VerID), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + err := s.collector.ProcessApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsInvalidInputError(err)) +} + +// TestRequestMissingApprovals checks that requests are sent only for chunks +// that have not collected enough approvals yet, and are sent only to the +// verifiers assigned to those chunks. It also checks that the threshold and +// rate limiting is respected. +func (s *AssignmentCollectorTestSuite) TestRequestMissingApprovals() { + // build new assignment with 2 verifiers + assignment := chunks.NewAssignment() + for _, chunk := range s.Chunks { + verifiers := s.ChunksAssignment.Verifiers(chunk) + assignment.Add(chunk, verifiers[:2]) + } + // replace old one + s.ChunksAssignment = assignment + + incorporatedBlocks := make([]*flow.Header, 0) + + lastHeight := uint64(rand.Uint32()) + for i := 0; i < 2; i++ { + incorporatedBlock := unittest.BlockHeaderFixture() + incorporatedBlock.Height = lastHeight + lastHeight++ + + s.blocks[incorporatedBlock.ID()] = &incorporatedBlock + incorporatedBlocks = append(incorporatedBlocks, &incorporatedBlock) + } + + incorporatedResults := make([]*flow.IncorporatedResult, 0, len(incorporatedBlocks)) + for _, block := range incorporatedBlocks { + incorporatedResult := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result), + unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID())) + incorporatedResults = append(incorporatedResults, incorporatedResult) + + err := s.collector.ProcessIncorporatedResult(incorporatedResult) + require.NoError(s.T(), err) + } + + requests := make([]*messages.ApprovalRequest, 0) + // mock the Publish method when requests are sent to 2 verifiers + s.conduit.On("Publish", mock.Anything, mock.Anything, mock.Anything). + Return(nil). + Run(func(args mock.Arguments) { + // collect the request + ar, ok := args[0].(*messages.ApprovalRequest) + s.Assert().True(ok) + requests = append(requests, ar) + }) + + err := s.collector.RequestMissingApprovals(lastHeight) + require.NoError(s.T(), err) + + // first time it goes through, no requests should be made because of the + // blackout period + require.Len(s.T(), requests, 0) + + // wait for the max blackout period to elapse and retry + time.Sleep(3 * time.Second) + + // requesting with immature height will be ignored + err = s.collector.RequestMissingApprovals(lastHeight - uint64(len(incorporatedBlocks)) - 1) + s.Require().NoError(err) + require.Len(s.T(), requests, 0) + + err = s.collector.RequestMissingApprovals(lastHeight) + s.Require().NoError(err) + + require.Len(s.T(), requests, s.Chunks.Len()*len(s.collector.collectors)) + + resultID := s.IncorporatedResult.Result.ID() + for _, chunk := range s.Chunks { + for _, incorporatedResult := range incorporatedResults { + requestItem := s.requestTracker.Get(resultID, incorporatedResult.IncorporatedBlockID, chunk.Index) + require.Equal(s.T(), uint(1), requestItem.Requests) + } + + } +} + +// TestCheckEmergencySealing tests that currently tracked incorporated results can be emergency sealed +// when height difference reached the emergency sealing threshold. +func (s *AssignmentCollectorTestSuite) TestCheckEmergencySealing() { + err := s.collector.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + // checking emergency sealing with current height + // should early exit without creating any seals + err = s.collector.CheckEmergencySealing(s.IncorporatedBlock.Height) + require.NoError(s.T(), err) + + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + + err = s.collector.CheckEmergencySealing(sealing.DefaultEmergencySealingThreshold + s.IncorporatedBlock.Height) + require.NoError(s.T(), err) + + s.sealsPL.AssertExpectations(s.T()) +} diff --git a/engine/consensus/approvals/assignment_collector_tree.go b/engine/consensus/approvals/assignment_collector_tree.go new file mode 100644 index 00000000000..43ed95d890c --- /dev/null +++ b/engine/consensus/approvals/assignment_collector_tree.go @@ -0,0 +1,220 @@ +package approvals + +import ( + "fmt" + "sync" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/forest" + "github.com/onflow/flow-go/storage" +) + +// assignmentCollectorVertex is a helper structure that implements a LevelledForrest Vertex interface and encapsulates +// AssignmentCollector and information if collector is processable or not +type assignmentCollectorVertex struct { + collector *AssignmentCollector + processable bool +} + +/* Methods implementing LevelledForest's Vertex interface */ + +func (v *assignmentCollectorVertex) VertexID() flow.Identifier { return v.collector.ResultID } +func (v *assignmentCollectorVertex) Level() uint64 { return v.collector.BlockHeight } +func (v *assignmentCollectorVertex) Parent() (flow.Identifier, uint64) { + return v.collector.result.PreviousResultID, v.collector.BlockHeight - 1 +} + +// NewCollector is a factory method to generate an AssignmentCollector for an execution result +type NewCollectorFactoryMethod = func(result *flow.ExecutionResult) (*AssignmentCollector, error) + +// AssignmentCollectorTree is a mempool holding assignment collectors, which is aware of the tree structure +// formed by the execution results. The mempool supports pruning by height: only collectors +// descending from the latest finalized block are relevant. +// Safe for concurrent access. Internally, the mempool utilizes the LevelledForrest. +type AssignmentCollectorTree struct { + forest *forest.LevelledForest + lock sync.RWMutex + onCreateCollector NewCollectorFactoryMethod + size uint64 + lastSealedID flow.Identifier + lastFinalizedHeight uint64 + headers storage.Headers +} + +func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, onCreateCollector NewCollectorFactoryMethod) *AssignmentCollectorTree { + return &AssignmentCollectorTree{ + forest: forest.NewLevelledForest(lastSealed.Height), + lock: sync.RWMutex{}, + onCreateCollector: onCreateCollector, + size: 0, + lastSealedID: lastSealed.ID(), + headers: headers, + } +} + +// GetCollector returns collector by ID and whether it is processable or not +func (t *AssignmentCollectorTree) GetCollector(resultID flow.Identifier) (*AssignmentCollector, bool) { + t.lock.RLock() + defer t.lock.RUnlock() + vertex, found := t.forest.GetVertex(resultID) + if !found { + return nil, false + } + + v := vertex.(*assignmentCollectorVertex) + return v.collector, v.processable +} + +// FinalizeForkAtLevel performs finalization of fork which is stored in leveled forest. When block is finalized we +// can mark other forks as orphan and stop processing approvals for it. Eventually all forks will be cleaned up by height +func (t *AssignmentCollectorTree) FinalizeForkAtLevel(finalized *flow.Header, sealed *flow.Header) { + finalizedBlockID := finalized.ID() + t.lock.Lock() + defer t.lock.Unlock() + + if t.lastFinalizedHeight >= finalized.Height { + return + } + + t.lastFinalizedHeight = finalized.Height + t.lastSealedID = sealed.ID() + iter := t.forest.GetVerticesAtLevel(finalized.Height) + for iter.HasNext() { + vertex := iter.NextVertex().(*assignmentCollectorVertex) + if finalizedBlockID != vertex.collector.BlockID() { + t.markForkProcessable(vertex, false) + } + } +} + +// markForkProcessable takes starting vertex of some fork and marks it as processable in recursive manner +func (t *AssignmentCollectorTree) markForkProcessable(vertex *assignmentCollectorVertex, processable bool) { + vertex.processable = processable + iter := t.forest.GetChildren(vertex.VertexID()) + for iter.HasNext() { + t.markForkProcessable(iter.NextVertex().(*assignmentCollectorVertex), processable) + } +} + +// GetCollectorsByInterval returns processable collectors that satisfy interval [from; to) +func (t *AssignmentCollectorTree) GetCollectorsByInterval(from, to uint64) []*AssignmentCollector { + var vertices []*AssignmentCollector + t.lock.RLock() + defer t.lock.RUnlock() + + if from < t.forest.LowestLevel { + from = t.forest.LowestLevel + } + + for l := from; l < to; l++ { + iter := t.forest.GetVerticesAtLevel(l) + for iter.HasNext() { + vertex := iter.NextVertex().(*assignmentCollectorVertex) + if vertex.processable { + vertices = append(vertices, vertex.collector) + } + } + } + + return vertices +} + +// LazyInitCollector is a helper structure that is used to return collector which is lazy initialized +type LazyInitCollector struct { + Collector *AssignmentCollector + Processable bool // whether collector is processable + Created bool // whether collector was created or retrieved from cache +} + +// GetOrCreateCollector performs lazy initialization of AssignmentCollector using double checked locking +// Returns, (AssignmentCollector, true or false whenever it was created, error) +func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionResult) (*LazyInitCollector, error) { + resultID := result.ID() + // first let's check if we have a collector already + cachedCollector, processable := t.GetCollector(resultID) + if cachedCollector != nil { + return &LazyInitCollector{ + Collector: cachedCollector, + Processable: processable, + Created: false, + }, nil + } + + collector, err := t.onCreateCollector(result) + if err != nil { + return nil, fmt.Errorf("could not create assignment collector for %v: %w", resultID, err) + } + vertex := &assignmentCollectorVertex{ + collector: collector, + processable: false, + } + + executedBlock, err := t.headers.ByBlockID(result.BlockID) + if err != nil { + return nil, fmt.Errorf("could not fetch executed block %v: %w", result.BlockID, err) + } + + // fast check shows that there is no collector, need to create one + t.lock.Lock() + defer t.lock.Unlock() + + // we need to check again, since it's possible that after checking for existing collector but before taking a lock + // new collector was created by concurrent goroutine + v, found := t.forest.GetVertex(resultID) + if found { + return &LazyInitCollector{ + Collector: v.(*assignmentCollectorVertex).collector, + Processable: v.(*assignmentCollectorVertex).processable, + Created: false, + }, nil + } + parent, parentFound := t.forest.GetVertex(result.PreviousResultID) + if parentFound { + vertex.processable = parent.(*assignmentCollectorVertex).processable + } else if executedBlock.ParentID == t.lastSealedID { + vertex.processable = true + } + + err = t.forest.VerifyVertex(vertex) + if err != nil { + return nil, fmt.Errorf("failed to store assignment collector into the tree: %w", err) + } + + t.forest.AddVertex(vertex) + t.size += 1 + t.markForkProcessable(vertex, vertex.processable) + return &LazyInitCollector{ + Collector: vertex.collector, + Processable: vertex.processable, + Created: true, + }, nil +} + +// PruneUpToHeight prunes all results for all assignment collectors with height up to but +// NOT INCLUDING `limit`. Errors if limit is lower than +// the previous value (as we cannot recover previously pruned results). +// Returns list of resultIDs that were pruned +func (t *AssignmentCollectorTree) PruneUpToHeight(limit uint64) ([]flow.Identifier, error) { + var pruned []flow.Identifier + t.lock.Lock() + defer t.lock.Unlock() + + if t.size > 0 { + // collect IDs of vertices that were pruned + for l := t.forest.LowestLevel; l < limit; l++ { + iterator := t.forest.GetVerticesAtLevel(l) + for iterator.HasNext() { + vertex := iterator.NextVertex() + pruned = append(pruned, vertex.VertexID()) + } + } + } + + // remove vertices and adjust size + err := t.forest.PruneUpToLevel(limit) + if err != nil { + return nil, fmt.Errorf("pruning Levelled Forest up to height (aka level) %d failed: %w", limit, err) + } + t.size -= uint64(len(pruned)) + return pruned, nil +} diff --git a/engine/consensus/approvals/chunk_collector.go b/engine/consensus/approvals/chunk_collector.go new file mode 100644 index 00000000000..011947e6546 --- /dev/null +++ b/engine/consensus/approvals/chunk_collector.go @@ -0,0 +1,55 @@ +package approvals + +import ( + "sync" + + "github.com/onflow/flow-go/model/flow" +) + +// ChunkApprovalCollector implements logic for checking chunks against assignments as +// well as accumulating signatures of already checked approvals. +type ChunkApprovalCollector struct { + assignment map[flow.Identifier]struct{} // set of verifiers that were assigned to current chunk + chunkApprovals flow.SignatureCollector // accumulator of signatures for current collector + lock sync.Mutex // lock to protect `chunkApprovals` + requiredApprovalsForSealConstruction uint // number of approvals that are required for each chunk to be sealed +} + +func NewChunkApprovalCollector(assignment map[flow.Identifier]struct{}, requiredApprovalsForSealConstruction uint) *ChunkApprovalCollector { + return &ChunkApprovalCollector{ + assignment: assignment, + chunkApprovals: flow.NewSignatureCollector(), + lock: sync.Mutex{}, + requiredApprovalsForSealConstruction: requiredApprovalsForSealConstruction, + } +} + +// ProcessApproval performs processing and bookkeeping of single approval +func (c *ChunkApprovalCollector) ProcessApproval(approval *flow.ResultApproval) (flow.AggregatedSignature, bool) { + approverID := approval.Body.ApproverID + if _, ok := c.assignment[approverID]; ok { + c.lock.Lock() + defer c.lock.Unlock() + c.chunkApprovals.Add(approverID, approval.Body.AttestationSignature) + if c.chunkApprovals.NumberSignatures() >= c.requiredApprovalsForSealConstruction { + return c.chunkApprovals.ToAggregatedSignature(), true + } + } + + return flow.AggregatedSignature{}, false +} + +// GetMissingSigners returns ids of approvers that are present in assignment but didn't provide approvals +func (c *ChunkApprovalCollector) GetMissingSigners() flow.IdentifierList { + // provide capacity for worst-case + result := make(flow.IdentifierList, 0, len(c.assignment)) + c.lock.Lock() + for id := range c.assignment { + if hasSigned := c.chunkApprovals.HasSigned(id); !hasSigned { + result = append(result, id) + } + } + c.lock.Unlock() + + return result +} diff --git a/engine/consensus/approvals/chunk_collector_test.go b/engine/consensus/approvals/chunk_collector_test.go new file mode 100644 index 00000000000..3fb1c5721dd --- /dev/null +++ b/engine/consensus/approvals/chunk_collector_test.go @@ -0,0 +1,95 @@ +package approvals + +import ( + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestChunkApprovalCollector performs isolated testing of ChunkApprovalCollector. +// ChunkApprovalCollector has to process and cache signatures for result approvals that satisfy assignment. +// ChunkApprovalCollector has to reject approvals with invalid assignment. +// ChunkApprovalCollector is responsible for properly accumulating signatures and creating aggregated signature when requested. +func TestChunkApprovalCollector(t *testing.T) { + suite.Run(t, new(ChunkApprovalCollectorTestSuite)) +} + +type ChunkApprovalCollectorTestSuite struct { + BaseApprovalsTestSuite + + chunk *flow.Chunk + chunkAssignment map[flow.Identifier]struct{} + collector *ChunkApprovalCollector +} + +func (s *ChunkApprovalCollectorTestSuite) SetupTest() { + s.BaseApprovalsTestSuite.SetupTest() + s.chunk = s.Chunks[0] + s.chunkAssignment = make(map[flow.Identifier]struct{}) + for _, verifier := range s.ChunksAssignment.Verifiers(s.chunk) { + s.chunkAssignment[verifier] = struct{}{} + } + s.collector = NewChunkApprovalCollector(s.chunkAssignment, uint(len(s.chunkAssignment))) +} + +// TestProcessApproval_ValidApproval tests processing a valid approval. Expected to process it without error +// and report status to caller. +func (s *ChunkApprovalCollectorTestSuite) TestProcessApproval_ValidApproval() { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.chunk.Index), unittest.WithApproverID(s.VerID)) + _, collected := s.collector.ProcessApproval(approval) + require.False(s.T(), collected) + require.Equal(s.T(), uint(1), s.collector.chunkApprovals.NumberSignatures()) +} + +// TestProcessApproval_InvalidChunkAssignment tests processing approval with invalid chunk assignment. Expected to +// reject this approval, signature cache shouldn't be affected. +func (s *ChunkApprovalCollectorTestSuite) TestProcessApproval_InvalidChunkAssignment() { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.chunk.Index), unittest.WithApproverID(s.VerID)) + delete(s.chunkAssignment, s.VerID) + _, collected := s.collector.ProcessApproval(approval) + require.False(s.T(), collected) + require.Equal(s.T(), uint(0), s.collector.chunkApprovals.NumberSignatures()) +} + +// TestGetAggregatedSignature_MultipleApprovals tests processing approvals from different verifiers. Expected to provide a valid +// aggregated sig that has `AttestationSignature` for every approval. +func (s *ChunkApprovalCollectorTestSuite) TestGetAggregatedSignature_MultipleApprovals() { + var aggregatedSig flow.AggregatedSignature + var collected bool + sigCollector := flow.NewSignatureCollector() + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.chunk.Index), unittest.WithApproverID(verID)) + aggregatedSig, collected = s.collector.ProcessApproval(approval) + sigCollector.Add(approval.Body.ApproverID, approval.Body.AttestationSignature) + } + + require.True(s.T(), collected) + require.NotNil(s.T(), aggregatedSig) + require.Equal(s.T(), uint(len(s.AuthorizedVerifiers)), s.collector.chunkApprovals.NumberSignatures()) + require.Equal(s.T(), sigCollector.ToAggregatedSignature(), aggregatedSig) +} + +// TestGetMissingSigners tests that missing signers returns correct IDs of approvers that haven't provided an approval +func (s *ChunkApprovalCollectorTestSuite) TestGetMissingSigners() { + assignedSigners := make(flow.IdentifierList, 0, len(s.chunkAssignment)) + for id := range s.chunkAssignment { + assignedSigners = append(assignedSigners, id) + } + require.ElementsMatch(s.T(), assignedSigners, s.collector.GetMissingSigners()) + + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.chunk.Index), unittest.WithApproverID(assignedSigners[0])) + s.collector.ProcessApproval(approval) + + require.ElementsMatch(s.T(), assignedSigners[1:], s.collector.GetMissingSigners()) + + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.chunk.Index), unittest.WithApproverID(verID)) + s.collector.ProcessApproval(approval) + } + + require.Empty(s.T(), s.collector.GetMissingSigners()) +} diff --git a/engine/consensus/approvals/core.go b/engine/consensus/approvals/core.go new file mode 100644 index 00000000000..60b2c4a1d95 --- /dev/null +++ b/engine/consensus/approvals/core.go @@ -0,0 +1,333 @@ +package approvals + +import ( + "errors" + "fmt" + "sync/atomic" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/consensus/sealing" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/network" + "github.com/onflow/flow-go/state/protocol" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/logging" +) + +// approvalProcessingCore is an implementation of ResultApprovalProcessor interface +// This struct is responsible for: +// - collecting approvals for execution results +// - processing multiple incorporated results +// - pre-validating approvals (if they are outdated or non-verifiable) +// - pruning already processed collectorTree +type approvalProcessingCore struct { + log zerolog.Logger // used to log relevant actions with context + collectorTree *AssignmentCollectorTree // levelled forest for assignment collectors + approvalsCache *LruCache // in-memory cache of approvals that weren't verified + atomicLastSealedHeight uint64 // atomic variable for last sealed block height + atomicLastFinalizedHeight uint64 // atomic variable for last finalized block height + emergencySealingActive bool // flag which indicates if emergency sealing is active or not. NOTE: this is temporary while sealing & verification is under development + headers storage.Headers // used to access block headers in storage + state protocol.State // used to access protocol state + seals storage.Seals // used to get last sealed block + requestTracker *sealing.RequestTracker // used to keep track of number of approval requests, and blackout periods, by chunk +} + +func NewApprovalProcessingCore(headers storage.Headers, state protocol.State, sealsDB storage.Seals, assigner module.ChunkAssigner, + verifier module.Verifier, sealsMempool mempool.IncorporatedResultSeals, approvalConduit network.Conduit, requiredApprovalsForSealConstruction uint, emergencySealingActive bool) (*approvalProcessingCore, error) { + + lastSealed, err := state.Sealed().Head() + if err != nil { + return nil, fmt.Errorf("could not retrieve last sealed block: %w", err) + } + + core := &approvalProcessingCore{ + log: log.With().Str("engine", "approvals.Core").Logger(), + approvalsCache: NewApprovalsLRUCache(1000), + headers: headers, + state: state, + seals: sealsDB, + emergencySealingActive: emergencySealingActive, + requestTracker: sealing.NewRequestTracker(10, 30), + } + + factoryMethod := func(result *flow.ExecutionResult) (*AssignmentCollector, error) { + return NewAssignmentCollector(result, core.state, core.headers, assigner, sealsMempool, verifier, + approvalConduit, core.requestTracker, requiredApprovalsForSealConstruction) + } + + core.collectorTree = NewAssignmentCollectorTree(lastSealed, headers, factoryMethod) + + return core, nil +} + +func (c *approvalProcessingCore) lastSealedHeight() uint64 { + return atomic.LoadUint64(&c.atomicLastSealedHeight) +} + +func (c *approvalProcessingCore) lastFinalizedHeight() uint64 { + return atomic.LoadUint64(&c.atomicLastFinalizedHeight) +} + +// WARNING: this function is implemented in a way that we expect blocks strictly in parent-child order +// Caller has to ensure that it doesn't feed blocks that were already processed or in wrong order. +func (c *approvalProcessingCore) OnFinalizedBlock(finalizedBlockID flow.Identifier) { + finalized, err := c.headers.ByBlockID(finalizedBlockID) + if err != nil { + c.log.Fatal().Err(err).Msgf("could not retrieve header for finalized block %s", finalizedBlockID) + } + + // no need to process already finalized blocks + if finalized.Height <= c.lastFinalizedHeight() { + return + } + + // it's important to use atomic operation to make sure that we have correct ordering + atomic.StoreUint64(&c.atomicLastFinalizedHeight, finalized.Height) + + seal, err := c.seals.ByBlockID(finalizedBlockID) + if err != nil { + c.log.Fatal().Err(err).Msgf("could not retrieve seal for finalized block %s", finalizedBlockID) + } + lastSealed, err := c.headers.ByBlockID(seal.BlockID) + if err != nil { + c.log.Fatal().Err(err).Msgf("could not retrieve last sealed block %s", seal.BlockID) + } + + // it's important to use atomic operation to make sure that we have correct ordering + atomic.StoreUint64(&c.atomicLastSealedHeight, lastSealed.Height) + + // check if there are stale results qualified for emergency sealing + err = c.checkEmergencySealing(lastSealed.Height, finalized.Height) + if err != nil { + c.log.Fatal().Err(err).Msgf("could not check emergency sealing at block %v", finalizedBlockID) + } + + // finalize forks to stop collecting approvals for orphan collectors + c.collectorTree.FinalizeForkAtLevel(finalized, lastSealed) + + // as soon as we discover new sealed height, proceed with pruning collectors + pruned, err := c.collectorTree.PruneUpToHeight(lastSealed.Height) + if err != nil { + c.log.Fatal().Err(err).Msgf("could not prune collectorTree tree at block %v", finalizedBlockID) + } + + // remove all pending items that we might have requested + c.requestTracker.Remove(pruned...) +} + +// processIncorporatedResult implements business logic for processing single incorporated result +// Returns: +// * engine.InvalidInputError - incorporated result is invalid +// * engine.UnverifiableInputError - result is unverifiable since referenced block cannot be found +// * engine.OutdatedInputError - result is outdated for instance block was already sealed +// * exception in case of any other error, usually this is not expected +// * nil - successfully processed incorporated result +func (c *approvalProcessingCore) processIncorporatedResult(result *flow.IncorporatedResult) error { + err := c.checkBlockOutdated(result.Result.BlockID) + if err != nil { + return fmt.Errorf("won't process outdated or unverifiable execution result %s: %w", result.Result.BlockID, err) + } + + incorporatedBlock, err := c.headers.ByBlockID(result.IncorporatedBlockID) + if err != nil { + return fmt.Errorf("could not get block height for incorporated block %s: %w", + result.IncorporatedBlockID, err) + } + incorporatedAtHeight := incorporatedBlock.Height + + lastFinalizedBlockHeight := c.lastFinalizedHeight() + + // check if we are dealing with finalized block or an orphan + if incorporatedAtHeight <= lastFinalizedBlockHeight { + finalized, err := c.headers.ByHeight(incorporatedAtHeight) + if err != nil { + return fmt.Errorf("could not retrieve finalized block at height %d: %w", incorporatedAtHeight, err) + } + if finalized.ID() != result.IncorporatedBlockID { + // it means that we got incorporated result for a block which doesn't extend our chain + // and should be discarded from future processing + return engine.NewOutdatedInputErrorf("won't process incorporated result from orphan block %s", result.IncorporatedBlockID) + } + } + + // in case block is not finalized we will create collector and start processing approvals + // no checks for orphans can be made at this point + // we expect that assignment collector will cleanup orphan IRs whenever new finalized block is processed + + lazyCollector, err := c.collectorTree.GetOrCreateCollector(result.Result) + if err != nil { + return fmt.Errorf("could not process incorporated result, cannot create collector: %w", err) + } + + if !lazyCollector.Processable { + return engine.NewOutdatedInputErrorf("collector for %s is marked as non processable", result.ID()) + } + + err = lazyCollector.Collector.ProcessIncorporatedResult(result) + if err != nil { + return fmt.Errorf("could not process incorporated result: %w", err) + } + + // process pending approvals only if it's a new collector + // pending approvals are those we haven't received its result yet, + // once we received a result and created a new collector, we find the pending + // approvals for this result, and process them + // newIncorporatedResult should be true only for one goroutine even if multiple access this code at the same + // time, ensuring that processing of pending approvals happens once for particular assignment + if lazyCollector.Created { + err = c.processPendingApprovals(lazyCollector.Collector) + if err != nil { + return fmt.Errorf("could not process cached approvals: %w", err) + } + } + + return nil +} + +func (c *approvalProcessingCore) ProcessIncorporatedResult(result *flow.IncorporatedResult) error { + err := c.processIncorporatedResult(result) + + // we expect that only engine.UnverifiableInputError, + // engine.OutdatedInputError, engine.InvalidInputError are expected, otherwise it's an exception + if engine.IsUnverifiableInputError(err) || engine.IsOutdatedInputError(err) || engine.IsInvalidInputError(err) { + logger := c.log.Info() + if engine.IsInvalidInputError(err) { + logger = c.log.Error() + } + + logger.Err(err).Msgf("could not process incorporated result %v", result.ID()) + return nil + } + + return err +} + +// checkBlockOutdated performs a sanity check if block is outdated +// Returns: +// * engine.UnverifiableInputError - sentinel error in case we haven't discovered requested blockID +// * engine.OutdatedInputError - sentinel error in case block is outdated +// * exception in case of unknown internal error +// * nil - block isn't sealed +func (c *approvalProcessingCore) checkBlockOutdated(blockID flow.Identifier) error { + block, err := c.headers.ByBlockID(blockID) + if err != nil { + if !errors.Is(err, storage.ErrNotFound) { + return fmt.Errorf("failed to retrieve header for block %x: %w", blockID, err) + } + return engine.NewUnverifiableInputError("no header for block: %v", blockID) + } + + // it's important to use atomic operation to make sure that we have correct ordering + lastSealedHeight := c.lastSealedHeight() + // drop approval, if it is for block whose height is lower or equal to already sealed height + if lastSealedHeight >= block.Height { + return engine.NewOutdatedInputErrorf("requested processing for already sealed block height") + } + + return nil +} + +func (c *approvalProcessingCore) ProcessApproval(approval *flow.ResultApproval) error { + err := c.processApproval(approval) + + // we expect that only engine.UnverifiableInputError, + // engine.OutdatedInputError, engine.InvalidInputError are expected, otherwise it's an exception + if engine.IsUnverifiableInputError(err) || engine.IsOutdatedInputError(err) || engine.IsInvalidInputError(err) { + logger := c.log.Info() + if engine.IsInvalidInputError(err) { + logger = c.log.Error() + } + + logger.Err(err). + Hex("approval_id", logging.Entity(approval)). + Msgf("could not process result approval") + + return nil + } + + return err +} + +// processApproval implements business logic for processing single approval +// Returns: +// * engine.InvalidInputError - result approval is invalid +// * engine.UnverifiableInputError - result approval is unverifiable since referenced block cannot be found +// * engine.OutdatedInputError - result approval is outdated for instance block was already sealed +// * exception in case of any other error, usually this is not expected +// * nil - successfully processed result approval +func (c *approvalProcessingCore) processApproval(approval *flow.ResultApproval) error { + err := c.checkBlockOutdated(approval.Body.BlockID) + if err != nil { + return fmt.Errorf("won't process approval for oudated block (%x): %w", approval.Body.BlockID, err) + } + + if collector, processable := c.collectorTree.GetCollector(approval.Body.ExecutionResultID); collector != nil { + if !processable { + return engine.NewOutdatedInputErrorf("collector for %s is marked as non processable", approval.Body.ExecutionResultID) + } + + // if there is a collector it means that we have received execution result and we are ready + // to process approvals + err = collector.ProcessApproval(approval) + if err != nil { + return fmt.Errorf("could not process assignment: %w", err) + } + } else { + // in case we haven't received execution result, cache it and process later. + c.approvalsCache.Put(approval) + } + + return nil +} + +func (c *approvalProcessingCore) checkEmergencySealing(lastSealedHeight, lastFinalizedHeight uint64) error { + if !c.emergencySealingActive { + return nil + } + + emergencySealingHeight := lastSealedHeight + sealing.DefaultEmergencySealingThreshold + + // we are interested in all collectors that match condition: + // lastSealedBlock + sealing.DefaultEmergencySealingThreshold < lastFinalizedHeight + // in other words we should check for emergency sealing only if threshold was reached + if emergencySealingHeight >= lastFinalizedHeight { + return nil + } + + delta := lastFinalizedHeight - emergencySealingHeight + // if block is emergency sealable depends on it's incorporated block height + // collectors tree stores collector by executed block height + // we need to select multiple levels to find eligible collectors for emergency sealing + for _, collector := range c.collectorTree.GetCollectorsByInterval(lastSealedHeight, lastSealedHeight+delta) { + err := collector.CheckEmergencySealing(lastFinalizedHeight) + if err != nil { + return err + } + } + return nil +} + +func (c *approvalProcessingCore) processPendingApprovals(collector *AssignmentCollector) error { + // filter cached approvals for concrete execution result + for _, approval := range c.approvalsCache.TakeByResultID(collector.ResultID) { + err := collector.ProcessApproval(approval) + if err != nil { + if engine.IsInvalidInputError(err) { + c.log.Debug(). + Hex("result_id", collector.ResultID[:]). + Err(err). + Msgf("invalid approval with id %s", approval.ID()) + } else { + return fmt.Errorf("could not process assignment: %w", err) + } + } + } + + return nil +} diff --git a/engine/consensus/approvals/core_test.go b/engine/consensus/approvals/core_test.go new file mode 100644 index 00000000000..da8cecebd5e --- /dev/null +++ b/engine/consensus/approvals/core_test.go @@ -0,0 +1,479 @@ +package approvals + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/consensus/sealing" + "github.com/onflow/flow-go/model/flow" + mempool "github.com/onflow/flow-go/module/mempool/mock" + module "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network/mocknetwork" + realproto "github.com/onflow/flow-go/state/protocol" + protocol "github.com/onflow/flow-go/state/protocol/mock" + realstorage "github.com/onflow/flow-go/storage" + storage "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/unittest" +) + +// TestApprovalProcessingCore performs testing of approval processing core +// approvalProcessingCore is responsible for delegating processing to assignment collectorTree for each separate execution result +// approvalProcessingCore performs height based checks and decides if approval or incorporated result has to be processed at all +// or rejected as outdated or unverifiable. +// approvalProcessingCore maintains a LRU cache of known approvals that cannot be verified at the moment/ +func TestApprovalProcessingCore(t *testing.T) { + suite.Run(t, new(ApprovalProcessingCoreTestSuite)) +} + +type ApprovalProcessingCoreTestSuite struct { + BaseApprovalsTestSuite + + blocks map[flow.Identifier]*flow.Header + headers *storage.Headers + state *protocol.State + assigner *module.ChunkAssigner + sealsPL *mempool.IncorporatedResultSeals + sealsDB *storage.Seals + sigVerifier *module.Verifier + conduit *mocknetwork.Conduit + identitiesCache map[flow.Identifier]map[flow.Identifier]*flow.Identity // helper map to store identities for given block + core *approvalProcessingCore +} + +func (s *ApprovalProcessingCoreTestSuite) SetupTest() { + s.BaseApprovalsTestSuite.SetupTest() + + s.sealsPL = &mempool.IncorporatedResultSeals{} + s.state = &protocol.State{} + s.assigner = &module.ChunkAssigner{} + s.sigVerifier = &module.Verifier{} + s.conduit = &mocknetwork.Conduit{} + s.headers = &storage.Headers{} + s.sealsDB = &storage.Seals{} + + // setup blocks cache for protocol state + s.blocks = make(map[flow.Identifier]*flow.Header) + s.blocks[s.ParentBlock.ID()] = &s.ParentBlock + s.blocks[s.Block.ID()] = &s.Block + s.blocks[s.IncorporatedBlock.ID()] = &s.IncorporatedBlock + + // setup identities for each block + s.identitiesCache = make(map[flow.Identifier]map[flow.Identifier]*flow.Identity) + s.identitiesCache[s.IncorporatedResult.Result.BlockID] = s.AuthorizedVerifiers + + s.assigner.On("Assign", mock.Anything, mock.Anything).Return(s.ChunksAssignment, nil) + + s.headers.On("ByBlockID", mock.Anything).Return(func(blockID flow.Identifier) *flow.Header { + return s.blocks[blockID] + }, func(blockID flow.Identifier) error { + _, found := s.blocks[blockID] + if found { + return nil + } else { + return realstorage.ErrNotFound + } + }) + + s.state.On("Sealed").Return(unittest.StateSnapshotForKnownBlock(&s.ParentBlock, nil)).Once() + + s.state.On("AtBlockID", mock.Anything).Return( + func(blockID flow.Identifier) realproto.Snapshot { + if block, found := s.blocks[blockID]; found { + return unittest.StateSnapshotForKnownBlock(block, s.identitiesCache[blockID]) + } else { + return unittest.StateSnapshotForUnknownBlock() + } + }, + ) + var err error + s.core, err = NewApprovalProcessingCore(s.headers, s.state, s.sealsDB, s.assigner, s.sigVerifier, s.sealsPL, s.conduit, + uint(len(s.AuthorizedVerifiers)), false) + require.NoError(s.T(), err) +} + +// TestOnBlockFinalized_RejectOutdatedApprovals tests that approvals will be rejected as outdated +// for block that is already sealed +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOutdatedApprovals() { + approval := unittest.ResultApprovalFixture(unittest.WithApproverID(s.VerID), + unittest.WithChunk(s.Chunks[0].Index), + unittest.WithBlockID(s.Block.ID())) + err := s.core.processApproval(approval) + require.NoError(s.T(), err) + + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.Block)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + s.core.OnFinalizedBlock(s.Block.ID()) + + err = s.core.processApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsOutdatedInputError(err)) +} + +// TestOnBlockFinalized_RejectOutdatedExecutionResult tests that incorporated result will be rejected as outdated +// if the block which is targeted by execution result is already sealed. +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOutdatedExecutionResult() { + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.Block)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + s.core.OnFinalizedBlock(s.Block.ID()) + + err := s.core.processIncorporatedResult(s.IncorporatedResult) + require.Error(s.T(), err) + require.True(s.T(), engine.IsOutdatedInputError(err)) +} + +// TestOnBlockFinalized_RejectUnverifiableEntries tests that core will reject both execution results +// and approvals for blocks that we have no information about. +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectUnverifiableEntries() { + s.IncorporatedResult.Result.BlockID = unittest.IdentifierFixture() // replace blockID with random one + err := s.core.processIncorporatedResult(s.IncorporatedResult) + require.Error(s.T(), err) + require.True(s.T(), engine.IsUnverifiableInputError(err)) + + approval := unittest.ResultApprovalFixture(unittest.WithApproverID(s.VerID), + unittest.WithChunk(s.Chunks[0].Index)) + + err = s.core.processApproval(approval) + require.Error(s.T(), err) + require.True(s.T(), engine.IsUnverifiableInputError(err)) +} + +// TestOnBlockFinalized_RejectOrphanIncorporatedResults tests that execution results incorporated in orphan blocks +// are rejected as outdated in next situation +// A <- B_1 +// <- B_2 +// B_1 is finalized rendering B_2 as orphan, submitting IR[ER[A], B_1] is a success, submitting IR[ER[A], B_2] is an outdated incorporated result +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_RejectOrphanIncorporatedResults() { + blockB1 := unittest.BlockHeaderWithParentFixture(&s.Block) + blockB2 := unittest.BlockHeaderWithParentFixture(&s.Block) + + s.blocks[blockB1.ID()] = &blockB1 + s.blocks[blockB2.ID()] = &blockB2 + + IR1 := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(blockB1.ID()), + unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result)) + + IR2 := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(blockB2.ID()), + unittest.IncorporatedResult.WithResult(s.IncorporatedResult.Result)) + + s.headers.On("ByHeight", blockB1.Height).Return(&blockB1, nil) + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.ParentBlock)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + // blockB1 becomes finalized + s.core.OnFinalizedBlock(blockB1.ID()) + + err := s.core.processIncorporatedResult(IR1) + require.NoError(s.T(), err) + + err = s.core.processIncorporatedResult(IR2) + require.Error(s.T(), err) + require.True(s.T(), engine.IsOutdatedInputError(err)) +} + +// TestOnFinalizedBlock_CollectorsCleanup tests that stale collectorTree are cleaned up for +// already sealed blocks. +func (s *ApprovalProcessingCoreTestSuite) TestOnFinalizedBlock_CollectorsCleanup() { + blockID := s.Block.ID() + numResults := uint(10) + for i := uint(0); i < numResults; i++ { + // all results incorporated in different blocks + incorporatedBlock := unittest.BlockHeaderWithParentFixture(&s.IncorporatedBlock) + s.blocks[incorporatedBlock.ID()] = &incorporatedBlock + // create different incorporated results for same block ID + result := unittest.ExecutionResultFixture() + result.BlockID = blockID + result.PreviousResultID = s.IncorporatedResult.Result.ID() + incorporatedResult := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithResult(result), + unittest.IncorporatedResult.WithIncorporatedBlockID(incorporatedBlock.ID())) + err := s.core.processIncorporatedResult(incorporatedResult) + require.NoError(s.T(), err) + } + require.Equal(s.T(), uint64(numResults), s.core.collectorTree.size) + + candidate := unittest.BlockHeaderWithParentFixture(&s.Block) + s.blocks[candidate.ID()] = &candidate + + // candidate becomes new sealed and finalized block, it means that + // we will need to cleanup our tree till new height, removing all outdated collectors + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&candidate)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + s.core.OnFinalizedBlock(candidate.ID()) + require.Equal(s.T(), uint64(0), s.core.collectorTree.size) +} + +// TestProcessIncorporated_ApprovalsBeforeResult tests a scenario when first we have received approvals for unknown +// execution result and after that we discovered execution result. In this scenario we should be able +// to create a seal right after discovering execution result since all approvals should be cached.(if cache capacity is big enough) +func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalsBeforeResult() { + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + for _, chunk := range s.Chunks { + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID), + unittest.WithBlockID(s.Block.ID()), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + err := s.core.processApproval(approval) + require.NoError(s.T(), err) + } + } + + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + + err := s.core.processIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + s.sealsPL.AssertCalled(s.T(), "Add", mock.Anything) +} + +// TestProcessIncorporated_ApprovalsAfterResult tests a scenario when first we have discovered execution result +//// and after that we started receiving approvals. In this scenario we should be able to create a seal right +//// after processing last needed approval to meet `requiredApprovalsForSealConstruction` threshold. +func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalsAfterResult() { + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil) + + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + + err := s.core.processIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + for _, chunk := range s.Chunks { + for verID := range s.AuthorizedVerifiers { + approval := unittest.ResultApprovalFixture(unittest.WithChunk(chunk.Index), + unittest.WithApproverID(verID), + unittest.WithBlockID(s.Block.ID()), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + err := s.core.processApproval(approval) + require.NoError(s.T(), err) + } + } + + s.sealsPL.AssertCalled(s.T(), "Add", mock.Anything) +} + +// TestProcessIncorporated_ProcessingInvalidApproval tests that processing invalid approval when result is discovered +// is correctly handled in case of sentinel error +func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ProcessingInvalidApproval() { + // fail signature verification for first approval + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(false, nil).Once() + + // generate approvals for first chunk + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), + unittest.WithApproverID(s.VerID), + unittest.WithBlockID(s.Block.ID()), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + + // this approval has to be cached since execution result is not known yet + err := s.core.processApproval(approval) + require.NoError(s.T(), err) + + // at this point approval has to be processed, even if it's invalid + // if it's an expected sentinel error, it has to be handled internally + err = s.core.processIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) +} + +// TestProcessIncorporated_ApprovalVerificationException tests that processing invalid approval when result is discovered +// is correctly handled in case of exception +func (s *ApprovalProcessingCoreTestSuite) TestProcessIncorporated_ApprovalVerificationException() { + // fail signature verification with exception + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(false, fmt.Errorf("exception")).Once() + + // generate approvals for first chunk + approval := unittest.ResultApprovalFixture(unittest.WithChunk(s.Chunks[0].Index), + unittest.WithApproverID(s.VerID), + unittest.WithBlockID(s.Block.ID()), + unittest.WithExecutionResultID(s.IncorporatedResult.Result.ID())) + + // this approval has to be cached since execution result is not known yet + err := s.core.processApproval(approval) + require.NoError(s.T(), err) + + // at this point approval has to be processed, even if it's invalid + // if it's an expected sentinel error, it has to be handled internally + err = s.core.processIncorporatedResult(s.IncorporatedResult) + require.Error(s.T(), err) +} + +// TestOnBlockFinalized_EmergencySealing tests that emergency sealing kicks in to resolve sealing halt +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_EmergencySealing() { + s.core.emergencySealingActive = true + s.sealsPL.On("Add", mock.Anything).Return(true, nil).Once() + + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.ParentBlock)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Times(sealing.DefaultEmergencySealingThreshold) + + err := s.core.ProcessIncorporatedResult(s.IncorporatedResult) + require.NoError(s.T(), err) + + lastFinalizedBlock := &s.IncorporatedBlock + for i := 0; i < sealing.DefaultEmergencySealingThreshold; i++ { + finalizedBlock := unittest.BlockHeaderWithParentFixture(lastFinalizedBlock) + s.blocks[finalizedBlock.ID()] = &finalizedBlock + s.core.OnFinalizedBlock(finalizedBlock.ID()) + lastFinalizedBlock = &finalizedBlock + } + + s.sealsPL.AssertExpectations(s.T()) +} + +// TestOnBlockFinalized_ProcessingOrphanApprovals tests that approvals for orphan forks are rejected as outdated entries without processing +// A <- B_1 <- C_1{ IER[B_1] } +// <- B_2 <- C_2{ IER[B_2] } <- D_2{ IER[C_2] } +// <- B_3 <- C_3{ IER[B_3] } <- D_3{ IER[C_3] } <- E_3{ IER[D_3] } +// B_1 becomes finalized rendering forks starting at B_2 and B_3 as orphans +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ProcessingOrphanApprovals() { + forks := make([][]*flow.Block, 3) + forkResults := make([][]*flow.ExecutionResult, len(forks)) + + for forkIndex := range forks { + forks[forkIndex] = unittest.ChainFixtureFrom(forkIndex+2, &s.ParentBlock) + fork := forks[forkIndex] + + previousResult := s.IncorporatedResult.Result + for blockIndex, block := range fork { + s.blocks[block.ID()] = block.Header + s.identitiesCache[block.ID()] = s.AuthorizedVerifiers + + // create and incorporate result for every block in fork except first one + if blockIndex > 0 { + // create a result + result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*previousResult)) + result.BlockID = block.Header.ParentID + result.Chunks = s.Chunks + forkResults[forkIndex] = append(forkResults[forkIndex], result) + previousResult = result + + // incorporate in fork + IR := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()), + unittest.IncorporatedResult.WithResult(result)) + + err := s.core.processIncorporatedResult(IR) + require.NoError(s.T(), err) + } + } + } + + // same block sealed + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.ParentBlock)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + // block B_1 becomes finalized + s.core.OnFinalizedBlock(forks[0][0].ID()) + + // verify will be called twice for every approval in first fork + s.sigVerifier.On("Verify", mock.Anything, mock.Anything, mock.Anything).Return(true, nil).Times(len(forkResults[0]) * 2) + + // try submitting approvals for each result + for forkIndex, results := range forkResults { + for _, result := range results { + executedBlockID := result.BlockID + resultID := result.ID() + + approval := unittest.ResultApprovalFixture(unittest.WithChunk(0), + unittest.WithApproverID(s.VerID), + unittest.WithBlockID(executedBlockID), + unittest.WithExecutionResultID(resultID)) + + err := s.core.processApproval(approval) + + // for first fork all results should be valid, since it's a finalized fork + // all others forks are orphans and approvals for those should be outdated + if forkIndex == 0 { + require.NoError(s.T(), err) + } else { + require.Error(s.T(), err) + require.True(s.T(), engine.IsOutdatedInputError(err)) + } + } + } +} + +// TestOnBlockFinalized_ExtendingUnprocessableFork tests that extending orphan fork results in non processable collectors +// - X <- Y <- Z +// / +// <- A <- B <- C <- D <- E +// | +// finalized +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ExtendingUnprocessableFork() { + forks := make([][]*flow.Block, 2) + + for forkIndex := range forks { + forks[forkIndex] = unittest.ChainFixtureFrom(forkIndex+3, &s.Block) + fork := forks[forkIndex] + for _, block := range fork { + s.blocks[block.ID()] = block.Header + s.identitiesCache[block.ID()] = s.AuthorizedVerifiers + } + } + + finalized := forks[1][0].Header + + s.headers.On("ByHeight", finalized.Height).Return(finalized, nil) + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.ParentBlock)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + // finalize block B + s.core.OnFinalizedBlock(finalized.ID()) + + // create incorporated result for each block in main fork + for forkIndex, fork := range forks { + previousResult := s.IncorporatedResult.Result + for _, block := range fork { + result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*previousResult)) + result.BlockID = block.Header.ParentID + result.Chunks = s.Chunks + previousResult = result + + // incorporate in fork + IR := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(block.ID()), + unittest.IncorporatedResult.WithResult(result)) + err := s.core.processIncorporatedResult(IR) + if forkIndex > 0 { + require.NoError(s.T(), err) + } else { + require.Error(s.T(), err) + require.True(s.T(), engine.IsOutdatedInputError(err)) + } + } + } +} + +// TestOnBlockFinalized_ExtendingSealedResult tests if assignment collector tree accepts collector which extends sealed result +func (s *ApprovalProcessingCoreTestSuite) TestOnBlockFinalized_ExtendingSealedResult() { + seal := unittest.Seal.Fixture(unittest.Seal.WithBlock(&s.Block)) + s.sealsDB.On("ByBlockID", mock.Anything).Return(seal, nil).Once() + + unsealedBlock := unittest.BlockHeaderWithParentFixture(&s.Block) + s.blocks[unsealedBlock.ID()] = &unsealedBlock + s.identitiesCache[unsealedBlock.ID()] = s.AuthorizedVerifiers + result := unittest.ExecutionResultFixture(unittest.WithPreviousResult(*s.IncorporatedResult.Result)) + result.BlockID = unsealedBlock.ID() + + s.headers.On("ByHeight", unsealedBlock.Height).Return(unsealedBlock, nil) + s.core.OnFinalizedBlock(unsealedBlock.ID()) + + incorporatedBlock := unittest.BlockHeaderWithParentFixture(&unsealedBlock) + s.blocks[incorporatedBlock.ID()] = &incorporatedBlock + s.identitiesCache[incorporatedBlock.ID()] = s.AuthorizedVerifiers + IR := unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithIncorporatedBlockID(incorporatedBlock.ID()), + unittest.IncorporatedResult.WithResult(result)) + err := s.core.processIncorporatedResult(IR) + require.NoError(s.T(), err) + + s.sealsDB.AssertExpectations(s.T()) +} diff --git a/engine/consensus/approvals/incorporated_result_seals.go b/engine/consensus/approvals/incorporated_result_seals.go new file mode 100644 index 00000000000..2104dcab93c --- /dev/null +++ b/engine/consensus/approvals/incorporated_result_seals.go @@ -0,0 +1,90 @@ +package approvals + +import ( + "github.com/rs/zerolog/log" + + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/mempool" + "github.com/onflow/flow-go/storage" + "github.com/onflow/flow-go/utils/logging" +) + +// IncorporatedResultSeals implements the incorporated result seals memory pool +// of the consensus nodes. +// ATTENTION: this is a temporary wrapper for `mempool.IncorporatedResultSeals` to support +// a condition that there must be at least 2 receipts from _different_ ENs +// committing to the same incorporated result. +// This wrapper should only be used with `approvalProcessingCore`. +type IncorporatedResultSeals struct { + seals mempool.IncorporatedResultSeals // seals mempool that wrapped + receiptsDB storage.ExecutionReceipts // receipts DB to decide if we have multiple receipts for same result +} + +// NewIncorporatedResults creates a mempool for the incorporated result seals +func NewIncorporatedResultSeals(mempool mempool.IncorporatedResultSeals) *IncorporatedResultSeals { + return &IncorporatedResultSeals{ + seals: mempool, + } +} + +// Add adds an IncorporatedResultSeal to the mempool +func (ir *IncorporatedResultSeals) Add(seal *flow.IncorporatedResultSeal) (bool, error) { + return ir.seals.Add(seal) +} + +// All returns all the items in the mempool +func (ir *IncorporatedResultSeals) All() []*flow.IncorporatedResultSeal { + return ir.seals.All() +} + +// resultHasMultipleReceipts implements an additional _temporary_ safety measure: +// only consider incorporatedResult sealable if there are at AT LEAST 2 RECEIPTS +// from _different_ ENs committing to the result. +func (ir *IncorporatedResultSeals) resultHasMultipleReceipts(incorporatedResult *flow.IncorporatedResult) bool { + blockID := incorporatedResult.Result.BlockID // block that was computed + resultID := incorporatedResult.Result.ID() + + // get all receipts that are known for the block + receipts, err := ir.receiptsDB.ByBlockID(blockID) + if err != nil { + log.Error().Err(err). + Hex("block_id", logging.ID(blockID)). + Msg("could not get receipts by block ID") + return false + } + + // Index receipts for given incorporatedResult by their executor. In case + // there are multiple receipts from the same executor, we keep the last one. + receiptsForIncorporatedResults := receipts.GroupByResultID().GetGroup(resultID) + return receiptsForIncorporatedResults.GroupByExecutorID().NumberGroups() >= 2 +} + +// ByID gets an IncorporatedResultSeal by IncorporatedResult ID +func (ir *IncorporatedResultSeals) ByID(id flow.Identifier) (*flow.IncorporatedResultSeal, bool) { + seal, ok := ir.seals.ByID(id) + if !ok { + return nil, false + } + + // _temporary_ measure, return only receipts that have multiple commitments from different ENs. + if !ir.resultHasMultipleReceipts(seal.IncorporatedResult) { + return nil, false + } + + return seal, true +} + +// Rem removes an IncorporatedResultSeal from the mempool +func (ir *IncorporatedResultSeals) Rem(id flow.Identifier) bool { + return ir.seals.Rem(id) +} + +// Clear removes all entities from the pool. +func (ir *IncorporatedResultSeals) Clear() { + ir.seals.Clear() +} + +// RegisterEjectionCallbacks adds the provided OnEjection callbacks +func (ir *IncorporatedResultSeals) RegisterEjectionCallbacks(callbacks ...mempool.OnEjection) { + ir.seals.RegisterEjectionCallbacks(callbacks...) +} diff --git a/engine/consensus/approvals/testutil.go b/engine/consensus/approvals/testutil.go new file mode 100644 index 00000000000..04229b28527 --- /dev/null +++ b/engine/consensus/approvals/testutil.go @@ -0,0 +1,58 @@ +package approvals + +import ( + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/model/chunks" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" +) + +// BaseApprovalsTestSuite is a base suite for testing approvals processing related functionality +// At nutshell generates mock data that can be used to create approvals and provides all needed +// data to validate those approvals for respected execution result. +type BaseApprovalsTestSuite struct { + suite.Suite + + ParentBlock flow.Header // parent of sealing candidate + Block flow.Header // candidate for sealing + IncorporatedBlock flow.Header // block that incorporated result + VerID flow.Identifier // for convenience, node id of first verifier + Chunks flow.ChunkList // list of chunks of execution result + ChunksAssignment *chunks.Assignment + AuthorizedVerifiers map[flow.Identifier]*flow.Identity // map of authorized verifier identities for execution result + IncorporatedResult *flow.IncorporatedResult +} + +func (s *BaseApprovalsTestSuite) SetupTest() { + s.ParentBlock = unittest.BlockHeaderFixture() + s.Block = unittest.BlockHeaderWithParentFixture(&s.ParentBlock) + verifiers := make(flow.IdentifierList, 0) + s.AuthorizedVerifiers = make(map[flow.Identifier]*flow.Identity) + s.ChunksAssignment = chunks.NewAssignment() + s.Chunks = unittest.ChunkListFixture(50, s.Block.ID()) + + // setup identities + for j := 0; j < 5; j++ { + identity := unittest.IdentityFixture(unittest.WithRole(flow.RoleVerification)) + verifiers = append(verifiers, identity.NodeID) + s.AuthorizedVerifiers[identity.NodeID] = identity + } + + // create assignment + for _, chunk := range s.Chunks { + s.ChunksAssignment.Add(chunk, verifiers) + } + + s.VerID = verifiers[0] + result := unittest.ExecutionResultFixture() + result.BlockID = s.Block.ID() + result.Chunks = s.Chunks + + s.IncorporatedBlock = unittest.BlockHeaderWithParentFixture(&s.Block) + + // compose incorporated result + s.IncorporatedResult = unittest.IncorporatedResult.Fixture( + unittest.IncorporatedResult.WithResult(result), + unittest.IncorporatedResult.WithIncorporatedBlockID(s.IncorporatedBlock.ID())) +} diff --git a/engine/consensus/sealing/core.go b/engine/consensus/sealing/core.go index b2a651fb9a0..e8d7cfc77f8 100644 --- a/engine/consensus/sealing/core.go +++ b/engine/consensus/sealing/core.go @@ -882,11 +882,13 @@ func (c *Core) clearPools(sealedIDs []flow.Identifier) error { // clear the request tracker of all items corresponding to results that are // no longer in the incorporated-results mempool - for resultID := range c.requestTracker.GetAll() { + var removedResultIDs []flow.Identifier + for _, resultID := range c.requestTracker.GetAllIds() { if _, _, ok := c.incorporatedResults.ByResultID(resultID); !ok { - c.requestTracker.Remove(resultID) + removedResultIDs = append(removedResultIDs, resultID) } } + c.requestTracker.Remove(removedResultIDs...) // for each missing block that we are tracking, remove it from tracking if // we now know that block or if we have just cleared related resources; then @@ -1067,10 +1069,11 @@ func (c *Core) requestPendingApprovals() (int, error) { requestCount := 0 for _, r := range c.incorporatedResults.All() { resultID := r.Result.ID() + incorporatedBlockID := r.IncorporatedBlockID // not finding the block that the result was incorporated in is a fatal // error at this stage - block, err := c.headersDB.ByBlockID(r.IncorporatedBlockID) + block, err := c.headersDB.ByBlockID(incorporatedBlockID) if err != nil { return 0, fmt.Errorf("could not retrieve block: %w", err) } @@ -1088,7 +1091,7 @@ func (c *Core) requestPendingApprovals() (int, error) { if err != nil { return 0, fmt.Errorf("could not retrieve finalized block for finalized height %d: %w", block.Height, err) } - if finalizedBlockAtHeight.ID() != r.IncorporatedBlockID { + if finalizedBlockAtHeight.ID() != incorporatedBlockID { // block is in an orphaned fork continue } @@ -1109,7 +1112,7 @@ func (c *Core) requestPendingApprovals() (int, error) { // from verifiers that were assigned to the chunk. Note that the // assigner keeps a cache of computed assignments, so this is not // necessarily an expensive operation. - assignment, err := c.assigner.Assign(r.Result, r.IncorporatedBlockID) + assignment, err := c.assigner.Assign(r.Result, incorporatedBlockID) if err != nil { // at this point, we know the block and a valid child block exists. // Not being able to compute the assignment constitutes a fatal @@ -1130,11 +1133,12 @@ func (c *Core) requestPendingApprovals() (int, error) { // Retrieve information about requests made for this chunk. Skip // requesting if the blackout period hasn't expired. Otherwise, // update request count and reset blackout period. - requestTrackerItem := c.requestTracker.Get(resultID, chunk.Index) + requestTrackerItem := c.requestTracker.Get(resultID, incorporatedBlockID, chunk.Index) if requestTrackerItem.IsBlackout() { continue } requestTrackerItem.Update() + c.requestTracker.Set(resultID, incorporatedBlockID, chunk.Index, requestTrackerItem) // for monitoring/debugging purposes, log requests if we start // making more than 10 diff --git a/engine/consensus/sealing/core_test.go b/engine/consensus/sealing/core_test.go index 29e3a9f506e..428ace275c7 100644 --- a/engine/consensus/sealing/core_test.go +++ b/engine/consensus/sealing/core_test.go @@ -715,7 +715,7 @@ func (ms *SealingSuite) TestRequestPendingApprovals() { ms.sealing.requiredApprovalsForSealConstruction = 2 // expectedRequests collects the set of ApprovalRequests that should be sent - expectedRequests := []*messages.ApprovalRequest{} + expectedRequests := make(map[flow.Identifier]*messages.ApprovalRequest) // populate the incorporated-results mempool with: // - 50 that have collected two signatures per chunk @@ -768,11 +768,10 @@ func (ms *SealingSuite) TestRequestPendingApprovals() { // expect requests to be sent out if the result's block is below // the threshold if i < n-int(ms.sealing.approvalRequestsThreshold) { - expectedRequests = append(expectedRequests, - &messages.ApprovalRequest{ - ResultID: ir.Result.ID(), - ChunkIndex: chunk.Index, - }) + expectedRequests[ir.IncorporatedBlockID] = &messages.ApprovalRequest{ + ResultID: ir.Result.ID(), + ChunkIndex: chunk.Index, + } } } } @@ -853,9 +852,10 @@ func (ms *SealingSuite) TestRequestPendingApprovals() { // Check the request tracker ms.Assert().Equal(exp, len(ms.sealing.requestTracker.index)) - for _, expectedRequest := range expectedRequests { + for incorporatedBlockID, expectedRequest := range expectedRequests { requestItem := ms.sealing.requestTracker.Get( expectedRequest.ResultID, + incorporatedBlockID, expectedRequest.ChunkIndex, ) ms.Assert().Equal(uint(0), requestItem.Requests) @@ -872,9 +872,10 @@ func (ms *SealingSuite) TestRequestPendingApprovals() { // Check the request tracker ms.Assert().Equal(exp, len(ms.sealing.requestTracker.index)) - for _, expectedRequest := range expectedRequests { + for incorporatedBlockID, expectedRequest := range expectedRequests { requestItem := ms.sealing.requestTracker.Get( expectedRequest.ResultID, + incorporatedBlockID, expectedRequest.ChunkIndex, ) ms.Assert().Equal(uint(1), requestItem.Requests) diff --git a/engine/consensus/sealing/request_tracker.go b/engine/consensus/sealing/request_tracker.go index 76f3f07f74d..42e81e02353 100644 --- a/engine/consensus/sealing/request_tracker.go +++ b/engine/consensus/sealing/request_tracker.go @@ -2,6 +2,7 @@ package sealing import ( "math/rand" + "sync" "time" "github.com/onflow/flow-go/model/flow" @@ -24,8 +25,8 @@ type RequestTrackerItem struct { // NewRequestTrackerItem instantiates a new RequestTrackerItem where the // NextTimeout is evaluated to the current time plus a random blackout period // contained between min and max. -func NewRequestTrackerItem(blackoutPeriodMin, blackoutPeriodMax int) *RequestTrackerItem { - item := &RequestTrackerItem{ +func NewRequestTrackerItem(blackoutPeriodMin, blackoutPeriodMax int) RequestTrackerItem { + item := RequestTrackerItem{ blackoutPeriodMin: blackoutPeriodMin, blackoutPeriodMax: blackoutPeriodMax, } @@ -54,51 +55,75 @@ RequestTracker ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/ // RequestTracker is an index of RequestTrackerItems indexed by execution result -// ID and chunk index. -// It is not concurrency-safe. +// Index on result ID, incorporated block ID and chunk index. +// Is concurrency-safe. type RequestTracker struct { - index map[flow.Identifier]map[uint64]*RequestTrackerItem + index map[flow.Identifier]map[flow.Identifier]map[uint64]RequestTrackerItem blackoutPeriodMin int blackoutPeriodMax int + lock sync.RWMutex } // NewRequestTracker instantiates a new RequestTracker with blackout periods // between min and max seconds. func NewRequestTracker(blackoutPeriodMin, blackoutPeriodMax int) *RequestTracker { return &RequestTracker{ - index: make(map[flow.Identifier]map[uint64]*RequestTrackerItem), + index: make(map[flow.Identifier]map[flow.Identifier]map[uint64]RequestTrackerItem), blackoutPeriodMin: blackoutPeriodMin, blackoutPeriodMax: blackoutPeriodMax, } } -// GetAll returns a map of all the items in the tracker indexed by execution -// result ID and chunk index. -func (rt *RequestTracker) GetAll() map[flow.Identifier]map[uint64]*RequestTrackerItem { - return rt.index -} - // Get returns the tracker item for a specific chunk, and creates a new one if // it doesn't exist. -func (rt *RequestTracker) Get(resultID flow.Identifier, chunkIndex uint64) *RequestTrackerItem { - item, ok := rt.index[resultID][chunkIndex] +func (rt *RequestTracker) Get(resultID, incorporatedBlockID flow.Identifier, chunkIndex uint64) RequestTrackerItem { + rt.lock.RLock() + item, ok := rt.index[resultID][incorporatedBlockID][chunkIndex] + rt.lock.RUnlock() + if !ok { item = NewRequestTrackerItem(rt.blackoutPeriodMin, rt.blackoutPeriodMax) - rt.Set(resultID, chunkIndex, item) + rt.Set(resultID, incorporatedBlockID, chunkIndex, item) } return item } // Set inserts or updates the tracker item for a specific chunk. -func (rt *RequestTracker) Set(resultID flow.Identifier, chunkIndex uint64, item *RequestTrackerItem) { - _, ok := rt.index[resultID] - if !ok { - rt.index[resultID] = make(map[uint64]*RequestTrackerItem) +func (rt *RequestTracker) Set(resultID, incorporatedBlockID flow.Identifier, chunkIndex uint64, item RequestTrackerItem) { + rt.lock.Lock() + defer rt.lock.Unlock() + level1, level1found := rt.index[resultID] + if !level1found { + level1 = make(map[flow.Identifier]map[uint64]RequestTrackerItem) + rt.index[resultID] = level1 + } + level2, level2found := level1[incorporatedBlockID] + if !level2found { + level2 = make(map[uint64]RequestTrackerItem) + level1[incorporatedBlockID] = level2 } - rt.index[resultID][chunkIndex] = item + level2[chunkIndex] = item +} + +// GetAllIds returns all result IDs that we are indexing +func (rt *RequestTracker) GetAllIds() []flow.Identifier { + rt.lock.RLock() + defer rt.lock.RUnlock() + ids := make([]flow.Identifier, 0, len(rt.index)) + for resultID := range rt.index { + ids = append(ids, resultID) + } + return ids } // Remove removes all entries pertaining to an execution result -func (rt *RequestTracker) Remove(resultID flow.Identifier) { - delete(rt.index, resultID) +func (rt *RequestTracker) Remove(resultIDs ...flow.Identifier) { + if len(resultIDs) == 0 { + return + } + rt.lock.Lock() + defer rt.lock.Unlock() + for _, resultID := range resultIDs { + delete(rt.index, resultID) + } } diff --git a/model/flow/identifierList.go b/model/flow/identifierList.go index 044127afc3b..0b091f89d66 100644 --- a/model/flow/identifierList.go +++ b/model/flow/identifierList.go @@ -15,6 +15,15 @@ func (il IdentifierList) Len() int { return len(il) } +// Lookup converts the Identifiers to a lookup table. +func (il IdentifierList) Lookup() map[Identifier]struct{} { + lookup := make(map[Identifier]struct{}, len(il)) + for _, id := range il { + lookup[id] = struct{}{} + } + return lookup +} + // Less returns true if element i in the IdentifierList is less than j based on its identifier. // Otherwise it returns true. // It satisfies the sort.Interface making the IdentifierList sortable. @@ -46,6 +55,11 @@ func (il IdentifierList) Strings() []string { return list } +func (il IdentifierList) Copy() IdentifierList { + cpy := make(IdentifierList, 0, il.Len()) + return append(cpy, il...) +} + // Contains returns whether this identifier list contains the target identifier. func (il IdentifierList) Contains(target Identifier) bool { for _, id := range il { @@ -53,6 +67,5 @@ func (il IdentifierList) Contains(target Identifier) bool { return true } } - return false } diff --git a/model/flow/incorporated_result.go b/model/flow/incorporated_result.go index 8c91434ecc2..29cfb913858 100644 --- a/model/flow/incorporated_result.go +++ b/model/flow/incorporated_result.go @@ -80,7 +80,8 @@ func (ir *IncorporatedResult) AddSignature(chunkIndex uint64, signerID Identifie as, ok := ir.chunkApprovals[chunkIndex] if !ok { - as = NewSignatureCollector() + c := NewSignatureCollector() + as = &c ir.chunkApprovals[chunkIndex] = as } @@ -137,8 +138,8 @@ type SignatureCollector struct { } // NewSignatureCollector instantiates a new SignatureCollector -func NewSignatureCollector() *SignatureCollector { - return &SignatureCollector{ +func NewSignatureCollector() SignatureCollector { + return SignatureCollector{ verifierSignatures: nil, signerIDs: nil, signerIDSet: make(map[Identifier]int), @@ -169,6 +170,12 @@ func (c *SignatureCollector) BySigner(signerID Identifier) (*crypto.Signature, b return &c.verifierSignatures[idx], true } +// HasSigned checks if signer has already provided a signature +func (c *SignatureCollector) HasSigned(signerID Identifier) bool { + _, found := c.signerIDSet[signerID] + return found +} + // Add appends a signature. Only the _first_ signature is retained for each signerID. func (c *SignatureCollector) Add(signerID Identifier, signature crypto.Signature) { if _, found := c.signerIDSet[signerID]; found { diff --git a/model/flow/resultApproval.go b/model/flow/resultApproval.go index 0a5cba48608..3f1b3f5701e 100644 --- a/model/flow/resultApproval.go +++ b/model/flow/resultApproval.go @@ -24,6 +24,19 @@ type ResultApprovalBody struct { Spock crypto.Signature // proof of re-computation, one per each chunk } +// PartialID generates a unique identifier using Attestation + ApproverID +func (rab ResultApprovalBody) PartialID() Identifier { + data := struct { + Attestation Attestation + ApproverID Identifier + }{ + Attestation: rab.Attestation, + ApproverID: rab.ApproverID, + } + + return MakeID(data) +} + // ID generates a unique identifier using ResultApprovalBody func (rab ResultApprovalBody) ID() Identifier { return MakeID(rab) diff --git a/module/chunks/chunk_assigner.go b/module/chunks/chunk_assigner.go index d69152df660..a67e67ea399 100644 --- a/module/chunks/chunk_assigner.go +++ b/module/chunks/chunk_assigner.go @@ -70,15 +70,16 @@ func (p *ChunkAssigner) Assign(result *flow.ExecutionResult, blockID flow.Identi return a, nil } - // Get a list of verifiers - snapshot := p.protocolState.AtBlockID(blockID) - verifiers, err := snapshot.Identities(filter.And(filter.HasRole(flow.RoleVerification), filter.HasStake(true))) + // Get a list of verifiers at block that is being sealed + verifiers, err := p.protocolState.AtBlockID(result.BlockID).Identities(filter.And(filter.HasRole(flow.RoleVerification), + filter.HasStake(true), + filter.Not(filter.Ejected))) if err != nil { return nil, fmt.Errorf("could not get verifiers: %w", err) } // create RNG for assignment - rng, err := p.rngByBlockID(snapshot) + rng, err := p.rngByBlockID(p.protocolState.AtBlockID(blockID)) if err != nil { return nil, err } diff --git a/module/forest/leveled_forrest.go b/module/forest/leveled_forrest.go index f3d97e4043b..8f141d91ca2 100644 --- a/module/forest/leveled_forrest.go +++ b/module/forest/leveled_forrest.go @@ -39,10 +39,11 @@ type vertexContainer struct { } // NewLevelledForest initializes a LevelledForest -func NewLevelledForest() *LevelledForest { +func NewLevelledForest(lowestLevel uint64) *LevelledForest { return &LevelledForest{ vertices: make(VertexSet), verticesAtLevel: make(map[uint64]VertexList), + LowestLevel: lowestLevel, } } @@ -51,11 +52,32 @@ func (f *LevelledForest) PruneUpToLevel(level uint64) error { if level < f.LowestLevel { return fmt.Errorf("new lowest level %d cannot be smaller than previous last retained level %d", level, f.LowestLevel) } - for l := f.LowestLevel; l < level; l++ { - for _, v := range f.verticesAtLevel[l] { // nil map behaves like empty map when iterating over it - delete(f.vertices, v.id) + if len(f.vertices) == 0 { + f.LowestLevel = level + return nil + } + + // to optimize the pruning large level-ranges, we compare: + // * the number of levels for which we have stored vertex containers: len(f.verticesAtLevel) + // * the number of levels that need to be pruned: level-f.LowestLevel + // We iterate over the dimension which is smaller. + if uint64(len(f.verticesAtLevel)) < level-f.LowestLevel { + for l, vertices := range f.verticesAtLevel { + if l < level { + for _, v := range vertices { + delete(f.vertices, v.id) + } + delete(f.verticesAtLevel, l) + } + } + } else { + for l := f.LowestLevel; l < level; l++ { + for _, v := range f.verticesAtLevel[l] { // nil map behaves like empty map when iterating over it + delete(f.vertices, v.id) + } + delete(f.verticesAtLevel, l) + } - delete(f.verticesAtLevel, l) } f.LowestLevel = level return nil diff --git a/module/forest/leveled_forrest_test.go b/module/forest/leveled_forrest_test.go index 2d021e15836..780dc324519 100644 --- a/module/forest/leveled_forrest_test.go +++ b/module/forest/leveled_forrest_test.go @@ -74,7 +74,7 @@ func TestVertexIteratorOnEmpty(t *testing.T) { // TestLevelledForest_AddVertex tests that Vertex can be added twice without problems func TestLevelledForest_AddVertex(t *testing.T) { - F := NewLevelledForest() + F := NewLevelledForest(0) v := NewVertexMock("A", 3, "Genesis", 0) if err := F.VerifyVertex(v); err != nil { assert.Fail(t, err.Error()) @@ -308,7 +308,7 @@ func TestLevelledForest_PruneAtLevel(t *testing.T) { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Helper Functions ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // func populateNewForest(t *testing.T) *LevelledForest { - F := NewLevelledForest() + F := NewLevelledForest(0) for _, v := range TestVertices { if err := F.VerifyVertex(v); err != nil { assert.Fail(t, err.Error()) diff --git a/module/mempool/consensus/execution_tree.go b/module/mempool/consensus/execution_tree.go index bebf468570a..69fc59f399c 100644 --- a/module/mempool/consensus/execution_tree.go +++ b/module/mempool/consensus/execution_tree.go @@ -29,7 +29,7 @@ type ExecutionTree struct { func NewExecutionTree() *ExecutionTree { return &ExecutionTree{ RWMutex: sync.RWMutex{}, - forest: *forest.NewLevelledForest(), + forest: *forest.NewLevelledForest(0), size: 0, } }