Skip to content

Commit

Permalink
Merge branch 'fixfilter'
Browse files Browse the repository at this point in the history
  • Loading branch information
borovskyav committed Mar 27, 2018
2 parents dbb0f91 + fcf7348 commit a8e8001
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cmd/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func main() {
defer stopHeartbeatWorker(heartbeatWorker)

// Start metrics listener
listener, err := connection.NewListener(config.Filter.Listen, logger, patternStorage)
listener, err := connection.NewListener(config.Filter.Listen, logger, cacheMetrics, patternStorage)
if err != nil {
logger.Fatalf("Failed to start listen: %s", err.Error())
}
Expand Down
26 changes: 14 additions & 12 deletions database/redis/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@ const (

// DbConnector contains redis pool
type DbConnector struct {
pool *redis.Pool
logger moira.Logger
retentionCache *cache.Cache
metricsCache *cache.Cache
messengersCache *cache.Cache
sync *redsync.Redsync
pool *redis.Pool
logger moira.Logger
retentionCache *cache.Cache
retentionSavingCache *cache.Cache
metricsCache *cache.Cache
messengersCache *cache.Cache
sync *redsync.Redsync
}

// NewDatabase creates Redis pool based on config
func NewDatabase(logger moira.Logger, config Config) *DbConnector {
pool := newRedisPool(logger, config)
return &DbConnector{
pool: pool,
logger: logger,
retentionCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
metricsCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
messengersCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
sync: redsync.New([]redsync.Pool{pool}),
pool: pool,
logger: logger,
retentionCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
retentionSavingCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
metricsCache: cache.New(cacheValueExpirationDuration, cacheCleanupInterval),
messengersCache: cache.New(cache.NoExpiration, cache.DefaultExpiration),
sync: redsync.New([]redsync.Pool{pool}),
}
}

Expand Down
6 changes: 5 additions & 1 deletion database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/database"
"github.com/moira-alert/moira/database/redis/reply"
"github.com/patrickmn/go-cache"
)

// GetPatterns gets updated patterns array
Expand Down Expand Up @@ -96,7 +97,10 @@ func (connector *DbConnector) SaveMetrics(metrics map[string]*moira.MatchedMetri
for _, metric := range metrics {
metricValue := fmt.Sprintf("%v %v", metric.Timestamp, metric.Value)
c.Send("ZADD", metricDataKey(metric.Metric), metric.RetentionTimestamp, metricValue)
c.Send("SET", metricRetentionKey(metric.Metric), metric.Retention)

if err := connector.retentionSavingCache.Add(metric.Metric, true, cache.DefaultExpiration); err == nil {
c.Send("SET", metricRetentionKey(metric.Metric), metric.Retention)
}

for _, pattern := range metric.Patterns {
c.Send("SADD", patternMetricsKey(pattern), metric.Metric)
Expand Down
21 changes: 19 additions & 2 deletions filter/connection/listening.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/filter"
"github.com/moira-alert/moira/metrics/graphite"
)

// MetricsListener is facade for standard net.MetricsListener and accept connection for handling it
Expand All @@ -17,10 +18,11 @@ type MetricsListener struct {
handler *Handler
logger moira.Logger
tomb tomb.Tomb
metrics *graphite.FilterMetrics
}

// NewListener creates new listener
func NewListener(port string, logger moira.Logger, patternStorage *filter.PatternStorage) (*MetricsListener, error) {
func NewListener(port string, logger moira.Logger, metrics *graphite.FilterMetrics, patternStorage *filter.PatternStorage) (*MetricsListener, error) {
address, err := net.ResolveTCPAddr("tcp", port)
if nil != err {
return nil, fmt.Errorf("Failed to resolve tcp address [%s]: %s", port, err.Error())
Expand All @@ -33,14 +35,15 @@ func NewListener(port string, logger moira.Logger, patternStorage *filter.Patter
listener: newListener,
logger: logger,
handler: NewConnectionsHandler(logger, patternStorage),
metrics: metrics,
}
return &listener, nil
}

// Listen waits for new data in connection and handles it in ConnectionHandler
// All handled data sets to metricsChan
func (listener *MetricsListener) Listen() chan *moira.MatchedMetric {
metricsChan := make(chan *moira.MatchedMetric, 10000)
metricsChan := make(chan *moira.MatchedMetric, 16384)
listener.tomb.Go(func() error {
for {
select {
Expand Down Expand Up @@ -68,10 +71,24 @@ func (listener *MetricsListener) Listen() chan *moira.MatchedMetric {
listener.handler.HandleConnection(conn, metricsChan)
}
})

listener.tomb.Go(func() error { return listener.checkNewMetricsChannelLen(metricsChan) })
listener.logger.Info("Moira Filter Listener Started")
return metricsChan
}

func (listener *MetricsListener) checkNewMetricsChannelLen(channel <-chan *moira.MatchedMetric) error {
checkTicker := time.NewTicker(time.Millisecond * 100)
for {
select {
case <-listener.tomb.Dying():
return nil
case <-checkTicker.C:
listener.metrics.MetricChannelLen.Update(int64(len(channel)))
}
}
}

// Stop stops listening connection
func (listener *MetricsListener) Stop() error {
listener.tomb.Kill(nil)
Expand Down
2 changes: 1 addition & 1 deletion filter/matched_metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (matcher *MetricsMatcher) Start(channel chan *moira.MatchedMetric) {
return
}
matcher.cacheStorage.EnrichMatchedMetric(buffer, metric)
if len(buffer) < 10 {
if len(buffer) < 100 {
continue
}
case <-time.After(time.Second):
Expand Down
1 change: 1 addition & 0 deletions metrics/graphite/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type FilterMetrics struct {
MatchingTimer Timer
SavingTimer Timer
BuildTreeTimer Timer
MetricChannelLen Histogram
}
1 change: 1 addition & 0 deletions metrics/graphite/go-metrics/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func ConfigureFilterMetrics(prefix string) *graphite.FilterMetrics {
MatchingTimer: registerTimer(metricNameWithPrefix(prefix, "time.match")),
SavingTimer: registerTimer(metricNameWithPrefix(prefix, "time.save")),
BuildTreeTimer: registerTimer(metricNameWithPrefix(prefix, "time.buildtree")),
MetricChannelLen: registerHistogram(metricNameWithPrefix(prefix, "metricsToSave")),
}
}

Expand Down

0 comments on commit a8e8001

Please sign in to comment.