Skip to content

Commit

Permalink
Adding gas and byte size limits to collections (#91)
Browse files Browse the repository at this point in the history
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
  • Loading branch information
ramtinms and jordanschalm committed Oct 28, 2020
1 parent e3bebd6 commit 45905b8
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 7 deletions.
8 changes: 8 additions & 0 deletions cmd/collection/main.go
Expand Up @@ -46,6 +46,8 @@ func main() {
var (
txLimit uint
maxCollectionSize uint
maxCollectionByteSize uint64
maxCollectionTotalGas uint64
builderExpiryBuffer uint
builderPayerRateLimit float64
builderUnlimitedPayers []string
Expand Down Expand Up @@ -94,6 +96,10 @@ func main() {
"set of payer addresses which are omitted from rate limiting")
flags.UintVar(&maxCollectionSize, "builder-max-collection-size", 200,
"maximum number of transactions in proposed collections")
flags.Uint64Var(&maxCollectionByteSize, "builder-max-collection-byte-size", 1000000,
"maximum byte size of the proposed collection")
flags.Uint64Var(&maxCollectionTotalGas, "builder-max-collection-total-gas", 1000000,
"maximum total amount of maxgas of transactions in proposed collections")
flags.DurationVar(&hotstuffTimeout, "hotstuff-timeout", 60*time.Second,
"the initial timeout for the hotstuff pacemaker")
flags.DurationVar(&hotstuffMinTimeout, "hotstuff-min-timeout", 2500*time.Millisecond,
Expand Down Expand Up @@ -287,6 +293,8 @@ func main() {
colMetrics,
push,
builder.WithMaxCollectionSize(maxCollectionSize),
builder.WithMaxCollectionByteSize(maxCollectionByteSize),
builder.WithMaxCollectionTotalGas(maxCollectionTotalGas),
builder.WithExpiryBuffer(builderExpiryBuffer),
builder.WithMaxPayerTransactionRate(builderPayerRateLimit),
builder.WithUnlimitedPayers(unlimitedPayers...),
Expand Down
14 changes: 9 additions & 5 deletions engine/collection/ingest/engine.go
Expand Up @@ -143,9 +143,11 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error {
// from outside the system or routed from another collection node.
func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBody) error {

txID := tx.ID()

log := e.log.With().
Hex("origin_id", originID[:]).
Hex("tx_id", logging.Entity(tx)).
Hex("tx_id", txID[:]).
Hex("ref_block_id", tx.ReferenceBlockID[:]).
Logger()

Expand Down Expand Up @@ -180,7 +182,6 @@ func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBod
pool := e.pools.ForEpoch(counter)

// short-circuit if we have already stored the transaction
txID := tx.ID()
if pool.Has(txID) {
e.log.Debug().Msg("received dupe transaction")
return nil
Expand All @@ -203,13 +204,16 @@ func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBod
return fmt.Errorf("node is not assigned to any cluster in this epoch: %d", counter)
}

localClusterFingerPrint := localCluster.Fingerprint()
txClusterFingerPrint := txCluster.Fingerprint()

log = log.With().
Hex("local_cluster", logging.ID(localCluster.Fingerprint())).
Hex("tx_cluster", logging.ID(txCluster.Fingerprint())).
Hex("local_cluster", logging.ID(localClusterFingerPrint)).
Hex("tx_cluster", logging.ID(txClusterFingerPrint)).
Logger()

// if our cluster is responsible for the transaction, add it to the mempool
if localCluster.Fingerprint() == txCluster.Fingerprint() {
if localClusterFingerPrint == txClusterFingerPrint {
_ = pool.Add(tx)
e.colMetrics.TransactionIngested(txID)
log.Debug().Msg("added transaction to pool")
Expand Down
4 changes: 2 additions & 2 deletions model/flow/transaction.go
Expand Up @@ -64,7 +64,7 @@ func (tb TransactionBody) Fingerprint() []byte {
})
}

func (tb TransactionBody) ByteSize() int {
func (tb TransactionBody) ByteSize() uint {
size := 0
size += len(tb.ReferenceBlockID)
size += len(tb.Script)
Expand All @@ -81,7 +81,7 @@ func (tb TransactionBody) ByteSize() int {
for _, s := range tb.EnvelopeSignatures {
size += s.ByteSize()
}
return size
return uint(size)
}

func (tb TransactionBody) ID() Identifier {
Expand Down
30 changes: 30 additions & 0 deletions module/builder/collection/builder.go
Expand Up @@ -195,13 +195,41 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
minRefID := refChainFinalizedID

var transactions []*flow.TransactionBody
var totalByteSize uint64
var totalGas uint64
for _, tx := range b.transactions.All() {

// if we have reached maximum number of transactions, stop
if uint(len(transactions)) >= b.config.MaxCollectionSize {
break
}

txByteSize := uint64(tx.ByteSize())
// ignore transactions with tx byte size bigger that the max amount per collection
// this case shouldn't happen ever since we keep a limit on tx byte size but in case
// we keep this condition
if txByteSize > b.config.MaxCollectionByteSize {
continue
}

// because the max byte size per tx is way smaller than the max collection byte size, we can stop here and not continue.
// to make it more effective in the future we can continue adding smaller ones
if totalByteSize+txByteSize > b.config.MaxCollectionByteSize {
break
}

// ignore transactions with max gas bigger that the max total gas per collection
// this case shouldn't happen ever but in case we keep this condition
if tx.GasLimit > b.config.MaxCollectionTotalGas {
continue
}

// cause the max gas limit per tx is way smaller than the total max gas per collection, we can stop here and not continue.
// to make it more effective in the future we can continue adding smaller ones
if totalGas+tx.GasLimit > b.config.MaxCollectionTotalGas {
break
}

// retrieve the main chain header that was used as reference
refHeader, err := b.mainHeaders.ByBlockID(tx.ReferenceBlockID)
if errors.Is(err, storage.ErrNotFound) {
Expand Down Expand Up @@ -251,6 +279,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
limiter.transactionIncluded(tx)

transactions = append(transactions, tx)
totalByteSize += txByteSize
totalGas += tx.GasLimit
}

// STEP FOUR: we have a set of transactions that are valid to include
Expand Down
37 changes: 37 additions & 0 deletions module/builder/collection/builder_test.go
Expand Up @@ -111,6 +111,7 @@ func (suite *BuilderSuite) Bootstrap() {
transaction := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) {
tx.ReferenceBlockID = root.ID()
tx.ProposalKey.SequenceNumber = uint64(i)
tx.GasLimit = uint64(9999)
})
added := suite.pool.Add(&transaction)
suite.Assert().True(added)
Expand Down Expand Up @@ -459,6 +460,42 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() {
suite.Assert().Equal(builtCollection.Len(), 1)
}

func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() {
// set the max collection byte size to 600 (each tx is about 273 bytes)
suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionByteSize(600))

// build a block
header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter)
suite.Require().Nil(err)

// retrieve the built block from storage
var built model.Block
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
suite.Require().Nil(err)
builtCollection := built.Payload.Collection

// should be only 2 transactions in the collection, since each tx is ~273 bytes and the limit is 600 bytes
suite.Assert().Equal(builtCollection.Len(), 2)
}

func (suite *BuilderSuite) TestBuildOn_MaxCollectionTotalGas() {
// set the max gas to 20,000
suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionTotalGas(20000))

// build a block
header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter)
suite.Require().Nil(err)

// retrieve the built block from storage
var built model.Block
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
suite.Require().Nil(err)
builtCollection := built.Payload.Collection

// should be only 2 transactions in collection, since each transaction has gas limit of 9,999 and collection limit is set to 20,000
suite.Assert().Equal(builtCollection.Len(), 2)
}

func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() {

// create enough main-chain blocks that an expired transaction is possible
Expand Down
20 changes: 20 additions & 0 deletions module/builder/collection/config.go
Expand Up @@ -28,6 +28,12 @@ type Config struct {
// UnlimitedPayer is a set of addresses which are not affected by per-payer
// rate limiting.
UnlimitedPayers map[flow.Address]struct{}

// MaxCollectionByteSize is the maximum byte size of a collection.
MaxCollectionByteSize uint64

// MaxCollectionTotalGas is the maximum of total of gas per collection (sum of maxGasLimit over transactions)
MaxCollectionTotalGas uint64
}

func DefaultConfig() Config {
Expand All @@ -36,6 +42,8 @@ func DefaultConfig() Config {
ExpiryBuffer: 15, // 15 blocks for collections to be included
MaxPayerTransactionRate: 0, // no rate limiting
UnlimitedPayers: make(map[flow.Address]struct{}), // no unlimited payers
MaxCollectionByteSize: uint64(1000000), // ~1MB
MaxCollectionTotalGas: uint64(1000000), // 1M
}
}

Expand Down Expand Up @@ -71,3 +79,15 @@ func WithUnlimitedPayers(payers ...flow.Address) Opt {
c.UnlimitedPayers = lookup
}
}

func WithMaxCollectionByteSize(limit uint64) Opt {
return func(c *Config) {
c.MaxCollectionByteSize = limit
}
}

func WithMaxCollectionTotalGas(limit uint64) Opt {
return func(c *Config) {
c.MaxCollectionTotalGas = limit
}
}

0 comments on commit 45905b8

Please sign in to comment.