Skip to content

Commit

Permalink
Merge bbac2cc into e476540
Browse files Browse the repository at this point in the history
  • Loading branch information
Pliner committed Feb 16, 2019
2 parents e476540 + bbac2cc commit d4db077
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 28 deletions.
4 changes: 2 additions & 2 deletions checker/worker/lazy_triggers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (worker *Checker) fillLazyTriggerIDs() error {
for _, triggerID := range triggerIDs {
newLazyTriggerIDs[triggerID] = true
}
worker.lazyTriggerIDs = newLazyTriggerIDs
worker.Metrics.UnusedTriggersCount.Update(int64(len(worker.lazyTriggerIDs)))
worker.lazyTriggerIDs.Store(newLazyTriggerIDs)
worker.Metrics.UnusedTriggersCount.Update(int64(len(newLazyTriggerIDs)))
return nil
}

Expand Down
35 changes: 15 additions & 20 deletions checker/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,37 +46,32 @@ func (worker *Checker) handleMetricEvent(pattern string) error {
}

func (worker *Checker) addTriggerIDsIfNeeded(triggerIDs []string) {
needToCheckTriggerIDs := make([]string, len(triggerIDs))
for _, triggerID := range triggerIDs {
if worker.needHandleTrigger(triggerID) {
needToCheckTriggerIDs = append(needToCheckTriggerIDs, triggerID)
}
}
needToCheckTriggerIDs := worker.getTriggerIDsToCheck(triggerIDs)
if len(needToCheckTriggerIDs) > 0 {
worker.Database.AddLocalTriggersToCheck(needToCheckTriggerIDs)
}
}

func (worker *Checker) addRemoteTriggerIDsIfNeeded(triggerIDs []string) {
needToCheckRemoteTriggerIDs := make([]string, len(triggerIDs))
for _, triggerID := range triggerIDs {
if worker.needHandleTrigger(triggerID) {
needToCheckRemoteTriggerIDs = append(needToCheckRemoteTriggerIDs, triggerID)
}
}
needToCheckRemoteTriggerIDs := worker.getTriggerIDsToCheck(triggerIDs)
if len(needToCheckRemoteTriggerIDs) > 0 {
worker.Database.AddRemoteTriggersToCheck(needToCheckRemoteTriggerIDs)
}
}

func (worker *Checker) needHandleTrigger(triggerID string) bool {
if _, ok := worker.lazyTriggerIDs[triggerID]; ok {
randomDuration := worker.getRandomLazyCacheDuration()
err := worker.LazyTriggersCache.Add(triggerID, true, randomDuration)
if err != nil {
return false
func (worker *Checker) getTriggerIDsToCheck(triggerIDs []string) []string {
lazyTriggerIDs := worker.lazyTriggerIDs.Load().(map[string]bool)
triggerIDsToCheck := make([]string, len(triggerIDs))
for _, triggerID := range triggerIDs {
if _, ok := lazyTriggerIDs[triggerID]; ok {
randomDuration := worker.getRandomLazyCacheDuration()
if err := worker.LazyTriggersCache.Add(triggerID, true, randomDuration); err != nil {
continue
}
}
if err := worker.TriggerCache.Add(triggerID, true, cache.DefaultExpiration); err == nil {
triggerIDsToCheck = append(triggerIDsToCheck, triggerID)
}
}
err := worker.TriggerCache.Add(triggerID, true, cache.DefaultExpiration)
return err == nil
return triggerIDsToCheck
}
7 changes: 4 additions & 3 deletions checker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package worker

import (
"runtime"
"sync/atomic"
"time"

"github.com/moira-alert/moira/metric_source"
metricSource "github.com/moira-alert/moira/metric_source"
"github.com/moira-alert/moira/metric_source/remote"
"github.com/patrickmn/go-cache"
"gopkg.in/tomb.v2"
Expand All @@ -25,7 +26,7 @@ type Checker struct {
TriggerCache *cache.Cache
LazyTriggersCache *cache.Cache
PatternCache *cache.Cache
lazyTriggerIDs map[string]bool
lazyTriggerIDs atomic.Value
lastData int64
tomb tomb.Tomb
remoteEnabled bool
Expand All @@ -45,7 +46,7 @@ func (worker *Checker) Start() error {
return err
}

worker.lazyTriggerIDs = make(map[string]bool)
worker.lazyTriggerIDs.Store(make(map[string]bool))
worker.tomb.Go(worker.lazyTriggersWorker)

worker.tomb.Go(worker.runNodataChecker)
Expand Down
7 changes: 4 additions & 3 deletions filter/patterns_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"path"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"

Expand All @@ -20,7 +21,7 @@ type PatternStorage struct {
database moira.Database
metrics *graphite.FilterMetrics
logger moira.Logger
PatternTree *patternNode
PatternTree atomic.Value
}

// patternNode contains pattern node
Expand Down Expand Up @@ -86,7 +87,7 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte) *moira.Ma

// matchPattern returns array of matched patterns
func (storage *PatternStorage) matchPattern(metric []byte) []string {
currentLevel := []*patternNode{storage.PatternTree}
currentLevel := []*patternNode{storage.PatternTree.Load().(*patternNode)}
var found, index int
for i, c := range metric {
if c == '.' {
Expand Down Expand Up @@ -216,7 +217,7 @@ func (storage *PatternStorage) buildTree(patterns []string) error {
}
}

storage.PatternTree = newTree
storage.PatternTree.Store(newTree)
return nil
}

Expand Down

0 comments on commit d4db077

Please sign in to comment.