Skip to content

Commit

Permalink
Merge 887c351 into 166f565
Browse files Browse the repository at this point in the history
  • Loading branch information
Pliner committed Feb 18, 2019
2 parents 166f565 + 887c351 commit 2e8fdb0
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
4 changes: 4 additions & 0 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (connector *DbConnector) getMetricRetention(metric string) (int64, error) {

// SaveMetrics saves new metrics
func (connector *DbConnector) SaveMetrics(metrics map[string]*moira.MatchedMetric) error {
if len(metrics) == 0 {
return nil
}

c := connector.pool.Get()
defer c.Close()
for _, metric := range metrics {
Expand Down
9 changes: 5 additions & 4 deletions filter/cache_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package filter

import (
"bufio"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics/graphite"
"io"
"regexp"
"strconv"
"strings"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/metrics/graphite"
)

var defaultRetention = 60
Expand Down Expand Up @@ -47,14 +48,14 @@ func NewCacheStorage(logger moira.Logger, metrics *graphite.FilterMetrics, reade
}

// EnrichMatchedMetric calculate retention and filter cached values
func (storage *Storage) EnrichMatchedMetric(buffer map[string]*moira.MatchedMetric, m *moira.MatchedMetric) {
func (storage *Storage) EnrichMatchedMetric(batch map[string]*moira.MatchedMetric, m *moira.MatchedMetric) {
m.Retention = storage.getRetention(m)
m.RetentionTimestamp = roundToNearestRetention(m.Timestamp, int64(m.Retention))
if ex, ok := storage.metricsCache[m.Metric]; ok && ex.RetentionTimestamp == m.RetentionTimestamp && ex.Value == m.Value {
return
}
storage.metricsCache[m.Metric] = m
buffer[m.Metric] = m
batch[m.Metric] = m
}

// getRetention returns first matched retention for metric
Expand Down
47 changes: 32 additions & 15 deletions filter/matched_metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,51 @@ func NewMetricsMatcher(metrics *graphite.FilterMetrics, logger moira.Logger, dat

// Start process matched metrics from channel and save it in cache storage
func (matcher *MetricsMatcher) Start(matchedMetricsChan chan *moira.MatchedMetric) {
flushInterval := time.Second
matcher.waitGroup.Add(1)
go func() {
defer matcher.waitGroup.Done()
buffer := make(map[string]*moira.MatchedMetric)

for batch := range matcher.receiveBatch(matchedMetricsChan) {
timer := time.Now()
matcher.save(batch)
matcher.metrics.SavingTimer.UpdateSince(timer)
}
}()
matcher.logger.Infof("Moira Filter Metrics Matcher started to save %d cached metrics every %s", matcher.cacheCapacity, time.Second.Seconds())
}

func (matcher *MetricsMatcher) receiveBatch(metrics <-chan *moira.MatchedMetric) <-chan map[string]*moira.MatchedMetric {
batchedMetrics := make(chan map[string]*moira.MatchedMetric, 1)

go func() {
defer close(batchedMetrics)

batchTicker := time.NewTicker(time.Second)
defer batchTicker.Stop()

for {
batch := make(map[string]*moira.MatchedMetric)

retry:
select {
case metric, ok := <-matchedMetricsChan:
case metric, ok := <-metrics:
if !ok {
batchedMetrics <- batch
matcher.logger.Info("Moira Filter Metrics Matcher stopped")
return
}
matcher.cacheStorage.EnrichMatchedMetric(buffer, metric)
if len(buffer) < matcher.cacheCapacity {
continue
matcher.cacheStorage.EnrichMatchedMetric(batch, metric)
if len(batch) < matcher.cacheCapacity {
goto retry
}
case <-time.After(flushInterval):
batchedMetrics <- batch
case <-batchTicker.C:
batchedMetrics <- batch
}
if len(buffer) == 0 {
continue
}
timer := time.Now()
matcher.save(buffer)
matcher.metrics.SavingTimer.UpdateSince(timer)
buffer = make(map[string]*moira.MatchedMetric)
}
}()
matcher.logger.Infof("Moira Filter Metrics Matcher started to save %d cached metrics every %s", matcher.cacheCapacity, flushInterval.String())

return batchedMetrics
}

// Wait waits for metric matcher instance will stop
Expand Down

0 comments on commit 2e8fdb0

Please sign in to comment.