Skip to content

Commit

Permalink
removing duplicate query addrs in queryFn
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivan Valkov committed Oct 12, 2018
1 parent a01d10c commit 7180ccc
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
4 changes: 2 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func runQuery(
specs = append(specs, query.NewGRPCStoreSpec(addr))
}

specs = removeDuplicates(logger, duplicatedStores, specs)
specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)

return specs
},
Expand Down Expand Up @@ -410,7 +410,7 @@ func runQuery(
return nil
}

func removeDuplicates(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Counter, specs []query.StoreSpec) []query.StoreSpec {
set := make(map[string]query.StoreSpec)
for _, spec := range specs {
addr := spec.Addr()
Expand Down
29 changes: 26 additions & 3 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand All @@ -47,7 +48,6 @@ import (
"github.com/prometheus/tsdb/labels"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/improbable-eng/thanos/pkg/discovery"
)

// registerRule registers a rule command.
Expand Down Expand Up @@ -168,9 +168,13 @@ func runRule(
Name: "thanos_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})

duplicatedQuery := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_rule_duplicated_query_address",
Help: "The number of times a duplicated query addresses is detected from the different configs in rule",
})
reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)

db, err := tsdb.Open(dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts)
if err != nil {
Expand All @@ -187,7 +191,7 @@ func runRule(
}

// FileSD query addresses
fileSDCache := discovery.NewCache()
fileSDCache := cache.New()

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
Expand All @@ -212,6 +216,8 @@ func runRule(
addrs = append(addrs, addr)
}

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
vec, err := queryPrometheusInstant(ctx, logger, addrs[i], q, t)
if err != nil {
Expand Down Expand Up @@ -719,3 +725,20 @@ func labelsTSDBToProm(lset labels.Labels) (res promlabels.Labels) {
}
return res
}

func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.Counter, addrs []string) []string {
set := make(map[string]struct{})
for _, addr := range addrs {
if _, ok := set[addr]; ok {
level.Warn(logger).Log("msg", "Duplicate query address is provided - %v", addr)
duplicatedQueriers.Inc()
}
set[addr] = struct{}{}
}

deduplicated := make([]string, 0, len(set))
for key := range set {
deduplicated = append(deduplicated, key)
}
return deduplicated
}

0 comments on commit 7180ccc

Please sign in to comment.