Skip to content

Commit

Permalink
*: Allow exposing multiple label-sets (#1284)
Browse files Browse the repository at this point in the history
* *: Allow exposing multiple label-sets

* pkg/store: Add more multi-label-sets matching tests

* Address comments

* If LabelSet is empty, and Labels is not, use Labels as LabelSet

This is used for backward compatibility of the querier with store APIs
that don't expose LabelSet yet.

* pkg/store/proxy.go: Correctly handle no label-sets discovered

* pkg/query: Encapsulate LabelSet defaulting in spec.Metadata
  • Loading branch information
brancz authored and bwplotka committed Jul 1, 2019
1 parent 5777be8 commit 05f81a5
Show file tree
Hide file tree
Showing 17 changed files with 1,290 additions and 567 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1248](https://github.com/improbable-eng/thanos/pull/1248) Add a web UI to show the state of remote storage.

### Changed

- [#1284](https://github.com/improbable-eng/thanos/pull/1284) Add support for multiple label-sets in Info gRPC service. This deprecates the single `Labels` slice of the `InfoResponse`, in a future release backward compatible handling for the single set of Labels will be removed. Upgrading to v0.6.0 or higher is advised.

## [v0.5.0](https://github.com/improbable-eng/thanos/releases/tag/v0.5.0) - 2019.06.05

TL;DR: Store LRU cache is no longer leaking, Upgraded Thanos UI to Prometheus 2.9, Fixed auto-downsampling, Moved to Go 1.12.5 and more.
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ require (
github.com/uber/jaeger-client-go v2.16.0+incompatible
github.com/uber/jaeger-lib v2.0.0+incompatible
go.uber.org/atomic v1.4.0 // indirect
golang.org/x/net v0.0.0-20190522155817-f3200d17e092
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2
Expand Down
104 changes: 60 additions & 44 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -32,14 +33,14 @@ type StoreSpec interface {
// If metadata call fails we assume that store is no longer accessible and we should not use it.
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error)
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error)
}

type StoreStatus struct {
Name string
LastCheck time.Time
LastError error
Labels []storepb.Label
LabelSets []storepb.LabelSet
StoreType component.StoreAPI
MinTime int64
MaxTime int64
Expand All @@ -62,12 +63,16 @@ func (s *grpcStoreSpec) Addr() string {

// Metadata method for gRPC store API tries to reach host Info method until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) {
func (s *grpcStoreSpec) Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, err error) {
resp, err := client.Info(ctx, &storepb.InfoRequest{}, grpc.FailFast(false))
if err != nil {
return nil, 0, 0, errors.Wrapf(err, "fetching store info from %s", s.addr)
}
return resp.Labels, resp.MinTime, resp.MaxTime, nil
if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 {
resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}}
}

return resp.LabelSets, resp.MinTime, resp.MaxTime, nil
}

// StoreSet maintains a set of active stores. It is backed up by Store Specifications that are dynamically fetched on
Expand All @@ -81,13 +86,13 @@ type StoreSet struct {
dialOpts []grpc.DialOption
gRPCInfoCallTimeout time.Duration

mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
mtx sync.RWMutex
storesStatusesMtx sync.RWMutex
stores map[string]*storeRef
storeNodeConnections prometheus.Gauge
externalLabelOccurrencesInStores map[string]int
storeStatuses map[string]*StoreStatus
unhealthyStoreTimeout time.Duration
}

