Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop old inmemory samples #13002

Merged
merged 27 commits into from Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
443511b
Drop old inmemory samples
marctc Oct 19, 2023
5780421
Avoid copying timeseries when the feature is disabled
tpaschalis Oct 24, 2023
d5460dd
Run gofmt
tpaschalis Oct 24, 2023
39edc4b
Clarify docs
marctc Oct 24, 2023
255a6c0
Add more logging info
marctc Oct 24, 2023
3da411a
Remove loggers
marctc Nov 17, 2023
818ac24
optimize function and add tests
marctc Nov 17, 2023
f0c0129
Simplify filter
marctc Nov 20, 2023
19f3275
rename var
marctc Nov 21, 2023
1f7d6f9
Update help info from metrics
marctc Nov 21, 2023
97cc3ce
use metrics to keep track of drop elements during buildWriteRequest
marctc Nov 21, 2023
3620ce5
rename var in tests
marctc Nov 21, 2023
1ffbe1a
pass time.Now as parameter
marctc Nov 21, 2023
ee89afe
Change buildwriterequest during retries
marctc Nov 21, 2023
2836549
Revert "Remove loggers"
marctc Nov 21, 2023
d3775fc
use log level debug for loggers
marctc Nov 21, 2023
a99fc24
Merge branch 'main' into drop-old-inmemory-samples
tpaschalis Dec 14, 2023
047a101
Fix linter
tpaschalis Dec 14, 2023
c0f1ef9
Remove noisy debug-level logs; add 'reason' label to drop metrics
tpaschalis Dec 14, 2023
0b1c94e
Remove accidentally committed files
tpaschalis Dec 14, 2023
0ecfa07
Propagate logger to buildWriteRequest to log dropped data
tpaschalis Dec 14, 2023
488e1ff
Fix docs comment
tpaschalis Dec 18, 2023
25ec6b6
Make drop reason more specific
tpaschalis Dec 18, 2023
7dee9d0
Remove unnecessary pass of logger
tpaschalis Dec 20, 2023
722825d
Use snake_case for reason label
tpaschalis Dec 20, 2023
0ae385c
Merge branch 'main' of ssh://github.com/prometheus/prometheus into dr…
tpaschalis Jan 4, 2024
ec3e740
Fix dropped samples metric
tpaschalis Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/config.go
Expand Up @@ -1124,6 +1124,9 @@ type QueueConfig struct {
MinBackoff model.Duration `yaml:"min_backoff,omitempty"`
MaxBackoff model.Duration `yaml:"max_backoff,omitempty"`
RetryOnRateLimit bool `yaml:"retry_on_http_429,omitempty"`

// Samples older than the limit will be dropped.
SampleAgeLimit model.Duration `yaml:"sample_age_limit,omitempty"`
}

// MetadataConfig is the configuration for sending metadata to remote
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/configuration.md
Expand Up @@ -3619,6 +3619,10 @@ queue_config:
# Retry upon receiving a 429 status code from the remote-write storage.
# This is experimental and might change in the future.
[ retry_on_http_429: <boolean> | default = false ]
# If set, any sample that is older than sample_age_limit
# will not be sent to the remote storage. The default value is 0s,
# which means that all samples are sent.
[ sample_age_limit: <duration> | default = 0s ]

# Configures the sending of series metadata to remote storage.
# Metadata configuration is subject to change at any point
Expand Down
2 changes: 1 addition & 1 deletion storage/remote/codec_test.go
Expand Up @@ -517,7 +517,7 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) {
}

func TestDecodeWriteRequest(t *testing.T) {
buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil)
buf, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil)
require.NoError(t, err)

actual, err := DecodeWriteRequest(bytes.NewReader(buf))
Expand Down
190 changes: 163 additions & 27 deletions storage/remote/queue_manager.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand All @@ -51,6 +52,10 @@ const (

// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3

reasonTooOld = "too_old"
reasonDroppedSeries = "dropped_series"
reasonUnintentionalDroppedSeries = "unintentionally_dropped_series"
)

