Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an experimental flag to block samples with timestamp too far in the future #6195

Merged
merged 12 commits into from
Apr 10, 2023
4 changes: 2 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,8 +872,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

rc.tsdbTooFarInFutureTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.too-far-in-future.time-window",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe print an error and exit if the flag < 0.

Copy link
Contributor Author

@jnyi jnyi Mar 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of the flag should be a duration string like "0s", "5m", "1h", etc

fpetkovski marked this conversation as resolved.
Show resolved Hide resolved
"[EXPERIMENTAL] Configures the allowed time window for ingesting samples too far in the future. Disabled (0s) by default"+
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration.",
).Default("0s").Hidden())
"Please note enable this flag will reject samples in the future of receive local NTP time + configured duration due to clock skew in remote write clients.",
).Default("0s"))

rc.tsdbOutOfOrderTimeWindow = extkingpin.ModelDuration(cmd.Flag("tsdb.out-of-order.time-window",
"[EXPERIMENTAL] Configures the allowed time window for ingestion of out-of-order samples. Disabled (0s) by default"+
Expand Down
35 changes: 27 additions & 8 deletions pkg/receive/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"

Expand All @@ -28,6 +29,26 @@ type TenantStorage interface {
TenantAppendable(string) (Appendable, error)
}

// Wraps storage.Appender to add validation and logging.
type ReceiveAppender struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if my comment wasn't clear, I meant that this appender should always reject samples that are too far in the future.

If the flag is set to a value other than zero, we would wrap the default appender with this one. Otherwise we would just use the storage appender as before.

Copy link
Contributor Author

@jnyi jnyi Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ic, we can do that, but I feel this pattern is more extensible for wrapping prometheus' default appender implementation going forward:

app := multiTSDB.appender(tenantID)
if tooFarInFuture != 0 {
  app = ReceiveAppender{ toofarInFuture, app}
} else if <another condition> {
 app = AnotherAppenderWrapper{ flag, app}
} else if <third condition> {
  app = thridAppenderWrapper...
}

vs

app :=  ReceiveAppender{
  tooFarInFuture: tooFarInFutureFlag,
  // add other flags
  app: multiTSDB.appender(tenantID)
}

// add behavior insider `ReceiveAppender`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The appender wrapper would only be responsible for rejecting samples in the future and other appender wrappers will apply different limits. So it would rather be:

app := multiTSDB.appender(tenantID)
if tooFarInFuture != 0 {
  app = AppenderWithFutureLimit(toofarInFuture, app}
} 
if <another condition> {
 app = AnotherAppender{flag, app}
} 
if <third condition> {
  app = ThirdAppender(flag, appender)
}

In any case, this is definitely not blocker but more of a suggestion. We would anyway have one appender for now and we can break it up into multiple if it the logic gets more complicated in the future.

tLogger log.Logger
tooFarInFuture int64 // Unit: nanoseconds
storage.Appender
}

func (a *ReceiveAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if a.tooFarInFuture > 0 {
tooFar := model.Now().Add(time.Duration(a.tooFarInFuture))
if tooFar.Before(model.Time(t)) {
level.Warn(a.tLogger).Log("msg", "block metric too far in the future", "lset", lset,
"timestamp", t, "bound", tooFar)
// now + tooFarInFutureTimeWindow < sample timestamp
return 0, storage.ErrOutOfBounds
}
}
return a.Appender.Append(ref, lset, t, v)
}

type WriterOptions struct {
Intern bool
TooFarInFutureTimeWindow int64 // Unit: nanoseconds
Expand Down Expand Up @@ -81,11 +102,15 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR
return errors.Wrap(err, "get appender")
}
getRef := app.(storage.GetRef)
tooFarInFuture := model.Now().Add(time.Duration(r.opts.TooFarInFutureTimeWindow))
var (
ref storage.SeriesRef
errs writeErrors
)
app = &ReceiveAppender{
tLogger: tLogger,
tooFarInFuture: r.opts.TooFarInFutureTimeWindow,
Appender: app,
}
for _, t := range wreq.Timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
Expand Down Expand Up @@ -121,13 +146,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range t.Samples {
if r.opts.TooFarInFutureTimeWindow != 0 && tooFarInFuture.Before(model.Time(s.Timestamp)) {
// now + tooFarInFutureTimeWindow < sample timestamp
err = storage.ErrOutOfBounds
level.Debug(tLogger).Log("msg", "block metric too far in the future", "lset", lset, "timestamp", s.Timestamp, "bound", tooFarInFuture)
} else {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
}
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
switch err {
case storage.ErrOutOfOrderSample:
numSamplesOutOfOrder++
Expand Down