Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rate limit by payer account #67

Merged
merged 13 commits into from Oct 19, 2020
15 changes: 15 additions & 0 deletions cmd/collection/main.go
Expand Up @@ -46,6 +46,8 @@ func main() {
txLimit uint
maxCollectionSize uint
builderExpiryBuffer uint
builderPayerRateLimit float64
builderUnlimitedPayers []string
hotstuffTimeout time.Duration
hotstuffMinTimeout time.Duration
hotstuffTimeoutIncreaseFactor float64
Expand Down Expand Up @@ -85,6 +87,10 @@ func main() {
"how many additional cluster members we propagate transactions to")
flags.UintVar(&builderExpiryBuffer, "builder-expiry-buffer", 25,
"expiry buffer for transactions in proposed collections")
flags.Float64Var(&builderPayerRateLimit, "builder-rate-limit", 0, // no rate limiting
"rate limit for each payer (transactions/collection)")
flags.StringSliceVar(&builderUnlimitedPayers, "builder-unlimited-payers", []string{}, // no unlimited payers
"set of payer addresses which are omitted from rate limiting")
flags.UintVar(&maxCollectionSize, "builder-max-collection-size", 200,
"maximum number of transactions in proposed collections")
flags.DurationVar(&hotstuffTimeout, "hotstuff-timeout", 60*time.Second,
Expand Down Expand Up @@ -265,6 +271,13 @@ func main() {
return nil, err
}

// convert hex string flag values to addresses
unlimitedPayers := make([]flow.Address, 0, len(builderUnlimitedPayers))
for _, payerStr := range builderUnlimitedPayers {
payerAddr := flow.HexToAddress(payerStr)
unlimitedPayers = append(unlimitedPayers, payerAddr)
}

builderFactory, err := factories.NewBuilderFactory(
node.DB,
node.Storage.Headers,
Expand All @@ -273,6 +286,8 @@ func main() {
push,
builder.WithMaxCollectionSize(maxCollectionSize),
builder.WithExpiryBuffer(builderExpiryBuffer),
builder.WithMaxPayerTransactionRate(builderPayerRateLimit),
builder.WithUnlimitedPayers(unlimitedPayers...),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion integration/localnet/Makefile
@@ -1,4 +1,4 @@
COLLECTION = 1
COLLECTION = 3
CONSENSUS = 3
EXECUTION = 1
VERIFICATION = 1
Expand Down
2 changes: 1 addition & 1 deletion integration/localnet/bootstrap.go
Expand Up @@ -24,7 +24,7 @@ const (
DockerComposeFile = "./docker-compose.nodes.yml"
DockerComposeFileVersion = "3.7"
PrometheusTargetsFile = "./targets.nodes.json"
DefaultCollectionCount = 1
DefaultCollectionCount = 3
DefaultConsensusCount = 3
DefaultExecutionCount = 1
DefaultVerificationCount = 1
Expand Down
84 changes: 43 additions & 41 deletions module/builder/collection/builder.go
Expand Up @@ -25,28 +25,13 @@ import (
// HotStuff event loop is the only consumer of this interface and is single
// threaded, this is OK.
type Builder struct {
db *badger.DB
mainHeaders storage.Headers
clusterHeaders storage.Headers
payloads storage.ClusterPayloads
transactions mempool.Transactions
tracer module.Tracer
maxCollectionSize uint
expiryBuffer uint
}

type Opt func(*Builder)

func WithMaxCollectionSize(size uint) Opt {
return func(b *Builder) {
b.maxCollectionSize = size
}
}

func WithExpiryBuffer(buf uint) Opt {
return func(b *Builder) {
b.expiryBuffer = buf
}
db *badger.DB
mainHeaders storage.Headers
clusterHeaders storage.Headers
payloads storage.ClusterPayloads
transactions mempool.Transactions
tracer module.Tracer
config Config
}

func NewBuilder(
Expand All @@ -60,19 +45,19 @@ func NewBuilder(
) *Builder {

b := Builder{
db: db,
tracer: tracer,
mainHeaders: mainHeaders,
clusterHeaders: clusterHeaders,
payloads: payloads,
transactions: transactions,
maxCollectionSize: 100,
expiryBuffer: 15,
db: db,
tracer: tracer,
mainHeaders: mainHeaders,
clusterHeaders: clusterHeaders,
payloads: payloads,
transactions: transactions,
config: DefaultConfig(),
}

for _, apply := range opts {
apply(&b)
apply(&b.config)
}

return &b
}

Expand Down Expand Up @@ -127,9 +112,19 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
b.tracer.StartSpan(parentID, trace.COLBuildOnUnfinalizedLookup)
defer b.tracer.FinishSpan(parentID, trace.COLBuildOnUnfinalizedLookup)

// RATE LIMITING: the builder module can be configured to limit the
// rate at which transactions with a common payer are included in
// blocks. Depending on the configured limit, we either allow 1
// transaction every N sequential collections, or we allow K transactions
// per collection.

// keep track of transactions in the ancestry to avoid duplicates
lookup := newTransactionLookup()
// keep track of transactions to enforce rate limiting
limiter := newRateLimiter(b.config, parent.Height+1)

// look up previously included transactions in UN-FINALIZED ancestors
ancestorID := parentID
unfinalizedLookup := make(map[flow.Identifier]struct{})
clusterFinalID := clusterFinal.ID()
for ancestorID != clusterFinalID {
ancestor, err := b.clusterHeaders.ByBlockID(ancestorID)
Expand All @@ -148,7 +143,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

collection := payload.Collection
for _, tx := range collection.Transactions {
unfinalizedLookup[tx.ID()] = struct{}{}
lookup.addUnfinalizedAncestor(tx.ID())
limiter.addAncestor(ancestor.Height, tx)
}
ancestorID = ancestor.ParentID
}
Expand All @@ -166,7 +162,6 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
}

// look up previously included transactions in FINALIZED ancestors
finalizedLookup := make(map[flow.Identifier]struct{})
ancestorID = clusterFinal.ID()
ancestorHeight := clusterFinal.Height
for ancestorHeight > limit {
Expand All @@ -181,7 +176,8 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

collection := payload.Collection
for _, tx := range collection.Transactions {
finalizedLookup[tx.ID()] = struct{}{}
lookup.addFinalizedAncestor(tx.ID())
limiter.addAncestor(ancestor.Height, tx)
}

ancestorID = ancestor.ParentID
Expand All @@ -202,7 +198,7 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er
for _, tx := range b.transactions.All() {

// if we have reached maximum number of transactions, stop
if uint(len(transactions)) >= b.maxCollectionSize {
if uint(len(transactions)) >= b.config.MaxCollectionSize {
break
}

Expand All @@ -222,32 +218,38 @@ func (b *Builder) BuildOn(parentID flow.Identifier, setter func(*flow.Header) er

// ensure the reference block is not too old
txID := tx.ID()
if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.expiryBuffer) {
if refChainFinalizedHeight-refHeader.Height > uint64(flow.DefaultTransactionExpiry-b.config.ExpiryBuffer) {
// the transaction is expired, it will never be valid
b.transactions.Rem(txID)
continue
}

// check that the transaction was not already used in un-finalized history
_, duplicated := unfinalizedLookup[txID]
if duplicated {
if lookup.isUnfinalizedAncestor(txID) {
continue
}

// check that the transaction was not already included in finalized history.
_, duplicated = finalizedLookup[txID]
if duplicated {
if lookup.isFinalizedAncestor(txID) {
// remove from mempool, conflicts with finalized block will never be valid
b.transactions.Rem(txID)
continue
}

// enforce rate limiting rules
if limiter.shouldRateLimit(tx) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing rate limiting a tx would be an exception vs a norm and if that is the case then should we log.debug the tx and payer we rate limited to figure out who is abusing the system from logs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could add a metric later "rate_limited_tx".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I won't have time before the spork we're trying to get this in for, but I'll add a note to the other issue.

continue
}

// ensure we find the lowest reference block height
if refHeader.Height < minRefHeight {
minRefHeight = refHeader.Height
minRefID = tx.ReferenceBlockID
}

// update per-payer transaction count
limiter.transactionIncluded(tx)

transactions = append(transactions, tx)
}

Expand Down