type queueManagerMetrics struct {
Expand All @@ -68,9 +73,9 @@ type queueManagerMetrics struct {
retriedExemplarsTotal prometheus.Counter
retriedHistogramsTotal prometheus.Counter
retriedMetadataTotal prometheus.Counter
droppedSamplesTotal prometheus.Counter
droppedExemplarsTotal prometheus.Counter
droppedHistogramsTotal prometheus.Counter
droppedSamplesTotal *prometheus.CounterVec
droppedExemplarsTotal *prometheus.CounterVec
droppedHistogramsTotal *prometheus.CounterVec
Comment on lines +76 to +78
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI @cstyan the use of a *prometheus.CounterVec here makes for a lot of difference in allocations.

For the following benchmarks, I'm using this commit from this branch.

To showcase, here's the difference of running BenchmarkSampleSend on the current 7dee9d0 PR HEAD, and versus the same HEAD but with a simple prometheus.Counter instead.

$ benchstat head-7dee9d009-no-counter-vec.txt head-7dee9d009-counter-vec.txt
...
             │ head-7dee9d009-no-counter-vec.txt │      head-7dee9d009-counter-vec.txt      │
             │             allocs/op             │   allocs/op    vs base                   │
SampleSend-8                          12.00 ± 8%   10030.00 ± 0%  +83483.33% (p=0.000 n=20)

There is already a related fix out on client_golang for this, but will be released on the next v1.18.0 version and it's not currently being used by Prometheus.

When I updated client_golang to include the fix, it dramatically decreased allocations, making it on par with using the simple counter and is going a lot faster when it's discarding all samples.

$ go get -u github.com/prometheus/client_golang@51714a5a30c011624485bd78137378ff0d635383
$ ...
$ benchstat head-current-client-golang.txt head-new-client-golang.txt
goos: darwin
goarch: arm64
pkg: github.com/prometheus/prometheus/storage/remote
             │ head-current-client-golang.txt │     head-new-client-golang.txt      │
             │             sec/op             │   sec/op     vs base                │
SampleSend-8                      911.1µ ± 1%   735.2µ ± 1%  -19.31% (p=0.000 n=20)

             │ head-current-client-golang.txt │      head-new-client-golang.txt      │
             │              B/op              │     B/op      vs base                │
SampleSend-8                   164.173Ki ± 0%   6.366Ki ± 2%  -96.12% (p=0.000 n=20)

             │ head-current-client-golang.txt │     head-new-client-golang.txt     │
             │           allocs/op            │ allocs/op   vs base                │
SampleSend-8                    10030.00 ± 0%   24.00 ± 4%  -99.76% (p=0.000 n=20)
$ benchstat main-no-filter.txt head-new-client-golang.txt
goos: darwin
goarch: arm64
pkg: github.com/prometheus/prometheus/storage/remote
             │ main-no-filter.txt │     head-new-client-golang.txt      │
             │       sec/op       │   sec/op     vs base                │
SampleSend-8        1881.6µ ± 11%   735.2µ ± 1%  -60.93% (p=0.000 n=20)

             │ main-no-filter.txt │      head-new-client-golang.txt      │
             │        B/op        │     B/op      vs base                │
SampleSend-8       250.674Ki ± 8%   6.366Ki ± 2%  -97.46% (p=0.000 n=20)

             │ main-no-filter.txt │     head-new-client-golang.txt     │
             │     allocs/op      │ allocs/op   vs base                │
SampleSend-8          405.00 ± 6%   24.00 ± 4%  -94.07% (p=0.000 n=20)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, I'm happy to see that there's

a) No performance regression for people not using the flag
b) No performance regression for people using the flag in normal operation, i.e. when all samples go through
c) A significant throughput gain when discarding unwanted samples

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is only impacting cases where we're throwing away samples (either via relabeling, due to their age or unintentionally), do you see this as a blocker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cstyan a new v1.18.0 release of client_golang is out, which fixes the performance overhead. Do you think it's ok to bump as part of this release, or should we make it a different PR altogether?

https://github.com/prometheus/client_golang/releases/tag/v1.18.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tpaschalis prometheus should be updating that package soon, there's an automated PR open already for it. Lets just double check benchmark results in your PR after the package update is merged and then we can merge here as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cstyan I could only find this automated PR, but it only bumped it in another subpackage and not the main go.mod file.

I've opened the following to manually bump it for the main go.mod and will re-run the benchmarks. #13373

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

client_golang is now updated on main, and I've merged main into this branch to run the benchmarks once more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So after updating client_golang, there's no impact on people not using the feature

$ benchstat main.txt head-nofilter.txt
goos: darwin
goarch: arm64
pkg: github.com/prometheus/prometheus/storage/remote
             │   main.txt   │       head-nofilter.txt       │
             │    sec/op    │   sec/op     vs base          │
SampleSend-8   2.021m ± 10%   1.999m ± 3%  ~ (p=0.841 n=20)

             │   main.txt   │       head-nofilter.txt        │
             │     B/op     │     B/op      vs base          │
SampleSend-8   276.4Ki ± 6%   279.6Ki ± 6%  ~ (p=0.862 n=20)

             │  main.txt  │      head-nofilter.txt       │
             │ allocs/op  │ allocs/op   vs base          │
SampleSend-8   438.5 ± 4%   440.5 ± 6%  ~ (p=0.507 n=20)

as well as a speedup when samples are thrown away

$ benchstat head-nofilter.txt head-filter.txt
goos: darwin
goarch: arm64
pkg: github.com/prometheus/prometheus/storage/remote
             │ head-nofilter.txt │           head-filter.txt           │
             │      sec/op       │   sec/op     vs base                │
