Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding gas and byte size limits to collections #91

Merged
merged 14 commits into from Oct 28, 2020
8 changes: 8 additions & 0 deletions cmd/collection/main.go
Expand Up @@ -46,6 +46,8 @@ func main() {
var (
txLimit uint
maxCollectionSize uint
maxCollectionByteSize uint64
maxCollectionTotalGas uint64
builderExpiryBuffer uint
builderPayerRateLimit float64
builderUnlimitedPayers []string
Expand Down Expand Up @@ -94,6 +96,10 @@ func main() {
"set of payer addresses which are omitted from rate limiting")
flags.UintVar(&maxCollectionSize, "builder-max-collection-size", 200,
"maximum number of transactions in proposed collections")
flags.Uint64Var(&maxCollectionByteSize, "builder-max-collection-byte-size", 1000000,
"maximum byte size of the proposed collection")
flags.Uint64Var(&maxCollectionTotalGas, "builder-max-collection-total-gas", 1000000,
"maximum total amount of maxgas of transactions in proposed collections")
flags.DurationVar(&hotstuffTimeout, "hotstuff-timeout", 60*time.Second,
"the initial timeout for the hotstuff pacemaker")
flags.DurationVar(&hotstuffMinTimeout, "hotstuff-min-timeout", 2500*time.Millisecond,
Expand Down Expand Up @@ -287,6 +293,8 @@ func main() {
colMetrics,
push,
builder.WithMaxCollectionSize(maxCollectionSize),
builder.WithMaxCollectionByteSize(maxCollectionByteSize),
builder.WithMaxCollectionTotalGas(maxCollectionTotalGas),
builder.WithExpiryBuffer(builderExpiryBuffer),
builder.WithMaxPayerTransactionRate(builderPayerRateLimit),
builder.WithUnlimitedPayers(unlimitedPayers...),
Expand Down
14 changes: 9 additions & 5 deletions engine/collection/ingest/engine.go
Expand Up @@ -143,9 +143,11 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error {
// from outside the system or routed from another collection node.
func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBody) error {

txID := tx.ID()

log := e.log.With().
Hex("origin_id", originID[:]).
Hex("tx_id", logging.Entity(tx)).
Hex("tx_id", txID[:]).
Hex("ref_block_id", tx.ReferenceBlockID[:]).
Logger()

Expand Down Expand Up @@ -180,7 +182,6 @@ func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBod
pool := e.pools.ForEpoch(counter)

// short-circuit if we have already stored the transaction
txID := tx.ID()
if pool.Has(txID) {
e.log.Debug().Msg("received dupe transaction")
return nil
Expand All @@ -203,13 +204,16 @@ func (e *Engine) onTransaction(originID flow.Identifier, tx *flow.TransactionBod
return fmt.Errorf("node is not assigned to any cluster in this epoch: %d", counter)
}

localClusterFingerPrint := localCluster.Fingerprint()
txClusterFingerPrint := txCluster.Fingerprint()

log = log.With().
Hex("local_cluster", logging.ID(localCluster.Fingerprint())).
Hex("tx_cluster", logging.ID(txCluster.Fingerprint())).
Hex("local_cluster", logging.ID(localClusterFingerPrint)).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is fingerprint an ID? I understand that this function, AFAIR just hexprints it, but seems semantically wrong to use ID() on fingerprint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think fingerprint is different than ID here as it ignores some data parts I guess, but since I don't know that much about this part of the code, I won't change it right now.

Copy link
Member Author

@ramtinms ramtinms Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point on semantics. the issue is we don't have a fingerprint type, and it still returns an Identifier type

Hex("tx_cluster", logging.ID(txClusterFingerPrint)).
Logger()

// if our cluster is responsible for the transaction, add it to the mempool
if localCluster.Fingerprint() == txCluster.Fingerprint() {
if localClusterFingerPrint == txClusterFingerPrint {
_ = pool.Add(tx)
e.colMetrics.TransactionIngested(txID)
log.Debug().Msg("added transaction to pool")
Expand Down
4 changes: 2 additions & 2 deletions model/flow/transaction.go
Expand Up @@ -64,7 +64,7 @@ func (tb TransactionBody) Fingerprint() []byte {
})
}

func (tb TransactionBody) ByteSize() int {
func (tb TransactionBody) ByteSize() uint {
size := 0
size += len(tb.ReferenceBlockID)
size += len(tb.Script)
Expand All @@ -81,7 +81,7 @@ func (tb TransactionBody) ByteSize() int {
for _, s := range tb.EnvelopeSignatures {
size += s.ByteSize()
}
return size
return uint(size)
}

func (tb TransactionBody) ID() Identifier {
Expand Down
30 changes: 30 additions & 0 deletions module/builder/collection/builder.go
Expand Up @@ -195,13 +195,41 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
minRefID := refChainFinalizedID

var transactions []*flow.TransactionBody
var totalByteSize uint64
var totalGas uint64
for _, tx := range b.transactions.All() {

// if we have reached maximum number of transactions, stop
if uint(len(transactions)) >= b.config.MaxCollectionSize {
break
}

txByteSize := uint64(tx.ByteSize())
// ignore transactions with tx byte size bigger that the max amount per collection
// this case shouldn't happen ever since we keep a limit on tx byte size but in case
// we keep this condition
if txByteSize > b.config.MaxCollectionByteSize {
continue
}

// because the max byte size per tx is way smaller than the max collection byte size, we can stop here and not continue.
// to make it more effective in the future we can continue adding smaller ones
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned by this comment:

In the worst case, if the byte size of the first transaction returned by transactions.All() is bigger than the collection's max byte size, then no transaction will be added. The leader will keep proposing empty collections.

Could we add a check to say if the single transaction's byte size is bigger than the max colleciton byte size, we will continue instead of break:


			if tx.ByteSize() > b.config.MaxCollectionByteSize {
				continue
			}
			if totalByteSize+tx.ByteSize() > b.config.MaxCollectionByteSize {
				break
			}

This could act as a "filter", or we could simply remove that over-sized transaction from the mempool

Same comment to the GasLimit

Copy link
Member Author

@ramtinms ramtinms Oct 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we covered this case by setting the tx max limit to 64K, but you're right there is no harm covering this edge case in case one day we removed tx size limits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally we get to coin-selection problem!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applied the suggested changes.

if totalByteSize+txByteSize > b.config.MaxCollectionByteSize {
break
}

// ignore transactions with max gas bigger that the max total gas per collection
// this case shouldn't happen ever but in case we keep this condition
if tx.GasLimit > b.config.MaxCollectionTotalGas {
continue
}

// cause the max gas limit per tx is way smaller than the total max gas per collection, we can stop here and not continue.
// to make it more effective in the future we can continue adding smaller ones
if totalGas+tx.GasLimit > b.config.MaxCollectionTotalGas {
break
}

// retrieve the main chain header that was used as reference
refHeader, err := b.mainHeaders.ByBlockID(tx.ReferenceBlockID)
if errors.Is(err, storage.ErrNotFound) {
Expand Down Expand Up @@ -251,6 +279,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
limiter.transactionIncluded(tx)

transactions = append(transactions, tx)
totalByteSize += txByteSize
totalGas += tx.GasLimit
}

// STEP FOUR: we have a set of transactions that are valid to include
Expand Down
37 changes: 37 additions & 0 deletions module/builder/collection/builder_test.go
Expand Up @@ -111,6 +111,7 @@ func (suite *BuilderSuite) Bootstrap() {
transaction := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) {
tx.ReferenceBlockID = root.ID()
tx.ProposalKey.SequenceNumber = uint64(i)
tx.GasLimit = uint64(9999)
})
added := suite.pool.Add(&transaction)
suite.Assert().True(added)
Expand Down Expand Up @@ -459,6 +460,42 @@ func (suite *BuilderSuite) TestBuildOn_MaxCollectionSize() {
suite.Assert().Equal(builtCollection.Len(), 1)
}

func (suite *BuilderSuite) TestBuildOn_MaxCollectionByteSize() {
// set the max collection byte size to 600 (each tx is about 273 bytes)
suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionByteSize(600))

// build a block
header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter)
suite.Require().Nil(err)

// retrieve the built block from storage
var built model.Block
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
suite.Require().Nil(err)
builtCollection := built.Payload.Collection

// should be only 2 transactions in the collection, since each tx is ~273 bytes and the limit is 600 bytes
suite.Assert().Equal(builtCollection.Len(), 2)
}

func (suite *BuilderSuite) TestBuildOn_MaxCollectionTotalGas() {
// set the max gas to 20,000
suite.builder = builder.NewBuilder(suite.db, trace.NewNoopTracer(), suite.headers, suite.headers, suite.payloads, suite.pool, builder.WithMaxCollectionTotalGas(20000))

// build a block
header, err := suite.builder.BuildOn(suite.genesis.ID(), noopSetter)
suite.Require().Nil(err)

// retrieve the built block from storage
var built model.Block
err = suite.db.View(procedure.RetrieveClusterBlock(header.ID(), &built))
suite.Require().Nil(err)
builtCollection := built.Payload.Collection

// should be only 2 transactions in collection, since each transaction has gas limit of 9,999 and collection limit is set to 20,000
suite.Assert().Equal(builtCollection.Len(), 2)
}

func (suite *BuilderSuite) TestBuildOn_ExpiredTransaction() {

// create enough main-chain blocks that an expired transaction is possible
Expand Down
20 changes: 20 additions & 0 deletions module/builder/collection/config.go
Expand Up @@ -28,6 +28,12 @@ type Config struct {
// UnlimitedPayer is a set of addresses which are not affected by per-payer
// rate limiting.
UnlimitedPayers map[flow.Address]struct{}

// MaxCollectionByteSize is the maximum byte size of a collection.
MaxCollectionByteSize uint64

// MaxCollectionTotalGas is the maximum of total of gas per collection (sum of maxGasLimit over transactions)
MaxCollectionTotalGas uint64
}

func DefaultConfig() Config {
Expand All @@ -36,6 +42,8 @@ func DefaultConfig() Config {
ExpiryBuffer: 15, // 15 blocks for collections to be included
MaxPayerTransactionRate: 0, // no rate limiting
UnlimitedPayers: make(map[flow.Address]struct{}), // no unlimited payers
MaxCollectionByteSize: uint64(1000000), // ~1MB
MaxCollectionTotalGas: uint64(1000000), // 1M
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The limit has been defined there, why defining again here?

Could we let me refer to the same number?

I'm afraid one day we change one place and forgot to change the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just followed the convention in the code to have a DefaultConfig and command line support to change it if needed. I won't change it as it needs to change all other values as well. maybe something for the future to change, cc: @jordanschalm

}
}

Expand Down Expand Up @@ -71,3 +79,15 @@ func WithUnlimitedPayers(payers ...flow.Address) Opt {
c.UnlimitedPayers = lookup
}
}

func WithMaxCollectionByteSize(limit uint64) Opt {
return func(c *Config) {
c.MaxCollectionByteSize = limit
}
}

func WithMaxCollectionTotalGas(limit uint64) Opt {
return func(c *Config) {
c.MaxCollectionTotalGas = limit
}
}