Skip to content

Commit

Permalink
mempool: Keep cache hashmap and linked list in sync (#2188)
Browse files Browse the repository at this point in the history
* mempool: Keep cache hashmap and linked list in sync

This removes bugs with the linked list being full, but hashmap empty

* address PR comments

* switch clist back to list
  • Loading branch information
ValarDragon authored and melekes committed Aug 13, 2018
1 parent 9c6fdad commit 8a1a792
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Expand Up @@ -18,3 +18,5 @@ IMPROVEMENTS:
corrupted WAL files and compose test WAL files (@bradyjoestar)

BUG FIXES:
- [mempool] No longer possible to fill up linked list without getting caching
benefits [#2180](https://github.com/tendermint/tendermint/issues/2180)
22 changes: 14 additions & 8 deletions mempool/mempool.go
Expand Up @@ -488,7 +488,7 @@ type txCache interface {
type mapTxCache struct {
mtx sync.Mutex
size int
map_ map[string]struct{}
map_ map[string]*list.Element
list *list.List // to remove oldest tx when cache gets too big
}

Expand All @@ -498,15 +498,15 @@ var _ txCache = (*mapTxCache)(nil)
func newMapTxCache(cacheSize int) *mapTxCache {
return &mapTxCache{
size: cacheSize,
map_: make(map[string]struct{}, cacheSize),
map_: make(map[string]*list.Element, cacheSize),
list: list.New(),
}
}

// Reset resets the cache to an empty state.
func (cache *mapTxCache) Reset() {
cache.mtx.Lock()
cache.map_ = make(map[string]struct{}, cache.size)
cache.map_ = make(map[string]*list.Element, cache.size)
cache.list.Init()
cache.mtx.Unlock()
}
Expand All @@ -524,20 +524,26 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
if cache.list.Len() >= cache.size {
popped := cache.list.Front()
poppedTx := popped.Value.(types.Tx)
// NOTE: the tx may have already been removed from the map
// but deleting a non-existent element is fine
delete(cache.map_, string(poppedTx))
cache.list.Remove(popped)
if popped != nil {
cache.list.Remove(popped)
}
}
cache.map_[string(tx)] = struct{}{}
cache.list.PushBack(tx)
cache.map_[string(tx)] = cache.list.Back()
return true
}

// Remove removes the given tx from the cache.
func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock()
delete(cache.map_, string(tx))
stx := string(tx)
popped := cache.map_[stx]
delete(cache.map_, stx)
if popped != nil {
cache.list.Remove(popped)
}

cache.mtx.Unlock()
}

Expand Down
51 changes: 51 additions & 0 deletions mempool/mempool_test.go
Expand Up @@ -223,6 +223,28 @@ func TestSerialReap(t *testing.T) {
reapCheck(600)
}

func TestCacheRemove(t *testing.T) {
cache := newMapTxCache(100)
numTxs := 10
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256
txBytes := make([]byte, 32)
rand.Read(txBytes)
txs[i] = txBytes
cache.Push(txBytes)
// make sure its added to both the linked list and the map
require.Equal(t, i+1, len(cache.map_))
require.Equal(t, i+1, cache.list.Len())
}
for i := 0; i < numTxs; i++ {
cache.Remove(txs[i])
// make sure its removed from both the map and the linked list
require.Equal(t, numTxs-(i+1), len(cache.map_))
require.Equal(t, numTxs-(i+1), cache.list.Len())
}
}

func TestMempoolCloseWAL(t *testing.T) {
// 1. Create the temporary directory for mempool and WAL testing.
rootDir, err := ioutil.TempDir("", "mempool-test")
Expand Down Expand Up @@ -272,6 +294,35 @@ func TestMempoolCloseWAL(t *testing.T) {
require.Equal(t, 1, len(m3), "expecting the wal match in")
}

func BenchmarkCacheInsertTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
txs[i] = make([]byte, 8)
binary.BigEndian.PutUint64(txs[i], uint64(i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Push(txs[i])
}
}

// This benchmark is probably skewed, since we actually will be removing
// txs in parallel, which may cause some overhead due to mutex locking.
func BenchmarkCacheRemoveTime(b *testing.B) {
cache := newMapTxCache(b.N)
txs := make([][]byte, b.N)
for i := 0; i < b.N; i++ {
txs[i] = make([]byte, 8)
binary.BigEndian.PutUint64(txs[i], uint64(i))
cache.Push(txs[i])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
cache.Remove(txs[i])
}
}

func checksumIt(data []byte) string {
h := md5.New()
h.Write(data)
Expand Down

0 comments on commit 8a1a792

Please sign in to comment.