Skip to content

Commit

Permalink
bound mempool memory usage
Browse files Browse the repository at this point in the history
Closes #3079
  • Loading branch information
melekes committed Feb 5, 2019
1 parent 1809efa commit 8dfa464
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Special thanks to external contributors on this release:
* P2P Protocol

### FEATURES:
- [mempool] \#3079 bound mempool memory usage (`mempool.max_bytes` is set to 1GB by default; see config.toml)

### IMPROVEMENTS:
- [tools] add go-deadlock tool to help detect deadlocks
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ type MempoolConfig struct {
WalPath string `mapstructure:"wal_dir"`
Size int `mapstructure:"size"`
CacheSize int `mapstructure:"cache_size"`
// Maximum size of all txs in the mempool in bytes
MaxBytes int `mapstructure:"max_bytes"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
Expand All @@ -548,6 +550,7 @@ func DefaultMempoolConfig() *MempoolConfig {
// ABCI Recheck
Size: 5000,
CacheSize: 10000,
MaxBytes: 1000000000, // 1GB
}
}

Expand Down Expand Up @@ -577,6 +580,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.CacheSize < 0 {
return errors.New("cache_size can't be negative")
}
if cfg.MaxBytes <= 0 {
return errors.New("max_bytes must be a positive number")
}
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ size = {{ .Mempool.Size }}
# size of the cache (used to filter transactions we saw earlier)
cache_size = {{ .Mempool.CacheSize }}
# maximum size of all txs in the mempool in bytes
max_bytes = {{ .Mempool.MaxBytes }}
##### consensus configuration options #####
[consensus]
Expand Down
3 changes: 3 additions & 0 deletions docs/tendermint-core/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ size = 5000
# size of the cache (used to filter transactions we saw earlier)
cache_size = 10000
# maximum size of all txs in the mempool in bytes
max_bytes = 1000000000
##### consensus configuration options #####
[consensus]
Expand Down
17 changes: 15 additions & 2 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ type Mempool struct {
preCheck PreCheckFunc
postCheck PostCheckFunc

sizeBytes int64 // size of all txs in the mempool in bytes

// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
cache txCache
Expand Down Expand Up @@ -265,8 +267,13 @@ func (mem *Mempool) Size() int {
return mem.txs.Len()
}

// Flushes the mempool connection to ensure async resCb calls are done e.g.
// from CheckTx.
// SizeBytes returns the size of all txs in the mempool in bytes.
func (mem *Mempool) SizeBytes() int64 {
return atomic.LoadInt64(&mem.sizeBytes)
}

// FlushAppConn flushes the mempool connection to ensure async resCb calls are
// done e.g. from CheckTx.
func (mem *Mempool) FlushAppConn() error {
return mem.proxyAppConn.FlushSync()
}
Expand All @@ -280,6 +287,7 @@ func (mem *Mempool) Flush() {

for e := mem.txs.Front(); e != nil; e = e.Next() {
mem.txs.Remove(e)
_ = atomic.SwapInt64(&mem.sizeBytes, 0)
e.DetachPrev()
}
}
Expand Down Expand Up @@ -310,6 +318,8 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {

if mem.Size() >= mem.config.Size {
return ErrMempoolIsFull
} else if int64(len(tx))+mem.SizeBytes() > int64(mem.config.MaxBytes) {
return ErrMempoolIsFull
}

// The size of the corresponding amino-encoded TxMessage
Expand Down Expand Up @@ -383,6 +393,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) {
tx: tx,
}
mem.txs.PushBack(memTx)
atomic.AddInt64(&mem.sizeBytes, int64(len(tx)))
mem.logger.Info("Added good transaction",
"tx", TxID(tx),
"res", r,
Expand Down Expand Up @@ -424,6 +435,7 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) {
// Tx became invalidated due to newly committed block.
mem.logger.Info("Tx is no longer valid", "tx", TxID(tx), "res", r, "err", postCheckErr)
mem.txs.Remove(mem.recheckCursor)
atomic.AddInt64(&mem.sizeBytes, int64(-len(tx)))
mem.recheckCursor.DetachPrev()

// remove from cache (it might be good later)
Expand Down Expand Up @@ -597,6 +609,7 @@ func (mem *Mempool) removeTxs(txs types.Txs) []types.Tx {
if _, ok := txsMap[string(memTx.tx)]; ok {
// remove from clist
mem.txs.Remove(e)
atomic.AddInt64(&mem.sizeBytes, int64(-len(memTx.tx)))
e.DetachPrev()

// NOTE: we don't remove committed txs from the cache.
Expand Down
22 changes: 22 additions & 0 deletions mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,28 @@ func TestMempoolMaxMsgSize(t *testing.T) {

}

func TestMempoolSizeBytes(t *testing.T) {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool := newMempoolWithApp(cc)

assert.EqualValues(t, 0, mempool.SizeBytes())

err := mempool.CheckTx([]byte{0x01}, nil)
require.NoError(t, err)
assert.EqualValues(t, 1, mempool.SizeBytes())

mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil)
assert.EqualValues(t, 0, mempool.SizeBytes())

err = mempool.CheckTx([]byte{0x02, 0x03}, nil)
require.NoError(t, err)
assert.EqualValues(t, 2, mempool.SizeBytes())

mempool.Flush()
assert.EqualValues(t, 0, mempool.SizeBytes())
}

func checksumIt(data []byte) string {
h := md5.New()
h.Write(data)
Expand Down

0 comments on commit 8dfa464

Please sign in to comment.