Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Collection] Compliance engine message queues #1270

Merged
merged 14 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 12 additions & 3 deletions consensus/hotstuff/event_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/module/metrics"
)

Expand All @@ -20,6 +21,7 @@ type EventLoop struct {
proposals chan *model.Proposal
votes chan *model.Vote

lm *lifecycle.LifecycleManager
unit *engine.Unit // lock for preventing concurrent state transitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like LifecycleManager and Unit supply largely redundant functionality here (?)
The only component from unit that we utilize here seems to be the WaitGroup:

wg sync.WaitGroup // tracks in-progress functions

Is there any reason to not replace unit by a WaitGroup. Would remove some of the clutter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really hope to see one day LifecycleManager fused with Unit so that's why I would leave it as it in and just remove unit completely then.

}

Expand All @@ -30,6 +32,7 @@ func NewEventLoop(log zerolog.Logger, metrics module.HotstuffMetrics, eventHandl

el := &EventLoop{
log: log,
lm: lifecycle.NewLifecycleManager(),
eventHandler: eventHandler,
metrics: metrics,
proposals: proposals,
Expand Down Expand Up @@ -195,11 +198,17 @@ func (el *EventLoop) SubmitVote(originID flow.Identifier, blockID flow.Identifie
// Multiple calls are handled gracefully and the event loop will only start
// once.
func (el *EventLoop) Ready() <-chan struct{} {
el.unit.Launch(el.loop)
return el.unit.Ready()
el.lm.OnStart(func() {
el.unit.Launch(el.loop)
})
return el.lm.Started()
}

// Done implements interface module.ReadyDoneAware
func (el *EventLoop) Done() <-chan struct{} {
return el.unit.Done()
el.lm.OnStop(func() {
// wait for event loop to exit
<-el.unit.Done()
})
return el.lm.Stopped()
}
311 changes: 311 additions & 0 deletions engine/collection/compliance/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
// (c) 2019 Dapper Labs - ALL RIGHTS RESERVED

package compliance

import (
"errors"
"fmt"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/state"
clusterkv "github.com/onflow/flow-go/state/cluster"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)

// Core contains the central business logic for the collector clusters' compliance engine.
// It is responsible for handling communication for the embedded consensus algorithm.
// NOTE: Core is designed to be non-thread safe and cannot be used in concurrent environment
// user of this object needs to ensure single thread access.
type Core struct {
log zerolog.Logger // used to log relevant actions with context
metrics module.EngineMetrics
mempoolMetrics module.MempoolMetrics
collectionMetrics module.CollectionMetrics
headers storage.Headers
state clusterkv.MutableState
pending module.PendingClusterBlockBuffer // pending block cache
sync module.BlockRequester
hotstuff module.HotStuff
}

// NewCore instantiates the business logic for the collector clusters' compliance engine.
func NewCore(
log zerolog.Logger,
collector module.EngineMetrics,
mempool module.MempoolMetrics,
collectionMetrics module.CollectionMetrics,
headers storage.Headers,
state clusterkv.MutableState,
pending module.PendingClusterBlockBuffer,
) (*Core, error) {

c := &Core{
log: log.With().Str("cluster_compliance", "core").Logger(),
metrics: collector,
mempoolMetrics: mempool,
collectionMetrics: collectionMetrics,
headers: headers,
state: state,
pending: pending,
sync: nil, // use `WithSync`
hotstuff: nil, // use `WithConsensus`
}

// log the mempool size off the bat
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

return c, nil
}

// OnBlockProposal handles incoming block proposals.
func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.ClusterBlockProposal) error {
header := proposal.Header
log := c.log.With().
Hex("origin_id", originID[:]).
Str("chain_id", header.ChainID.String()).
Uint64("block_height", header.Height).
Uint64("block_view", header.View).
Hex("block_id", logging.Entity(header)).
Hex("parent_id", header.ParentID[:]).
Hex("payload_hash", header.PayloadHash[:]).
Time("timestamp", header.Timestamp).
Hex("proposer", header.ProposerID[:]).
Int("num_signers", len(header.ParentVoterIDs)).
Logger()
log.Info().Msg("block proposal received")

c.prunePendingCache()

// first, we reject all blocks that we don't need to process:
// 1) blocks already in the cache; they will already be processed later
// 2) blocks already on disk; they were processed and await finalization

// ignore proposals that are already cached
_, cached := c.pending.ByID(header.ID())
if cached {
log.Debug().Msg("skipping already cached proposal")
return nil
}

// ignore proposals that were already processed
_, err := c.headers.ByBlockID(header.ID())
if err == nil {
log.Debug().Msg("skipping already processed proposal")
return nil
}
if !errors.Is(err, storage.ErrNotFound) {
return fmt.Errorf("could not check proposal: %w", err)
}

// there are two possibilities if the proposal is neither already pending
// processing in the cache, nor has already been processed:
// 1) the proposal is unverifiable because parent or ancestor is unknown
// => we cache the proposal and request the missing link
// 2) the proposal is connected to finalized state through an unbroken chain
// => we verify the proposal and forward it to hotstuff if valid

// if we can connect the proposal to an ancestor in the cache, it means
// there is a missing link; we cache it and request the missing link
ancestor, found := c.pending.ByID(header.ParentID)
if found {

// add the block to the cache
_ = c.pending.Add(originID, proposal)
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

// go to the first missing ancestor
ancestorID := ancestor.Header.ParentID
ancestorHeight := ancestor.Header.Height - 1
for {
ancestor, found := c.pending.ByID(ancestorID)
if !found {
break
}
ancestorID = ancestor.Header.ParentID
ancestorHeight = ancestor.Header.Height - 1
}

log.Debug().
Uint64("ancestor_height", ancestorHeight).
Hex("ancestor_id", ancestorID[:]).
Msg("requesting missing ancestor for proposal")

c.sync.RequestBlock(ancestorID)

return nil
}

// if the proposal is connected to a block that is neither in the cache, nor
// in persistent storage, its direct parent is missing; cache the proposal
// and request the parent
_, err = c.headers.ByBlockID(header.ParentID)
if errors.Is(err, storage.ErrNotFound) {

_ = c.pending.Add(originID, proposal)

c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())

log.Debug().Msg("requesting missing parent for proposal")

c.sync.RequestBlock(header.ParentID)

return nil
}
if err != nil {
return fmt.Errorf("could not check parent: %w", err)
}

// At this point, we should be able to connect the proposal to the finalized
// state and should process it to see whether to forward to hotstuff or not.
// processBlockAndDescendants is a recursive function. Here we trace the
// execution of the entire recursion, which might include processing the
// proposal's pending children. There is another span within
// processBlockProposal that measures the time spent for a single proposal.
err = c.processBlockAndDescendants(proposal)
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
if err != nil {
return fmt.Errorf("could not process block proposal: %w", err)
}

return nil
}

// processBlockAndDescendants is a recursive function that processes a block and
// its pending proposals for its children. By induction, any children connected
// to a valid proposal are validly connected to the finalized state and can be
// processed as well.
func (c *Core) processBlockAndDescendants(proposal *messages.ClusterBlockProposal) error {
blockID := proposal.Header.ID()

// process block itself
err := c.processBlockProposal(proposal)
// child is outdated by the time we started processing it
// => node was probably behind and is catching up. Log as warning
if engine.IsOutdatedInputError(err) {
c.log.Info().Msg("dropped processing of abandoned fork; this might be an indicator that the node is slightly behind")
return nil
}
// the block is invalid; log as error as we desire honest participation
// ToDo: potential slashing
if engine.IsInvalidInputError(err) {
c.log.Warn().Err(err).Msg("received invalid block from other node (potential slashing evidence?)")
return nil
}
if err != nil {
// unexpected error: potentially corrupted internal state => abort processing and escalate error
return fmt.Errorf("failed to process block %x: %w", blockID, err)
}

// process all children
// do not break on invalid or outdated blocks as they should not prevent us
// from processing other valid children
children, has := c.pending.ByParentID(blockID)
if !has {
return nil
}
for _, child := range children {
childProposal := &messages.ClusterBlockProposal{
Header: child.Header,
Payload: child.Payload,
}
cpr := c.processBlockAndDescendants(childProposal)
if cpr != nil {
// unexpected error: potentially corrupted internal state => abort processing and escalate error
return cpr
}
}

// drop all the children that should have been processed now
c.pending.DropForParent(blockID)

return nil
}

// processBlockProposal processes the given block proposal. The proposal must connect to
// the finalized state.
func (c *Core) processBlockProposal(proposal *messages.ClusterBlockProposal) error {
header := proposal.Header
log := c.log.With().
Str("chain_id", header.ChainID.String()).
Uint64("block_height", header.Height).
Uint64("block_view", header.View).
Hex("block_id", logging.Entity(header)).
Hex("parent_id", header.ParentID[:]).
Hex("payload_hash", header.PayloadHash[:]).
Time("timestamp", header.Timestamp).
Hex("proposer", header.ProposerID[:]).
Int("num_signers", len(header.ParentVoterIDs)).
Logger()
log.Info().Msg("processing block proposal")

// see if the block is a valid extension of the protocol state
block := &cluster.Block{
Header: proposal.Header,
Payload: proposal.Payload,
}
err := c.state.Extend(block)
// if the block proposes an invalid extension of the protocol state, then the block is invalid
if state.IsInvalidExtensionError(err) {
return engine.NewInvalidInputErrorf("invalid extension of protocol state (block: %x, height: %d): %w",
header.ID(), header.Height, err)
}
// protocol state aborted processing of block as it is on an abandoned fork: block is outdated
if state.IsOutdatedExtensionError(err) {
return engine.NewOutdatedInputErrorf("outdated extension of protocol state: %w", err)
}
if err != nil {
return fmt.Errorf("could not extend protocol state (block: %x, height: %d): %w", header.ID(), header.Height, err)
}

// retrieve the parent
parent, err := c.headers.ByBlockID(header.ParentID)
if err != nil {
return fmt.Errorf("could not retrieve proposal parent: %w", err)
}

// submit the model to hotstuff for processing
log.Info().Msg("forwarding block proposal to hotstuff")
c.hotstuff.SubmitProposal(header, parent.View)

return nil
}

// OnBlockVote handles votes for blocks by passing them to the core consensus
// algorithm
func (c *Core) OnBlockVote(originID flow.Identifier, vote *messages.ClusterBlockVote) error {

c.log.Debug().
Hex("origin_id", originID[:]).
Hex("block_id", vote.BlockID[:]).
Uint64("view", vote.View).
Msg("received vote")

c.hotstuff.SubmitVote(originID, vote.BlockID, vote.View, vote.SigData)
return nil
}

// prunePendingCache prunes the pending block cache by removing any blocks that
// are below the finalized height.
func (c *Core) prunePendingCache() {

// retrieve the finalized height
final, err := c.state.Final().Head()
if err != nil {
c.log.Warn().Err(err).Msg("could not get finalized head to prune pending blocks")
return
}

// remove all pending blocks at or below the finalized height
c.pending.PruneByHeight(final.Height)

// always record the metric
c.mempoolMetrics.MempoolEntries(metrics.ResourceClusterProposal, c.pending.Size())
}