SampleSend-8        1999.5µ ± 3%   723.4µ ± 5%  -63.82% (p=0.000 n=20)

             │ head-nofilter.txt │           head-filter.txt            │
             │       B/op        │     B/op      vs base                │
SampleSend-8      279.571Ki ± 6%   6.304Ki ± 3%  -97.75% (p=0.000 n=20)

             │ head-nofilter.txt │          head-filter.txt           │
             │     allocs/op     │ allocs/op   vs base                │
SampleSend-8         440.50 ± 6%   24.00 ± 4%  -94.55% (p=0.000 n=20)

enqueueRetriesTotal prometheus.Counter
sentBatchDuration prometheus.Histogram
highestSentTimestamp *maxTimestamp
Expand Down Expand Up @@ -180,27 +185,27 @@ func newQueueManagerMetrics(r prometheus.Registerer, rn, e string) *queueManager
Help: "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable.",
ConstLabels: constLabels,
})
m.droppedSamplesTotal = prometheus.NewCounter(prometheus.CounterOpts{
m.droppedSamplesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "samples_dropped_total",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.droppedExemplarsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"reason"})
m.droppedExemplarsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "exemplars_dropped_total",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of exemplars which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
m.droppedHistogramsTotal = prometheus.NewCounter(prometheus.CounterOpts{
}, []string{"reason"})
m.droppedHistogramsTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "histograms_dropped_total",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling or unintentionally because of an unknown reference ID.",
Help: "Total number of histograms which were dropped after being read from the WAL before being sent via remote write, either via relabelling, due to being too old or unintentionally because of an unknown reference ID.",
ConstLabels: constLabels,
})
}, []string{"reason"})
m.enqueueRetriesTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -391,7 +396,8 @@ type WriteClient interface {
// indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher.
type QueueManager struct {
lastSendTimestamp atomic.Int64
lastSendTimestamp atomic.Int64
buildRequestLimitTimestamp atomic.Int64

logger log.Logger
flushDeadline time.Duration
Expand Down Expand Up @@ -529,7 +535,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met

func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error {
// Build the WriteRequest with no samples.
req, _, err := buildWriteRequest(nil, metadata, pBuf, nil)
req, _, _, err := buildWriteRequest(t.logger, nil, metadata, pBuf, nil, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -575,18 +581,65 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
return nil
}

func isSampleOld(baseTime time.Time, sampleAgeLimit time.Duration, ts int64) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
limitTs := baseTime.Add(-sampleAgeLimit)
sampleTs := timestamp.Time(ts)
return sampleTs.Before(limitTs)
}

func isTimeSeriesOldFilter(metrics *queueManagerMetrics, baseTime time.Time, sampleAgeLimit time.Duration) func(ts prompb.TimeSeries) bool {
return func(ts prompb.TimeSeries) bool {
if sampleAgeLimit == 0 {
// If sampleAgeLimit is unset, then we never skip samples due to their age.
return false
}
switch {
// Only the first element should be set in the series, therefore we only check the first element.
case len(ts.Samples) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Samples[0].Timestamp) {
metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Histograms) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Histograms[0].Timestamp) {
metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
case len(ts.Exemplars) > 0:
if isSampleOld(baseTime, sampleAgeLimit, ts.Exemplars[0].Timestamp) {
metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
return true
}
default:
return false
}
return false
}
}

// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
func (t *QueueManager) Append(samples []record.RefSample) bool {
currentTime := time.Now()
outer:
for _, s := range samples {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), s.T) {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[s.Ref]
if !ok {
t.metrics.droppedSamplesTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[s.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped sample for series that was not explicitly dropped via relabelling", "ref", s.Ref)
t.metrics.droppedSamplesTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedSamplesTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
Expand Down Expand Up @@ -629,17 +682,23 @@ func (t *QueueManager) AppendExemplars(exemplars []record.RefExemplar) bool {
if !t.sendExemplars {
return true
}

currentTime := time.Now()
outer:
for _, e := range exemplars {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), e.T) {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[e.Ref]
if !ok {
t.metrics.droppedExemplarsTotal.Inc()
// Track dropped exemplars in the same EWMA for sharding calc.
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[e.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped exemplar for series that was not explicitly dropped via relabelling", "ref", e.Ref)
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedExemplarsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
Expand Down Expand Up @@ -678,16 +737,22 @@ func (t *QueueManager) AppendHistograms(histograms []record.RefHistogramSample)
if !t.sendNativeHistograms {
return true
}

currentTime := time.Now()
outer:
for _, h := range histograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
Expand Down Expand Up @@ -725,16 +790,22 @@ func (t *QueueManager) AppendFloatHistograms(floatHistograms []record.RefFloatHi
if !t.sendNativeHistograms {
return true
}

currentTime := time.Now()
outer:
for _, h := range floatHistograms {
if isSampleOld(currentTime, time.Duration(t.cfg.SampleAgeLimit), h.T) {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonTooOld).Inc()
continue
}
t.seriesMtx.Lock()
lbls, ok := t.seriesLabels[h.Ref]
if !ok {
t.metrics.droppedHistogramsTotal.Inc()
t.dataDropped.incr(1)
if _, ok := t.droppedSeries[h.Ref]; !ok {
level.Info(t.logger).Log("msg", "Dropped histogram for series that was not explicitly dropped via relabelling", "ref", h.Ref)
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonUnintentionalDroppedSeries).Inc()
} else {
t.metrics.droppedHistogramsTotal.WithLabelValues(reasonDroppedSeries).Inc()
}
t.seriesMtx.Unlock()
continue
Expand Down Expand Up @@ -1490,7 +1561,8 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s
// sendSamples to the remote storage with backoff for recoverable errors.
func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) error {
// Build the WriteRequest with no metadata.
req, highest, err := buildWriteRequest(samples, nil, pBuf, *buf)
req, highest, lowest, err := buildWriteRequest(s.qm.logger, samples, nil, pBuf, *buf, nil)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
Expand All @@ -1504,6 +1576,25 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func(try int) error {
currentTime := time.Now()
lowest := s.qm.buildRequestLimitTimestamp.Load()
if isSampleOld(currentTime, time.Duration(s.qm.cfg.SampleAgeLimit), lowest) {
// This will filter out old samples during retries.
req, _, lowest, err := buildWriteRequest(
s.qm.logger,
samples,
nil,
pBuf,
*buf,
isTimeSeriesOldFilter(s.qm.metrics, currentTime, time.Duration(s.qm.cfg.SampleAgeLimit)),
)
s.qm.buildRequestLimitTimestamp.Store(lowest)
if err != nil {
return err
}
*buf = req
}

ctx, span := otel.Tracer("").Start(ctx, "Remote Send Batch")
defer span.End()

Expand Down Expand Up @@ -1608,9 +1699,27 @@ func sendWriteRequestWithBackoff(ctx context.Context, cfg config.QueueConfig, l
}
}

func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte) ([]byte, int64, error) {
func buildTimeSeries(timeSeries []prompb.TimeSeries, filter func(prompb.TimeSeries) bool) (int64, int64, []prompb.TimeSeries, int, int, int) {
var highest int64
for _, ts := range samples {
var lowest int64
var droppedSamples, droppedExemplars, droppedHistograms int

keepIdx := 0
lowest = math.MaxInt64
for i, ts := range timeSeries {
if filter != nil && filter(ts) {
if len(ts.Samples) > 0 {
droppedSamples++
}
if len(ts.Exemplars) > 0 {
droppedExemplars++
}
if len(ts.Histograms) > 0 {
droppedHistograms++
}
continue
}

// At the moment we only ever append a TimeSeries with a single sample or exemplar in it.
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest {
highest = ts.Samples[0].Timestamp
Expand All @@ -1621,10 +1730,37 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest {
highest = ts.Histograms[0].Timestamp
}

// Get lowest timestamp
if len(ts.Samples) > 0 && ts.Samples[0].Timestamp < lowest {
lowest = ts.Samples[0].Timestamp
}
if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp < lowest {
lowest = ts.Exemplars[0].Timestamp
}
if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp < lowest {
lowest = ts.Histograms[0].Timestamp
}

// Move the current element to the write position and increment the write pointer
timeSeries[keepIdx] = timeSeries[i]
keepIdx++
}

timeSeries = timeSeries[:keepIdx]
return highest, lowest, timeSeries, droppedSamples, droppedExemplars, droppedHistograms
}

func buildWriteRequest(logger log.Logger, timeSeries []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, buf []byte, filter func(prompb.TimeSeries) bool) ([]byte, int64, int64, error) {
highest, lowest, timeSeries,
droppedSamples, droppedExemplars, droppedHistograms := buildTimeSeries(timeSeries, filter)

if droppedSamples > 0 || droppedExemplars > 0 || droppedHistograms > 0 {
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}

req := &prompb.WriteRequest{
Timeseries: samples,
Timeseries: timeSeries,
Metadata: metadata,
}

Expand All @@ -1635,7 +1771,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
}
err := pBuf.Marshal(req)
if err != nil {
return nil, highest, err
return nil, highest, lowest, err
}

// snappy uses len() to see if it needs to allocate a new slice. Make the
Expand All @@ -1644,5 +1780,5 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
buf = buf[0:cap(buf)]
}
compressed := snappy.Encode(buf, pBuf.Bytes())
return compressed, highest, nil
return compressed, highest, lowest, nil
}