Skip to content

Commit

Permalink
Change slasher cache to LRU cache (#5037)
Browse files Browse the repository at this point in the history
* Change cache to LRU cache

* fixes

* REduce db usage

* Fix function name

* Merge issues

* Save on eviction

* Fixes

* Fix

Co-authored-by: terence tsao <terence@prysmaticlabs.com>
  • Loading branch information
0xKiwi and terencechain committed Mar 8, 2020
1 parent 962fe85 commit d4cd51f
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 129 deletions.
4 changes: 2 additions & 2 deletions shared/debug/debug.go
Expand Up @@ -134,7 +134,7 @@ func (h *HandlerT) StartCPUProfile(file string) error {
}
h.cpuW = f
h.cpuFile = file
log.Info("CPU profiling started", "dump", h.cpuFile)
log.Info("CPU profiling started", " dump ", h.cpuFile)
return nil
}

Expand All @@ -146,7 +146,7 @@ func (h *HandlerT) StopCPUProfile() error {
if h.cpuW == nil {
return errors.New("CPU profiling not in progress")
}
log.Info("Done writing CPU profile", "dump", h.cpuFile)
log.Info("Done writing CPU profile", " dump ", h.cpuFile)
if err := h.cpuW.Close(); err != nil {
return err
}
Expand Down
14 changes: 14 additions & 0 deletions slasher/cache/BUILD.bazel
@@ -0,0 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["span_cache.go"],
importpath = "github.com/prysmaticlabs/prysm/slasher/cache",
visibility = ["//slasher:__subpackages__"],
deps = [
"//slasher/detection/attestations/types:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
],
)
69 changes: 69 additions & 0 deletions slasher/cache/span_cache.go
@@ -0,0 +1,69 @@
package cache

import (
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
)

var (
// epochSpansCacheSize defines the max number of epoch spans the cache can hold.
epochSpansCacheSize = 256
// Metrics
epochSpansCacheHit = promauto.NewCounter(prometheus.CounterOpts{
Name: "epoch_spans_cache_hit",
Help: "The total number of cache hits on the epoch spans cache.",
})
epochSpansCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
Name: "epoch_spans_cache_miss",
Help: "The total number of cache misses on the epoch spans cache.",
})
)

// EpochSpansCache is used to store the spans needed on a per-epoch basis for slashing detection.
type EpochSpansCache struct {
cache *lru.Cache
}

// NewEpochSpansCache initializes the map and underlying cache.
func NewEpochSpansCache(onEvicted func(key interface{}, value interface {})) (*EpochSpansCache, error) {
cache, err := lru.NewWithEvict(epochSpansCacheSize, onEvicted)
if err != nil {
return nil, err
}
return &EpochSpansCache{cache: cache}, nil
}

// Get returns an ok bool and the cached value for the requested epoch key, if any.
func (c *EpochSpansCache) Get(epoch uint64) (map[uint64]types.Span, bool) {
item, exists := c.cache.Get(epoch)
if exists && item != nil {
epochSpansCacheHit.Inc()
return item.(map[uint64]types.Span), true
}

epochSpansCacheMiss.Inc()
return make(map[uint64]types.Span), false
}

// Set the response in the cache.
func (c *EpochSpansCache) Set(epoch uint64, epochSpans map[uint64]types.Span) {
_ = c.cache.Add(epoch, epochSpans)
}

// Delete removes an epoch from the cache and returns if it existed or not.
// Performs the onEviction function before removal.
func (c *EpochSpansCache) Delete(epoch uint64) bool {
return c.cache.Remove(epoch)
}

// Has returns true if the key exists in the cache.
func (c *EpochSpansCache) Has(epoch uint64) bool {
return c.cache.Contains(epoch)
}

