Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resize cross-tx caches #2018

Merged
merged 12 commits into from
Jun 27, 2020
6 changes: 4 additions & 2 deletions dataRetriever/txpool/shardedTxPool.go
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/counting"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
Expand Down Expand Up @@ -59,8 +60,9 @@ func NewShardedTxPool(args ArgShardedTxPool) (*shardedTxPool, error) {
MinGasPriceNanoErd: uint32(args.MinGasPrice / oneBillion),
}

// NumberOfShards - 1 (for self shard) + 1 (for metachain)
numCrossTxCaches := args.NumberOfShards
// We do not reserve cross tx cache capacity for [metachain] -> [me] (no transactions), [me] -> me (already reserved above).
// Note: in the case of a 1-shard network, a reservation is made - though not actually needed.
numCrossTxCaches := core.MaxUint32(1, args.NumberOfShards-1)

configPrototypeDestinationMe := txcache.ConfigDestinationMe{
NumChunks: args.Config.Shards,
Expand Down
4 changes: 2 additions & 2 deletions dataRetriever/txpool/shardedTxPool_test.go
Expand Up @@ -101,8 +101,8 @@ func Test_NewShardedTxPool_ComputesCacheConfig(t *testing.T) {
require.Equal(t, 200, int(pool.configPrototypeSourceMe.MinGasPriceNanoErd))
require.Equal(t, 300000, int(pool.configPrototypeSourceMe.CountThreshold))

require.Equal(t, 150000, int(pool.configPrototypeDestinationMe.MaxNumItems))
require.Equal(t, 104857600, int(pool.configPrototypeDestinationMe.MaxNumBytes))
require.Equal(t, 300000, int(pool.configPrototypeDestinationMe.MaxNumItems))
require.Equal(t, 209715200, int(pool.configPrototypeDestinationMe.MaxNumBytes))
}

func Test_ShardDataStore_Or_GetTxCache(t *testing.T) {
Expand Down
27 changes: 24 additions & 3 deletions storage/immunitycache/cache.go
Expand Up @@ -4,18 +4,22 @@ import (
"sync"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/atomic"
"github.com/ElrondNetwork/elrond-go/storage"
)

var _ storage.Cacher = (*ImmunityCache)(nil)

var log = logger.GetOrCreate("storage/immunitycache")

const hospitalityWarnThreshold = -10000

// ImmunityCache is a cache-like structure
type ImmunityCache struct {
config CacheConfig
chunks []*immunityChunk
mutex sync.RWMutex
config CacheConfig
chunks []*immunityChunk
hospitality atomic.Counter
mutex sync.RWMutex
}

// NewImmunityCache creates a new cache
Expand Down Expand Up @@ -137,6 +141,14 @@ func (ic *ImmunityCache) HasOrAdd(key []byte, value interface{}, sizeInBytes int
cacheItem := newCacheItem(value, string(key), sizeInBytes)
chunk := ic.getChunkByKeyWithLock(string(key))
has, added = chunk.AddItem(cacheItem)
if !has {
if added {
ic.hospitality.Increment()
} else {
ic.hospitality.Decrement()
}
}

return has, added
}

Expand Down Expand Up @@ -247,12 +259,21 @@ func (ic *ImmunityCache) Diagnose(_ bool) {
count := ic.Count()
countImmune := ic.CountImmune()
numBytes := ic.NumBytes()
hospitality := ic.hospitality.Get()

log.Debug("ImmunityCache.Diagnose()",
"name", ic.config.Name,
"count", count,
"countImmune", countImmune,
"numBytes", numBytes,
"hospitality", hospitality,
)

if hospitality <= hospitalityWarnThreshold {
// After emitting a Warn, we reset the hospitality indicator
log.Warn("ImmunityCache.Diagnose()", "cache is not hospitable", "hospitality", hospitality)
ic.hospitality.Reset()
}
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
26 changes: 26 additions & 0 deletions storage/immunitycache/cache_test.go
Expand Up @@ -84,6 +84,7 @@ func TestImmunityCache_AddThenRemove(t *testing.T) {
_, _ = cache.HasOrAdd([]byte("c"), "foo-c", 0)
_ = cache.Put([]byte("d"), "foo-d", 0) // Same as HasOrAdd()
require.Equal(t, 4, cache.Len())
require.Equal(t, 4, int(cache.hospitality.Get()))
require.True(t, cache.Has([]byte("a")))
require.True(t, cache.Has([]byte("c")))

Expand Down Expand Up @@ -190,10 +191,12 @@ func TestImmunityCache_AddDoesNotWork_WhenFullWithImmune(t *testing.T) {
numNow, numFuture := cache.ImmunizeKeys(keysAsBytes([]string{"a", "b", "c", "d"}))
require.Equal(t, 4, numNow)
require.Equal(t, 0, numFuture)
require.Equal(t, 4, int(cache.hospitality.Get()))

_, added := cache.HasOrAdd([]byte("x"), "foo-x", 1)
require.False(t, added)
require.False(t, cache.Has([]byte("x")))
require.Equal(t, 3, int(cache.hospitality.Get()))
}

func TestImmunityCache_ForEachItem(t *testing.T) {
Expand Down Expand Up @@ -223,6 +226,29 @@ func TestImmunityCache_Fnv32Hash(t *testing.T) {
require.Equal(t, 3, int(fnv32Hash("d")%4))
}

func TestImmunityCache_DiagnoseResetsHospitalityAfterWarn(t *testing.T) {
cache := newCacheToTest(1, 4, 1000)
cache.addTestItems("a", "b", "c", "d")
_, _ = cache.ImmunizeKeys(keysAsBytes([]string{"a", "b", "c", "d"}))
require.Equal(t, 4, int(cache.hospitality.Get()))

cache.addTestItems("e", "f", "g", "h")
require.Equal(t, 0, int(cache.hospitality.Get()))

for i := -1; i > hospitalityWarnThreshold; i-- {
cache.addTestItems("foo")
require.Equal(t, i, int(cache.hospitality.Get()))
}

require.Equal(t, hospitalityWarnThreshold+1, int(cache.hospitality.Get()))
cache.Diagnose(false)
require.Equal(t, hospitalityWarnThreshold+1, int(cache.hospitality.Get()))
cache.addTestItems("foo")
require.Equal(t, hospitalityWarnThreshold, int(cache.hospitality.Get()))
cache.Diagnose(false)
require.Equal(t, 0, int(cache.hospitality.Get()))
}

func TestImmunityCache_ClearConcurrentWithRangeOverChunks(t *testing.T) {
cache := newCacheToTest(16, 4, 1000)
require.Equal(t, 16, len(cache.chunks))
Expand Down
9 changes: 1 addition & 8 deletions storage/immunitycache/chunk.go
Expand Up @@ -154,15 +154,8 @@ func (chunk *immunityChunk) removeNoLock(element *list.Element) {
}

func (chunk *immunityChunk) monitorEvictionNoLock(numRemoved int, err error) {
cacheName := chunk.config.cacheName

if err != nil {
log.Debug("immunityChunk.monitorEviction()", "name", cacheName, "numRemoved", numRemoved, "err", err)
return
}

if numRemoved > 0 {
log.Trace("immunityChunk.monitorEviction()", "name", cacheName, "numRemoved", numRemoved)
log.Trace("immunityChunk.monitorEviction()", "name", chunk.config.cacheName, "numRemoved", numRemoved, "err", err)
}
}

Expand Down