From d4cd51f23ee40930e518d903f6174df77e90d96a Mon Sep 17 00:00:00 2001 From: Ivan Martinez Date: Sun, 8 Mar 2020 17:11:59 -0400 Subject: [PATCH] Change slasher cache to LRU cache (#5037) * Change cache to LRU cache * fixes * REduce db usage * Fix function name * Merge issues * Save on eviction * Fixes * Fix Co-authored-by: terence tsao --- shared/debug/debug.go | 4 +- slasher/cache/BUILD.bazel | 14 +++++ slasher/cache/span_cache.go | 69 ++++++++++++++++++++++++ slasher/db/kv/BUILD.bazel | 2 +- slasher/db/kv/kv.go | 23 ++------ slasher/db/kv/kv_test.go | 4 +- slasher/db/kv/spanner.go | 97 ++++++++++------------------------ slasher/db/kv/spanner_test.go | 49 +++++------------ slasher/db/testing/setup_db.go | 4 +- 9 files changed, 137 insertions(+), 129 deletions(-) create mode 100644 slasher/cache/BUILD.bazel create mode 100644 slasher/cache/span_cache.go diff --git a/shared/debug/debug.go b/shared/debug/debug.go index 68462a8f91c..ca099a9d3ec 100644 --- a/shared/debug/debug.go +++ b/shared/debug/debug.go @@ -134,7 +134,7 @@ func (h *HandlerT) StartCPUProfile(file string) error { } h.cpuW = f h.cpuFile = file - log.Info("CPU profiling started", "dump", h.cpuFile) + log.Info("CPU profiling started", " dump ", h.cpuFile) return nil } @@ -146,7 +146,7 @@ func (h *HandlerT) StopCPUProfile() error { if h.cpuW == nil { return errors.New("CPU profiling not in progress") } - log.Info("Done writing CPU profile", "dump", h.cpuFile) + log.Info("Done writing CPU profile", " dump ", h.cpuFile) if err := h.cpuW.Close(); err != nil { return err } diff --git a/slasher/cache/BUILD.bazel b/slasher/cache/BUILD.bazel new file mode 100644 index 00000000000..120f50e7f5b --- /dev/null +++ b/slasher/cache/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["span_cache.go"], + importpath = "github.com/prysmaticlabs/prysm/slasher/cache", + visibility = ["//slasher:__subpackages__"], + deps = [ + "//slasher/detection/attestations/types:go_default_library", + "@com_github_hashicorp_golang_lru//:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", + ], +) diff --git a/slasher/cache/span_cache.go b/slasher/cache/span_cache.go new file mode 100644 index 00000000000..65955fcdeed --- /dev/null +++ b/slasher/cache/span_cache.go @@ -0,0 +1,69 @@ +package cache + +import ( + lru "github.com/hashicorp/golang-lru" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prysmaticlabs/prysm/slasher/detection/attestations/types" +) + +var ( + // epochSpansCacheSize defines the max number of epoch spans the cache can hold. + epochSpansCacheSize = 256 + // Metrics + epochSpansCacheHit = promauto.NewCounter(prometheus.CounterOpts{ + Name: "epoch_spans_cache_hit", + Help: "The total number of cache hits on the epoch spans cache.", + }) + epochSpansCacheMiss = promauto.NewCounter(prometheus.CounterOpts{ + Name: "epoch_spans_cache_miss", + Help: "The total number of cache misses on the epoch spans cache.", + }) +) + +// EpochSpansCache is used to store the spans needed on a per-epoch basis for slashing detection. +type EpochSpansCache struct { + cache *lru.Cache +} + +// NewEpochSpansCache initializes the map and underlying cache. +func NewEpochSpansCache(onEvicted func(key interface{}, value interface {})) (*EpochSpansCache, error) { + cache, err := lru.NewWithEvict(epochSpansCacheSize, onEvicted) + if err != nil { + return nil, err + } + return &EpochSpansCache{cache: cache}, nil +} + +// Get returns an ok bool and the cached value for the requested epoch key, if any. +func (c *EpochSpansCache) Get(epoch uint64) (map[uint64]types.Span, bool) { + item, exists := c.cache.Get(epoch) + if exists && item != nil { + epochSpansCacheHit.Inc() + return item.(map[uint64]types.Span), true + } + + epochSpansCacheMiss.Inc() + return make(map[uint64]types.Span), false +} + +// Set the response in the cache. +func (c *EpochSpansCache) Set(epoch uint64, epochSpans map[uint64]types.Span) { + _ = c.cache.Add(epoch, epochSpans) +} + +// Delete removes an epoch from the cache and returns if it existed or not. +// Performs the onEviction function before removal. +func (c *EpochSpansCache) Delete(epoch uint64) bool { + return c.cache.Remove(epoch) +} + +// Has returns true if the key exists in the cache. +func (c *EpochSpansCache) Has(epoch uint64) bool { + return c.cache.Contains(epoch) +} + +// Clear removes all keys from the SpanCache. +func (c *EpochSpansCache) Clear() { + c.cache.Purge() +} diff --git a/slasher/db/kv/BUILD.bazel b/slasher/db/kv/BUILD.bazel index c7f6b9285e6..71e9db91b01 100644 --- a/slasher/db/kv/BUILD.bazel +++ b/slasher/db/kv/BUILD.bazel @@ -19,10 +19,10 @@ go_library( "//shared/bytesutil:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", + "//slasher/cache:go_default_library", "//slasher/db/types:go_default_library", "//slasher/detection/attestations/types:go_default_library", "@com_github_boltdb_bolt//:go_default_library", - "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", diff --git a/slasher/db/kv/kv.go b/slasher/db/kv/kv.go index df62d4f680a..c315fb3096e 100644 --- a/slasher/db/kv/kv.go +++ b/slasher/db/kv/kv.go @@ -6,8 +6,8 @@ import ( "time" "github.com/boltdb/bolt" - "github.com/dgraph-io/ristretto" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/slasher/cache" ) var databaseFileName = "slasher.db" @@ -17,7 +17,7 @@ var databaseFileName = "slasher.db" type Store struct { db *bolt.DB databasePath string - spanCache *ristretto.Cache + spanCache *cache.EpochSpansCache spanCacheEnabled bool } @@ -25,8 +25,6 @@ type Store struct { type Config struct { // SpanCacheEnabled uses span cache to detect surround slashing. SpanCacheEnabled bool - CacheItems int64 - MaxCacheSize int64 } // Close closes the underlying boltdb database. @@ -34,7 +32,7 @@ func (db *Store) Close() error { return db.db.Close() } -// ClearSpanCache clears the MinMaxSpans cache. +// ClearSpanCache clears the spans cache. func (db *Store) ClearSpanCache() { db.spanCache.Clear() } @@ -86,21 +84,10 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) { } return nil, err } - if cfg.CacheItems == 0 { - cfg.CacheItems = 10 * cachedSpanerEpochs - } - if cfg.MaxCacheSize == 0 { - cfg.MaxCacheSize = cachedSpanerEpochs - } kv := &Store{db: boltDB, databasePath: datafile, spanCacheEnabled: cfg.SpanCacheEnabled} - spanCache, err := ristretto.NewCache(&ristretto.Config{ - NumCounters: cfg.CacheItems, // number of keys to track frequency of (10M). - MaxCost: cfg.MaxCacheSize, // maximum cost of cache. - BufferItems: 64, // number of keys per Get buffer. - OnEvict: persistSpanMapsOnEviction(kv), - }) + spanCache, err := cache.NewEpochSpansCache(persistSpanMapsOnEviction(kv)) if err != nil { - return nil, errors.Wrap(err, "failed to start span cache") + return nil, errors.Wrap(err, "could not create new cache") } kv.spanCache = spanCache diff --git a/slasher/db/kv/kv_test.go b/slasher/db/kv/kv_test.go index 7c716802b5e..f0d559edcd6 100644 --- a/slasher/db/kv/kv_test.go +++ b/slasher/db/kv/kv_test.go @@ -23,7 +23,7 @@ func setupDB(t testing.TB, ctx *cli.Context) *Store { if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - cfg := &Config{CacheItems: 0, MaxCacheSize: 0, SpanCacheEnabled: ctx.GlobalBool(flags.UseSpanCacheFlag.Name)} + cfg := &Config{SpanCacheEnabled: ctx.GlobalBool(flags.UseSpanCacheFlag.Name)} db, err := NewKVStore(p, cfg) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) @@ -40,7 +40,7 @@ func setupDBDiffCacheSize(t testing.TB, cacheItems int64, maxCacheSize int64) *S if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - cfg := &Config{CacheItems: cacheItems, MaxCacheSize: maxCacheSize, SpanCacheEnabled: true} + cfg := &Config{SpanCacheEnabled: true} newDB, err := NewKVStore(p, cfg) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) diff --git a/slasher/db/kv/spanner.go b/slasher/db/kv/spanner.go index 5024db74b44..8774aec351c 100644 --- a/slasher/db/kv/spanner.go +++ b/slasher/db/kv/spanner.go @@ -2,8 +2,6 @@ package kv import ( "context" - "fmt" - "reflect" "github.com/boltdb/bolt" "github.com/pkg/errors" @@ -20,31 +18,28 @@ import ( var highestObservedEpoch uint64 var lowestObservedEpoch = params.BeaconConfig().FarFutureEpoch -func cacheTypeMismatchError(value interface{}) error { - return fmt.Errorf("cache contains a value of type: %v "+ - "while expected to contain only values of type : map[uint64]types.Span", reflect.TypeOf(value)) -} - // This function defines a function which triggers upon a span map being // evicted from the cache. It allows us to persist the span map by the epoch value // to the database itself in the validatorsMinMaxSpanBucket. -func persistSpanMapsOnEviction(db *Store) func(uint64, uint64, interface{}, int64) { +func persistSpanMapsOnEviction(db *Store) func(key interface{}, value interface{}) { // We use a closure here so we can access the database itself // on the eviction of a span map from the cache. The function has the signature // required by the ristretto cache OnEvict method. // See https://godoc.org/github.com/dgraph-io/ristretto#Config. - return func(epoch uint64, _ uint64, value interface{}, cost int64) { - log.Tracef("Evicting span map for epoch: %d", epoch) + return func(key interface{}, value interface{}) { + log.Tracef("Evicting span map for epoch: %d", key) err := db.update(func(tx *bolt.Tx) error { + epoch, keyOK := key.(uint64) + spanMap, valueOK := value.(map[uint64]types.Span) + if !keyOK || !valueOK { + return errors.New("could not cast key and value into needed types") + } + bucket := tx.Bucket(validatorsMinMaxSpanBucket) epochBucket, err := bucket.CreateBucketIfNotExists(bytesutil.Bytes8(epoch)) if err != nil { return err } - spanMap, ok := value.(map[uint64]types.Span) - if !ok { - return cacheTypeMismatchError(value) - } for k, v := range spanMap { if err = epochBucket.Put(bytesutil.Bytes8(k), marshalSpan(v)); err != nil { return err @@ -93,16 +88,12 @@ func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]ty ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpansMap") defer span.End() if db.spanCacheEnabled { - v, ok := db.spanCache.Get(epoch) - spanMap := make(map[uint64]types.Span) + spanMap, ok := db.spanCache.Get(epoch) if ok { - spanMap, ok = v.(map[uint64]types.Span) - if !ok { - return nil, cacheTypeMismatchError(v) - } return spanMap, nil } } + var err error var spanMap map[uint64]types.Span err = db.view(func(tx *bolt.Tx) error { @@ -186,19 +177,12 @@ func (db *Store) SaveValidatorEpochSpan( defer traceSpan.End() if db.spanCacheEnabled { setObservedEpochs(epoch) - v, ok := db.spanCache.Get(epoch) - spanMap := make(map[uint64]types.Span) - if ok { - spanMap, ok = v.(map[uint64]types.Span) - if !ok { - return cacheTypeMismatchError(v) - } + spanMap, err := db.findOrLoadEpochInCache(ctx, epoch) + if err != nil { + return err } spanMap[validatorIdx] = span - saved := db.spanCache.Set(epoch, spanMap, 1) - if !saved { - return fmt.Errorf("failed to save span map to cache") - } + db.spanCache.Set(epoch, spanMap) return nil } @@ -215,19 +199,17 @@ func (db *Store) SaveValidatorEpochSpan( } // SaveEpochSpansMap accepts a epoch and span map epoch=>spans and writes it to disk. -// saves the spans to cache if caching is enabled. The key in the cache is the highest -// epoch seen by slasher and the value is the span map itself. +// saves the spans to cache if caching is enabled. The key in the cache is the +// epoch and the value is the span map itself. func (db *Store) SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]types.Span) error { ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpansMap") defer span.End() if db.spanCacheEnabled { setObservedEpochs(epoch) - saved := db.spanCache.Set(epoch, spanMap, 1) - if !saved { - return fmt.Errorf("failed to save span map to cache") - } + db.spanCache.Set(epoch, spanMap) return nil } + return db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorsMinMaxSpanBucket) valBucket, err := bucket.CreateBucketIfNotExists(bytesutil.Bytes8(epoch)) @@ -257,12 +239,8 @@ func (db *Store) SaveCachedSpansMaps(ctx context.Context) error { db.enableSpanCache(false) defer db.enableSpanCache(true) for epoch := lowestObservedEpoch; epoch <= highestObservedEpoch; epoch++ { - v, ok := db.spanCache.Get(epoch) + spanMap, ok := db.spanCache.Get(epoch) if ok { - spanMap, ok := v.(map[uint64]types.Span) - if !ok { - return cacheTypeMismatchError(v) - } if err := db.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil { return errors.Wrap(err, "failed to save span maps from cache") } @@ -281,11 +259,8 @@ func (db *Store) DeleteEpochSpans(ctx context.Context, epoch uint64) error { ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteEpochSpans") defer span.End() if db.spanCacheEnabled { - _, ok := db.spanCache.Get(epoch) - if ok { - db.spanCache.Del(epoch) - return nil - } + _ = db.spanCache.Delete(epoch) + return nil } return db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(validatorsMinMaxSpanBucket) @@ -301,20 +276,12 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteValidatorSpanByEpoch") defer span.End() if db.spanCacheEnabled { - v, ok := db.spanCache.Get(epoch) - spanMap := make(map[uint64][2]uint16) + spanMap, ok := db.spanCache.Get(epoch) if ok { - spanMap, ok = v.(map[uint64][2]uint16) - if !ok { - return cacheTypeMismatchError(v) - } - } - delete(spanMap, validatorIdx) - saved := db.spanCache.Set(epoch, spanMap, 1) - if !saved { - return errors.New("failed to save span map to cache") + delete(spanMap, validatorIdx) + db.spanCache.Set(epoch, spanMap) + return nil } - return nil } return db.update(func(tx *bolt.Tx) error { @@ -330,14 +297,11 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) { ctx, span := trace.StartSpan(ctx, "slasherDB.findOrLoadEpochInCache") defer span.End() - v, epochFound := db.spanCache.Get(epoch) + spanMap, epochFound := db.spanCache.Get(epoch) if epochFound { - spanMap, ok := v.(map[uint64]types.Span) - if !ok { - return make(map[uint64]types.Span), cacheTypeMismatchError(v) - } return spanMap, nil } + db.enableSpanCache(false) defer db.enableSpanCache(true) // If the epoch we want isn't in the cache, load it in. @@ -345,10 +309,7 @@ func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[ if err != nil { return make(map[uint64]types.Span), errors.Wrap(err, "failed to get span map for epoch") } - saved := db.spanCache.Set(epoch, spanForEpoch, 1) - if !saved { - return make(map[uint64]types.Span), fmt.Errorf("failed to save span map to cache") - } + db.spanCache.Set(epoch, spanForEpoch) return spanForEpoch, nil } diff --git a/slasher/db/kv/spanner_test.go b/slasher/db/kv/spanner_test.go index 7bd68cfbac7..a87e7dc867e 100644 --- a/slasher/db/kv/spanner_test.go +++ b/slasher/db/kv/spanner_test.go @@ -4,7 +4,6 @@ import ( "context" "flag" "reflect" - "strings" "testing" "time" @@ -96,30 +95,6 @@ func TestStore_SaveSpans(t *testing.T) { } } -func TestStore_WrongTypeInCache(t *testing.T) { - app := cli.NewApp() - set := flag.NewFlagSet("test", 0) - set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache") - db := setupDB(t, cli.NewContext(app, set, nil)) - defer teardownDB(t, db) - ctx := context.Background() - for _, tt := range spanTests { - - db.spanCache.Set(tt.epoch, []byte{0, 0}, 1) - // wait for value to pass through cache buffers - time.Sleep(time.Millisecond * 10) - _, err := db.EpochSpansMap(ctx, tt.epoch) - if err == nil || !strings.Contains(err.Error(), "cache contains a value of type") { - t.Fatalf("expected error type in cache : %v", err) - } - - _, err = db.EpochSpanByValidatorIndex(ctx, 1, tt.epoch) - if err == nil || !strings.Contains(err.Error(), "cache contains a value of type") { - t.Fatalf("expected error type in cache : %v", err) - } - } -} - func TestStore_SaveCachedSpans(t *testing.T) { app := cli.NewApp() set := flag.NewFlagSet("test", 0) @@ -189,7 +164,7 @@ func TestStore_DeleteEpochSpans(t *testing.T) { } } -func TestValidatorSpanMap_DeleteWithCache(t *testing.T) { +func TestValidatorSpanMap_DeletesOnCacheSavesToDB(t *testing.T) { app := cli.NewApp() set := flag.NewFlagSet("test", 0) set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache") @@ -203,28 +178,30 @@ func TestValidatorSpanMap_DeleteWithCache(t *testing.T) { t.Fatalf("Save validator span map failed: %v", err) } } - // wait for value to pass through cache buffers + // Wait for value to pass through cache buffers. time.Sleep(time.Millisecond * 10) for _, tt := range spanTests { - sm, err := db.EpochSpansMap(ctx, tt.epoch) + spanMap, err := db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatalf("Failed to get validator span map: %v", err) } - if sm == nil || !reflect.DeepEqual(sm, tt.spanMap) { - t.Fatalf("Get should return validator span map: %v got: %v", tt.spanMap, sm) + if spanMap == nil || !reflect.DeepEqual(spanMap, tt.spanMap) { + t.Fatalf("Get should return validator span map: %v got: %v", tt.spanMap, spanMap) } - err = db.DeleteEpochSpans(ctx, tt.epoch) - if err != nil { + + if err = db.DeleteEpochSpans(ctx, tt.epoch); err != nil { t.Fatalf("Delete validator span map error: %v", err) } - // wait for value to pass through cache buffers + // Wait for value to pass through cache buffers. + db.enableSpanCache(false) time.Sleep(time.Millisecond * 10) - sm, err = db.EpochSpansMap(ctx, tt.epoch) + spanMap, err = db.EpochSpansMap(ctx, tt.epoch) if err != nil { t.Fatal(err) } - if !reflect.DeepEqual(sm, map[uint64]types.Span{}) { - t.Errorf("Expected validator span map to be deleted, received: %v", sm) + db.enableSpanCache(true) + if !reflect.DeepEqual(spanMap, tt.spanMap) { + t.Errorf("Expected validator span map to be deleted, received: %v", spanMap) } } } diff --git a/slasher/db/testing/setup_db.go b/slasher/db/testing/setup_db.go index dcf46f81db6..88ff2308d02 100644 --- a/slasher/db/testing/setup_db.go +++ b/slasher/db/testing/setup_db.go @@ -23,7 +23,7 @@ func SetupSlasherDB(t testing.TB, spanCacheEnabled bool) *kv.Store { if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - cfg := &kv.Config{CacheItems: 0, MaxCacheSize: 0, SpanCacheEnabled: spanCacheEnabled} + cfg := &kv.Config{SpanCacheEnabled: spanCacheEnabled} db, err := slasherDB.NewDB(p, cfg) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err) @@ -41,7 +41,7 @@ func SetupSlasherDBDiffCacheSize(t testing.TB, cacheItems int64, maxCacheSize in if err := os.RemoveAll(p); err != nil { t.Fatalf("Failed to remove directory: %v", err) } - cfg := &kv.Config{CacheItems: cacheItems, MaxCacheSize: maxCacheSize, SpanCacheEnabled: true} + cfg := &kv.Config{SpanCacheEnabled: true} newDB, err := slasherDB.NewDB(p, cfg) if err != nil { t.Fatalf("Failed to instantiate DB: %v", err)