Skip to content

Commit

Permalink
feat(database): Patterns set iteration
Browse files Browse the repository at this point in the history
Added a set iterator to use instead of SMEMBERS. Used it instead of
fetching full patterns set in one request.
  • Loading branch information
kodmi committed Mar 11, 2020
1 parent ee9f7b5 commit 891c0ff
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 6 deletions.
120 changes: 120 additions & 0 deletions database/redis/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package redis

import (
"errors"
"fmt"

"github.com/gomodule/redigo/redis"
)

var ErrFinished = errors.New("redis iterator reached its end")

const defaultBatchSize = uint64(20)

// SetIterator scans a set, returning all it's values
// May return duplicate values
// A value may get omitted if it were not constantly present in the collection during a full iteration
type SetIterator struct {
conn redis.Conn
setName string
dbIterator string
batchSize uint64

values []string
currIndex int
isFinished bool
}

// Next returns the next iterator value
// Returns empty string and ErrFinished if there are no more values
// Returns empty string and error when fetching values from db failed
func (i *SetIterator) Next() (string, error) {
val, err := i.nextValue()
if err == nil {
return val, err
}
if i.isFinished {
i.Close()
return "", ErrFinished
}
// making sure we skip empty responses
for {
i.dbIterator, i.values, err = i.receiveBatch()
if err != nil {
return "", fmt.Errorf("scanning the %s set failed, error: %v", i.setName, err)
}

i.currIndex = 0
i.isFinished = i.dbIterator == "0"

if len(i.values) != 0 || i.isFinished {
break
}
}

return i.nextValue()
}

// ReadToEnd iterates over the whole set and returns results
func (i *SetIterator) ReadToEnd() ([]string, error) {
setSize, err := redis.Int(i.conn.Do("SCARD", patternsListKey))
if err != nil {
return nil, fmt.Errorf("failed to get moira patterns, error: %v", err)
}
values := make([]string, 0, setSize)

for {
val, err := i.Next()
if err == ErrFinished {
break
}
if err != nil {
return nil, err
}
values = append(values, val)
}

return values, nil
}

// Close terminates the iterator's db connection
// Iterator closes automatically, once it is read to the end
func (i *SetIterator) Close() error {
err := i.conn.Close()
if err == nil || err.Error() == "redigo: closed" {
return nil
}
return err
}

func (i *SetIterator) nextValue() (string, error) {
if i.currIndex >= len(i.values) {
return "", ErrFinished
}
val := i.values[i.currIndex]
i.currIndex++
return val, nil
}

func (i *SetIterator) receiveBatch() (next string, values []string, err error) {
response, err := redis.Values(i.conn.Do("SSCAN", i.setName, i.dbIterator, "COUNT", i.getBatchSize()))
if err != nil {
return
}
next, err = redis.String(response[0], err)
if err != nil {
return
}
values, err = redis.Strings(response[1], err)
if err != nil {
return
}
return
}

func (i *SetIterator) getBatchSize() uint64 {
if i.batchSize == 0 {
return defaultBatchSize
}
return i.batchSize
}
126 changes: 126 additions & 0 deletions database/redis/iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package redis

import (
"testing"

"github.com/gomodule/redigo/redis"
"github.com/op/go-logging"
. "github.com/smartystreets/goconvey/convey"
)

const setName = "test-set"

func TestSetIterator(t *testing.T) {
logger, _ := logging.GetLogger("dataBase")
db := newTestDatabase(logger, config)
db.flush()
defer db.flush()

Convey("Set iteration", t, func() {
db.clearSet()

Convey("Empty set", func() {
iter := db.createIterator()

val, err := iter.Next()

So(val, ShouldBeEmpty)
So(err, ShouldEqual, ErrFinished)
})

Convey("Single item", func() {
item := "foo"
db.addToSet(item)
iter := db.createIterator()

val, err := iter.Next()
So(err, ShouldBeNil)
So(val, ShouldEqual, item)

_, err = iter.Next()
So(err, ShouldEqual, ErrFinished)
})

Convey("Multiple items, iterate via Next", func() {
db.fillSetWithAlphabet()
iter := db.createIterator()
values := []string{}

for {
val, err := iter.Next()
if err != nil {
So(err, ShouldEqual, ErrFinished)
break
}
values = append(values, val)
}

So(values, ShouldHaveLength, 26) //alphabet
})

Convey("Multiple items, read to end", func() {
db.fillSetWithAlphabet()
iter := db.createIterator()

values, err := iter.ReadToEnd()

So(err, ShouldBeNil)
So(values, ShouldHaveLength, 26) //alphabet
})

Convey("Multiple items, read to end with preset batch size", func() {
db.fillSetWithAlphabet()
iter := db.createIterator()
iter.batchSize = 60

values, err := iter.ReadToEnd()

So(err, ShouldBeNil)
So(values, ShouldHaveLength, 26) //alphabet
})

Convey("Close returns nil error, after iterator is closed", func() {
iter := db.createIterator()

_, err := iter.Next()
So(err, ShouldEqual, ErrFinished)

err = iter.Close()
So(err, ShouldBeNil)
})
})
}

