Skip to content

Commit

Permalink
Merge 00b9824 into a19b756
Browse files Browse the repository at this point in the history
  • Loading branch information
A1bemuth committed Mar 16, 2020
2 parents a19b756 + 00b9824 commit 9d73e31
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 10 deletions.
6 changes: 4 additions & 2 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type RedisConfig struct {
// Redis node port
Port string `yaml:"port"`
// Redis database
DB int `yaml:"dbid"`
ConnectionLimit int `yaml:"connection_limit"`
DB int `yaml:"dbid"`
ConnectionLimit int `yaml:"connection_limit"`
AllowSlaveReads bool `yaml:"allow_slave_reads"`
}

// GetSettings returns redis config parsed from moira config files
Expand All @@ -41,6 +42,7 @@ func (config *RedisConfig) GetSettings() redis.Config {
Port: config.Port,
DB: config.DB,
ConnectionLimit: config.ConnectionLimit,
AllowSlaveReads: config.AllowSlaveReads,
}
}

Expand Down
1 change: 1 addition & 0 deletions database/redis/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Config struct {
Port string
DB int
ConnectionLimit int
AllowSlaveReads bool
}
54 changes: 49 additions & 5 deletions database/redis/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ type DbConnector struct {
metricsCache *cache.Cache
sync *redsync.Redsync
source DBSource

slaveConnector *DbConnector
}