// Clear removes all keys from the SpanCache.
func (c *EpochSpansCache) Clear() {
c.cache.Purge()
}
2 changes: 1 addition & 1 deletion slasher/db/kv/BUILD.bazel
Expand Up @@ -19,10 +19,10 @@ go_library(
"//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//slasher/cache:go_default_library",
"//slasher/db/types:go_default_library",
"//slasher/detection/attestations/types:go_default_library",
"@com_github_boltdb_bolt//:go_default_library",
"@com_github_dgraph_io_ristretto//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
Expand Down
23 changes: 5 additions & 18 deletions slasher/db/kv/kv.go
Expand Up @@ -6,8 +6,8 @@ import (
"time"

"github.com/boltdb/bolt"
"github.com/dgraph-io/ristretto"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/slasher/cache"
)

var databaseFileName = "slasher.db"
Expand All @@ -17,24 +17,22 @@ var databaseFileName = "slasher.db"
type Store struct {
db *bolt.DB
databasePath string
spanCache *ristretto.Cache
spanCache *cache.EpochSpansCache
spanCacheEnabled bool
}

// Config options for the slasher db.
type Config struct {
// SpanCacheEnabled uses span cache to detect surround slashing.
SpanCacheEnabled bool
CacheItems int64
MaxCacheSize int64
}

// Close closes the underlying boltdb database.
func (db *Store) Close() error {
return db.db.Close()
}

// ClearSpanCache clears the MinMaxSpans cache.
// ClearSpanCache clears the spans cache.
func (db *Store) ClearSpanCache() {
db.spanCache.Clear()
}
Expand Down Expand Up @@ -86,21 +84,10 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
}
return nil, err
}
if cfg.CacheItems == 0 {
cfg.CacheItems = 10 * cachedSpanerEpochs
}
if cfg.MaxCacheSize == 0 {
cfg.MaxCacheSize = cachedSpanerEpochs
}
kv := &Store{db: boltDB, databasePath: datafile, spanCacheEnabled: cfg.SpanCacheEnabled}
spanCache, err := ristretto.NewCache(&ristretto.Config{
NumCounters: cfg.CacheItems, // number of keys to track frequency of (10M).
MaxCost: cfg.MaxCacheSize, // maximum cost of cache.
BufferItems: 64, // number of keys per Get buffer.
OnEvict: persistSpanMapsOnEviction(kv),
})
spanCache, err := cache.NewEpochSpansCache(persistSpanMapsOnEviction(kv))
if err != nil {
return nil, errors.Wrap(err, "failed to start span cache")
return nil, errors.Wrap(err, "could not create new cache")
}
kv.spanCache = spanCache

