Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query: add strict mode flag #2252

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -20,6 +20,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Added

- [#2252](https://github.com/thanos-io/thanos/pull/2252) Query: add new `--store.strict-mode` flag. More information available [here](/docs/proposals/202001_thanos_query_health_handling.md).
- [#2265](https://github.com/thanos-io/thanos/pull/2265) Compactor: Add `--wait-interval` to specify compaction wait interval between consecutive compact runs when `--wait` enabled.
- [#2250](https://github.com/thanos-io/thanos/pull/2250) Compactor: Enable vertical compaction for offline deduplication (Experimental). Uses `--deduplication.replica-label` flag to specify the replica label to deduplicate on (Hidden). Please note that this uses a NAIVE algorithm for merging (no smart replica deduplication, just chaining samples together). This works well for deduplication of blocks with **precisely the same samples** like produced by Receiver replication. We plan to add a smarter algorithm in the following weeks.
- [#1714](https://github.com/thanos-io/thanos/pull/1714) Run the bucket web UI in the compact component when it is run as a long-lived process.
Expand Down
27 changes: 21 additions & 6 deletions cmd/thanos/query.go
Expand Up @@ -70,6 +70,9 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
replicaLabels := cmd.Flag("query.replica-label", "Labels to treat as a replica indicator along which data is deduplicated. Still you will be able to query without deduplication using 'dedup=false' parameter.").
Strings()

strictMode := cmd.Flag("store.strict-mode", "Enable strict mode which makes Thanos Query always keep statically specified StoreAPIs even if we had failed to check their health. "+
"This is useful when you have a caching layer on top and want to get partial responses if one of your nodes goes down that you know for sure that it needs to be up.").Default("false").Bool()

instantDefaultMaxSourceResolution := modelDuration(cmd.Flag("query.instant.default.max_source_resolution", "default value for max_source_resolution for instant queries. If not set, defaults to 0s only taking raw resolution into account. 1h can be a good value if you use instant queries over time ranges that incorporate times outside of your raw-retention.").Default("0s").Hidden())

selectorLabels := cmd.Flag("selector-label", "Query selector labels that will be exposed in info endpoint (repeated).").
Expand Down Expand Up @@ -162,6 +165,7 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) {
*dnsSDResolver,
time.Duration(*unhealthyStoreTimeout),
time.Duration(*instantDefaultMaxSourceResolution),
*strictMode,
component.Query,
)
}
Expand Down Expand Up @@ -202,6 +206,7 @@ func runQuery(
dnsSDResolver string,
unhealthyStoreTimeout time.Duration,
instantDefaultMaxSourceResolution time.Duration,
strictMode bool,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand All @@ -222,14 +227,24 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

staticStores, dynamicStores := dns.FilterNodes(storeAddrs...)
evictionPolicy := query.NormalMode
if strictMode {
evictionPolicy = query.StrictMode
}

var (
stores = query.NewStoreSet(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add DNS resolved addresses from static flags and file SD.
// Add DNS resolved addresses.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
specs = append(specs, query.NewGRPCStoreSpec(addr, false))
}
// Add static nodes.
for _, addr := range staticStores {
specs = append(specs, query.NewGRPCStoreSpec(addr, true))
}

specs = removeDuplicateStoreSpecs(logger, duplicatedStores, specs)
Expand Down Expand Up @@ -257,7 +272,7 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(5*time.Second, ctx.Done(), func() error {
stores.Update(ctx)
stores.Update(ctx, evictionPolicy)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -289,8 +304,8 @@ func runQuery(
continue
}
fileSDCache.Update(update)
stores.Update(ctxUpdate)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), storeAddrs...))
stores.Update(ctxUpdate, evictionPolicy)
dnsProvider.Resolve(ctxUpdate, append(fileSDCache.Addresses(), dynamicStores...))
case <-ctxUpdate.Done():
return nil
}
Expand All @@ -305,7 +320,7 @@ func runQuery(
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(dnsSDInterval, ctx.Done(), func() error {
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), storeAddrs...))
dnsProvider.Resolve(ctx, append(fileSDCache.Addresses(), dynamicStores...))
return nil
})
}, func(error) {
Expand Down
7 changes: 7 additions & 0 deletions docs/components/query.md
Expand Up @@ -319,6 +319,13 @@ Flags:
which data is deduplicated. Still you will be
able to query without deduplication using
'dedup=false' parameter.
--store.strict-mode Enable strict mode which makes Thanos Query
always keep statically specified StoreAPIs even
if we had failed to check their health. This is
useful when you have a caching layer on top and
want to get partial responses if one of your
nodes goes down that you know for sure that it
needs to be up.
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
Expand Down
2 changes: 1 addition & 1 deletion docs/proposals/202001_thanos_query_health_handling.md
Expand Up @@ -2,7 +2,7 @@
title: Thanos Query store nodes healthiness handling
type: proposal
menu: proposals
status: accepted
status: complete
owner: GiedriusS
---

Expand Down
36 changes: 30 additions & 6 deletions pkg/discovery/dns/provider.go
Expand Up @@ -87,6 +87,32 @@ func (p *Provider) Clone() *Provider {
}
}

