Skip to content

Commit

Permalink
Merge pull request #179 from onflow/martin/5092-mutator-seals
Browse files Browse the repository at this point in the history
Martin/5092 mutator seals
  • Loading branch information
arrivets committed Dec 3, 2020
2 parents 72b51bf + 4762d56 commit e92bbbd
Show file tree
Hide file tree
Showing 5 changed files with 800 additions and 343 deletions.
207 changes: 122 additions & 85 deletions state/protocol/badger/mutator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package badger
import (
"errors"
"fmt"

"github.com/dgraph-io/badger/v2"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module"
Expand Down Expand Up @@ -223,15 +224,28 @@ func (m *Mutator) Extend(candidate *flow.Block) error {
return fmt.Errorf("header does not compliance the chain state: %w", err)
}

// Get the latest seal in the fork that ends with the candidate's parent.
// The protocol state saves this information for each block that has been
// successfully added to the chain tree (even when the added block does not
// itself contain a seal). We just called `headerExtend` to check that the
// candidate block's header is a valid extension of the chain, which implies
// that the parent must already be part of the chain tree. Therefore, _not_
// finding the latest sealed block in the fork up to the parent constitutes
// a fatal internal error.
lastSealUpToParent, err := m.state.seals.ByBlockID(candidate.Header.ParentID)
if err != nil {
return fmt.Errorf("could not retrieve parent seal (%x): %w", candidate.Header.ParentID, err)
}

// check if the guarantees in the payload is a valid extension of the finalized state
err = m.guaranteeExtend(candidate)
if err != nil {
return fmt.Errorf("guarantee does not compliance the chain state: %w", err)
}

// check if the seals in the payload is a valid extension of the finalized state
// return the last seal at the parent block
last, err := m.sealExtend(candidate)
// check if the seals in the payload is a valid extension of the finalized
// state, return the last seal at the candidate block
last, err := m.sealExtend(candidate, lastSealUpToParent)
if err != nil {
return fmt.Errorf("seal in parent block does not compliance the chain state: %w", err)
}
Expand Down Expand Up @@ -401,14 +415,21 @@ func (m *Mutator) guaranteeExtend(candidate *flow.Block) error {
return nil
}

// The compliance of the seal payload.
// we need them to create a valid chain of seals on our branch of the chain,
// starting at the block directly after the last sealed block all the way to
// at most the parent.
// We use deterministic lookup by height for the finalized part of the chain and
// then make a list of unfinalized blocks for the remainder, if any seals
// remain.
func (m *Mutator) sealExtend(candidate *flow.Block) (*flow.Seal, error) {
// sealExtend checks the compliance of the payload seals and returns the last
// valid seal on the fork up to and including `candidate`. Payload seals should
// form a valid chain on top of the last seal as of the parent of `candidate`,
// and should only correspond to blocks and execution results incorporated on
// the current fork.
//
// Note that we don't explicitly check that sealed results satisfy the sub-graph
// check. Nevertheless, correctness in this regard is guaranteed because:
// * We only allow seals that correspond to ExecutionReceipts that were
// incorporated in this fork.
// * We only include ExecutionReceipts whose results pass the sub-graph check
// (as part of ReceiptValidator).
// => Therefore, only seals whose results pass the sub-graph check will be
// allowed.
func (m *Mutator) sealExtend(candidate *flow.Block, lastSealUpToParent *flow.Seal) (*flow.Seal, error) {

blockID := candidate.ID()
m.state.tracer.StartSpan(blockID, trace.ProtoStateMutatorExtendCheckSeals)
Expand All @@ -417,6 +438,14 @@ func (m *Mutator) sealExtend(candidate *flow.Block) (*flow.Seal, error) {
header := candidate.Header
payload := candidate.Payload

last := lastSealUpToParent

// if there is no seal in the block payload, use the last sealed block of
// the parent block as the last sealed block of the given block.
if len(payload.Seals) == 0 {
return last, nil
}

// map each seal to the block it is sealing for easy lookup; we will need to
// successfully connect _all_ of these seals to the last sealed block for
// the payload to be valid
Expand All @@ -428,103 +457,111 @@ func (m *Mutator) sealExtend(candidate *flow.Block) (*flow.Seal, error) {
return nil, state.NewInvalidExtensionErrorf("multiple seals for the same block")
}

// get the parent's block seal, which constitutes the beginning of the
// sealing chain; if no seals are part of the payload, it will also be used
// for the candidate block, which remains at the same sealed state
last, err := m.state.seals.ByBlockID(header.ParentID)
if err != nil {
return nil, fmt.Errorf("could not retrieve parent seal (%x): %w", header.ParentID, err)
}

// if there is no seal in the block payload, use the last sealed block of the parent
// block as the last sealed block of the given block.
if len(payload.Seals) == 0 {
return last, nil
}
// incorporatedResults collects the _first_ appearance of unsealed execution
// results on the fork, along with the ID of the block in which they are
// incorporated.
incorporatedResults := make(map[flow.Identifier]*flow.IncorporatedResult)

// get the last sealed block; we use its height to iterate forwards through
// the finalized blocks which still need sealing
sealed, err := m.state.headers.ByBlockID(last.BlockID)
if err != nil {
return nil, fmt.Errorf("could not retrieve sealed block (%x): %w", last.BlockID, err)
}
// collect IDs of blocks on the fork (from parent to last sealed)
var blockIDs []flow.Identifier

var finalizedHeight uint64
err = m.state.db.View(operation.RetrieveFinalizedHeight(&finalizedHeight))
if err != nil {
return nil, fmt.Errorf("could not retrieve finalized height: %w", err)
}
var finalID flow.Identifier
err = m.state.db.View(operation.LookupBlockHeight(finalizedHeight, &finalID))
if err != nil {
return nil, fmt.Errorf("could not lookup finalized block: %w", err)
}
// loop through the fork backwards up to last sealed block and collect
// IncorporatedResults as well as the IDs of blocks visited
sealedID := last.BlockID
ancestorID := header.ParentID
for ancestorID != sealedID {

// we now go from last sealed height plus one to finalized height and check
// if we have the seal for each of them step by step; often we will not even
// enter this loop, because last sealed height is higher than finalized
for height := sealed.Height + 1; height <= finalizedHeight; height++ {
// as we are iterating the finalized blocks, if there is all the seals
// have been used to seal the finalized blocks, and there is no more seal left,
// we could exit earlier with the last seal
if len(byBlock) == 0 {
return last, nil
}
header, err := m.state.headers.ByHeight(height)
ancestor, err := m.state.headers.ByBlockID(ancestorID)
if err != nil {
return nil, fmt.Errorf("could not get block for sealed height (%d): %w", height, err)
}
blockID := header.ID()
next, found := byBlock[blockID]
if !found {
return nil, state.NewInvalidExtensionErrorf("chain of seals broken for finalized (missing: %x)", blockID)
return nil, fmt.Errorf("could not retrieve ancestor header (%x): %w", ancestorID, err)
}
delete(byBlock, blockID)
last = next
}
// In case no seals are left, we skip the remaining part:
if len(byBlock) == 0 {
return last, nil
}
// Once we have filled in seals for all finalized blocks we need to check
// the non-finalized blocks backwards; collect all of them, from direct
// parent to just before finalized, and see if we can use up the rest of the
// seals. We need to stop collecting ancestors either when reaching the
// finalized state, or when reaching the last sealed block.
ancestorID := header.ParentID
var pendingIDs []flow.Identifier
for ancestorID != finalID && ancestorID != last.BlockID {
pendingIDs = append(pendingIDs, ancestorID)
ancestor, err := m.state.headers.ByBlockID(ancestorID)

// keep track of blocks on the fork
blockIDs = append(blockIDs, ancestorID)

payload, err := m.state.payloads.ByBlockID(ancestorID)
if err != nil {
return nil, fmt.Errorf("could not get sealable ancestor (%x): %w", ancestorID, err)
return nil, fmt.Errorf("could not get block payload %x: %w", ancestorID, err)
}

// Collect execution results from receipts.
for _, receipt := range payload.Receipts {
resultID := receipt.ExecutionResult.ID()
// incorporatedResults should contain the _first_ appearance of the
// ExecutionResult. We are traversing the fork backwards, so we can
// overwrite any previously recorded result.

// ATTENTION:
// Here, IncorporatedBlockID (the first argument) should be set
// to ancestorID, because that is the block that contains the
// ExecutionResult. However, in phase 2 of the sealing roadmap,
// we are still using a temporary sealing logic where the
// IncorporatedBlockID is expected to be the result's block ID.
incorporatedResults[resultID] = flow.NewIncorporatedResult(
receipt.ExecutionResult.BlockID,
&receipt.ExecutionResult,
)

}

ancestorID = ancestor.ParentID
}

for i := len(pendingIDs) - 1; i >= 0; i-- {
// as we are iterating the pendings blocks, if there is no more seal left,
// we exit earlier with the last seal
// Loop forward across the fork and try to create a valid chain of seals.
// blockIDs, as populated by the previous loop, is in reverse order.
for i := len(blockIDs) - 1; i >= 0; i-- {
// if there are no more seals left, we can exit earlier
if len(byBlock) == 0 {
return last, nil
}
pendingID := pendingIDs[i]
next, found := byBlock[pendingID]

// return an error if there are still seals to consider, but they break
// the chain
blockID := blockIDs[i]
seal, found := byBlock[blockID]
if !found {
return nil, state.NewInvalidExtensionErrorf("chain of seals broken for pending (missing: %x)", pendingID)
return nil, state.NewInvalidExtensionErrorf("chain of seals broken (missing: %x)", blockID)
}

delete(byBlock, blockID)

// check if we have an incorporatedResult for this seal
incorporatedResult, ok := incorporatedResults[seal.ResultID]
if !ok {
return nil, state.NewInvalidExtensionErrorf("seal %x does not correspond to a result on this fork", seal.ID())
}
delete(byBlock, pendingID)
last = next

// check the integrity of the seal
valid, err := IsValidSeal(seal, incorporatedResult)
if err != nil {
return nil, fmt.Errorf("could not check validity of Seal %v: %w", seal.ID(), err)
}
if !valid {
return nil, state.NewInvalidExtensionErrorf("payload includes invalid seal (%x)", seal.ID())
}

last = seal
}

// This is just a sanity check; at this point, no seals should be left.
// at this point no seals should be left
if len(byBlock) > 0 {
return nil, fmt.Errorf("not all seals connected to state (left: %d)", len(byBlock))
return nil, state.NewInvalidExtensionErrorf("not all seals connected to state (left: %d)", len(byBlock))
}

return last, nil
}

// IsValidSeal checks the crytographic integrity of the seal and checks that
// the seal has collected enough approval signatures based on the chunk
// assignment of the corresponding incorporated result.
func IsValidSeal(seal *flow.Seal, incorporatedResult *flow.IncorporatedResult) (bool, error) {
// TODO
// Check cryptographic integrity
// Check the aggregated signatures against set of assigned verifiers
// How do we get the ChunkAssigner?
return true, nil
}

// receiptExtend checks the compliance of the receipt payload.
// * Receipts should pertain to blocks on the fork
// * Receipts should not appear more than once on a fork
Expand Down
Loading

0 comments on commit e92bbbd

Please sign in to comment.