type storeSetNodeCollector struct {
Expand Down Expand Up @@ -135,12 +140,12 @@ func NewStoreSet(
}

ss := &StoreSet{
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelStores: map[string]int{},
logger: log.With(logger, "component", "storeset"),
storeSpecs: storeSpecs,
dialOpts: dialOpts,
storeNodeConnections: storeNodeConnections,
gRPCInfoCallTimeout: 10 * time.Second,
externalLabelOccurrencesInStores: map[string]int{},
stores: make(map[string]*storeRef),
storeStatuses: make(map[string]*StoreStatus),
unhealthyStoreTimeout: unhealthyStoreTimeout,
Expand All @@ -162,27 +167,27 @@ type storeRef struct {
addr string

// Meta (can change during runtime).
labels []storepb.Label
labelSets []storepb.LabelSet
storeType component.StoreAPI
minTime int64
maxTime int64

logger log.Logger
}

func (s *storeRef) Update(labels []storepb.Label, minTime int64, maxTime int64) {
s.mtx.RLock()
defer s.mtx.RUnlock()
func (s *storeRef) Update(labelSets []storepb.LabelSet, minTime int64, maxTime int64) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.labels = labels
s.labelSets = labelSets
s.minTime = minTime
s.maxTime = maxTime
}

func (s *storeRef) Labels() []storepb.Label {
func (s *storeRef) LabelSets() []storepb.LabelSet {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.labels
return s.labelSets
}

func (s *storeRef) TimeRange() (int64, int64) {
Expand All @@ -194,7 +199,7 @@ func (s *storeRef) TimeRange() (int64, int64) {

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf("Addr: %s Labels: %v Mint: %d Maxt: %d", s.addr, s.Labels(), mint, maxt)
return fmt.Sprintf("Addr: %s LabelSets: %v Mint: %d Maxt: %d", s.addr, storepb.LabelSetsToString(s.LabelSets()), mint, maxt)
}

func (s *storeRef) Addr() string {
Expand All @@ -211,10 +216,11 @@ func (s *StoreSet) Update(ctx context.Context) {
healthyStores := s.getHealthyStores(ctx)

// Record the number of occurrences of external label combinations for current store slice.
externalLabelStores := map[string]int{}
externalLabelOccurrencesInStores := map[string]int{}
for _, st := range healthyStores {
externalLabelStores[externalLabelsFromStore(st)]++
externalLabelOccurrencesInStores[externalLabelsFromStore(st)]++
}
level.Debug(s.logger).Log("msg", "updating healthy stores", "externalLabelOccurrencesInStores", fmt.Sprintf("%#+v", externalLabelOccurrencesInStores))

s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -238,10 +244,12 @@ func (s *StoreSet) Update(ctx context.Context) {
// No external labels means strictly store gateway or ruler and it is fine to have access to multiple instances of them.
//
// Sidecar will error out if it will be configured with empty external labels.
if len(store.Labels()) > 0 && externalLabelStores[externalLabelsFromStore(store)] != 1 {
externalLabels := externalLabelsFromStore(store)
storesWithExternalLabels := externalLabelOccurrencesInStores[externalLabels]
if len(store.LabelSets()) > 0 && storesWithExternalLabels != 1 {
store.close()
s.updateStoreStatus(store, errors.New(droppingStoreMessage))
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr)
level.Warn(s.logger).Log("msg", droppingStoreMessage, "address", addr, "extLset", externalLabels, "duplicates", storesWithExternalLabels)
continue
}

Expand All @@ -254,7 +262,7 @@ func (s *StoreSet) Update(ctx context.Context) {
s.updateStoreStatus(store, nil)
level.Info(s.logger).Log("msg", "adding new store to query storeset", "address", addr)
}
s.externalLabelStores = externalLabelStores
s.externalLabelOccurrencesInStores = externalLabelOccurrencesInStores
s.storeNodeConnections.Set(float64(len(s.stores)))
s.cleanUpStoreStatuses()
}
Expand Down Expand Up @@ -288,14 +296,14 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
store, ok := s.stores[addr]
if ok {
// Check existing store. Is it healthy? What are current metadata?
labels, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
labelSets, minTime, maxTime, err := spec.Metadata(ctx, store.StoreClient)
if err != nil {
// Peer unhealthy. Do not include in healthy stores.
s.updateStoreStatus(store, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", err, "address", addr)
return
}
store.Update(labels, minTime, maxTime)
store.Update(labelSets, minTime, maxTime)
} else {
// New store or was unhealthy and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
Expand All @@ -314,8 +322,11 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "initial store client info fetch"), "address", addr)
return
}
if len(resp.LabelSets) == 0 && len(resp.Labels) > 0 {
resp.LabelSets = []storepb.LabelSet{{Labels: resp.Labels}}
}
store.storeType = component.FromProto(resp.StoreType)
store.Update(resp.Labels, resp.MinTime, resp.MaxTime)
store.Update(resp.LabelSets, resp.MinTime, resp.MaxTime)
}

mtx.Lock()
Expand All @@ -331,16 +342,21 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
}

func externalLabelsFromStore(store *storeRef) string {
tsdbLabels := labels.Labels{}
for _, l := range store.labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
tsdbLabelSetStrings := make([]string, 0, len(store.labelSets))
for _, ls := range store.labelSets {
tsdbLabels := labels.Labels{}
for _, l := range ls.Labels {
tsdbLabels = append(tsdbLabels, labels.Label{
Name: l.Name,
Value: l.Value,
})
}
sort.Sort(tsdbLabels)
tsdbLabelSetStrings = append(tsdbLabelSetStrings, tsdbLabels.String())
}
sort.Sort(tsdbLabels)
sort.Strings(tsdbLabelSetStrings)

return tsdbLabels.String()
return strings.Join(tsdbLabelSetStrings, ",")
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand All @@ -357,7 +373,7 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
status.LastCheck = time.Now()

if err == nil {
status.Labels = store.labels
status.LabelSets = store.labelSets
status.StoreType = store.storeType
status.MinTime = store.minTime
status.MaxTime = store.maxTime
Expand Down Expand Up @@ -385,8 +401,8 @@ func (s *StoreSet) externalLabelOccurrences() map[string]int {
s.mtx.RLock()
defer s.mtx.RUnlock()

r := make(map[string]int, len(s.externalLabelStores))
for k, v := range s.externalLabelStores {
r := make(map[string]int, len(s.externalLabelOccurrencesInStores))
for k, v := range s.externalLabelOccurrencesInStores {
r[k] = v
}

Expand Down

0 comments on commit 05f81a5

Please sign in to comment.