Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Consensus and Collection] Refactors guarantee dissemination #1406

Merged
merged 8 commits into from
Oct 2, 2021
26 changes: 10 additions & 16 deletions engine/collection/pusher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/onflow/flow-go/utils/logging"
)

const DefaultRecipientCount uint = 3

// Engine is the collection pusher engine, which provides access to resources
// held by the collection node.
type Engine struct {
Expand All @@ -34,21 +32,18 @@ type Engine struct {
state protocol.State
collections storage.Collections
transactions storage.Transactions

recipientCount uint // number of consensus nodes to push to
}

func New(log zerolog.Logger, net module.Network, state protocol.State, engMetrics module.EngineMetrics, colMetrics module.CollectionMetrics, me module.Local, collections storage.Collections, transactions storage.Transactions) (*Engine, error) {
e := &Engine{
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,
recipientCount: DefaultRecipientCount,
unit: engine.NewUnit(),
log: log.With().Str("engine", "pusher").Logger(),
engMetrics: engMetrics,
colMetrics: colMetrics,
me: me,
state: state,
collections: collections,
transactions: transactions,
}

conduit, err := net.Register(engine.PushGuarantees, e)
Expand Down Expand Up @@ -130,8 +125,7 @@ func (e *Engine) onSubmitCollectionGuarantee(originID flow.Identifier, req *mess
return e.SubmitCollectionGuarantee(&req.Guarantee)
}

// SubmitCollectionGuarantee submits the collection guarantee to all
// consensus nodes.
// SubmitCollectionGuarantee submits the collection guarantee to all consensus nodes.
func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee) error {

consensusNodes, err := e.state.Final().Identities(filter.HasRole(flow.RoleConsensus))
Expand All @@ -144,7 +138,7 @@ func (e *Engine) SubmitCollectionGuarantee(guarantee *flow.CollectionGuarantee)
// network usage significantly by implementing a simple retry mechanism here and
// only sending to a single consensus node.
// => https://github.com/dapperlabs/flow-go/issues/4358
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to update this comment to avoid confusing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

err = e.conduit.Multicast(guarantee, e.recipientCount, consensusNodes.NodeIDs()...)
err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not submit collection guarantee: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions engine/collection/pusher/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/model/messages"
metrics "github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/metrics"
module "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/mocknetwork"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand Down Expand Up @@ -87,7 +87,7 @@ func (suite *Suite) TestSubmitCollectionGuarantee() {

// should submit the collection to consensus nodes
consensus := suite.identities.Filter(filter.HasRole(flow.RoleConsensus))
suite.conduit.On("Multicast", guarantee, pusher.DefaultRecipientCount, consensus[0].NodeID).Return(nil)
suite.conduit.On("Publish", guarantee, consensus[0].NodeID).Return(nil)

msg := &messages.SubmitCollectionGuarantee{
Guarantee: *guarantee,
Expand Down
25 changes: 4 additions & 21 deletions engine/consensus/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func New(
return nil, fmt.Errorf("could not register engine: %w", err)
}
e.con = con

return e, nil
}

Expand Down Expand Up @@ -245,26 +244,10 @@ func (e *Engine) onGuarantee(originID flow.Identifier, guarantee *flow.Collectio
// NOTE: there are two ways to go about this:
// - expect the collection nodes to propagate the guarantee to all consensus nodes;
// - ensure that we take care of propagating guarantees to other consensus nodes.
// It's probably a better idea to make sure the consensus nodes take care of this.
// The consensus committee is the backbone of the network and should rely as little
// as possible on correct behaviour from other node roles. At the same time, there
// are likely to be significantly more consensus nodes on the network, which means
// it's a better usage of resources to distribute the load for propagation over
// consensus node committee than over the collection clusters.

// select all the consensus nodes on the network as our targets
committee, err := final.Identities(filter.HasRole(flow.RoleConsensus))
if err != nil {
return fmt.Errorf("could not get committee: %w", err)
}

// send the collection guarantee to all consensus committee
err = e.con.Publish(guarantee, committee.NodeIDs()...)
if err != nil {
return fmt.Errorf("could not send guarantee: %w", err)
}

log.Info().Msg("collection guarantee broadcast to committee")
// Currently, we go with first option as each collection node broadcasts a guarantee to
// all consensus nodes. So we expect all collections of a cluster to broadcast a guarantee to
// all consensus nodes. Even on an unhappy path, as long as only one collection node does it
// the guarantee must be delivered to all consensus nodes.

return nil
}
Expand Down
80 changes: 33 additions & 47 deletions engine/consensus/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,17 +168,16 @@ func (suite *IngestionSuite) TestOnGuaranteeNewFromCollection() {
suite.pool.On("Has", guarantee.ID()).Return(false)
suite.pool.On("Add", guarantee).Return(true)

suite.expectGuaranteePublished(guarantee)

// submit the guarantee as if it was sent by a collection node
err := suite.ingest.onGuarantee(suite.collID, guarantee)
suite.Assert().NoError(err, "should not error on new guarantee from collection node")

// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeUnstaked() {
Expand All @@ -200,7 +199,8 @@ func (suite *IngestionSuite) TestOnGuaranteeUnstaked() {
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNewFromConsensus() {
Expand All @@ -218,8 +218,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNewFromConsensus() {
// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeOld() {
Expand All @@ -237,8 +238,9 @@ func (suite *IngestionSuite) TestOnGuaranteeOld() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNotAdded() {
Expand All @@ -256,8 +258,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNotAdded() {
// check that the guarantee has been added to the mempool
suite.pool.AssertCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeNoGuarantor() {
Expand All @@ -279,8 +282,9 @@ func (suite *IngestionSuite) TestOnGuaranteeNoGuarantor() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeInvalidRole() {
Expand All @@ -302,8 +306,9 @@ func (suite *IngestionSuite) TestOnGuaranteeInvalidRole() {
// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeExpired() {
Expand All @@ -327,11 +332,9 @@ func (suite *IngestionSuite) TestOnGuaranteeExpired() {
suite.Assert().Error(err, "should error with expired collection")
suite.Assert().True(engine.IsOutdatedInputError(err))

// check that the guarantee has been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeInvalidGuarantor() {
Expand All @@ -350,11 +353,9 @@ func (suite *IngestionSuite) TestOnGuaranteeInvalidGuarantor() {
suite.Assert().Error(err, "should error with invalid guarantor")
suite.Assert().True(engine.IsInvalidInputError(err))

// check that the guarantee has not been added to the mempool
suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// check that the submit call was not called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

// test that just after an epoch boundary we still accept guarantees from collectors
Expand All @@ -375,8 +376,6 @@ func (suite *IngestionSuite) TestOnGuaranteeEpochEnd() {
suite.pool.On("Has", guarantee.ID()).Return(false)
suite.pool.On("Add", guarantee).Return(true)

suite.expectGuaranteePublished(guarantee)

// submit the guarantee as if it was sent by the collection node which
// is leaving at the current epoch boundary
err := suite.ingest.onGuarantee(suite.collID, guarantee)
Expand All @@ -385,8 +384,9 @@ func (suite *IngestionSuite) TestOnGuaranteeEpochEnd() {
// check that the guarantee has been added to the mempool
suite.pool.AssertExpectations(suite.T())

// check that the Publish call was called
suite.conduit.AssertExpectations(suite.T())
// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

func (suite *IngestionSuite) TestOnGuaranteeUnknownOrigin() {
Expand All @@ -403,6 +403,10 @@ func (suite *IngestionSuite) TestOnGuaranteeUnknownOrigin() {
suite.Assert().True(engine.IsInvalidInputError(err))

suite.pool.AssertNotCalled(suite.T(), "Add", guarantee)

// we should not propagate the guarantee
suite.conduit.AssertNotCalled(suite.T(), "Multicast", guarantee, mock.Anything, mock.Anything)
suite.conduit.AssertNotCalled(suite.T(), "Publish", guarantee, mock.Anything)
}

// validGuarantee returns a valid collection guarantee based on the suite state.
Expand All @@ -412,21 +416,3 @@ func (suite *IngestionSuite) validGuarantee() *flow.CollectionGuarantee {
guarantee.ReferenceBlockID = suite.head.ID()
return guarantee
}

// expectGuaranteePublished creates an expectation on the Conduit mock that the
// guarantee should be published to the consensus nodes
func (suite *IngestionSuite) expectGuaranteePublished(guarantee *flow.CollectionGuarantee) {

// check that we call the submit with the correct consensus node IDs
suite.conduit.On("Publish", guarantee, mock.Anything, mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
nodeID1 := args.Get(1).(flow.Identifier)
nodeID2 := args.Get(2).(flow.Identifier)
nodeID3 := args.Get(3).(flow.Identifier)
suite.Assert().ElementsMatch(
[]flow.Identifier{nodeID1, nodeID2, nodeID3},
[]flow.Identifier{suite.con1ID, suite.con2ID, suite.con3ID},
)
},
).Return(nil).Once()
}