diff --git a/cmd/collection/main.go b/cmd/collection/main.go index 3c74ba48fc5..49897427722 100644 --- a/cmd/collection/main.go +++ b/cmd/collection/main.go @@ -47,6 +47,8 @@ func main() { txLimit uint maxCollectionSize uint builderExpiryBuffer uint + builderPayerRateLimit float64 + builderUnlimitedPayers []string hotstuffTimeout time.Duration hotstuffMinTimeout time.Duration hotstuffTimeoutIncreaseFactor float64 @@ -86,6 +88,10 @@ func main() { "how many additional cluster members we propagate transactions to") flags.UintVar(&builderExpiryBuffer, "builder-expiry-buffer", 25, "expiry buffer for transactions in proposed collections") + flags.Float64Var(&builderPayerRateLimit, "builder-rate-limit", 0, // no rate limiting + "rate limit for each payer (transactions/collection)") + flags.StringSliceVar(&builderUnlimitedPayers, "builder-unlimited-payers", []string{}, // no unlimited payers + "set of payer addresses which are omitted from rate limiting") flags.UintVar(&maxCollectionSize, "builder-max-collection-size", 200, "maximum number of transactions in proposed collections") flags.DurationVar(&hotstuffTimeout, "hotstuff-timeout", 60*time.Second, @@ -267,6 +273,13 @@ func main() { return nil, err } + // convert hex string flag values to addresses + unlimitedPayers := make([]flow.Address, 0, len(builderUnlimitedPayers)) + for _, payerStr := range builderUnlimitedPayers { + payerAddr := flow.HexToAddress(payerStr) + unlimitedPayers = append(unlimitedPayers, payerAddr) + } + builderFactory, err := factories.NewBuilderFactory( node.DB, node.Storage.Headers, @@ -275,6 +288,8 @@ func main() { push, builder.WithMaxCollectionSize(maxCollectionSize), builder.WithExpiryBuffer(builderExpiryBuffer), + builder.WithMaxPayerTransactionRate(builderPayerRateLimit), + builder.WithUnlimitedPayers(unlimitedPayers...), ) if err != nil { return nil, err diff --git a/integration/localnet/Makefile b/integration/localnet/Makefile index 815fd690d67..9c2503669fa 100644 --- a/integration/localnet/Makefile +++ b/integration/localnet/Makefile @@ -1,4 +1,4 @@ -COLLECTION = 1 +COLLECTION = 3 CONSENSUS = 3 EXECUTION = 1 VERIFICATION = 1 diff --git a/integration/localnet/bootstrap.go b/integration/localnet/bootstrap.go index 286278922c2..204cbf5b0f7 100644 --- a/integration/localnet/bootstrap.go +++ b/integration/localnet/bootstrap.go @@ -24,7 +24,7 @@ const ( DockerComposeFile = "./docker-compose.nodes.yml" DockerComposeFileVersion = "3.7" PrometheusTargetsFile = "./targets.nodes.json" - DefaultCollectionCount = 1 + DefaultCollectionCount = 3 DefaultConsensusCount = 3 DefaultExecutionCount = 1 DefaultVerificationCount = 1 diff --git a/module/builder/collection/builder.go b/module/builder/collection/builder.go index e9124fd4f7c..4b4eca4e919 100644 --- a/module/builder/collection/builder.go +++ b/module/builder/collection/builder.go @@ -25,28 +25,13 @@ import ( // HotStuff event loop is the only consumer of this interface and is single // threaded, this is OK. type Builder struct { - db *badger.DB - mainHeaders storage.Headers - clusterHeaders storage.Headers - payloads storage.ClusterPayloads - transactions mempool.Transactions - tracer module.Tracer - maxCollectionSize uint - expiryBuffer uint -} - -type Opt func(*Builder) - -func WithMaxCollectionSize(size uint) Opt { - return func(b *Builder) { - b.maxCollectionSize = size - } -} - -func WithExpiryBuffer(buf uint) Opt { - return func(b *Builder) { - b.expiryBuffer = buf - } + db *badger.DB + mainHeaders storage.Headers + clusterHeaders storage.Headers + payloads storage.ClusterPayloads + transactions mempool.Transactions + tracer module.Tracer + config Config } func NewBuilder( @@ -60,19 +45,19 @@ func NewBuilder( ) *Builder { b := Builder{ - db: db, - tracer: tracer, - mainHeaders: mainHeaders, - clusterHeaders: clusterHeaders, - payloads: payloads, - transactions: transactions, - maxCollectionSize: 100, - expiryBuffer: 15, + db: db, + tracer: tracer, + mainHeaders: mainHeaders, + clusterHeaders: clusterHeaders, + payloads: payloads, + transactions: transactions, + config: DefaultConfig(), } for _, apply := range opts { - apply(&b) + apply(&b.config) } + return &b } @@ -127,9 +112,19 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er b.tracer.StartSpan(parentID, trace.COLBuildOnUnfinalizedLookup) defer b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup) + // RATE LIMITING: the builder module can be configured to limit the + // rate at which transactions with a common payer are included in + // blocks. Depending on the configured limit, we either allow 1 + // transaction every N sequential collections, or we allow K transactions + // per collection. + + // keep track of transactions in the ancestry to avoid duplicates + lookup := newTransactionLookup() + // keep track of transactions to enforce rate limiting + limiter := newRateLimiter(b.config, parent.Height+1) + // look up previously included transactions in UN-FINALIZED ancestors ancestorID := parentID - unfinalizedLookup := make(map[flow.Identifier]struct{}) clusterFinalID := clusterFinal.ID() for ancestorID != clusterFinalID { ancestor, err := b.clusterHeaders.ByBlockID(ancestorID) @@ -148,7 +143,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er collection := payload.Collection for _, tx := range collection.Transactions { - unfinalizedLookup[tx.ID()] = struct{}{} + lookup.addUnfinalizedAncestor(tx.ID()) + limiter.addAncestor(ancestor.Height, tx) } ancestorID = ancestor.ParentID } @@ -166,7 +162,6 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er } // look up previously included transactions in FINALIZED ancestors - finalizedLookup := make(map[flow.Identifier]struct{}) ancestorID = clusterFinal.ID() ancestorHeight := clusterFinal.Height for ancestorHeight > limit { @@ -181,7 +176,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er collection := payload.Collection for _, tx := range collection.Transactions { - finalizedLookup[tx.ID()] = struct{}{} + lookup.addFinalizedAncestor(tx.ID()) + limiter.addAncestor(ancestor.Height, tx) } ancestorID = ancestor.ParentID @@ -202,7 +198,7 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er for _, tx := range b.transactions.All() { // if we have reached maximum number of transactions, stop - if uint(len(transactions)) >= b.maxCollectionSize { + if uint(len(transactions)) >= b.config.MaxCollectionSize { break } @@ -222,32 +218,38 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er // ensure the reference block is not too old txID := tx.ID() - if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.expiryBuffer) { + if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.config.ExpiryBuffer) { // the transaction is expired, it will never be valid b.transactions.Rem(txID) continue } // check that the transaction was not already used in un-finalized history - _, duplicated := unfinalizedLookup[txID] - if duplicated { + if lookup.isUnfinalizedAncestor(txID) { continue } // check that the transaction was not already included in finalized history. - _, duplicated = finalizedLookup[txID] - if duplicated { + if lookup.isFinalizedAncestor(txID) { // remove from mempool, conflicts with finalized block will never be valid b.transactions.Rem(txID) continue } + // enforce rate limiting rules + if limiter.shouldRateLimit(tx) { + continue + } + // ensure we find the lowest reference block height if refHeader.Height < minRefHeight { minRefHeight = refHeader.Height minRefID = tx.ReferenceBlockID } + // update per-payer transaction count + limiter.transactionIncluded(tx) + transactions = append(transactions, tx) } diff --git a/module/builder/collection/builder_test.go b/module/builder/collection/builder_test.go index d2dd15a20c1..6841a7bf720 100644 --- a/module/builder/collection/builder_test.go +++ b/module/builder/collection/builder_test.go @@ -135,6 +135,29 @@ func (suite *BuilderSuite) Payload(transactions ...*flow.TransactionBody) model. return model.PayloadFromTransactions(final.ID(), transactions...) } +// ProtoStateRoot returns the root block of the protocol state. +func (suite *BuilderSuite) ProtoStateRoot() *flow.Header { + root, err := suite.protoState.Params().Root() + suite.Require().Nil(err) + return root +} + +// ClearPool removes all items from the pool +func (suite *BuilderSuite) ClearPool() { + // TODO use Clear() + for _, tx := range suite.pool.All() { + suite.pool.Rem(tx.ID()) + } +} + +// FillPool adds n transactions to the pool, using the given generator function. +func (suite *BuilderSuite) FillPool(n int, create func() *flow.TransactionBody) { + for i := 0; i < n; i++ { + tx := create() + suite.pool.Add(tx) + } +} + func TestBuilder(t *testing.T) { suite.Run(t, new(BuilderSuite)) } @@ -518,6 +541,210 @@ func (suite *BuilderSuite) TestBuildOn_EmptyMempool() { suite.Assert().Equal(0, built.Payload.Collection.Len()) } +// With rate limiting turned off, we should fill collections as fast as we can +// regardless of how many transactions with the same payer we include. +func (suite *BuilderSuite) TestBuildOn_NoRateLimiting() { + + // start with an empty mempool + suite.ClearPool() + + // create builder with no rate limit and max 10 tx/collection + suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + builder.WithMaxCollectionSize(10), + builder.WithMaxPayerTransactionRate(0), + ) + + // fill the pool with 100 transactions from the same payer + payer := unittest.RandomAddressFixture() + create := func() *flow.TransactionBody { + tx := unittest.TransactionBodyFixture() + tx.ReferenceBlockID = suite.ProtoStateRoot().ID() + tx.Payer = payer + return &tx + } + suite.FillPool(100, create) + + // since we have no rate limiting we should fill all collections and in 10 blocks + parentID := suite.genesis.ID() + for i := 0; i < 10; i++ { + header, err := suite.builder.BuildOn(parentID, noopSetter) + suite.Require().Nil(err) + parentID = header.ID() + + // each collection should be full with 10 transactions + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().Nil(err) + suite.Assert().Len(built.Payload.Collection.Transactions, 10) + } +} + +// With rate limiting turned on, we should be able to fill transactions as fast +// as possible so long as per-payer limits are not reached. This test generates +// transactions such that the number of transactions with a given proposer exceeds +// the rate limit -- since it's the proposer not the payer, it shouldn't limit +// our collections. +func (suite *BuilderSuite) TestBuildOn_RateLimitNonPayer() { + + // start with an empty mempool + suite.ClearPool() + + // create builder with 5 tx/payer and max 10 tx/collection + suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + builder.WithMaxCollectionSize(10), + builder.WithMaxPayerTransactionRate(5), + ) + + // fill the pool with 100 transactions with the same proposer + // since it's not the same payer, rate limit does not apply + proposer := unittest.RandomAddressFixture() + create := func() *flow.TransactionBody { + tx := unittest.TransactionBodyFixture() + tx.ReferenceBlockID = suite.ProtoStateRoot().ID() + tx.Payer = unittest.RandomAddressFixture() + tx.ProposalKey = flow.ProposalKey{ + Address: proposer, + KeyID: rand.Uint64(), + SequenceNumber: rand.Uint64(), + } + return &tx + } + suite.FillPool(100, create) + + // since rate limiting does not apply to non-payer keys, we should fill all collections in 10 blocks + parentID := suite.genesis.ID() + for i := 0; i < 10; i++ { + header, err := suite.builder.BuildOn(parentID, noopSetter) + suite.Require().Nil(err) + parentID = header.ID() + + // each collection should be full with 10 transactions + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().Nil(err) + suite.Assert().Len(built.Payload.Collection.Transactions, 10) + } +} + +// When configured with a rate limit of k>1, we should be able to include up to +// k transactions with a given payer per collection +func (suite *BuilderSuite) TestBuildOn_HighRateLimit() { + + // start with an empty mempool + suite.ClearPool() + + // create builder with 5 tx/payer and max 10 tx/collection + suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + builder.WithMaxCollectionSize(10), + builder.WithMaxPayerTransactionRate(5), + ) + + // fill the pool with 50 transactions from the same payer + payer := unittest.RandomAddressFixture() + create := func() *flow.TransactionBody { + tx := unittest.TransactionBodyFixture() + tx.ReferenceBlockID = suite.ProtoStateRoot().ID() + tx.Payer = payer + return &tx + } + suite.FillPool(50, create) + + // rate-limiting should be applied, resulting in half-full collections (5/10) + parentID := suite.genesis.ID() + for i := 0; i < 10; i++ { + header, err := suite.builder.BuildOn(parentID, noopSetter) + suite.Require().Nil(err) + parentID = header.ID() + + // each collection should be half-full with 5 transactions + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().Nil(err) + suite.Assert().Len(built.Payload.Collection.Transactions, 5) + } +} + +// When configured with a rate limit of k<1, we should be able to include 1 +// transactions with a given payer every ceil(1/k) collections +func (suite *BuilderSuite) TestBuildOn_LowRateLimit() { + + // start with an empty mempool + suite.ClearPool() + + // create builder with .5 tx/payer and max 10 tx/collection + suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + builder.WithMaxCollectionSize(10), + builder.WithMaxPayerTransactionRate(.5), + ) + + // fill the pool with 5 transactions from the same payer + payer := unittest.RandomAddressFixture() + create := func() *flow.TransactionBody { + tx := unittest.TransactionBodyFixture() + tx.ReferenceBlockID = suite.ProtoStateRoot().ID() + tx.Payer = payer + return &tx + } + suite.FillPool(5, create) + + // rate-limiting should be applied, resulting in every ceil(1/k) collections + // having one transaction and empty collections otherwise + parentID := suite.genesis.ID() + for i := 0; i < 10; i++ { + header, err := suite.builder.BuildOn(parentID, noopSetter) + suite.Require().Nil(err) + parentID = header.ID() + + // collections should either be empty or have 1 transaction + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().Nil(err) + if i%2 == 0 { + suite.Assert().Len(built.Payload.Collection.Transactions, 1) + } else { + suite.Assert().Len(built.Payload.Collection.Transactions, 0) + } + } +} +func (suite *BuilderSuite) TestBuildOn_UnlimitedPayer() { + + // start with an empty mempool + suite.ClearPool() + + // create builder with 5 tx/payer and max 10 tx/collection + // configure an unlimited payer + payer := unittest.RandomAddressFixture() + suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, + builder.WithMaxCollectionSize(10), + builder.WithMaxPayerTransactionRate(5), + builder.WithUnlimitedPayers(payer), + ) + + // fill the pool with 100 transactions from the same payer + create := func() *flow.TransactionBody { + tx := unittest.TransactionBodyFixture() + tx.ReferenceBlockID = suite.ProtoStateRoot().ID() + tx.Payer = payer + return &tx + } + suite.FillPool(100, create) + + // rate-limiting should not be applied, since the payer is marked as unlimited + parentID := suite.genesis.ID() + for i := 0; i < 10; i++ { + header, err := suite.builder.BuildOn(parentID, noopSetter) + suite.Require().Nil(err) + parentID = header.ID() + + // each collection should be full with 10 transactions + var built model.Block + err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built)) + suite.Assert().Nil(err) + suite.Assert().Len(built.Payload.Collection.Transactions, 10) + + } +} + // helper to check whether a collection contains each of the given transactions. func collectionContains(collection flow.Collection, txIDs ...flow.Identifier) bool { diff --git a/module/builder/collection/config.go b/module/builder/collection/config.go new file mode 100644 index 00000000000..89c3c84c12d --- /dev/null +++ b/module/builder/collection/config.go @@ -0,0 +1,73 @@ +package collection + +import ( + "github.com/onflow/flow-go/model/flow" +) + +// Config is the configurable options for the collection builder. +type Config struct { + + // MaxCollectionSize is the maximum size of collections. + MaxCollectionSize uint + + // ExpiryBuffer is how much buffer we add when considering transaction + // expiry. If the buffer is set to 10, and a transaction actually expires + // in 15 blocks, we consider it expired in 5 (15-10) blocks. This accounts + // for the time between the collection being built and being included in + // block. + ExpiryBuffer uint + + // MaxPayerTransactionRate is the maximum number of transactions per payer + // per collection. Fractional values greater than 1 are rounded down. + // Fractional values 0= 1 || limiter.rate <= 0 { + return + } + + latest := limiter.latestCollectionHeight[tx.Payer] + if height >= latest { + limiter.latestCollectionHeight[tx.Payer] = height + } +} + +// note that we have added a transaction to the collection under construction. +func (limiter *rateLimiter) transactionIncluded(tx *flow.TransactionBody) { + limiter.txIncludedCount[tx.Payer]++ +} + +// applies the rate limiting rules, returning whether the transaction should be +// omitted from the collection under construction. +func (limiter *rateLimiter) shouldRateLimit(tx *flow.TransactionBody) bool { + + payer := tx.Payer + + // skip rate limiting if it is turned off or the payer is unlimited + _, isUnlimited := limiter.unlimited[payer] + if limiter.rate == 0 || isUnlimited { + return false + } + + // if rate >=1, we only consider the current collection and rate limit once + // the number of transactions for the payer exceeds rate + if limiter.rate >= 1 { + if limiter.txIncludedCount[payer] >= uint(math.Floor(limiter.rate)) { + return true + } + } + + // if rate < 1, we need to look back to see when a transaction by this payer + // was most recently included - we rate limit if the # of collections since + // the payer's last transaction is less than ceil(1/rate) + if limiter.rate < 1 { + + // rate limit if we've already include a transaction for this payer, we allow + // AT MOST one transaction per payer in a given collection + if limiter.txIncludedCount[payer] > 0 { + return true + } + + // otherwise, check whether sufficiently many empty collection + // have been built since the last transaction from the payer + + latestHeight, hasLatest := limiter.latestCollectionHeight[payer] + // if there is no recent transaction, don't rate limit + if !hasLatest { + return false + } + + if limiter.height-latestHeight < uint64(math.Ceil(1/limiter.rate)) { + return true + } + } + + return false +} diff --git a/module/builder/collection/tx_lookup.go b/module/builder/collection/tx_lookup.go new file mode 100644 index 00000000000..b99073acf98 --- /dev/null +++ b/module/builder/collection/tx_lookup.go @@ -0,0 +1,42 @@ +package collection + +import "github.com/onflow/flow-go/model/flow" + +// transactionLookup encapsulates state and logic for checking chain history +// to avoid transaction duplication while building collections. +type transactionLookup struct { + // set of transaction IDs in finalized ancestry + finalized map[flow.Identifier]struct{} + // set of transaction IDs in unfinalzied ancestry + unfinalized map[flow.Identifier]struct{} +} + +func newTransactionLookup() *transactionLookup { + lookup := &transactionLookup{ + finalized: make(map[flow.Identifier]struct{}), + unfinalized: make(map[flow.Identifier]struct{}), + } + return lookup +} + +// note the existence of a transaction in a finalized noteAncestor collection +func (lookup *transactionLookup) addFinalizedAncestor(txID flow.Identifier) { + lookup.finalized[txID] = struct{}{} +} + +// note the existence of a transaction in a unfinalized noteAncestor collection +func (lookup *transactionLookup) addUnfinalizedAncestor(txID flow.Identifier) { + lookup.unfinalized[txID] = struct{}{} +} + +// checks whether the given transaction ID is in a finalized noteAncestor collection +func (lookup *transactionLookup) isFinalizedAncestor(txID flow.Identifier) bool { + _, exists := lookup.finalized[txID] + return exists +} + +// checks whether the given transaction ID is in a unfinalized noteAncestor collection +func (lookup *transactionLookup) isUnfinalizedAncestor(txID flow.Identifier) bool { + _, exists := lookup.unfinalized[txID] + return exists +} diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 54ec70727b7..259e1aede73 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -27,11 +27,21 @@ func AddressFixture() flow.Address { return flow.Testnet.Chain().ServiceAddress() } +func RandomAddressFixture() flow.Address { + // we use a 32-bit index - since the linear address generator uses 45 bits, + // this won't error + addr, err := flow.Testnet.Chain().AddressAtIndex(uint64(rand.Uint32())) + if err != nil { + panic(err) + } + return addr +} + func TransactionSignatureFixture() flow.TransactionSignature { return flow.TransactionSignature{ Address: AddressFixture(), SignerIndex: 0, - Signature: []byte{1, 2, 3, 4}, + Signature: SeedFixture(64), KeyID: 1, } }