Skip to content

Commit

Permalink
chore: applying 1.6.1 hotfixes to main branch (#3044)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum authored Feb 28, 2023
1 parent 4832630 commit e6e9933
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 85 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## [1.6.1](https://github.com/rudderlabs/rudder-server/compare/v1.6.0...v1.6.1) (2023-02-28)


### Miscellaneous

* debugger's badgerdb cache optimisations ([#3042](https://github.com/rudderlabs/rudder-server/issues/3042)) ([34602c3](https://github.com/rudderlabs/rudder-server/commit/34602c34924f2f007874009f6fdc69bbfb1fae44))

## [1.6.0](https://github.com/rudderlabs/rudder-server/compare/v1.5.0...v1.6.0) (2023-02-23)


Expand Down
111 changes: 47 additions & 64 deletions services/debugger/cache/internal/badger/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package badger

import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
Expand All @@ -14,35 +13,33 @@ import (
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
rsync "github.com/rudderlabs/rudder-server/utils/sync"
"github.com/samber/lo"
)

/*
loadCacheConfig sets the properties of the cache after reading it from the config file.
This gives a feature of hot readability as well.
*/
func (e *Cache[E]) loadCacheConfig() {
config.RegisterDurationConfigVariable(30, &e.cleanupFreq, true, time.Second, "LiveEvent.cache.clearFreq") // default clearFreq is 15 seconds
config.RegisterIntConfigVariable(100, &e.limiter, true, 1, "LiveEvent.cache.limiter")
config.RegisterDurationConfigVariable(1, &e.ticker, false, time.Minute, "LiveEvent.cache.GCTime")
config.RegisterDurationConfigVariable(15, &e.queryTimeout, false, time.Second, "LiveEvent.cache.queryTimeout")
config.RegisterIntConfigVariable(3, &e.retries, false, 1, "LiveEvent.cache.retries")
config.RegisterFloat64ConfigVariable(0.5, &e.gcDiscardRatio, false, "LiveEvent.cache.gcDiscardRatio")
config.RegisterIntConfigVariable(1, &e.numMemtables, false, 1, "LiveEvent.cache.NumMemtables")
config.RegisterIntConfigVariable(0, &e.numVersionsToKeep, false, 1, "LiveEvent.cache.NumVersionsToKeep")
config.RegisterIntConfigVariable(5, &e.numLevelZeroTables, false, 1, "LiveEvent.cache.NumLevelZeroTables")
config.RegisterIntConfigVariable(10, &e.numLevelZeroTablesStall, false, 1, "LiveEvent.cache.NumLevelZeroTablesStall")
config.RegisterBoolConfigVariable(false, &e.syncWrites, false, "LiveEvent.cache.SyncWrites")
config.RegisterDurationConfigVariable(30, &e.cleanupFreq, true, time.Second, "LiveEvent.cache."+e.origin+".clearFreq", "LiveEvent.cache.clearFreq") // default clearFreq is 30 seconds
config.RegisterIntConfigVariable(100, &e.limiter, true, 1, "LiveEvent.cache."+e.origin+".limiter", "LiveEvent.cache.limiter")
config.RegisterDurationConfigVariable(1, &e.ticker, false, time.Minute, "LiveEvent.cache."+e.origin+".GCTime", "LiveEvent.cache.GCTime")
config.RegisterDurationConfigVariable(15, &e.queryTimeout, false, time.Second, "LiveEvent.cache."+e.origin+".queryTimeout", "LiveEvent.cache.queryTimeout")
config.RegisterFloat64ConfigVariable(0.5, &e.gcDiscardRatio, false, "LiveEvent.cache."+e.origin+".gcDiscardRatio", "LiveEvent.cache.gcDiscardRatio")
config.RegisterIntConfigVariable(5, &e.numMemtables, false, 1, "LiveEvent.cache."+e.origin+".NumMemtables", "LiveEvent.cache.NumMemtables")
config.RegisterIntConfigVariable(5, &e.numLevelZeroTables, false, 1, "LiveEvent.cache."+e.origin+".NumLevelZeroTables", "LiveEvent.cache.NumLevelZeroTables")
config.RegisterIntConfigVariable(15, &e.numLevelZeroTablesStall, false, 1, "LiveEvent.cache."+e.origin+".NumLevelZeroTablesStall", "LiveEvent.cache.NumLevelZeroTablesStall")
// 512 bytes - prefer using Value Log over LSM tree
config.RegisterInt64ConfigVariable(512, &e.valueThreshold, false, 1, "LiveEvent.cache."+e.origin+".ValueThreshold", "LiveEvent.cache.ValueThreshold")
config.RegisterBoolConfigVariable(false, &e.syncWrites, false, "LiveEvent.cache."+e.origin+".SyncWrites", "LiveEvent.cache.SyncWrites")
}

/*
Cache is an in-memory cache. Each key-value pair stored in this cache have a TTL and one goroutine removes the
key-value pair form the cache which is older than TTL time.
*/
type Cache[E any] struct {
plocker *rsync.PartitionLocker
limiter int
retries int
path string
origin string
done chan struct{}
Expand All @@ -52,9 +49,9 @@ type Cache[E any] struct {
cleanupFreq time.Duration // TTL time on badgerDB
gcDiscardRatio float64
numMemtables int
numVersionsToKeep int
numLevelZeroTables int
numLevelZeroTablesStall int
valueThreshold int64
syncWrites bool
db *badger.DB
logger logger.Logger
Expand All @@ -74,67 +71,53 @@ func (badgerLogger) Warningf(format string, a ...interface{}) {

// Update writes the entries into badger db with a TTL
func (e *Cache[E]) Update(key string, value E) error {
e.plocker.Lock(key)
defer e.plocker.Unlock(key)
operation := func() error {
return e.db.Update(func(txn *badger.Txn) error {
res, err := txn.Get([]byte(key))
if err != nil && err != badger.ErrKeyNotFound {
return err
}
var values []E
if res != nil {
if err = res.Value(func(val []byte) error {
return json.Unmarshal(val, &values)
}); err != nil {
return err
}
}
if len(values) >= e.limiter {
values = values[len(values)-e.limiter+1:]
}
values = append(values, value)
data, err := json.Marshal(values)
if err != nil {
return err
}
entry := badger.NewEntry([]byte(key), data).WithTTL(e.cleanupFreq)
return txn.SetEntry(entry)
})
}
var err error
for i := 0; i < e.retries; i++ {
if err = operation(); !errors.Is(err, badger.ErrConflict) {
return e.db.Update(func(txn *badger.Txn) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
e.logger.Warnf("Retrying update func because of ErrConflict %d", i+1)
}
return err
entry := badger.NewEntry([]byte(key), data).WithTTL(e.cleanupFreq)
return txn.SetEntry(entry)
})
}

// Read fetches all the entries for a given key from badgerDB
func (e *Cache[E]) Read(key string) ([]E, error) {
var values []E
err := e.db.View(func(txn *badger.Txn) error {
item, err := txn.Get([]byte(key))
if err != nil {
return err
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = e.limiter
itr := txn.NewKeyIterator([]byte(key), opts)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
if itr.Item().IsDeletedOrExpired() {
break
}
var value E
if err := itr.Item().Value(func(val []byte) error {
return json.Unmarshal(val, &value)
}); err == nil { // ignore unmarshal errors (old version of the data)
values = append(values, value)
}
if len(values) >= e.limiter {
break
}
}
return item.Value(func(val []byte) error {
return json.Unmarshal(val, &values)
})
return nil
})
if errors.Is(err, badger.ErrKeyNotFound) {
return nil, nil
if err == nil {
_ = e.db.Update(func(txn *badger.Txn) error {
return txn.Delete([]byte(key))
})
}
return values, err

return lo.Reverse(values), err
}

func New[E any](origin string, logger logger.Logger, opts ...func(Cache[E])) (*Cache[E], error) {
e := Cache[E]{
plocker: rsync.NewPartitionLocker(),
origin: origin,
logger: logger,
origin: origin,
logger: logger,
}
e.loadCacheConfig()
badgerPathName := e.origin + "/cache/badgerdbv3"
Expand All @@ -158,12 +141,12 @@ func New[E any](origin string, logger logger.Logger, opts ...func(Cache[E])) (*C
WithIndexCacheSize(16 << 20). // 16mb
WithNumGoroutines(1).
WithNumMemtables(e.numMemtables).
WithValueThreshold(e.valueThreshold).
WithBlockCacheSize(0).
WithNumVersionsToKeep(e.numVersionsToKeep).
WithNumVersionsToKeep(e.limiter).
WithNumLevelZeroTables(e.numLevelZeroTables).
WithNumLevelZeroTablesStall(e.numLevelZeroTablesStall).
WithSyncWrites(e.syncWrites)

e.db, err = badger.Open(badgerOpts)
if err != nil {
e.logger.Errorf("Error while opening badgerDB: %v", err)
Expand Down
91 changes: 70 additions & 21 deletions services/debugger/cache/internal/badger/badger_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package badger

import (
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/stretchr/testify/assert"
Expand All @@ -15,18 +17,20 @@ var _ = Describe("cache", func() {
testKey := "test_key"
testValue1 := []byte("test_value1")
testValue2 := []byte("test_value2")
testValue3 := []byte("test_value3")
testValue4 := []byte("test_value4")
var e *Cache[[]byte]
var err error

BeforeEach(func() {
misc.Init()
GinkgoT().Setenv("RSERVER_LIVE_EVENT_CACHE_CLEAR_FREQ", "1")
GinkgoT().Setenv("RSERVER_LIVE_EVENT_CACHE_GCTIME", "1s")
config.Set("LiveEvent.cache.clearFreq", "1s")
e, err = New[[]byte]("test", logger.NewLogger())
Expect(err).To(BeNil())
})

AfterEach(func() {
config.Reset()
_ = e.Stop()
})

Expand All @@ -37,45 +41,90 @@ var _ = Describe("cache", func() {

It("Cache update", func() {
Expect(e.Update(testKey, testValue1)).To(BeNil())

val, err := e.Read(testKey)
Expect(len(val)).To(Equal(1))
Expect(err).To(BeNil())
Expect(val[0]).To(Equal(testValue1))
Eventually(func() int {
val, err := e.Read(testKey)
Expect(err).To(BeNil())
return len(val)
}, 6*time.Second).Should(Equal(0))
Expect(val).To(Equal([][]byte{testValue1}))

val, err = e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(val)).To(Equal(0))
})

It("Cache readAndPopData", func() {
Expect(e.Update(testKey, testValue1)).To(BeNil())
Expect(e.Update(testKey, testValue2)).To(BeNil())
Expect(e.Update(testKey, testValue3)).To(BeNil())
Expect(e.Update(testKey, testValue4)).To(BeNil())
v, err := e.Read(testKey)
Expect(len(v)).To(Equal(2))
Expect(err).To(BeNil())
assert.ElementsMatch(GinkgoT(), v, [][]byte{testValue1, testValue2})
Eventually(func() int {
val, err := e.Read(testKey)
Expect(err).To(BeNil())
return len(val)
}, 6*time.Second).Should(Equal(0))
Expect(len(v)).To(Equal(4))
Expect(v).To(Equal([][]byte{testValue1, testValue2, testValue3, testValue4}))

v, err = e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(0))
})

It("Cache read when multiple keys exist", func() {
testKey2 := testKey + "_2"
Expect(e.Update(testKey2, testValue1)).To(BeNil())
Expect(e.Update(testKey, testValue1)).To(BeNil())

Expect(e.Update(testKey2, testValue2)).To(BeNil())
Expect(e.Update(testKey, testValue2)).To(BeNil())

Expect(e.Update(testKey2, testValue3)).To(BeNil())
Expect(e.Update(testKey, testValue3)).To(BeNil())

Expect(e.Update(testKey2, testValue4)).To(BeNil())
Expect(e.Update(testKey, testValue4)).To(BeNil())

Expect(e.Update(testKey2, testValue1)).To(BeNil())

v, err := e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(4))
Expect(v).To(Equal([][]byte{testValue1, testValue2, testValue3, testValue4}))

v, err = e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(0))

v, err = e.Read(testKey2)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(5))
Expect(v).To(Equal([][]byte{testValue1, testValue2, testValue3, testValue4, testValue1}))

v, err = e.Read(testKey2)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(0))
})

It("should only keep the last x values for the same key", func() {
var values [][]byte
for i := 0; i < e.limiter; i++ {
Expect(e.Update(testKey, testValue1)).To(BeNil())
values = append(values, testValue1)
for i := 0; i <= e.limiter; i++ {
value := []byte(fmt.Sprintf("test_value_%d", i))
Expect(e.Update(testKey, value)).To(BeNil())
values = append(values, value)
}
Expect(e.Update(testKey, testValue2)).To(BeNil())
values = append(values, testValue2)

v, err := e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(e.limiter))

assert.ElementsMatch(GinkgoT(), v, values[1:])
Expect(v).To(Equal(values[1:]))
})

It("Cache expiry", func() {
Expect(e.Update(testKey, testValue1)).To(BeNil())
time.Sleep(e.cleanupFreq)
Expect(e.Update(testKey, testValue2)).To(BeNil())
v, err := e.Read(testKey)
Expect(err).To(BeNil())
Expect(len(v)).To(Equal(1))
assert.ElementsMatch(GinkgoT(), v, [][]byte{testValue2})
})
})
})

0 comments on commit e6e9933

Please sign in to comment.