Skip to content

Commit

Permalink
Merge 6401a7c into 20c1b5a
Browse files Browse the repository at this point in the history
  • Loading branch information
Pliner committed Feb 4, 2019
2 parents 20c1b5a + 6401a7c commit 2c3295c
Show file tree
Hide file tree
Showing 22 changed files with 614 additions and 559 deletions.
60 changes: 12 additions & 48 deletions checker/worker/nodata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,20 @@ package worker

import (
"time"

w "github.com/moira-alert/moira/worker"
)

const nodataCheckerLockName = "moira-nodata-checker"
const nodataCheckerLockTTL = time.Second * 15

func (worker *Checker) noDataChecker(stop <-chan struct{}) error {
checkTicker := time.NewTicker(worker.Config.NoDataCheckInterval)
defer checkTicker.Stop()
worker.Logger.Info("NODATA checker started")
for {
select {
case <-stop:
checkTicker.Stop()
worker.Logger.Info("NODATA checker stopped")
return nil
case <-checkTicker.C:
Expand Down Expand Up @@ -39,53 +44,12 @@ func (worker *Checker) checkNoData() error {
// runNodataChecker starts NODATA checker and manages its subscription in Redis
// to make sure there is always only one working checker
func (worker *Checker) runNodataChecker() error {
var databaseMutexExpiry, singleCheckerStateExpiry time.Duration

databaseMutexExpiry = time.Second * 15
w.NewWorker(
"NOData checker",
worker.Logger,
worker.Database.NewLock(nodataCheckerLockName, nodataCheckerLockTTL),
worker.noDataChecker,
).Run(worker.tomb.Dying())

if worker.Config.NoDataCheckInterval > time.Minute {
singleCheckerStateExpiry = time.Second * 30
} else {
singleCheckerStateExpiry = worker.Config.NoDataCheckInterval / 2
}

stop := make(chan struct{})

firstCheck := true
go func() {
for {
if worker.Database.RegisterNodataCheckerIfAlreadyNot(databaseMutexExpiry) {
worker.Logger.Infof("Registered new NODATA checker, start checking triggers for NODATA")
go worker.noDataChecker(stop)
worker.renewRegistration(databaseMutexExpiry, stop)
continue
}
if firstCheck {
worker.Logger.Infof("NODATA checker already registered, trying for register every %v in loop", singleCheckerStateExpiry)
firstCheck = false
}
<-time.After(singleCheckerStateExpiry)
}
}()
return nil
}

// renewRegistration tries to renew NODATA-checker subscription
// and gracefully stops NODATA checker on fail to prevent multiple checkers running
func (worker *Checker) renewRegistration(ttl time.Duration, stop chan struct{}) {
renewTicker := time.NewTicker(ttl / 3)
for {
select {
case <-renewTicker.C:
if !worker.Database.RenewNodataCheckerRegistration() {
worker.Logger.Warningf("Could not renew registration for NODATA checker")
stop <- struct{}{}
return
}
case <-worker.tomb.Dying():
renewTicker.Stop()
stop <- struct{}{}
return
}
}
}
1 change: 0 additions & 1 deletion checker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (worker *Checker) checkMetricEventsChannelLen(ch <-chan *moira.MetricEvent)

// Stop stops checks triggers
func (worker *Checker) Stop() error {
worker.Database.DeregisterNodataChecker()
worker.tomb.Kill(nil)
return worker.tomb.Wait()
}
1 change: 0 additions & 1 deletion cmd/notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func main() {
if err := sender.RegisterSenders(database); err != nil {
logger.Fatalf("Can not configure senders: %s", err.Error())
}
defer database.DeregisterBots()

// Start moira self state checker
selfState := &selfstate.SelfCheckWorker{
Expand Down
9 changes: 9 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,12 @@ import "fmt"

// ErrNil return from database data storing methods if no object in DB
var ErrNil = fmt.Errorf("nil returned")

var (
// ErrLockAlreadyHeld is returned if we attempt to double acquire
ErrLockAlreadyHeld = fmt.Errorf("lock was already held")
// ErrLockAcquireInterrupted is returned if we cancel the acquire
ErrLockAcquireInterrupted = fmt.Errorf("lock's request was interrupted")
// ErrLockNotAcquired if we cannot acquire
ErrLockNotAcquired = fmt.Errorf("lock was not acquired")
)
49 changes: 1 addition & 48 deletions database/redis/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,9 @@ package redis

import (
"fmt"
"strings"
"time"

"github.com/garyburd/redigo/redis"
"gopkg.in/redsync.v1"

"github.com/moira-alert/moira/database"
"github.com/patrickmn/go-cache"
)

const (
botUsername = "moira-bot-host"
"strings"
)

// GetIDByUsername read ID of user by messenger username
Expand Down Expand Up @@ -50,44 +41,6 @@ func (connector *DbConnector) RemoveUser(messenger, username string) error {
return nil
}

// RegisterBotIfAlreadyNot creates registration of bot instance in redis
func (connector *DbConnector) RegisterBotIfAlreadyNot(messenger string, ttl time.Duration) bool {
mutex := connector.sync.NewMutex(usernameKey(messenger, botUsername), redsync.SetExpiry(ttl), redsync.SetTries(1))
if err := mutex.Lock(); err != nil {
return false
}
connector.messengersCache.Set(messenger, mutex, cache.NoExpiration)
return true
}

// RenewBotRegistration extends bot lock registrations for given ttl
func (connector *DbConnector) RenewBotRegistration(messenger string) bool {
mutexInterface, ok := connector.messengersCache.Get(messenger)
if !ok {
return false
}
mutex := mutexInterface.(*redsync.Mutex)
return mutex.Extend()
}

// DeregisterBots cancels registration for all registered messengers
func (connector *DbConnector) DeregisterBots() {
messengers := connector.messengersCache.Items()
for messenger := range messengers {
connector.DeregisterBot(messenger)
}
}

// DeregisterBot removes registration of bot instance in redis
func (connector *DbConnector) DeregisterBot(messenger string) bool {
mutexInterface, ok := connector.messengersCache.Get(messenger)
if !ok {
return false
}
mutex := mutexInterface.(*redsync.Mutex)
return mutex.Unlock()
}

func usernameKey(messenger, username string) string {
return fmt.Sprintf("moira-%s-users:%s", messenger, username)
}
170 changes: 1 addition & 169 deletions database/redis/bot_test.go
Original file line number Diff line number Diff line change
@@ -1,185 +1,20 @@
package redis

import (
"fmt"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
"testing"

"github.com/moira-alert/moira/database"
"github.com/moira-alert/moira/logging/go-logging"
)

func TestRenewBotRegistration(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "debug", "test")
dataBase := NewDatabase(logger, config, Notifier)
dataBase.flush()
defer dataBase.flush()

lockTTLMilliseconds := 3000
lockTTLDuration := time.Duration(lockTTLMilliseconds) * time.Millisecond

var firstLockString string
var secondLockString string
var testLockString string
var err error

Convey("Manage bot registrations", t, func() {
Convey("No registrations to renew", func() {
renewed := dataBase.RenewBotRegistration(messenger3)
So(renewed, ShouldBeFalse)
})
Convey("Just register, should be registered", func() {
registered := dataBase.RegisterBotIfAlreadyNot(messenger3, lockTTLDuration)
So(registered, ShouldBeTrue)
})
Convey("This messenger should be a temp user, with auto generated string", func() {
firstLockString, err = dataBase.GetIDByUsername(messenger3, botUsername)
So(err, ShouldBeNil)
So(firstLockString, ShouldNotBeEmpty)
fmt.Println(firstLockString)
})
Convey("Accidentally called Renew bot registration, should not be renewed", func() {
lockResults := testLockWithTTLExpireErrorExpected(logger, lockTTLMilliseconds, true, 2, func() bool {
return dataBase.RenewBotRegistration(messenger3)
})
So(lockResults[len(lockResults)-1], ShouldBeFalse)
})
Convey("Register second messenger, should be as temp user, with new string", func() {
lockResults := testLockWithTTLExpireErrorExpected(logger, lockTTLMilliseconds, false, 3, func() bool {
return dataBase.RegisterBotIfAlreadyNot(messenger3, lockTTLDuration)
})
So(lockResults, ShouldContain, true)

secondLockString, err = dataBase.GetIDByUsername(messenger3, botUsername)
So(err, ShouldBeNil)
So(secondLockString, ShouldNotBeEmpty)
So(firstLockString, ShouldNotResemble, secondLockString)
fmt.Println(secondLockString)
})
Convey("Renew bot registration, should be renewed", func() {
testLockString, err = dataBase.GetIDByUsername(messenger3, botUsername)
So(err, ShouldBeNil)
So(firstLockString, ShouldNotBeEmpty)
So(secondLockString, ShouldResemble, testLockString)

renewed := dataBase.RenewBotRegistration(messenger3)
So(renewed, ShouldBeTrue)
})
Convey("Accidentally called Renew bot registration again, should not be renewed", func() {
lockResults := testLockWithTTLExpireErrorExpected(logger, lockTTLMilliseconds, true, 2, func() bool {
return dataBase.RenewBotRegistration(messenger3)
})
So(lockResults[len(lockResults)-1], ShouldBeFalse)
})
})
}

func TestBotDataStoring(t *testing.T) {
logger, _ := logging.ConfigureLog("stdout", "info", "test")
dataBase := newTestDatabase(logger, config)
dataBase.flush()
defer dataBase.flush()

Convey("Messengers manipulation", t, func() {
Convey("Register-deregister messenger", func() {
Convey("Nothing to deregister", func() {
unlocked := dataBase.DeregisterBot(messenger2)
So(unlocked, ShouldBeFalse)
})

Convey("Just register, should be registered", func() {
actual := dataBase.RegisterBotIfAlreadyNot(messenger2, time.Second*30)
So(actual, ShouldBeTrue)
})

var firstLockString string
Convey("This messenger should be a temp user, with auto generated string", func() {
firstLockString, _ = dataBase.GetIDByUsername(messenger2, botUsername)
fmt.Println(firstLockString)
So(firstLockString, ShouldNotBeEmpty)
})

Convey("Register same messenger, should be not registered", func() {
actual := dataBase.RegisterBotIfAlreadyNot(messenger2, time.Second*30)
So(actual, ShouldBeFalse)
})

Convey("DeregisterBot should deregister it", func() {
unlocked := dataBase.DeregisterBot(messenger2)
So(unlocked, ShouldBeTrue)
})

Convey("And Register it again, should be as temp user, with new string", func() {
actual := dataBase.RegisterBotIfAlreadyNot(messenger2, time.Second*30)
So(actual, ShouldBeTrue)

secondLockString, err := dataBase.GetIDByUsername(messenger2, botUsername)
fmt.Println(secondLockString)
So(err, ShouldBeNil)
So(secondLockString, ShouldNotBeEmpty)
So(firstLockString, ShouldNotResemble, secondLockString)
})

Convey("Now deregister it via DeregisterBots and check for nil returned", func() {
dataBase.DeregisterBots()
actual, err := dataBase.GetIDByUsername(messenger2, botUsername)
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)
})
})

Convey("Register-deregister several messengers", func() {
dataBase.flush()

actual := dataBase.RegisterBotIfAlreadyNot(messenger1, time.Second*30)
So(actual, ShouldBeTrue)
actual = dataBase.RegisterBotIfAlreadyNot(messenger2, time.Second*30)
So(actual, ShouldBeTrue)
actual = dataBase.RegisterBotIfAlreadyNot(messenger3, time.Second*30)
So(actual, ShouldBeTrue)

Convey("All messengers should have temp user, with host name", func() {
actual, err := dataBase.GetIDByUsername(messenger1, botUsername)
So(err, ShouldBeNil)
So(actual, ShouldNotBeEmpty)

actual, err = dataBase.GetIDByUsername(messenger2, botUsername)
So(err, ShouldBeNil)
So(actual, ShouldNotBeEmpty)

actual, err = dataBase.GetIDByUsername(messenger3, botUsername)
So(err, ShouldBeNil)
So(actual, ShouldNotBeEmpty)
})

Convey("Now deregister one of messenges via DeregisterBot and check for deregistered flag and hostname in another", func() {
dataBase.DeregisterBot(messenger3)
actual, err := dataBase.GetIDByUsername(messenger3, botUsername)
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)

actual, err = dataBase.GetIDByUsername(messenger1, botUsername)
So(err, ShouldBeNil)
So(actual, ShouldNotBeEmpty)

actual, err = dataBase.GetIDByUsername(messenger2, botUsername)
So(err, ShouldBeNil)
So(actual, ShouldNotBeEmpty)
})

Convey("Now call DeregisterBots and check two another for deregistered flag", func() {
dataBase.DeregisterBots()
actual, err := dataBase.GetIDByUsername(messenger1, botUsername)
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)
actual, err = dataBase.GetIDByUsername(messenger2, botUsername)
So(err, ShouldResemble, database.ErrNil)
So(actual, ShouldBeEmpty)
})
})

Convey("Get-set usernames", func() {
Convey("Just set username to one of messengers", func() {
err := dataBase.SetUsernameID(messenger1, user1, "id1")
Expand Down Expand Up @@ -238,9 +73,6 @@ func TestBotDataStoringErrorConnection(t *testing.T) {

err = dataBase.RemoveUser(messenger2, user1)
So(err, ShouldNotBeNil)

actual2 := dataBase.RegisterBotIfAlreadyNot(messenger3, 30)
So(actual2, ShouldBeFalse)
})
}

Expand Down

0 comments on commit 2c3295c

Please sign in to comment.