Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Required option for remote_read #5667

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Expand Up @@ -614,6 +614,9 @@ type RemoteReadConfig struct {
// RequiredMatchers is an optional list of equality matchers which have to
// be present in a selector to query the remote read endpoint.
RequiredMatchers model.LabelSet `yaml:"required_matchers,omitempty"`

// Required makes reads from this remote-read endpoint required for querying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Required makes reads from this remote-read endpoint required for querying
// Required makes reads from this remote-read endpoint required for querying.

Required bool `yaml:"required,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface.
Expand Down
41 changes: 33 additions & 8 deletions storage/fanout.go
Expand Up @@ -188,6 +188,35 @@ func (f *fanoutAppender) Rollback() (err error) {
return nil
}

// WarningQuerierFunc wraps a querierFunc with a warningQuerier
func WarningQueryable(q Queryable) QueryableFunc {
return func(ctx context.Context, mint, maxt int64) (Querier, error) {
v, err := q.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
return NewWarningQuerier(v), nil
}
}

func NewWarningQuerier(q Querier) Querier {
return &warningQuerier{q}
}

// warningQuerier implements Querier and will change all errors to warnings
type warningQuerier struct {
Querier
}

// Select returns a set of series that matches the given label matchers.
func (q *warningQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
v, w, err := q.Querier.Select(params, matchers...)
if err != nil {
w = append(w, err)
}
return v, w, nil
}

// mergeQuerier implements Querier.
type mergeQuerier struct {
primaryQuerier Querier
Expand Down Expand Up @@ -239,15 +268,11 @@ func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher)
}
if err != nil {
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primary querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
return nil, nil, err
}
return nil, nil, err
}
if set != nil {
seriesSets = append(seriesSets, set)
}
seriesSets = append(seriesSets, set)
}
return NewMergeSeriesSet(seriesSets, q), warnings, nil
}
Expand Down
3 changes: 3 additions & 0 deletions storage/remote/storage.go
Expand Up @@ -106,6 +106,9 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
if !rrConf.ReadRecent {
q = PreferLocalStorageFilter(q, s.localStartTimeCallback)
}
if !rrConf.Required {
q = storage.WarningQueryable(q)
}
queryables = append(queryables, q)
}
s.queryables = queryables
Expand Down