Skip to content

Commit

Permalink
Merge pull request #1716 from ElrondNetwork/pool++limits
Browse files Browse the repository at this point in the history
Add limits for number of transactions by sender
  • Loading branch information
LucianMincu committed May 15, 2020
2 parents b2ebad4 + 4dcb960 commit 2f23347
Show file tree
Hide file tree
Showing 41 changed files with 955 additions and 455 deletions.
2 changes: 2 additions & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@

[TxDataPool]
Size = 900000
SizePerSender = 1000
SizeInBytes = 524288000
SizeInBytesPerSender = 614400
Type = "TxCache"
Shards = 16

Expand Down
10 changes: 6 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package config

// CacheConfig will map the json cache configuration
type CacheConfig struct {
Type string `json:"type"`
Size uint32 `json:"size"`
SizeInBytes uint32 `json:"sizeInBytes"`
Shards uint32 `json:"shards"`
Type string `json:"type"`
Size uint32 `json:"size"`
SizePerSender uint32 `json:"sizePerSender"`
SizeInBytes uint32 `json:"sizeInBytes"`
SizeInBytesPerSender uint32 `json:"sizeInBytesPerSender"`
Shards uint32 `json:"shards"`
}

//HeadersPoolConfig will map the headers cache configuration
Expand Down
11 changes: 0 additions & 11 deletions dataRetriever/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,3 @@ package dataRetriever

// TxPoolNumSendersToEvictInOneStep instructs tx pool eviction algorithm to remove this many senders when eviction takes place
const TxPoolNumSendersToEvictInOneStep = uint32(100)

// TxPoolLargeNumOfTxsForASender instructs tx pool eviction algorithm to tag a sender with more transactions than this value
// as a "sender with a large number of transactions"
const TxPoolLargeNumOfTxsForASender = uint32(500)

// TxPoolNumTxsToEvictFromASender instructs tx pool eviction algorithm to remove this many transactions
// for "a sender with a large number of transactions" when eviction takes place
const TxPoolNumTxsToEvictFromASender = uint32(100)

// TxPoolMinSizeInBytes is the lower limit of the tx cache / eviction parameter "sizeInBytes"
const TxPoolMinSizeInBytes = uint32(40960)
8 changes: 4 additions & 4 deletions dataRetriever/factory/txpool/txPoolFactory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ import (

func Test_CreateNewTxPool_ShardedData(t *testing.T) {
config := storageUnit.CacheConfig{Type: storageUnit.FIFOShardedCache, Size: 100, SizeInBytes: 40960, Shards: 1}
args := txpool.ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 1}
args := txpool.ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 1}

txPool, err := CreateTxPool(args)
require.Nil(t, err)
require.NotNil(t, txPool)

config = storageUnit.CacheConfig{Type: storageUnit.LRUCache, Size: 100, SizeInBytes: 40960, Shards: 1}
args = txpool.ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 1}
args = txpool.ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 1}
txPool, err = CreateTxPool(args)
require.Nil(t, err)
require.NotNil(t, txPool)
}

func Test_CreateNewTxPool_ShardedTxPool(t *testing.T) {
config := storageUnit.CacheConfig{Size: 100, SizeInBytes: 40960, Shards: 1}
args := txpool.ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 1}
config := storageUnit.CacheConfig{Size: 100, SizePerSender: 1, SizeInBytes: 40960, SizeInBytesPerSender: 40960, Shards: 1}
args := txpool.ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 1}