// NewDatabase creates Redis pool based on config
func NewDatabase(logger moira.Logger, config Config, source DBSource) *DbConnector {
poolDialer := newPoolDialer(logger, config)
poolDialer, slaveDialer := createPoolDialers(logger, config)

pool := &redis.Pool{
MaxIdle: config.ConnectionLimit,
Expand All @@ -68,8 +70,19 @@ func NewDatabase(logger moira.Logger, config Config, source DBSource) *DbConnect
Dial: poolDialer.Dial,
TestOnBorrow: poolDialer.Test,
}
var slavePool *redis.Pool
if slaveDialer != nil {
slavePool = &redis.Pool{
MaxIdle: config.ConnectionLimit,
MaxActive: config.ConnectionLimit,
Wait: true,
IdleTimeout: 240 * time.Second,
Dial: slaveDialer.Dial,
TestOnBorrow: slaveDialer.Test,
}
}

return &DbConnector{
connector := &DbConnector{
pool: pool,
logger: logger,
retentionCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
Expand All @@ -78,12 +91,35 @@ func NewDatabase(logger moira.Logger, config Config, source DBSource) *DbConnect
sync: redsync.New([]redsync.Pool{syncPool}),
source: source,
}

if slavePool != nil {
slaveConnector := &DbConnector{
pool: slavePool,
retentionCache: connector.retentionCache,
retentionSavingCache: connector.retentionSavingCache,
metricsCache: connector.metricsCache,
sync: connector.sync,
source: connector.source,
}
connector.slaveConnector = slaveConnector
}

return connector
}

// AllowStale returns a database instance, that prioritizes connections to slave nodes
// Should only be used for read accesses when data actuality is not needed
func (connector *DbConnector) AllowStale() moira.Database {
if connector.slaveConnector == nil {
return connector
}
return connector.slaveConnector
}

func newPoolDialer(logger moira.Logger, config Config) PoolDialer {
func createPoolDialers(logger moira.Logger, config Config) (mainDialer, slaveDialer PoolDialer) {
if config.MasterName != "" && len(config.SentinelAddresses) > 0 {
logger.Infof("Redis: Sentinel for name: %v, DB: %v", config.MasterName, config.DB)
return NewSentinelPoolDialer(
sentinelDialer := NewSentinelPoolDialer(
logger,
SentinelPoolDialerConfig{
MasterName: config.MasterName,
Expand All @@ -92,15 +128,23 @@ func newPoolDialer(logger moira.Logger, config Config) PoolDialer {
DialTimeout: dialTimeout,
},
)
if !config.AllowSlaveReads {
return sentinelDialer, nil
}
logger.Info("Redis: Sentinel slaves pooling enabled")
slaveDialer := NewSentinelSlavePoolDialer(sentinelDialer)

return sentinelDialer, slaveDialer
}

serverAddr := net.JoinHostPort(config.Host, config.Port)
logger.Infof("Redis: %v, DB: %v", serverAddr, config.DB)
return &DirectPoolDialer{
mainDialer = &DirectPoolDialer{
serverAddress: serverAddr,
db: config.DB,
dialTimeout: dialTimeout,
}
return
}

func (connector *DbConnector) makePubSubConnection(channel string) (*redis.PubSubConn, error) {
Expand Down
58 changes: 58 additions & 0 deletions database/redis/database_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,64 @@ func (dialer *SentinelPoolDialer) Test(c redis.Conn, t time.Time) error {
return nil
}

//NewSentinelPoolDialer returns new SentinelPoolDialer
func NewSentinelSlavePoolDialer(sentinelDialer *SentinelPoolDialer) *SentinelSlavePoolDialer {
slaveDialer := &SentinelSlavePoolDialer{
SentinelPoolDialer: sentinelDialer,
}
return slaveDialer
}

// SentinelSlavePoolDialer connects to Redis via sentinel prioritizing slave servers
type SentinelSlavePoolDialer struct {
*SentinelPoolDialer
}

// Dial tries connecting to slaves
// If there are no slaves available, a connection to master is returned
func (dialer *SentinelSlavePoolDialer) Dial() (redis.Conn, error) {
slaves, err := dialer.sentinel.SlaveAddrs()
if err != nil {
return nil, err
}
if len(slaves) == 0 {
dialer.logger.Debug("No redis slaves available, connecting to master")
return dialer.SentinelPoolDialer.Dial()
}

var conn redis.Conn
for _, slaveAddr := range slaves {
conn, err = redis.Dial(
"tcp",
slaveAddr,
redis.DialDatabase(dialer.config.DB),
redis.DialConnectTimeout(dialer.config.DialTimeout),
)
if err == nil {
dialer.logger.Debugf("Connected to slave node %s", slaveAddr)
break
} else {
dialer.logger.Warningf("Connecting to slave %s failed, error: %s", slaveAddr, err.Error())
}
}
if err != nil {
return dialer.SentinelPoolDialer.Dial()
}

// required for redis cluster, but will fail for simple replicas
_, err = redis.String(conn.Do("READONLY"))
if err != nil && err.Error() != "ERR This instance has cluster support disabled" {
dialer.logger.Warning("Switching to readonly mode failed, error: %s", err.Error())
}

return conn, nil
}

// Test checks if connection is alive
func (dialer *SentinelSlavePoolDialer) Test(c redis.Conn, t time.Time) error {
return c.Err()
}

func (dialer *SentinelPoolDialer) discoverLoop() {
checkTicker := time.NewTicker(30 * time.Second)
defer checkTicker.Stop()
Expand Down
70 changes: 70 additions & 0 deletions database/redis/database_dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package redis

import (
"testing"
"time"

"github.com/moira-alert/moira/logging/go-logging"
. "github.com/smartystreets/goconvey/convey"
)

var logger, _ = logging.ConfigureLog("stdout", "debug", "test")

var sentinelConfig = SentinelPoolDialerConfig{
MasterName: "master01",
DB: 0,
DialTimeout: time.Millisecond,
SentinelAddresses: []string{
"fake-sentinel:26379",
},
}

func TestDirectDialer(t *testing.T) {
Convey("Direct dialer", t, func() {
Convey("Tries dial and fails", func() {
dialer := DirectPoolDialer {
serverAddress: "fake-redis:6379",
db: 0,
dialTimeout: time.Millisecond,
}

_, err := dialer.Dial()

So(err.Error(), ShouldEqual, "dial tcp: i/o timeout")
})

Convey("Test dials successfully", func() {
dialer := DirectPoolDialer {
serverAddress: "localhost:6379",
db: 0,
dialTimeout: 5 * time.Second,
}

conn, err := dialer.Dial()

So(err, ShouldBeNil)

err = dialer.Test(conn, time.Now())
So(err, ShouldBeNil)
})
})
}

func TestSentinelDialer(t *testing.T) {
dialer := NewSentinelPoolDialer(logger, sentinelConfig)

Convey("Tries dial and fails", t, func() {
_, err := dialer.Dial()
So(err.Error(), ShouldEqual, "redigo: no sentinels available; last error: dial tcp: i/o timeout")
})
}

func TestSlaveDialer(t *testing.T) {
dialer := NewSentinelPoolDialer(logger, sentinelConfig)
slaveDialer := NewSentinelSlavePoolDialer(dialer)

Convey("Tries dial and fails", t, func() {
_, err := slaveDialer.Dial()
So(err.Error(), ShouldEqual, "redigo: no sentinels available; last error: dial tcp: i/o timeout")
})
}
33 changes: 33 additions & 0 deletions database/redis/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,36 @@ func TestInitialization(t *testing.T) {
So(err, ShouldNotBeNil)
})
}

func TestAllowStale(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "info", "test")

Convey("Allow stale", t, func() {
Convey("When using redis directly, returns same db", func() {
database := newTestDatabase(logger, emptyConfig)

staleDatabase := database.AllowStale()

So(staleDatabase, ShouldPointTo, database)
})

Convey("When using sentinel, returns slave db instance, retains references", func() {
sentinelConfig := Config{
MasterName: "mstr",
SentinelAddresses: []string{"addr.ru"},
DB: 0,
AllowSlaveReads: true,
}
database := newTestDatabase(logger, sentinelConfig)

staleDatabase := database.AllowStale()

So(staleDatabase, ShouldNotPointTo, database)
staleConnector := staleDatabase.(*DbConnector)
So(staleConnector.metricsCache, ShouldPointTo, database.metricsCache)
So(staleConnector.retentionCache, ShouldPointTo, database.retentionCache)
So(staleConnector.retentionSavingCache, ShouldPointTo, database.retentionSavingCache)
So(staleConnector.sync, ShouldPointTo, database.sync)
})
})
}
4 changes: 2 additions & 2 deletions database/redis/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,8 @@ func (connector *DbConnector) getTriggerWithTags(triggerRaw interface{}, tagsRaw
return trigger, nil
}

func (connector *DbConnector) cleanupPatternsOutOfUse(pattern []string) error {
for _, pattern := range pattern {
func (connector *DbConnector) cleanupPatternsOutOfUse(patterns []string) error {
for _, pattern := range patterns {
triggerIDs, err := connector.GetPatternTriggerIDs(pattern)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion filter/patterns_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewPatternStorage(database moira.Database, metrics *metrics.FilterMetrics,

// Refresh builds pattern's indexes from redis data
func (storage *PatternStorage) Refresh() error {
newPatterns, err := storage.database.GetPatterns()
newPatterns, err := storage.database.AllowStale().GetPatterns()
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions filter/patterns_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestProcessIncomingMetric(t *testing.T) {

mockCtrl := gomock.NewController(t)
database := mock_moira_alert.NewMockDatabase(mockCtrl)
database.EXPECT().AllowStale().AnyTimes().Return(database)
logger, _ := logging.GetLogger("Scheduler")

Convey("Create new pattern storage, GetPatterns returns error, should error", t, func() {
Expand Down
3 changes: 3 additions & 0 deletions interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

// Database implements DB functionality
type Database interface {
// Get database instance for requests that do not require realtime data
AllowStale() Database

// SelfState
UpdateMetricsHeartbeat() error
GetMetricsUpdatesCount() (int64, error)
Expand Down
14 changes: 14 additions & 0 deletions mock/moira-alert/database.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions perfomance_tests/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func BenchmarkProcessIncomingMetric(b *testing.B) {

mockCtrl := gomock.NewController(b)
database := mock_moira_alert.NewMockDatabase(mockCtrl)
database.EXPECT().AllowStale().AnyTimes().Return(database)
logger, _ := logging.GetLogger("Benchmark")

database.EXPECT().GetPatterns().Return(patterns, nil)
Expand Down

0 comments on commit 9d73e31

Please sign in to comment.