Skip to content

Commit

Permalink
Merge pull request #2018 from ElrondNetwork/pool-resize-crosstx
Browse files Browse the repository at this point in the history
Resize cross-tx caches
  • Loading branch information
LucianMincu committed Jun 27, 2020
2 parents cbcf485 + b83afbd commit 2f65e78
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 17 deletions.
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -563,8 +563,8 @@

[Health]
IntervalVerifyMemoryInSeconds = 1
IntervalDiagnoseComponentsInSeconds = 10
IntervalDiagnoseComponentsDeeplyInSeconds = 30
IntervalDiagnoseComponentsInSeconds = 30
IntervalDiagnoseComponentsDeeplyInSeconds = 120
MemoryUsageToCreateProfiles = 2147483648 # 2GB
NumMemoryUsageRecordsToKeep = 100
FolderPath = "health-records"
Expand Down
6 changes: 4 additions & 2 deletions dataRetriever/txpool/shardedTxPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/counting"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/dataRetriever"
Expand Down Expand Up @@ -59,8 +60,9 @@ func NewShardedTxPool(args ArgShardedTxPool) (*shardedTxPool, error) {
MinGasPriceNanoErd: uint32(args.MinGasPrice / oneBillion),
}

// NumberOfShards - 1 (for self shard) + 1 (for metachain)
numCrossTxCaches := args.NumberOfShards
// We do not reserve cross tx cache capacity for [metachain] -> [me] (no transactions), [me] -> me (already reserved above).
// Note: in the case of a 1-shard network, a reservation is made - though not actually needed.
numCrossTxCaches := core.MaxUint32(1, args.NumberOfShards-1)

