Skip to content

Commit

Permalink
[Consensus and Collection] Refactors guarantee dissemination (#1406)
Browse files Browse the repository at this point in the history
* refactors pusher engine multicast

* refactors ingestion engine publish mechanism on consensus node side

* fixes lint

* fixes a comment
  • Loading branch information
yhassanzadeh13 committed Oct 2, 2021
1 parent 5b3a98a commit 3e27bfb
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 92 deletions.
34 changes: 12 additions & 22 deletions engine/collection/pusher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

const DefaultRecipientCount uint = 3

// Engine is the collection pusher engine, which provides access to resources
// held by the collection node.
type Engine struct {
Expand All @@ -34,21 +32,18 @@ type Engine struct {
state protocol.State
collections storage.Collections
transactions storage.Transactions

recipientCount uint // number of consensus nodes to push to
}

func New(log zerolog.Logger, net module.Network, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,
recipientCount: DefaultRecipientCount,
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,
}

conduit, err := net.Register(engine.PushGuarantees, e)
Expand Down Expand Up @@ -130,21 +125,16 @@ func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *mess
return e.SubmitCollectionGuarantee(&req.Guarantee)
}

// SubmitCollectionGuarantee submits the collection guarantee to all
// consensus nodes.
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {

consensusNodes, err := e.state.Final().Identities(filter.HasRole(flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get consensus nodes: %w", err)
}

// TODO: We actually only need to send to a small subset of consensus engines, as
// they propagate the guarantee within the consensus committee. We can reduce
// network usage significantly by implementing a simple retry mechanism here and
// only sending to a single consensus node.
// => https://github.com/dapperlabs/flow-go/issues/4358
err = e.conduit.Multicast(guarantee, e.recipientCount, consensusNodes.NodeIDs()...)
// NOTE: Consensus nodes do not broadcast guarantees among themselves, so it needs that
// at least one collection node make a publish to all of them.
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not submit collection guarantee: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions engine/collection/pusher/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
metrics "github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/mocknetwork"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand Down Expand Up @@ -87,7 +87,7 @@ func (suite *Suite) TestSubmitCollectionGuarantee() {

// should submit the collection to consensus nodes
consensus := suite.identities.Filter(filter.HasRole(flow.RoleConsensus))
suite.conduit.On("Multicast", guarantee, pusher.DefaultRecipientCount, consensus[0].NodeID).Return(nil)
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil)

msg := &messages.SubmitCollectionGuarantee{
Guarantee: *guarantee,
Expand Down
25 changes: 4 additions & 21 deletions engine/consensus/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func New(
return nil, fmt.Errorf("could not register engine: %w", err)
}
e.con = con

return e, nil
}

Expand Down Expand Up @@ -245,26 +244,10 @@ func (e *Engine) onGuarantee(originID flow.Identifier, guarantee *flow.Collectio
// NOTE: there are two ways to go about this:
// - expect the collection nodes to propagate the guarantee to all consensus nodes;
// - ensure that we take care of propagating guarantees to other consensus nodes.
// It's probably a better idea to make sure the consensus nodes take care of this.
// The consensus committee is the backbone of the network and should rely as little
// as possible on correct behaviour from other node roles. At the same time, there
// are likely to be significantly more consensus nodes on the network, which means
// it's a better usage of resources to distribute the load for propagation over
// consensus node committee than over the collection clusters.

// select all the consensus nodes on the network as our targets
committee, err := final.Identities(filter.HasRole(flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get committee: %w", err)
}

// send the collection guarantee to all consensus committee
err = e.con.Publish(guarantee, committee.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not send guarantee: %w", err)
}

log.Info().Msg("collection guarantee broadcast to committee")
// Currently, we go with first option as each collection node broadcasts a guarantee to
// all consensus nodes. So we expect all collections of a cluster to broadcast a guarantee to
// all consensus nodes. Even on an unhappy path, as long as only one collection node does it
// the guarantee must be delivered to all consensus nodes.

return nil
}
Expand Down
80 changes: 33 additions & 47 deletions engine/consensus/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,16 @@ func (suite *IngestionSuite) TestOnGuaranteeNewFromCollection() {
suite.pool.On("Has", guarantee.ID()).Return(false)
suite.pool.On("Add", guarantee).Return(true)

suite.expectGuaranteePublished(guarantee)

// submit the guarantee as if it was sent by a collection node
err := suite.ingest.onGuarantee(suite.collID, guarantee)
suite.Assert().NoError(err, "should not error on new guarantee from collection node")

// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeUnstaked() {
Expand All @@ -200,7 +199,8 @@ func (suite *IngestionSuite) TestOnGuaranteeUnstaked() {
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNewFromConsensus() {
Expand All @@ -218,8 +218,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNewFromConsensus() {
// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeOld() {
Expand All @@ -237,8 +238,9 @@ func (suite *IngestionSuite) TestOnGuaranteeOld() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNotAdded() {
Expand All @@ -256,8 +258,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNotAdded() {
// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNoGuarantor() {
Expand All @@ -279,8 +282,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNoGuarantor() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeInvalidRole() {
Expand All @@ -302,8 +306,9 @@ func (suite *IngestionSuite) TestOnGuaranteeInvalidRole() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeExpired() {
Expand All @@ -327,11 +332,9 @@ func (suite *IngestionSuite) TestOnGuaranteeExpired() {
suite.Assert().Error(err, "should error with expired collection")
suite.Assert().True(engine.IsOutdatedInputError(err))

// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeInvalidGuarantor() {
Expand All @@ -350,11 +353,9 @@ func (suite *IngestionSuite) TestOnGuaranteeInvalidGuarantor() {
suite.Assert().Error(err, "should error with invalid guarantor")
suite.Assert().True(engine.IsInvalidInputError(err))

// check that the guarantee has not been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

// test that just after an epoch boundary we still accept guarantees from collectors
Expand All @@ -375,8 +376,6 @@ func (suite *IngestionSuite) TestOnGuaranteeEpochEnd() {
suite.pool.On("Has", guarantee.ID()).Return(false)
suite.pool.On("Add", guarantee).Return(true)

suite.expectGuaranteePublished(guarantee)

// submit the guarantee as if it was sent by the collection node which
// is leaving at the current epoch boundary
err := suite.ingest.onGuarantee(suite.collID, guarantee)
Expand All @@ -385,8 +384,9 @@ func (suite *IngestionSuite) TestOnGuaranteeEpochEnd() {
// check that the guarantee has been added to the mempool
suite.pool.AssertExpectations(suite.T())

// check that the Publish call was called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeUnknownOrigin() {
Expand All @@ -403,6 +403,10 @@ func (suite *IngestionSuite) TestOnGuaranteeUnknownOrigin() {
suite.Assert().True(engine.IsInvalidInputError(err))

suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

// validGuarantee returns a valid collection guarantee based on the suite state.
Expand All @@ -412,21 +416,3 @@ func (suite *IngestionSuite) validGuarantee() *flow.CollectionGuarantee {
guarantee.ReferenceBlockID = suite.head.ID()
return guarantee
}

// expectGuaranteePublished creates an expectation on the Conduit mock that the
// guarantee should be published to the consensus nodes
func (suite *IngestionSuite) expectGuaranteePublished(guarantee *flow.CollectionGuarantee) {

// check that we call the submit with the correct consensus node IDs
suite.conduit.On("Publish", guarantee, mock.Anything, mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
nodeID1 := args.Get(1).(flow.Identifier)
nodeID2 := args.Get(2).(flow.Identifier)
nodeID3 := args.Get(3).(flow.Identifier)
suite.Assert().ElementsMatch(
[]flow.Identifier{nodeID1, nodeID2, nodeID3},
[]flow.Identifier{suite.con1ID, suite.con2ID, suite.con3ID},
)
},
).Return(nil).Once()
}

0 comments on commit 3e27bfb

Please sign in to comment.