txPool, err := CreateTxPool(args)
require.Nil(t, err)
Expand Down
26 changes: 16 additions & 10 deletions dataRetriever/txpool/argShardedTxPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,26 @@ type ArgShardedTxPool struct {
func (args *ArgShardedTxPool) verify() error {
config := args.Config

if config.SizeInBytes < dataRetriever.TxPoolMinSizeInBytes {
return fmt.Errorf("%w: config.SizeInBytes is less than [dataRetriever.TxPoolMinSizeInBytes]", dataRetriever.ErrCacheConfigInvalidSizeInBytes)
if config.SizeInBytes == 0 {
return fmt.Errorf("%w: config.SizeInBytes is not valid", dataRetriever.ErrCacheConfigInvalidSizeInBytes)
}
if config.Size < 1 {
return fmt.Errorf("%w: config.Size is less than 1", dataRetriever.ErrCacheConfigInvalidSize)
if config.SizeInBytesPerSender == 0 {
return fmt.Errorf("%w: config.SizeInBytesPerSender is not valid", dataRetriever.ErrCacheConfigInvalidSizeInBytes)
}
if config.Shards < 1 {
return fmt.Errorf("%w: config.Shards (map chunks) is less than 1", dataRetriever.ErrCacheConfigInvalidShards)
if config.Size == 0 {
return fmt.Errorf("%w: config.Size is not valid", dataRetriever.ErrCacheConfigInvalidSize)
}
if args.MinGasPrice < 1 {
return fmt.Errorf("%w: MinGasPrice is less than 1", dataRetriever.ErrCacheConfigInvalidEconomics)
if config.SizePerSender == 0 {
return fmt.Errorf("%w: config.SizePerSender is not valid", dataRetriever.ErrCacheConfigInvalidSize)
}
if args.NumberOfShards < 1 {
return fmt.Errorf("%w: NumberOfShards is less than 1", dataRetriever.ErrCacheConfigInvalidSharding)
if config.Shards == 0 {
return fmt.Errorf("%w: config.Shards (map chunks) is not valid", dataRetriever.ErrCacheConfigInvalidShards)
}
if args.MinGasPrice == 0 {
return fmt.Errorf("%w: MinGasPrice is not valid", dataRetriever.ErrCacheConfigInvalidEconomics)
}
if args.NumberOfShards == 0 {
return fmt.Errorf("%w: NumberOfShards is not valid", dataRetriever.ErrCacheConfigInvalidSharding)
}

return nil
Expand Down
16 changes: 16 additions & 0 deletions dataRetriever/txpool/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package txpool

import (
"github.com/ElrondNetwork/elrond-go/storage"
"github.com/ElrondNetwork/elrond-go/storage/txcache"
)

type txCache interface {
storage.Cacher

AddTx(tx *txcache.WrappedTransaction) (ok bool, added bool)
GetByTxHash(txHash []byte) (*txcache.WrappedTransaction, bool)
RemoveTxByHash(txHash []byte) error
CountTx() int64
ForEachTransaction(function txcache.ForEachTransaction)
}
31 changes: 21 additions & 10 deletions dataRetriever/txpool/shardedTxPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"strconv"
"sync"

"github.com/ElrondNetwork/elrond-go-logger"
logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/counting"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
Expand All @@ -31,31 +31,31 @@ type shardedTxPool struct {

type txPoolShard struct {
CacheID string
Cache *txcache.TxCache
Cache txCache
}

// NewShardedTxPool creates a new sharded tx pool
// Implements "dataRetriever.TxPool"
func NewShardedTxPool(args ArgShardedTxPool) (dataRetriever.ShardedDataCacherNotifier, error) {
log.Trace("NewShardedTxPool", "args", args)
log.Info("NewShardedTxPool", "args", args)

err := args.verify()
if err != nil {
return nil, err
}

const oneTrilion = 1000000 * 1000000
const oneBillion = 1000000 * 1000
numCaches := 2*args.NumberOfShards - 1

cacheConfigPrototype := txcache.CacheConfig{
NumChunksHint: args.Config.Shards,
EvictionEnabled: true,
NumBytesThreshold: args.Config.SizeInBytes / numCaches,
NumBytesPerSenderThreshold: args.Config.SizeInBytesPerSender,
CountThreshold: args.Config.Size / numCaches,
CountPerSenderThreshold: args.Config.SizePerSender,
NumSendersToEvictInOneStep: dataRetriever.TxPoolNumSendersToEvictInOneStep,
LargeNumOfTxsForASender: dataRetriever.TxPoolLargeNumOfTxsForASender,
NumTxsToEvictFromASender: dataRetriever.TxPoolNumTxsToEvictFromASender,
MinGasPriceMicroErd: uint32(args.MinGasPrice / oneTrilion),
MinGasPriceNanoErd: uint32(args.MinGasPrice / oneBillion),
}

cacheConfigPrototypeForSelfShard := cacheConfigPrototype
Expand All @@ -82,7 +82,7 @@ func (txPool *shardedTxPool) ShardDataStore(cacheID string) storage.Cacher {
}

// getTxCache returns the requested cache
func (txPool *shardedTxPool) getTxCache(cacheID string) *txcache.TxCache {
func (txPool *shardedTxPool) getTxCache(cacheID string) txCache {
shard := txPool.getOrCreateShard(cacheID)
return shard.Cache
}
Expand All @@ -108,8 +108,7 @@ func (txPool *shardedTxPool) createShard(cacheID string) *txPoolShard {

shard, ok := txPool.backingMap[cacheID]
if !ok {
cacheConfig := txPool.getCacheConfig(cacheID)
cache := txcache.NewTxCache(cacheConfig)
cache := txPool.createTxCache(cacheID)
shard = &txPoolShard{
CacheID: cacheID,
Cache: cache,
Expand All @@ -121,6 +120,17 @@ func (txPool *shardedTxPool) createShard(cacheID string) *txPoolShard {
return shard
}

func (txPool *shardedTxPool) createTxCache(cacheID string) txCache {
cacheConfig := txPool.getCacheConfig(cacheID)
cache, err := txcache.NewTxCache(cacheConfig)
if err != nil {
log.Error("shardedTxPool.createTxCache()", "err", err)
return txcache.NewDisabledCache()
}

return cache
}

func (txPool *shardedTxPool) getCacheConfig(cacheID string) txcache.CacheConfig {
var cacheConfig txcache.CacheConfig

Expand All @@ -144,6 +154,7 @@ func (txPool *shardedTxPool) AddData(key []byte, value interface{}, cacheID stri

sourceShardID, destinationShardID, err := process.ParseShardCacherIdentifier(cacheID)
if err != nil {
log.Error("shardedTxPool.AddData()", "err", err)
return
}

Expand Down
48 changes: 31 additions & 17 deletions dataRetriever/txpool/shardedTxPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,43 @@ func Test_NewShardedTxPool(t *testing.T) {
}

func Test_NewShardedTxPool_WhenBadConfig(t *testing.T) {
goodArgs := ArgShardedTxPool{Config: storageUnit.CacheConfig{Size: 100, SizeInBytes: 40960, Shards: 16}, MinGasPrice: 100000000000000, NumberOfShards: 1}
goodArgs := ArgShardedTxPool{Config: storageUnit.CacheConfig{Size: 100, SizePerSender: 10, SizeInBytes: 409600, SizeInBytesPerSender: 40960, Shards: 16}, MinGasPrice: 200000000000, NumberOfShards: 1}

args := goodArgs
args.Config = storageUnit.CacheConfig{SizeInBytes: 1}
args.Config.SizeInBytes = 0
pool, err := NewShardedTxPool(args)
require.Nil(t, pool)
require.NotNil(t, err)
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidSizeInBytes.Error())

args = goodArgs
args.Config = storageUnit.CacheConfig{SizeInBytes: 40960, Size: 1}
args.Config.SizeInBytesPerSender = 0
pool, err = NewShardedTxPool(args)
require.Nil(t, pool)
require.NotNil(t, err)
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidShards.Error())
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidSizeInBytes.Error())

args = goodArgs
args.Config.Size = 0
pool, err = NewShardedTxPool(args)
require.Nil(t, pool)
require.NotNil(t, err)
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidSize.Error())

args = goodArgs
args.Config = storageUnit.CacheConfig{SizeInBytes: 40960, Shards: 1}
args.Config.SizePerSender = 0
pool, err = NewShardedTxPool(args)
require.Nil(t, pool)
require.NotNil(t, err)
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidSize.Error())

args = goodArgs
args.Config.Shards = 0
pool, err = NewShardedTxPool(args)
require.Nil(t, pool)
require.NotNil(t, err)
require.Errorf(t, err, dataRetriever.ErrCacheConfigInvalidShards.Error())

args = goodArgs
args.MinGasPrice = 0
pool, err = NewShardedTxPool(args)
Expand All @@ -63,8 +77,8 @@ func Test_NewShardedTxPool_WhenBadConfig(t *testing.T) {
}

func Test_NewShardedTxPool_ComputesCacheConfig(t *testing.T) {
config := storageUnit.CacheConfig{SizeInBytes: 524288000, Size: 900000, Shards: 1}
args := ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 5}
config := storageUnit.CacheConfig{SizeInBytes: 524288000, SizeInBytesPerSender: 614400, Size: 900000, SizePerSender: 1000, Shards: 1}
args := ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 5}

poolAsInterface, err := NewShardedTxPool(args)
require.Nil(t, err)
Expand All @@ -73,11 +87,11 @@ func Test_NewShardedTxPool_ComputesCacheConfig(t *testing.T) {

require.Equal(t, true, pool.cacheConfigPrototype.EvictionEnabled)
require.Equal(t, uint32(58254222), pool.cacheConfigPrototype.NumBytesThreshold)
require.Equal(t, uint32(614400), pool.cacheConfigPrototype.NumBytesPerSenderThreshold)
require.Equal(t, uint32(100000), pool.cacheConfigPrototype.CountThreshold)
require.Equal(t, uint32(1000), pool.cacheConfigPrototype.CountPerSenderThreshold)
require.Equal(t, uint32(100), pool.cacheConfigPrototype.NumSendersToEvictInOneStep)
require.Equal(t, uint32(500), pool.cacheConfigPrototype.LargeNumOfTxsForASender)
require.Equal(t, uint32(100), pool.cacheConfigPrototype.NumTxsToEvictFromASender)
require.Equal(t, uint32(100), pool.cacheConfigPrototype.MinGasPriceMicroErd)
require.Equal(t, uint32(200), pool.cacheConfigPrototype.MinGasPriceNanoErd)
require.Equal(t, uint32(291271110), pool.cacheConfigPrototypeForSelfShard.NumBytesThreshold)
require.Equal(t, uint32(500000), pool.cacheConfigPrototypeForSelfShard.CountThreshold)
}
Expand Down Expand Up @@ -161,7 +175,7 @@ func Test_AddData_CallsOnAddedHandlers(t *testing.T) {

// Second addition is ignored (txhash-based deduplication)
pool.AddData([]byte("hash-1"), createTx("alice", 42), "1")
pool.AddData([]byte("hash-1"), createTx("whatever", 43), "1")
pool.AddData([]byte("hash-1"), createTx("alice", 42), "1")

waitABit()
require.Equal(t, uint32(1), atomic.LoadUint32(&numAdded))
Expand Down Expand Up @@ -304,8 +318,8 @@ func Test_NotImplementedFunctions(t *testing.T) {
}

func Test_routeToCacheUnions(t *testing.T) {
config := storageUnit.CacheConfig{Size: 100, SizeInBytes: 40960, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 4, SelfShardID: 42}
config := storageUnit.CacheConfig{Size: 100, SizePerSender: 10, SizeInBytes: 409600, SizeInBytesPerSender: 40960, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 4, SelfShardID: 42}
poolAsInterface, _ := NewShardedTxPool(args)
pool := poolAsInterface.(*shardedTxPool)

Expand All @@ -319,8 +333,8 @@ func Test_routeToCacheUnions(t *testing.T) {
}

func Test_getCacheConfig(t *testing.T) {
config := storageUnit.CacheConfig{Size: 150, SizeInBytes: 61440, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 8, SelfShardID: 4}
config := storageUnit.CacheConfig{Size: 150, SizePerSender: 1, SizeInBytes: 61440, SizeInBytesPerSender: 40960, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 8, SelfShardID: 4}
poolAsInterface, _ := NewShardedTxPool(args)
pool := poolAsInterface.(*shardedTxPool)

Expand Down Expand Up @@ -353,7 +367,7 @@ type thisIsNotATransaction struct {
}

func newTxPoolToTest() (dataRetriever.ShardedDataCacherNotifier, error) {
config := storageUnit.CacheConfig{Size: 100, SizeInBytes: 40960, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 100000000000000, NumberOfShards: 4}
config := storageUnit.CacheConfig{Size: 100, SizePerSender: 10, SizeInBytes: 409600, SizeInBytesPerSender: 40960, Shards: 16}
args := ArgShardedTxPool{Config: config, MinGasPrice: 200000000000, NumberOfShards: 4}
return NewShardedTxPool(args)
}
10 changes: 6 additions & 4 deletions epochStart/metachain/epochStartData_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ func createTxPool(selfShardID uint32) (dataRetriever.ShardedDataCacherNotifier,
return txpool.NewShardedTxPool(
txpool.ArgShardedTxPool{
Config: storageUnit.CacheConfig{
Size: 100000,
SizeInBytes: 1000000000,
Shards: 16,
Size: 100000,
SizePerSender: 1000,
SizeInBytes: 1000000000,
SizeInBytesPerSender: 10000000,
Shards: 16,
},
MinGasPrice: 100000000000000,
MinGasPrice: 200000000000,
NumberOfShards: 1,
SelfShardID: selfShardID,
},
Expand Down
10 changes: 6 additions & 4 deletions genesis/mock/poolsHolderMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func NewPoolsHolderMock() *PoolsHolderMock {
phf.transactions, _ = txpool.NewShardedTxPool(
txpool.ArgShardedTxPool{
Config: storageUnit.CacheConfig{
Size: 10000,
SizeInBytes: 1000000000,
Shards: 16,
Size: 100000,
SizePerSender: 1000,
SizeInBytes: 1000000000,
SizeInBytesPerSender: 10000000,
Shards: 16,
},
MinGasPrice: 100000000000000,
MinGasPrice: 200000000000,
NumberOfShards: 1,
},
)
Expand Down
10 changes: 6 additions & 4 deletions integrationTests/consensus/testInitializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,13 @@ func createTestShardDataPool() dataRetriever.PoolsHolder {
txPool, _ := txpool.NewShardedTxPool(
txpool.ArgShardedTxPool{
Config: storageUnit.CacheConfig{
Size: 100000,
SizeInBytes: 1000000000,
Shards: 1,
Size: 100000,
SizePerSender: 1000,
SizeInBytes: 1000000000,
SizeInBytesPerSender: 10000000,
Shards: 16,
},
MinGasPrice: 100000000000000,
MinGasPrice: 200000000000,
NumberOfShards: 1,
},
)
Expand Down
Loading

0 comments on commit 2f23347

Please sign in to comment.