configPrototypeDestinationMe := txcache.ConfigDestinationMe{
NumChunks: args.Config.Shards,
Expand Down
4 changes: 2 additions & 2 deletions dataRetriever/txpool/shardedTxPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func Test_NewShardedTxPool_ComputesCacheConfig(t *testing.T) {
require.Equal(t, 200, int(pool.configPrototypeSourceMe.MinGasPriceNanoErd))
require.Equal(t, 300000, int(pool.configPrototypeSourceMe.CountThreshold))

require.Equal(t, 150000, int(pool.configPrototypeDestinationMe.MaxNumItems))
require.Equal(t, 104857600, int(pool.configPrototypeDestinationMe.MaxNumBytes))
require.Equal(t, 300000, int(pool.configPrototypeDestinationMe.MaxNumItems))
require.Equal(t, 209715200, int(pool.configPrototypeDestinationMe.MaxNumBytes))
}

func Test_ShardDataStore_Or_GetTxCache(t *testing.T) {
Expand Down
33 changes: 30 additions & 3 deletions storage/immunitycache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import (
"sync"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/atomic"
"github.com/ElrondNetwork/elrond-go/storage"
)

var _ storage.Cacher = (*ImmunityCache)(nil)

var log = logger.GetOrCreate("storage/immunitycache")

const hospitalityWarnThreshold = -10000

// ImmunityCache is a cache-like structure
type ImmunityCache struct {
config CacheConfig
chunks []*immunityChunk
mutex sync.RWMutex
config CacheConfig
chunks []*immunityChunk
hospitality atomic.Counter
mutex sync.RWMutex
}

// NewImmunityCache creates a new cache
Expand Down Expand Up @@ -137,6 +141,14 @@ func (ic *ImmunityCache) HasOrAdd(key []byte, value interface{}, sizeInBytes int
cacheItem := newCacheItem(value, string(key), sizeInBytes)
chunk := ic.getChunkByKeyWithLock(string(key))
has, added = chunk.AddItem(cacheItem)
if !has {
if added {
ic.hospitality.Increment()
} else {
ic.hospitality.Decrement()
}
}

return has, added
}

Expand Down Expand Up @@ -247,12 +259,27 @@ func (ic *ImmunityCache) Diagnose(_ bool) {
count := ic.Count()
countImmune := ic.CountImmune()
numBytes := ic.NumBytes()
hospitality := ic.hospitality.Get()

log.Debug("ImmunityCache.Diagnose()",
"name", ic.config.Name,
"count", count,
"countImmune", countImmune,
"numBytes", numBytes,
"hospitality", hospitality,
)

if hospitality <= hospitalityWarnThreshold {
// After emitting a Warn, we reset the hospitality indicator
log.Warn("ImmunityCache.Diagnose(): cache is not hospitable",
"name", ic.config.Name,
"count", count,
"countImmune", countImmune,
"numBytes", numBytes,
"hospitality", hospitality,
)
ic.hospitality.Reset()
}
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
26 changes: 26 additions & 0 deletions storage/immunitycache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestImmunityCache_AddThenRemove(t *testing.T) {
_, _ = cache.HasOrAdd([]byte("c"), "foo-c", 0)
_ = cache.Put([]byte("d"), "foo-d", 0) // Same as HasOrAdd()
require.Equal(t, 4, cache.Len())
require.Equal(t, 4, int(cache.hospitality.Get()))
require.True(t, cache.Has([]byte("a")))
require.True(t, cache.Has([]byte("c")))

Expand Down Expand Up @@ -190,10 +191,12 @@ func TestImmunityCache_AddDoesNotWork_WhenFullWithImmune(t *testing.T) {
numNow, numFuture := cache.ImmunizeKeys(keysAsBytes([]string{"a", "b", "c", "d"}))
require.Equal(t, 4, numNow)
require.Equal(t, 0, numFuture)
require.Equal(t, 4, int(cache.hospitality.Get()))

_, added := cache.HasOrAdd([]byte("x"), "foo-x", 1)
require.False(t, added)
require.False(t, cache.Has([]byte("x")))
require.Equal(t, 3, int(cache.hospitality.Get()))
}

func TestImmunityCache_ForEachItem(t *testing.T) {
Expand Down Expand Up @@ -223,6 +226,29 @@ func TestImmunityCache_Fnv32Hash(t *testing.T) {
require.Equal(t, 3, int(fnv32Hash("d")%4))
}

func TestImmunityCache_DiagnoseResetsHospitalityAfterWarn(t *testing.T) {
cache := newCacheToTest(1, 4, 1000)
cache.addTestItems("a", "b", "c", "d")
_, _ = cache.ImmunizeKeys(keysAsBytes([]string{"a", "b", "c", "d"}))
require.Equal(t, 4, int(cache.hospitality.Get()))

cache.addTestItems("e", "f", "g", "h")
require.Equal(t, 0, int(cache.hospitality.Get()))

for i := -1; i > hospitalityWarnThreshold; i-- {
cache.addTestItems("foo")
require.Equal(t, i, int(cache.hospitality.Get()))
}

require.Equal(t, hospitalityWarnThreshold+1, int(cache.hospitality.Get()))
cache.Diagnose(false)
require.Equal(t, hospitalityWarnThreshold+1, int(cache.hospitality.Get()))
cache.addTestItems("foo")
require.Equal(t, hospitalityWarnThreshold, int(cache.hospitality.Get()))
cache.Diagnose(false)
require.Equal(t, 0, int(cache.hospitality.Get()))
}

func TestImmunityCache_ClearConcurrentWithRangeOverChunks(t *testing.T) {
cache := newCacheToTest(16, 4, 1000)
require.Equal(t, 16, len(cache.chunks))
Expand Down
9 changes: 1 addition & 8 deletions storage/immunitycache/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,8 @@ func (chunk *immunityChunk) removeNoLock(element *list.Element) {
}

func (chunk *immunityChunk) monitorEvictionNoLock(numRemoved int, err error) {
cacheName := chunk.config.cacheName

if err != nil {
log.Debug("immunityChunk.monitorEviction()", "name", cacheName, "numRemoved", numRemoved, "err", err)
return
}

if numRemoved > 0 {
log.Trace("immunityChunk.monitorEviction()", "name", cacheName, "numRemoved", numRemoved)
log.Trace("immunityChunk.monitorEviction()", "name", chunk.config.cacheName, "numRemoved", numRemoved, "err", err)
}
}

Expand Down

0 comments on commit 2f65e78

Please sign in to comment.