Skip to content

Commit

Permalink
Merge 7690bba into 71a37db
Browse files Browse the repository at this point in the history
  • Loading branch information
errx committed Jan 23, 2018
2 parents 71a37db + 7690bba commit 151c127
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 6 deletions.
2 changes: 2 additions & 0 deletions cmd/filter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type config struct {
type filterConfig struct {
Listen string `yaml:"listen"`
RetentionConfig string `yaml:"retention-config"`
MetricQueueSize int `yaml:"metric-queue-size"`
}

func getDefault() config {
Expand All @@ -31,6 +32,7 @@ func getDefault() config {
Filter: filterConfig{
Listen: ":2003",
RetentionConfig: "/etc/moira/storage-schemas.conf",
MetricQueueSize: 100,
},
Graphite: cmd.GraphiteConfig{
URI: "localhost:2003",
Expand Down
2 changes: 1 addition & 1 deletion cmd/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func main() {
if err != nil {
logger.Fatalf("Failed to start listen: %s", err.Error())
}
metricsChan := listener.Listen()
metricsChan := listener.Listen(config.Filter.MetricQueueSize)

// Start metrics matcher
metricsMatcher := matchedmetrics.NewMetricsMatcher(cacheMetrics, logger, database, cacheStorage)
Expand Down
5 changes: 2 additions & 3 deletions database/redis/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ func (connector *DbConnector) GetMetricsValues(metrics []string, from int64, unt
c := connector.pool.Get()
defer c.Close()

c.Send("MULTI")
for _, metric := range metrics {
c.Send("ZRANGEBYSCORE", metricDataKey(metric), from, until, "WITHSCORES")
}
resultByMetrics, err := redis.Values(c.Do("EXEC"))
resultByMetrics, err := redis.Values(c.Do(""))
if err != nil {
return nil, fmt.Errorf("Failed to EXEC: %v", err)
return nil, fmt.Errorf("Failed to get metric values: %v", err)
}

res := make(map[string][]*moira.MetricValue, len(resultByMetrics))
Expand Down
4 changes: 2 additions & 2 deletions filter/connection/listening.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func NewListener(port string, logger moira.Logger, patternStorage *filter.Patter

// Listen waits for new data in connection and handles it in ConnectionHandler
// All handled data sets to metricsChan
func (listener *MetricsListener) Listen() chan *moira.MatchedMetric {
metricsChan := make(chan *moira.MatchedMetric, 10)
func (listener *MetricsListener) Listen(queueSize int) chan *moira.MatchedMetric {
metricsChan := make(chan *moira.MatchedMetric, queueSize)
listener.tomb.Go(func() error {
for {
select {
Expand Down

0 comments on commit 151c127

Please sign in to comment.