Skip to content

Commit

Permalink
Merge pull request #67 from onflow/jordan/4909-acct-rate-limiting
Browse files Browse the repository at this point in the history
Rate limit by payer account
  • Loading branch information
jordanschalm committed Oct 19, 2020
2 parents 660c45c + bdf8458 commit ace6bdc
Show file tree
Hide file tree
Showing 9 changed files with 516 additions and 44 deletions.
15 changes: 15 additions & 0 deletions cmd/collection/main.go
Expand Up @@ -47,6 +47,8 @@ func main() {
txLimit uint
maxCollectionSize uint
builderExpiryBuffer uint
builderPayerRateLimit float64
builderUnlimitedPayers []string
hotstuffTimeout time.Duration
hotstuffMinTimeout time.Duration
hotstuffTimeoutIncreaseFactor float64
Expand Down Expand Up @@ -86,6 +88,10 @@ func main() {
"how many additional cluster members we propagate transactions to")
flags.UintVar(&builderExpiryBuffer, "builder-expiry-buffer", 25,
"expiry buffer for transactions in proposed collections")
flags.Float64Var(&builderPayerRateLimit, "builder-rate-limit", 0, // no rate limiting
"rate limit for each payer (transactions/collection)")
flags.StringSliceVar(&builderUnlimitedPayers, "builder-unlimited-payers", []string{}, // no unlimited payers
"set of payer addresses which are omitted from rate limiting")
flags.UintVar(&maxCollectionSize, "builder-max-collection-size", 200,
"maximum number of transactions in proposed collections")
flags.DurationVar(&hotstuffTimeout, "hotstuff-timeout", 60*time.Second,
Expand Down Expand Up @@ -267,6 +273,13 @@ func main() {
return nil, err
}

// convert hex string flag values to addresses
unlimitedPayers := make([]flow.Address, 0, len(builderUnlimitedPayers))
for _, payerStr := range builderUnlimitedPayers {
payerAddr := flow.HexToAddress(payerStr)
unlimitedPayers = append(unlimitedPayers, payerAddr)
}

builderFactory, err := factories.NewBuilderFactory(
node.DB,
node.Storage.Headers,
Expand All @@ -275,6 +288,8 @@ func main() {
push,
builder.WithMaxCollectionSize(maxCollectionSize),
builder.WithExpiryBuffer(builderExpiryBuffer),
builder.WithMaxPayerTransactionRate(builderPayerRateLimit),
builder.WithUnlimitedPayers(unlimitedPayers...),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion integration/localnet/Makefile
@@ -1,4 +1,4 @@
COLLECTION = 1
COLLECTION = 3
CONSENSUS = 3
EXECUTION = 1
VERIFICATION = 1
Expand Down
2 changes: 1 addition & 1 deletion integration/localnet/bootstrap.go
Expand Up @@ -24,7 +24,7 @@ const (
DockerComposeFile = "./docker-compose.nodes.yml"
DockerComposeFileVersion = "3.7"
PrometheusTargetsFile = "./targets.nodes.json"
DefaultCollectionCount = 1
DefaultCollectionCount = 3
DefaultConsensusCount = 3
DefaultExecutionCount = 1
DefaultVerificationCount = 1
Expand Down
84 changes: 43 additions & 41 deletions module/builder/collection/builder.go
Expand Up @@ -25,28 +25,13 @@ import (
// HotStuff event loop is the only consumer of this interface and is single
// threaded, this is OK.
type Builder struct {
db *badger.DB
mainHeaders storage.Headers
clusterHeaders storage.Headers
payloads storage.ClusterPayloads
transactions mempool.Transactions
tracer module.Tracer
maxCollectionSize uint
expiryBuffer uint
}

type Opt func(*Builder)

func WithMaxCollectionSize(size uint) Opt {
return func(b *Builder) {
b.maxCollectionSize = size
}
}

func WithExpiryBuffer(buf uint) Opt {
return func(b *Builder) {
b.expiryBuffer = buf
}
db *badger.DB
mainHeaders storage.Headers
clusterHeaders storage.Headers
payloads storage.ClusterPayloads
transactions mempool.Transactions
tracer module.Tracer
config Config
}

func NewBuilder(
Expand All @@ -60,19 +45,19 @@ func NewBuilder(
) *Builder {

b := Builder{
db: db,
tracer: tracer,
mainHeaders: mainHeaders,
clusterHeaders: clusterHeaders,
payloads: payloads,
transactions: transactions,
maxCollectionSize: 100,
expiryBuffer: 15,
db: db,
tracer: tracer,
mainHeaders: mainHeaders,
clusterHeaders: clusterHeaders,
payloads: payloads,
transactions: transactions,
config: DefaultConfig(),
}

for _, apply := range opts {
apply(&b)
apply(&b.config)
}

return &b
}

Expand Down Expand Up @@ -127,9 +112,19 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
b.tracer.StartSpan(parentID, trace.COLBuildOnUnfinalizedLookup)
defer b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup)

// RATE LIMITING: the builder module can be configured to limit the
// rate at which transactions with a common payer are included in
// blocks. Depending on the configured limit, we either allow 1
// transaction every N sequential collections, or we allow K transactions
// per collection.

// keep track of transactions in the ancestry to avoid duplicates
lookup := newTransactionLookup()
// keep track of transactions to enforce rate limiting
limiter := newRateLimiter(b.config, parent.Height+1)

// look up previously included transactions in UN-FINALIZED ancestors
ancestorID := parentID
unfinalizedLookup := make(map[flow.Identifier]struct{})
clusterFinalID := clusterFinal.ID()
for ancestorID != clusterFinalID {
ancestor, err := b.clusterHeaders.ByBlockID(ancestorID)
Expand All @@ -148,7 +143,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

collection := payload.Collection
for _, tx := range collection.Transactions {
unfinalizedLookup[tx.ID()] = struct{}{}
lookup.addUnfinalizedAncestor(tx.ID())
limiter.addAncestor(ancestor.Height, tx)
}
ancestorID = ancestor.ParentID
}
Expand All @@ -166,7 +162,6 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
}

// look up previously included transactions in FINALIZED ancestors
finalizedLookup := make(map[flow.Identifier]struct{})
ancestorID = clusterFinal.ID()
ancestorHeight := clusterFinal.Height
for ancestorHeight > limit {
Expand All @@ -181,7 +176,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

collection := payload.Collection
for _, tx := range collection.Transactions {
finalizedLookup[tx.ID()] = struct{}{}
lookup.addFinalizedAncestor(tx.ID())
limiter.addAncestor(ancestor.Height, tx)
}

ancestorID = ancestor.ParentID
Expand All @@ -202,7 +198,7 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
for _, tx := range b.transactions.All() {

// if we have reached maximum number of transactions, stop
if uint(len(transactions)) >= b.maxCollectionSize {
if uint(len(transactions)) >= b.config.MaxCollectionSize {
break
}

Expand All @@ -222,32 +218,38 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

// ensure the reference block is not too old
txID := tx.ID()
if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.expiryBuffer) {
if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.config.ExpiryBuffer) {
// the transaction is expired, it will never be valid
b.transactions.Rem(txID)
continue
}

// check that the transaction was not already used in un-finalized history
_, duplicated := unfinalizedLookup[txID]
if duplicated {
if lookup.isUnfinalizedAncestor(txID) {
continue
}

// check that the transaction was not already included in finalized history.
_, duplicated = finalizedLookup[txID]
if duplicated {
if lookup.isFinalizedAncestor(txID) {
// remove from mempool, conflicts with finalized block will never be valid
b.transactions.Rem(txID)
continue
}

// enforce rate limiting rules
if limiter.shouldRateLimit(tx) {
continue
}

// ensure we find the lowest reference block height
if refHeader.Height < minRefHeight {
minRefHeight = refHeader.Height
minRefID = tx.ReferenceBlockID
}

// update per-payer transaction count
limiter.transactionIncluded(tx)

transactions = append(transactions, tx)
}

Expand Down

0 comments on commit ace6bdc

Please sign in to comment.