Skip to content

Commit

Permalink
Merge 06c9e33 into 83affb6
Browse files Browse the repository at this point in the history
  • Loading branch information
Nixolay committed Aug 3, 2019
2 parents 83affb6 + 06c9e33 commit 3469af1
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 31 deletions.
25 changes: 25 additions & 0 deletions index/actualizer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package index

import (
"errors"
"testing"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -48,6 +49,21 @@ func TestIndex_actualize(t *testing.T) {
So(docCount, ShouldEqual, int64(18))
})

Convey("Test verification of error handling when receiving: FetchTriggersToReindex", func() {
expected := errors.New("err fetch trigers to reindex")
dataBase.EXPECT().FetchTriggersToReindex(fakeTS).Return(nil, expected)

err := index.actualizeIndex()
So(err, ShouldResemble, expected)
})

Convey("Test handling nil len: triggerToReindexIDs", func() {
dataBase.EXPECT().FetchTriggersToReindex(fakeTS).Return(nil, nil)

err := index.actualizeIndex()
So(err, ShouldBeNil)
})

Convey("Test addition", func() {
dataBase.EXPECT().FetchTriggersToReindex(fakeTS).Return(triggerIDs[18:20], nil)
dataBase.EXPECT().GetTriggerChecks(triggerIDs[18:20]).Return(triggerChecksPointers[18:20], nil)
Expand All @@ -67,5 +83,14 @@ func TestIndex_actualize(t *testing.T) {
docCount, _ := index.triggerIndex.GetCount()
So(docCount, ShouldEqual, int64(20))
})

Convey("Test verification of error handling when receiving: GetTriggerChecks", func() {
expected := errors.New("test error GetTriggerChecks")
dataBase.EXPECT().FetchTriggersToReindex(fakeTS).Return(triggerIDs[10:12], nil)
dataBase.EXPECT().GetTriggerChecks(triggerIDs[10:12]).Return(nil, expected)

err := index.actualizeIndex()
So(err, ShouldNotBeNil)
})
})
}
60 changes: 31 additions & 29 deletions index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func (index *Index) getTriggerChecksBatches(triggerIDsBatches [][]string) (trigg
go func(batch []string) {
defer wg.Done()
newBatch, err := index.database.GetTriggerChecks(batch)
if err != nil {
index.logger.Errorf("Cannot get trigger checks from DB: %s", err.Error())
newBatch, err = index.database.GetTriggerChecks(batch)
}
if err != nil {
errors <- err
return
Expand All @@ -41,38 +45,36 @@ func (index *Index) getTriggerChecksBatches(triggerIDsBatches [][]string) (trigg
func (index *Index) handleTriggerBatches(triggerChecksChan chan []*moira.TriggerCheck, getTriggersErrors chan error, toIndex int) error {
indexErrors := make(chan error)
wg := &sync.WaitGroup{}
defer wg.Wait()
var count int64
func() {
for {
select {
case batch, ok := <-triggerChecksChan:
if !ok {

for {
select {
case batch, ok := <-triggerChecksChan:
if !ok {
return nil
}
wg.Add(1)
go func(b []*moira.TriggerCheck) {
defer wg.Done()
err2 := index.triggerIndex.Write(b)
atomic.AddInt64(&count, int64(len(b)))
if err2 != nil {
indexErrors <- err2
return
}
wg.Add(1)
go func(b []*moira.TriggerCheck) {
defer wg.Done()
err2 := index.triggerIndex.Write(b)
atomic.AddInt64(&count, int64(len(b)))
if err2 != nil {
indexErrors <- err2
return
}
index.logger.Debugf("[%d triggers of %d] added to index", count, toIndex)
}(batch)
case err, ok := <-getTriggersErrors:
if ok {
index.logger.Errorf("Cannot get trigger checks from DB: %s", err.Error())
}
return
case err, ok := <-indexErrors:
if ok {
index.logger.Errorf("Cannot index trigger checks: %s", err.Error())
}
return
index.logger.Debugf("[%d triggers of %d] added to index", count, toIndex)
}(batch)
case err, ok := <-getTriggersErrors:
if ok {
index.logger.Errorf("Cannot get trigger checks from DB: %s", err.Error())
}
return err
case err, ok := <-indexErrors:
if ok {
index.logger.Errorf("Cannot index trigger checks: %s", err.Error())
}
return err
}
}()
wg.Wait()
return nil
}
}
2 changes: 1 addition & 1 deletion index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (index *Index) Start() error {
index.tomb.Go(index.checkIndexActualizationLag)
index.tomb.Go(index.checkIndexedTriggersCount)

return err
return nil
}

// IsReady returns boolean value which determines if index is ready to use
Expand Down
10 changes: 10 additions & 0 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ func TestIndex_CreateAndFill(t *testing.T) {
So(docCount, ShouldEqual, int64(32))
})

Convey("Test check error handling in the handleTriggerBatches", t, func() {
index := NewSearchIndex(logger, dataBase)

dataBase.EXPECT().GetTriggerChecks(triggerIDs[:20]).Return(triggerChecksPointers[:20], nil)
dataBase.EXPECT().GetTriggerChecks(triggerIDs[20:]).Return(triggerChecksPointers[20:], fmt.Errorf("test"))
dataBase.EXPECT().GetTriggerChecks(triggerIDs[20:]).Return(triggerChecksPointers[20:], fmt.Errorf("test"))
err := index.writeByBatches(triggerIDs, 20)
So(err, ShouldNotBeNil)
})

Convey("Test add Triggers to index where triggers are already presented", t, func() {
index := NewSearchIndex(logger, dataBase)

Expand Down
2 changes: 1 addition & 1 deletion vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1605,4 +1605,4 @@
}
],
"rootPath": "github.com/moira-alert/moira"
}
}

0 comments on commit 3469af1

Please sign in to comment.