// FilterNodes walks through the whole list of addresses and returns
// two lists of statically and dynamically defined nodes.
func FilterNodes(addrs ...string) (static []string, dynamic []string) {
static, dynamic = []string{}, []string{}

for _, addr := range addrs {
qtype, _ := GetQTypeName(addr)
if qtype != "" {
dynamic = append(dynamic, addr)
} else {
static = append(static, addr)
}
}
return
}

// GetQTypeName splits the provided addr into two parts: the QType (if any)
// and the name.
func GetQTypeName(addr string) (qtype string, name string) {
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
return "", addr
}
return qtypeAndName[0], qtypeAndName[1]
}

// Resolve stores a list of provided addresses or their DNS records if requested.
// Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV).
// defaultPort is used for non-SRV records when a port is not supplied.
Expand All @@ -100,14 +126,12 @@ func (p *Provider) Resolve(ctx context.Context, addrs []string) {
resolvedAddrs := map[string][]string{}
for _, addr := range addrs {
var resolved []string
qtypeAndName := strings.SplitN(addr, "+", 2)
if len(qtypeAndName) != 2 {
// No lookup specified. Add to results and continue to the next address.
resolvedAddrs[addr] = []string{addr}
p.resolverAddrs.WithLabelValues(addr).Set(1.0)
qtype, name := GetQTypeName(addr)
if qtype == "" {
resolvedAddrs[name] = []string{name}
p.resolverAddrs.WithLabelValues(name).Set(1.0)
continue
}
qtype, name := qtypeAndName[0], qtypeAndName[1]

resolved, err := p.resolver.Resolve(ctx, name, QType(qtype))
p.resolverLookupsCount.Inc()
Expand Down
33 changes: 33 additions & 0 deletions pkg/discovery/dns/provider_test.go
Expand Up @@ -102,3 +102,36 @@ func (d *mockResolver) Resolve(_ context.Context, name string, _ QType) ([]strin
}
return d.res[name], nil
}

// TestFilterStaticNodes tests if the provided nodes are separated correctly
// into static nodes and dynamic ones.
func TestFilterNodes(t *testing.T) {
for _, tcase := range []struct {
nodes []string
expectedStatic []string
expectedDynamic []string
}{
// All valid cases.
{
nodes: []string{"1.2.3.4", "dns+1.2.3.4", "dnssrv+13.3.3.3", "dnssrvnoa+1.1.1.1"},
expectedStatic: []string{"1.2.3.4"},
expectedDynamic: []string{"dns+1.2.3.4", "dnssrv+13.3.3.3", "dnssrvnoa+1.1.1.1"},
},
// Negative test that will be caught later on.
{
nodes: []string{"gibberish+1.1.1.1+noa"},
expectedStatic: []string{},
expectedDynamic: []string{"gibberish+1.1.1.1+noa"},
},
// Negative test with no nodes.
{
nodes: []string{},
expectedStatic: []string{},
expectedDynamic: []string{},
},
} {
gotStatic, gotDynamic := FilterNodes(tcase.nodes...)
testutil.Equals(t, tcase.expectedStatic, gotStatic, "mismatch between static nodes")
testutil.Equals(t, tcase.expectedDynamic, gotDynamic, "mismatch between dynamic nodes")
}
}
74 changes: 54 additions & 20 deletions pkg/query/storeset.go
Expand Up @@ -36,8 +36,25 @@ type StoreSpec interface {
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
// given store connection.
Metadata(ctx context.Context, client storepb.StoreClient) (labelSets []storepb.LabelSet, mint int64, maxt int64, storeType component.StoreAPI, err error)
// Returns true if the StoreAPI has been statically defined.
Static() bool
}

// QuerierEvictionPolicy defines what logic should we use when trying to determine
// which queriers we should remove if they do not pass the health check anymore.
type QuerierEvictionPolicy int

const (
// NormalMode is a querier eviction policy when we immediately
// remove a querier if it doesn't pass the health check.
NormalMode QuerierEvictionPolicy = iota
// StrictMode is a querier eviction policy when we do not remove
// queriers which were statically defined even if they have not passed
// the health check i.e. no service discovery mechanism was used (file
// or DNS sd).
StrictMode
)

type StoreStatus struct {
Name string
LastCheck time.Time
Expand All @@ -49,13 +66,19 @@ type StoreStatus struct {
}

type grpcStoreSpec struct {
addr string
addr string
static bool
}

// NewGRPCStoreSpec creates store pure gRPC spec.
// It uses Info gRPC call to get Metadata.
func NewGRPCStoreSpec(addr string) StoreSpec {
return &grpcStoreSpec{addr: addr}
func NewGRPCStoreSpec(addr string, static bool) StoreSpec {
return &grpcStoreSpec{addr: addr, static: static}
}

// Static returns true if the StoreAPI node has been statically defined.
func (s *grpcStoreSpec) Static() bool {
return s.static
}

func (s *grpcStoreSpec) Addr() string {
Expand Down Expand Up @@ -320,8 +343,8 @@ func newStoreAPIStats() map[component.StoreAPI]map[string]int {
}

// Update updates the store set. It fetches current list of store specs from function and updates the fresh metadata
// from all stores.
func (s *StoreSet) Update(ctx context.Context) {
// from all stores. The policy defines how we treat nodes which do not pass the health check.
func (s *StoreSet) Update(ctx context.Context, policy QuerierEvictionPolicy) {
s.updateMtx.Lock()
defer s.updateMtx.Unlock()

Expand All @@ -334,14 +357,14 @@ func (s *StoreSet) Update(ctx context.Context) {

level.Debug(s.logger).Log("msg", "starting updating storeAPIs", "cachedStores", len(stores))

healthyStores := s.getHealthyStores(ctx, stores)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "healthyStores", len(healthyStores), "cachedStores", len(stores))
activeStores := s.getActiveStores(ctx, stores, policy)
level.Debug(s.logger).Log("msg", "checked requested storeAPIs", "activeStores", len(activeStores), "cachedStores", len(stores))

stats := newStoreAPIStats()

// Close stores that where not healthy this time (are not in healthy stores map).
// Close stores that where not active this time (are not in active stores map).
for addr, st := range stores {
if _, ok := healthyStores[addr]; ok {
if _, ok := activeStores[addr]; ok {
stats[st.StoreType()][st.LabelSetsString()]++
continue
}
Expand All @@ -353,7 +376,7 @@ func (s *StoreSet) Update(ctx context.Context) {
}

// Add stores that are not yet in stores.
for addr, st := range healthyStores {
for addr, st := range activeStores {
if _, ok := stores[addr]; ok {
continue
}
Expand Down Expand Up @@ -384,15 +407,15 @@ func (s *StoreSet) Update(ctx context.Context) {
s.cleanUpStoreStatuses(stores)
}

func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*storeRef) map[string]*storeRef {
func (s *StoreSet) getActiveStores(ctx context.Context, stores map[string]*storeRef, policy QuerierEvictionPolicy) map[string]*storeRef {
var (
unique = make(map[string]struct{})
healthyStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
unique = make(map[string]struct{})
activeStores = make(map[string]*storeRef, len(stores))
mtx sync.Mutex
wg sync.WaitGroup
)

// Gather healthy stores map concurrently. Build new store if does not exist already.
// Gather active stores map concurrently. Build new store if does not exist already.
for _, storeSpec := range s.storeSpecs() {
if _, ok := unique[storeSpec.Addr()]; ok {
level.Warn(s.logger).Log("msg", "duplicated address in store nodes", "address", storeSpec.Addr())
Expand All @@ -411,7 +434,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor

st, seenAlready := stores[addr]
if !seenAlready {
// New store or was unhealthy and was removed in the past - create new one.
// New store or was unactive and was removed in the past - create new one.
conn, err := grpc.DialContext(ctx, addr, s.dialOpts...)
if err != nil {
s.updateStoreStatus(&storeRef{addr: addr}, err)
Expand All @@ -425,25 +448,36 @@ func (s *StoreSet) getHealthyStores(ctx context.Context, stores map[string]*stor
labelSets, minTime, maxTime, storeType, err := spec.Metadata(ctx, st.StoreClient)
if err != nil {
if !seenAlready {
// Close only if new. Unhealthy `s.stores` will be closed later on.
// Close only if new. Unactive `s.stores` will be closed later on.
st.Close()
}
s.updateStoreStatus(st, err)
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)

if !(policy == StrictMode && spec.Static()) {
return
}

// Still keep it around if static & strict mode enabled.
mtx.Lock()
defer mtx.Unlock()

activeStores[addr] = st
return
}

s.updateStoreStatus(st, nil)
st.Update(labelSets, minTime, maxTime, storeType)

mtx.Lock()
defer mtx.Unlock()

healthyStores[addr] = st
activeStores[addr] = st
}(storeSpec)
}
wg.Wait()

return healthyStores
return activeStores
}

func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
Expand Down