Skip to content

Commit

Permalink
Tweak Badger GC and expose several Badger config options (#94)
Browse files Browse the repository at this point in the history
* Add value log GC

* Add various badger options and tweak our GC

* Add more metrics and PR comments

* PR Comments

* Fix metrics

* Fix latency metrics

* Minot comment tweaks
  • Loading branch information
thomashargrove committed Jan 7, 2020
1 parent 6ec9eb4 commit 4b0a27e
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 144 deletions.
21 changes: 19 additions & 2 deletions pkg/sloop/server/internal/config/config.go
Expand Up @@ -51,6 +51,13 @@ type SloopConfig struct {
ApiServerHost string `json:"apiServerHost"`
WatchCrds bool `json:"watchCrds"`
RestoreDatabaseFile string `json:"restoreDatabaseFile"`
BadgerDiscardRatio float64 `json:"badgerDiscardRatio"`
BadgerVLogGCFreq time.Duration `json:"badgerVLogGCFreq"`
BadgerMaxTableSize int64 `json:"badgerMaxTableSize"`
BadgerKeepL0InMemory bool `json:"badgerKeepL0InMemory"`
BadgerVLogFileSize int64 `json:"badgerVLogFileSize"`
BadgerVLogMaxEntries uint `json:"badgerVLogMaxEntries"`
BadgerUseLSMOnlyOptions bool `json:"badgerUseLSMOnlyOptions"`
}

func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
Expand All @@ -67,8 +74,7 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
fs.StringVar(&config.DebugRecordFile, "record-file", "", "Record watch data to a playback file")
fs.BoolVar(&config.UseMockBadger, "use-mock-badger", false, "Use a fake in-memory mock of badger")
fs.BoolVar(&config.DisableStoreManager, "disable-store-manager", false, "Turn off store manager which is to clean up database")
fs.DurationVar(&config.CleanupFrequency, "cleanup-frequency", time.Minute,
"OPTIONAL: Frequency between subsequent runs for the database cleanup")
fs.DurationVar(&config.CleanupFrequency, "cleanup-frequency", time.Minute*30, "Frequency between subsequent runs for the database cleanup")
fs.BoolVar(&config.KeepMinorNodeUpdates, "keep-minor-node-updates", false, "Keep all node updates even if change is only condition timestamps")
fs.StringVar(&config.DefaultLookback, "default-lookback", "1h", "Default UX filter lookback")
fs.StringVar(&config.DefaultKind, "default-kind", "_all", "Default UX filter kind")
Expand All @@ -78,6 +84,13 @@ func registerFlags(fs *flag.FlagSet, config *SloopConfig) {
fs.StringVar(&config.ApiServerHost, "apiserver-host", "", "Kubernetes API server endpoint")
fs.BoolVar(&config.WatchCrds, "watch-crds", true, "Watch for activity for CRDs")
fs.StringVar(&config.RestoreDatabaseFile, "restore-database-file", "", "Restore database from backup file into current context.")
fs.Float64Var(&config.BadgerDiscardRatio, "badger-discard-ratio", 0.1, "Badger value log GC uses this value to decide if it wants to compact a vlog file. Smaller values free more disk space but use more computing resources")
fs.DurationVar(&config.BadgerVLogGCFreq, "badger-vlog-gc-freq", time.Minute*1, "Frequency of running badger's ValueLogGC")
fs.Int64Var(&config.BadgerMaxTableSize, "badger-max-table-size", 0, "Max LSM table size in bytes. 0 = use badger default")
fs.BoolVar(&config.BadgerKeepL0InMemory, "badger-keep-l0-in-memory", true, "Keeps all level 0 tables in memory for faster writes and compactions")
fs.Int64Var(&config.BadgerVLogFileSize, "badger-vlog-file-size", 0, "Max size in bytes per value log file. 0 = use badger default")
fs.UintVar(&config.BadgerVLogMaxEntries, "badger-vlog-max-entries", 0, "Max number of entries per value log files. 0 = use badger default")
fs.BoolVar(&config.BadgerUseLSMOnlyOptions, "badger-use-lsm-only-options", true, "Sets a higher valueThreshold so values would be collocated with LSM tree reducing vlog disk usage")
}

// This will first check if a config file is specified on cmd line using a temporary flagSet
Expand Down Expand Up @@ -125,6 +138,10 @@ func (c *SloopConfig) Validate() error {
if err != nil {
return errors.Wrapf(err, "DefaultLookback is an invalid duration: %v", c.DefaultLookback)
}
if c.CleanupFrequency < time.Minute*15 {
return fmt.Errorf("CleanupFrequency can not be less than 15 minutes. Badger is lazy about freeing space " +
"on disk so we need to give it time to avoid over-correction")
}
return nil
}

Expand Down
21 changes: 19 additions & 2 deletions pkg/sloop/server/server.go
Expand Up @@ -57,7 +57,16 @@ func RealMain() error {
factory := &badgerwrap.BadgerFactory{}

storeRootWithKubeContext := path.Join(conf.StoreRoot, kubeContext)
db, err := untyped.OpenStore(factory, storeRootWithKubeContext, time.Duration(1)*time.Hour)
storeConfig := &untyped.Config{
RootPath: storeRootWithKubeContext,
ConfigPartitionDuration: time.Duration(1) * time.Hour,
BadgerMaxTableSize: conf.BadgerMaxTableSize,
BadgerKeepL0InMemory: conf.BadgerKeepL0InMemory,
BadgerVLogFileSize: conf.BadgerVLogFileSize,
BadgerVLogMaxEntries: conf.BadgerVLogMaxEntries,
BadgerUseLSMOnlyOptions: conf.BadgerUseLSMOnlyOptions,
}
db, err := untyped.OpenStore(factory, storeConfig)
if err != nil {
return errors.Wrap(err, "failed to init untyped store")
}
Expand Down Expand Up @@ -107,7 +116,15 @@ func RealMain() error {
var storemgr *storemanager.StoreManager
if !conf.DisableStoreManager {
fs := &afero.Afero{Fs: afero.NewOsFs()}
storemgr = storemanager.NewStoreManager(tables, conf.StoreRoot, conf.CleanupFrequency, conf.MaxLookback, conf.MaxDiskMb, fs)
storeCfg := &storemanager.Config{
StoreRoot: conf.StoreRoot,
Freq: conf.CleanupFrequency,
TimeLimit: conf.MaxLookback,
SizeLimitBytes: conf.MaxDiskMb * 1024 * 1024,
BadgerDiscardRatio: conf.BadgerDiscardRatio,
BadgerVLogGCFreq: conf.BadgerVLogGCFreq,
}
storemgr = storemanager.NewStoreManager(tables, storeCfg, fs)
storemgr.Start()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sloop/store/untyped/badgerwrap/api.go
Expand Up @@ -44,7 +44,7 @@ type DB interface {
// NewTransactionAt(readTs uint64, update bool) *Txn
// NewWriteBatch() *WriteBatch
// PrintHistogram(keyPrefix []byte)
// RunValueLogGC(discardRatio float64) error
RunValueLogGC(discardRatio float64) error
// SetDiscardTs(ts uint64)
// Subscribe(ctx context.Context, cb func(kv *KVList), prefixes ...[]byte) error
// VerifyChecksum() error
Expand Down
4 changes: 4 additions & 0 deletions pkg/sloop/store/untyped/badgerwrap/badger.go
Expand Up @@ -84,6 +84,10 @@ func (b *BadgerDb) Load(r io.Reader, maxPendingWrites int) error {
return b.db.Load(r, maxPendingWrites)
}

func (b *BadgerDb) RunValueLogGC(discardRatio float64) error {
return b.db.RunValueLogGC(discardRatio)
}

// Transaction

func (t *BadgerTxn) Get(key []byte) (Item, error) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sloop/store/untyped/badgerwrap/mock.go
Expand Up @@ -118,6 +118,10 @@ func (b *MockDb) Load(r io.Reader, maxPendingWrites int) error {
return nil
}

func (b *MockDb) RunValueLogGC(discardRatio float64) error {
return nil
}

// Transaction

func (t *MockTxn) Get(key []byte) (Item, error) {
Expand Down
45 changes: 37 additions & 8 deletions pkg/sloop/store/untyped/store.go
Expand Up @@ -16,24 +16,53 @@ import (
"time"
)

func OpenStore(factory badgerwrap.Factory, rootPath string, configPartitionDuration time.Duration) (badgerwrap.DB, error) {
err := os.MkdirAll(rootPath, 0755)
type Config struct {
RootPath string
ConfigPartitionDuration time.Duration
BadgerMaxTableSize int64
BadgerKeepL0InMemory bool
BadgerVLogFileSize int64
BadgerVLogMaxEntries uint
BadgerUseLSMOnlyOptions bool
}

func OpenStore(factory badgerwrap.Factory, config *Config) (badgerwrap.DB, error) {
if config.ConfigPartitionDuration != time.Hour && config.ConfigPartitionDuration != 24*time.Hour {
return nil, fmt.Errorf("Only hour and day partitionDurations are supported")
}

err := os.MkdirAll(config.RootPath, 0755)
if err != nil {
glog.Infof("mkdir failed with %v", err)
}
// For now using a temp name because this all need to be replaced when we add real table/partition support
opts := badger.DefaultOptions(rootPath)

var opts badger.Options
if config.BadgerUseLSMOnlyOptions {
// LSMOnlyOptions uses less disk space for vlog files. See the comments on the LSMOnlyOptions() func for details
opts = badger.LSMOnlyOptions(config.RootPath)
} else {
opts = badger.DefaultOptions(config.RootPath)
}

if config.BadgerMaxTableSize != 0 {
opts = opts.WithMaxTableSize(config.BadgerMaxTableSize)
}
opts.KeepL0InMemory = config.BadgerKeepL0InMemory
if config.BadgerVLogFileSize != 0 {
opts = opts.WithValueLogFileSize(config.BadgerVLogFileSize)
}
if config.BadgerVLogMaxEntries != 0 {
opts = opts.WithValueLogMaxEntries(uint32(config.BadgerVLogMaxEntries))
}

db, err := factory.Open(opts)
if err != nil {
return nil, fmt.Errorf("badger.OpenStore failed with: %v", err)
}

if configPartitionDuration != time.Hour && configPartitionDuration != 24*time.Hour {
return nil, fmt.Errorf("Only hour and day partitionDurations are supported")
}
glog.Infof("BadgerDB Options: %+v", opts)

partitionDuration = configPartitionDuration
partitionDuration = config.ConfigPartitionDuration
return db, nil
}

Expand Down
101 changes: 101 additions & 0 deletions pkg/sloop/storemanager/stats.go
@@ -0,0 +1,101 @@
package storemanager

import (
"fmt"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/salesforce/sloop/pkg/sloop/store/untyped/badgerwrap"
"github.com/spf13/afero"
"os"
"path/filepath"
"time"
)

const vlogExt = ".vlog" // value log data
const sstExt = ".sst" // LSM data

var (
metricStoreSizeOnDiskMb = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_store_sizeondiskmb"})
metricBadgerKeys = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "sloop_badger_keys"}, []string{"level"})
metricBadgerTables = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "sloop_badger_tables"}, []string{"level"})
metricBadgerLsmFileCount = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_badger_lsmfilecount"})
metricBadgerLsmSizeMb = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_badger_lsmsizemb"})
metricBadgerVLogFileCount = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_badger_vlogfilecount"})
metricBadgerVLogSizeMb = promauto.NewGauge(prometheus.GaugeOpts{Name: "sloop_badger_vlogsizemb"})
)

type storeStats struct {
timestamp time.Time
DiskSizeBytes uint64
DiskLsmBytes uint64
DiskLsmFileCount int
DiskVlogBytes uint64
DiskVlogFileCount int
LevelToKeyCount map[int]uint64
LevelToTableCount map[int]int
}

func generateStats(storeRoot string, db badgerwrap.DB, fs *afero.Afero) *storeStats {
ret := &storeStats{}
ret.LevelToKeyCount = make(map[int]uint64)
ret.LevelToTableCount = make(map[int]int)
ret.timestamp = time.Now()

totalSizeBytes, extFileCount, extByteCount, err := getDirSizeRecursive(storeRoot, fs)
if err != nil {
// Swallowing on purpose as we still want the other stats
glog.Errorf("Failed to check storage size on disk: %v", err)
}
ret.DiskSizeBytes = totalSizeBytes
ret.DiskLsmFileCount = extFileCount[sstExt]
ret.DiskLsmBytes = extByteCount[sstExt]
ret.DiskVlogFileCount = extFileCount[vlogExt]
ret.DiskVlogBytes = extByteCount[vlogExt]

tables := db.Tables(true)
for _, table := range tables {
glog.V(2).Infof("BadgerDB TABLE id=%v keycount=%v level=%v left=%q right=%q", table.ID, table.KeyCount, table.Level, string(table.Left), string(table.Right))
ret.LevelToTableCount[table.Level] += 1
ret.LevelToKeyCount[table.Level] += table.KeyCount
}

glog.Infof("Finished updating store stats: %+v", ret)
return ret
}

// Returns total size, count of files by extension, count of bytes by extension
func getDirSizeRecursive(root string, fs *afero.Afero) (uint64, map[string]int, map[string]uint64, error) {
var totalSize uint64
var extFileCount = make(map[string]int)
var extByteCount = make(map[string]uint64)

err := fs.Walk(root, func(path string, info os.FileInfo, err error) error {
if !info.IsDir() {
totalSize += uint64(info.Size())
ext := filepath.Ext(path)
extFileCount[ext] += 1
extByteCount[ext] += uint64(info.Size())
}
return nil
})
if err != nil {
return 0, extFileCount, extByteCount, err
}

return totalSize, extFileCount, extByteCount, nil
}

func emitMetrics(stats *storeStats) {
metricStoreSizeOnDiskMb.Set(float64(stats.DiskSizeBytes / 1024 / 1024))
for k, v := range stats.LevelToKeyCount {
metricBadgerKeys.WithLabelValues(fmt.Sprintf("%v", k)).Set(float64(v))
}
for k, v := range stats.LevelToTableCount {
metricBadgerTables.WithLabelValues(fmt.Sprintf("%v", k)).Set(float64(v))
}
metricBadgerLsmFileCount.Set(float64(stats.DiskLsmFileCount))
metricBadgerLsmSizeMb.Set(float64(stats.DiskLsmBytes / 1024 / 1024))
metricBadgerVLogFileCount.Set(float64(stats.DiskVlogFileCount))
metricBadgerVLogSizeMb.Set(float64(stats.DiskVlogBytes / 1024 / 1024))
}
35 changes: 35 additions & 0 deletions pkg/sloop/storemanager/stats_test.go
@@ -0,0 +1,35 @@
package storemanager

import (
"github.com/spf13/afero"
"github.com/stretchr/testify/assert"
"path"
"testing"
)

func Test_GetDirSizeRecursive(t *testing.T) {
fs := afero.Afero{Fs: afero.NewMemMapFs()}
fs.MkdirAll(someDir, 0700)
// 3 vlog files
fs.WriteFile(path.Join(someDir, "000010.vlog"), []byte("a"), 0700)
fs.WriteFile(path.Join(someDir, "000011.vlog"), []byte("aa"), 0700)
fs.WriteFile(path.Join(someDir, "000012.vlog"), []byte("aaaaa"), 0700)
// 4 sst files
fs.WriteFile(path.Join(someDir, "000070.sst"), []byte("zzzzzz"), 0700)
fs.WriteFile(path.Join(someDir, "000071.sst"), []byte("zzzzzzz"), 0700)
fs.WriteFile(path.Join(someDir, "000072.sst"), []byte("zzzzzzzz"), 0700)
fs.WriteFile(path.Join(someDir, "000073.sst"), []byte("zzzzzzzzz"), 0700)
// Other
fs.WriteFile(path.Join(someDir, "KEYREGISTRY"), []byte("u"), 0700)
fs.WriteFile(path.Join(someDir, "MANIFEST"), []byte("u"), 0700)

subDir := path.Join(someDir, "subDir")
fs.Mkdir(subDir, 0700)
fs.WriteFile(path.Join(subDir, "randomFile"), []byte("abc"), 0700)

fileSize, extFileCount, extByteCount, err := getDirSizeRecursive(someDir, &fs)
assert.Nil(t, err)
assert.Equal(t, uint64(43), fileSize)
assert.Equal(t, map[string]int(map[string]int{"": 3, ".sst": 4, ".vlog": 3}), extFileCount)
assert.Equal(t, map[string]uint64(map[string]uint64{"": 5, ".sst": 30, ".vlog": 8}), extByteCount)
}

0 comments on commit 4b0a27e

Please sign in to comment.