Expand Down
4 changes: 2 additions & 2 deletions slasher/db/kv/kv_test.go
Expand Up @@ -23,7 +23,7 @@ func setupDB(t testing.TB, ctx *cli.Context) *Store {
if err := os.RemoveAll(p); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
cfg := &Config{CacheItems: 0, MaxCacheSize: 0, SpanCacheEnabled: ctx.GlobalBool(flags.UseSpanCacheFlag.Name)}
cfg := &Config{SpanCacheEnabled: ctx.GlobalBool(flags.UseSpanCacheFlag.Name)}
db, err := NewKVStore(p, cfg)
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
Expand All @@ -40,7 +40,7 @@ func setupDBDiffCacheSize(t testing.TB, cacheItems int64, maxCacheSize int64) *S
if err := os.RemoveAll(p); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
cfg := &Config{CacheItems: cacheItems, MaxCacheSize: maxCacheSize, SpanCacheEnabled: true}
cfg := &Config{SpanCacheEnabled: true}
newDB, err := NewKVStore(p, cfg)
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
Expand Down
97 changes: 29 additions & 68 deletions slasher/db/kv/spanner.go
Expand Up @@ -2,8 +2,6 @@ package kv

import (
"context"
"fmt"
"reflect"

"github.com/boltdb/bolt"
"github.com/pkg/errors"
Expand All @@ -20,31 +18,28 @@ import (
var highestObservedEpoch uint64
var lowestObservedEpoch = params.BeaconConfig().FarFutureEpoch

func cacheTypeMismatchError(value interface{}) error {
return fmt.Errorf("cache contains a value of type: %v "+
"while expected to contain only values of type : map[uint64]types.Span", reflect.TypeOf(value))
}

// This function defines a function which triggers upon a span map being
// evicted from the cache. It allows us to persist the span map by the epoch value
// to the database itself in the validatorsMinMaxSpanBucket.
func persistSpanMapsOnEviction(db *Store) func(uint64, uint64, interface{}, int64) {
func persistSpanMapsOnEviction(db *Store) func(key interface{}, value interface{}) {
// We use a closure here so we can access the database itself
// on the eviction of a span map from the cache. The function has the signature
// required by the ristretto cache OnEvict method.
// See https://godoc.org/github.com/dgraph-io/ristretto#Config.
return func(epoch uint64, _ uint64, value interface{}, cost int64) {
log.Tracef("Evicting span map for epoch: %d", epoch)
return func(key interface{}, value interface{}) {
log.Tracef("Evicting span map for epoch: %d", key)
err := db.update(func(tx *bolt.Tx) error {
epoch, keyOK := key.(uint64)
spanMap, valueOK := value.(map[uint64]types.Span)
if !keyOK || !valueOK {
return errors.New("could not cast key and value into needed types")
}

bucket := tx.Bucket(validatorsMinMaxSpanBucket)
epochBucket, err := bucket.CreateBucketIfNotExists(bytesutil.Bytes8(epoch))
if err != nil {
return err
}
spanMap, ok := value.(map[uint64]types.Span)
if !ok {
return cacheTypeMismatchError(value)
}
for k, v := range spanMap {
if err = epochBucket.Put(bytesutil.Bytes8(k), marshalSpan(v)); err != nil {
return err
Expand Down Expand Up @@ -93,16 +88,12 @@ func (db *Store) EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]ty
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpansMap")
defer span.End()
if db.spanCacheEnabled {
v, ok := db.spanCache.Get(epoch)
spanMap := make(map[uint64]types.Span)
spanMap, ok := db.spanCache.Get(epoch)
if ok {
spanMap, ok = v.(map[uint64]types.Span)
if !ok {
return nil, cacheTypeMismatchError(v)
}
return spanMap, nil
}
}

var err error
var spanMap map[uint64]types.Span
err = db.view(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -186,19 +177,12 @@ func (db *Store) SaveValidatorEpochSpan(
defer traceSpan.End()
if db.spanCacheEnabled {
setObservedEpochs(epoch)
v, ok := db.spanCache.Get(epoch)
spanMap := make(map[uint64]types.Span)
if ok {
spanMap, ok = v.(map[uint64]types.Span)
if !ok {
return cacheTypeMismatchError(v)
}
spanMap, err := db.findOrLoadEpochInCache(ctx, epoch)
if err != nil {
return err
}
spanMap[validatorIdx] = span
saved := db.spanCache.Set(epoch, spanMap, 1)
if !saved {
return fmt.Errorf("failed to save span map to cache")
}
db.spanCache.Set(epoch, spanMap)
return nil
}

Expand All @@ -215,19 +199,17 @@ func (db *Store) SaveValidatorEpochSpan(
}

// SaveEpochSpansMap accepts a epoch and span map epoch=>spans and writes it to disk.
// saves the spans to cache if caching is enabled. The key in the cache is the highest
// epoch seen by slasher and the value is the span map itself.
// saves the spans to cache if caching is enabled. The key in the cache is the
// epoch and the value is the span map itself.
func (db *Store) SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]types.Span) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpansMap")
defer span.End()
if db.spanCacheEnabled {
setObservedEpochs(epoch)
saved := db.spanCache.Set(epoch, spanMap, 1)
if !saved {
return fmt.Errorf("failed to save span map to cache")
}
db.spanCache.Set(epoch, spanMap)
return nil
}

return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsMinMaxSpanBucket)
valBucket, err := bucket.CreateBucketIfNotExists(bytesutil.Bytes8(epoch))
Expand Down Expand Up @@ -257,12 +239,8 @@ func (db *Store) SaveCachedSpansMaps(ctx context.Context) error {
db.enableSpanCache(false)
defer db.enableSpanCache(true)
for epoch := lowestObservedEpoch; epoch <= highestObservedEpoch; epoch++ {
v, ok := db.spanCache.Get(epoch)
spanMap, ok := db.spanCache.Get(epoch)
if ok {
spanMap, ok := v.(map[uint64]types.Span)
if !ok {
return cacheTypeMismatchError(v)
}
if err := db.SaveEpochSpansMap(ctx, epoch, spanMap); err != nil {
return errors.Wrap(err, "failed to save span maps from cache")
}
Expand All @@ -281,11 +259,8 @@ func (db *Store) DeleteEpochSpans(ctx context.Context, epoch uint64) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteEpochSpans")
defer span.End()
if db.spanCacheEnabled {
_, ok := db.spanCache.Get(epoch)
if ok {
db.spanCache.Del(epoch)
return nil
}
_ = db.spanCache.Delete(epoch)
return nil
}
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsMinMaxSpanBucket)
Expand All @@ -301,20 +276,12 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui
ctx, span := trace.StartSpan(ctx, "slasherDB.DeleteValidatorSpanByEpoch")
defer span.End()
if db.spanCacheEnabled {
v, ok := db.spanCache.Get(epoch)
spanMap := make(map[uint64][2]uint16)
spanMap, ok := db.spanCache.Get(epoch)
if ok {
spanMap, ok = v.(map[uint64][2]uint16)
if !ok {
return cacheTypeMismatchError(v)
}
}
delete(spanMap, validatorIdx)
saved := db.spanCache.Set(epoch, spanMap, 1)
if !saved {
return errors.New("failed to save span map to cache")
delete(spanMap, validatorIdx)
db.spanCache.Set(epoch, spanMap)
return nil
}
return nil
}

return db.update(func(tx *bolt.Tx) error {
Expand All @@ -330,25 +297,19 @@ func (db *Store) DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx ui
func (db *Store) findOrLoadEpochInCache(ctx context.Context, epoch uint64) (map[uint64]types.Span, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.findOrLoadEpochInCache")
defer span.End()
v, epochFound := db.spanCache.Get(epoch)
spanMap, epochFound := db.spanCache.Get(epoch)
if epochFound {
spanMap, ok := v.(map[uint64]types.Span)
if !ok {
return make(map[uint64]types.Span), cacheTypeMismatchError(v)
}
return spanMap, nil
}

db.enableSpanCache(false)
defer db.enableSpanCache(true)
// If the epoch we want isn't in the cache, load it in.
spanForEpoch, err := db.EpochSpansMap(ctx, epoch)
if err != nil {
return make(map[uint64]types.Span), errors.Wrap(err, "failed to get span map for epoch")
}
saved := db.spanCache.Set(epoch, spanForEpoch, 1)
if !saved {
return make(map[uint64]types.Span), fmt.Errorf("failed to save span map to cache")
}
db.spanCache.Set(epoch, spanForEpoch)
return spanForEpoch, nil
}

Expand Down

0 comments on commit d4cd51f

Please sign in to comment.