func (c *DbConnector) clearSet() {
conn := c.pool.Get()
values, err := redis.Strings(conn.Do("SMEMBERS", setName))
if err != nil {
panic(err)
}
for _, val := range values {
conn.Send("SREM", setName, val)
}
conn.Flush()
}

func (c *DbConnector) createIterator() SetIterator {
return SetIterator{
conn: c.pool.Get(),
setName: setName,
dbIterator: "0",
}
}

func (c *DbConnector) addToSet(value string) {
conn := c.pool.Get()
_, err := redis.Int64(conn.Do("SADD", setName, value))
if err != nil {
panic(err)
}
}

func (c *DbConnector) fillSetWithAlphabet() {
for i := int32('A'); i <= int32('Z'); i++ {
c.addToSet(string(rune(i)))
}
}
17 changes: 16 additions & 1 deletion database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,22 @@ import (
"github.com/patrickmn/go-cache"
)

// GetPatterns gets updated patterns array
// IteratePatterns incrementally iterates the patterns set and returns the results
// Keep in mind that duplicates are possible
func (connector *DbConnector) IteratePatterns() ([]string, error) {
c := connector.pool.Get()
iter := SetIterator{
conn: c,
dbIterator: "0",
setName: patternsListKey,
batchSize: 500,
}
defer iter.Close()

return iter.ReadToEnd()
}

// GetPatterns fetches whole patterns set at once
func (connector *DbConnector) GetPatterns() ([]string, error) {
c := connector.pool.Get()
defer c.Close()
Expand Down
2 changes: 1 addition & 1 deletion filter/matched_metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (matcher *MetricsMatcher) Start(matchedMetricsChan chan *moira.MatchedMetri
matcher.metrics.SavingTimer.UpdateSince(timer)
}
}()
matcher.logger.Infof("Moira Filter Metrics Matcher started to save %d cached metrics every %s", matcher.cacheCapacity, time.Second.Seconds())
matcher.logger.Infof("Moira Filter Metrics Matcher started to save %d cached metrics every %v seconds", matcher.cacheCapacity, time.Second.Seconds())
}

func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric) <-chan map[string]*moira.MatchedMetric {
Expand Down
2 changes: 1 addition & 1 deletion filter/patterns_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewPatternStorage(database moira.Database, metrics *metrics.FilterMetrics,

// Refresh builds pattern's indexes from redis data
func (storage *PatternStorage) Refresh() error {
newPatterns, err := storage.database.GetPatterns()
newPatterns, err := storage.database.IteratePatterns()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions filter/patterns_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func TestProcessIncomingMetric(t *testing.T) {
logger, _ := logging.GetLogger("Scheduler")

Convey("Create new pattern storage, GetPatterns returns error, should error", t, func() {
database.EXPECT().GetPatterns().Return(nil, fmt.Errorf("some error here"))
database.EXPECT().IteratePatterns().Return(nil, fmt.Errorf("some error here"))
metrics := metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry())
_, err := NewPatternStorage(database, metrics, logger)
So(err, ShouldBeError, fmt.Errorf("some error here"))
})

database.EXPECT().GetPatterns().Return(testPatterns, nil)
database.EXPECT().IteratePatterns().Return(testPatterns, nil)
patternsStorage, err := NewPatternStorage(database, metrics.ConfigureFilterMetrics(metrics.NewDummyRegistry()), logger)

Convey("Create new pattern storage, should no error", t, func() {
Expand Down
1 change: 1 addition & 0 deletions helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func ChunkSlice(original []string, chunkSize int) (divided [][]string) {
return
}

// RoundToNearestRetention rounds a given unix timestamp to nearest multiple of retention
func RoundToNearestRetention(ts, retention int64) int64 {
return (ts + retention/2) / retention * retention
}
1 change: 1 addition & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Database interface {

// Patterns and metrics storing
GetPatterns() ([]string, error)
IteratePatterns() ([]string, error)
AddPatternMetric(pattern, metric string) error
GetPatternMetrics(pattern string) ([]string, error)
RemovePattern(pattern string) error
Expand Down
15 changes: 15 additions & 0 deletions mock/moira-alert/database.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion perfomance_tests/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func BenchmarkProcessIncomingMetric(b *testing.B) {
database := mock_moira_alert.NewMockDatabase(mockCtrl)
logger, _ := logging.GetLogger("Benchmark")

database.EXPECT().GetPatterns().Return(patterns, nil)
database.EXPECT().IteratePatterns().Return(patterns, nil)
patternsStorage, err := filter.NewPatternStorage(database, filterMetrics, logger)
if err != nil {
b.Errorf("Can not create new cache storage %s", err)
Expand Down

0 comments on commit 891c0ff

Please sign in to comment.