From a3568caf46f634c83e4d47dafe87bb18bf588412 Mon Sep 17 00:00:00 2001 From: Rohit Singh Date: Wed, 7 Dec 2022 14:09:47 -0400 Subject: [PATCH] Added tools bucket marker flag to mark block for no-downsample Signed-off-by: Rohit Singh --- CHANGELOG.md | 1 + cmd/thanos/downsample.go | 2 + cmd/thanos/tools_bucket.go | 6 +- pkg/block/block.go | 31 ++++++++ pkg/block/block_test.go | 55 ++++++++++++++ pkg/block/fetcher.go | 3 + pkg/block/metadata/markers.go | 31 +++++++- pkg/compact/downsample/downsample.go | 108 +++++++++++++++++++++++++++ 8 files changed, 235 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65051ae9456..ab04839dad2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#5945](https://github.com/thanos-io/thanos/pull/5945) Tools: Added new `no-downsample` marker to skip blocks when downsampling via `thanos tools bucket mark --marker=no-downsample-mark.json`. - [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits. - [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits. - [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt. diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 693823afc29..5b2dbfeca7b 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -85,8 +85,10 @@ func RunDownsample( return err } + // While fetching blocks, filter out blocks that were marked for no downsample. metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), + downsample.NewGatherNoDownsampleMarkFilter(logger, bkt), }) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index f3e0e44fec0..bc48a56ff76 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -238,7 +238,7 @@ func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.F func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig { cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().StringsVar(&tbc.blockIDs) - cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename) + cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename, metadata.NoDownsampleMarkFilename) cmd.Flag("details", "Human readable details to be put into marker.").Required().StringVar(&tbc.details) return tbc @@ -1059,6 +1059,10 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { return errors.Wrapf(err, "mark %v for %v", id, tbc.marker) } + case metadata.NoDownsampleMarkFilename: + if err := block.MarkForNoDownsample(ctx, logger, bkt, id, metadata.ManualNoDownsampleReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil { + return errors.Wrapf(err, "mark %v for %v", id, tbc.marker) + } default: return errors.Errorf("not supported marker %v", tbc.marker) } diff --git a/pkg/block/block.go b/pkg/block/block.go index 7886e5d8693..5edf27f2423 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -402,3 +402,34 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id) return nil } + +// MarkForNoDownsample creates a file which marks block to be not downsampled. +func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoDownsampleReason, details string, markedForNoDownsample prometheus.Counter) error { + m := path.Join(id.String(), metadata.NoDownsampleMarkFilename) + noDownsampleMarkExists, err := bkt.Exists(ctx, m) + if err != nil { + return errors.Wrapf(err, "check exists %s in bucket", m) + } + if noDownsampleMarkExists { + level.Warn(logger).Log("msg", "requested to mark for no deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m)) + return nil + } + noDownsampleMark, err := json.Marshal(metadata.NoDownsampleMark{ + ID: id, + Version: metadata.NoDownsampleMarkVersion1, + + NoDownsampleTime: time.Now().Unix(), + Reason: reason, + Details: details, + }) + if err != nil { + return errors.Wrap(err, "json encode no downsample mark") + } + + if err := bkt.Upload(ctx, m, bytes.NewBuffer(noDownsampleMark)); err != nil { + return errors.Wrapf(err, "upload file %s to bucket", m) + } + markedForNoDownsample.Inc() + level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id) + return nil +} diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index e6c6c0aa8bc..f157fedd7dc 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -380,6 +380,61 @@ func TestMarkForNoCompact(t *testing.T) { } } +func TestMarkForNoDownsample(t *testing.T) { + + defer testutil.TolerantVerifyLeak(t) + ctx := context.Background() + + tmpDir := t.TempDir() + + for _, tcase := range []struct { + name string + preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) + + blocksMarked int + }{ + { + name: "block marked", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {}, + blocksMarked: 1, + }, + { + name: "block with no-downsample mark already, expected log and no metric increment", + preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) { + m, err := json.Marshal(metadata.NoDownsampleMark{ + ID: id, + NoDownsampleTime: time.Now().Unix(), + Version: metadata.NoDownsampleMarkVersion1, + }) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoDownsampleMarkFilename), bytes.NewReader(m))) + }, + blocksMarked: 0, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ + {{Name: "a", Value: "1"}}, + {{Name: "a", Value: "2"}}, + {{Name: "a", Value: "3"}}, + {{Name: "a", Value: "4"}}, + {{Name: "b", Value: "1"}}, + }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc) + testutil.Ok(t, err) + + tcase.preUpload(t, id, bkt) + + testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()), metadata.NoneFunc)) + + c := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) + err = MarkForNoDownsample(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoDownsampleReason, "", c) + testutil.Ok(t, err) + testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c)) + }) + } +} + // TestHashDownload uploads an empty block to in-memory storage // and tries to download it to the same dir. It should not try // to download twice. diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 3a228f994f2..a87f74a57a6 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -80,6 +80,9 @@ const ( // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. MarkedForNoCompactionMeta = "marked-for-no-compact" + // MarkedForNoDownsampleMeta is label for blocks which are loaded but also marked for no downsample. This label is also counted in `loaded` label metric. + MarkedForNoDownsampleMeta = "marked-for-no-downsample" + // Modified label values. replicaRemovedMeta = "replica-label-removed" ) diff --git a/pkg/block/metadata/markers.go b/pkg/block/metadata/markers.go index 8af9a5a1f87..83273eb3430 100644 --- a/pkg/block/metadata/markers.go +++ b/pkg/block/metadata/markers.go @@ -24,11 +24,15 @@ const ( // NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction. // If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions). NoCompactMarkFilename = "no-compact-mark.json" - + // NoDownsampleMarkFilename is the known json filenanme for optional file storing details about why block has to be excluded from downsampling. + // If such file is present in block dir, it means the block has to be excluded from downsampling. + NoDownsampleMarkFilename = "no-downsample-mark.json" // DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos. DeletionMarkVersion1 = 1 // NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos. NoCompactMarkVersion1 = 1 + // NoDownsampleVersion1 is the version of no-downsample-mark file supported by Thanos. + NoDownsampleMarkVersion1 = 1 ) var ( @@ -62,9 +66,14 @@ func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename } // NoCompactReason is a reason for a block to be excluded from compaction. type NoCompactReason string +// NoDownsampleReason is a reason for a block to be excluded from downsample. +type NoDownsampleReason string + const ( // ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason. ManualNoCompactReason NoCompactReason = "manual" + // ManualNoDownsampleReason is a custom reason of excluding from downsample that should be added when no-downsample mark is added for unknown/user specified reason. + ManualNoDownsampleReason NoDownsampleReason = "manual" // IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424) // This reason can be ignored when vertical block sharding will be implemented. IndexSizeExceedingNoCompactReason = "index-size-exceeding" @@ -88,6 +97,22 @@ type NoCompactMark struct { func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename } +// NoDownsampleMark marker stores reason of block being excluded from downsample if needed. +type NoDownsampleMark struct { + // ID of the tsdb block. + ID ulid.ULID `json:"id"` + // Version of the file. + Version int `json:"version"` + // Details is a human readable string giving details of reason. + Details string `json:"details,omitempty"` + + // NoDownsampleTime is a unix timestamp of when the block was marked for no downsample. + NoDownsampleTime int64 `json:"no_downsample_time"` + Reason NoDownsampleReason `json:"reason"` +} + +func (n *NoDownsampleMark) markerFilename() string { return NoDownsampleMarkFilename } + // ReadMarker reads the given mark file from /.json in bucket. func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error { markerFile := path.Join(dir, marker.markerFilename()) @@ -113,6 +138,10 @@ func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.Instrumente if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 { return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1) } + case NoDownsampleMarkFilename: + if version := marker.(*NoDownsampleMark).Version; version != NoDownsampleMarkVersion1 { + return errors.Errorf("unexpected no-downsample-mark file version %d, expected %d", version, NoDownsampleMarkVersion1) + } case DeletionMarkFilename: if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 { return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 7731eab877d..0ded84cf4e1 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -4,11 +4,13 @@ package downsample import ( + "context" "fmt" "math" "math/rand" "os" "path/filepath" + "sync" "time" "github.com/go-kit/log" @@ -22,7 +24,10 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/index" "github.com/prometheus/prometheus/tsdb/tsdbutil" + "golang.org/x/sync/errgroup" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/runutil" @@ -734,3 +739,106 @@ func SamplesFromTSDBSamples(samples []tsdbutil.Sample) []sample { } return res } + +// GatherNoDownsampleMarkFilter is a block.Fetcher filter that passes all metas. +// While doing it, it gathers all no-downsample-mark.json markers. +type GatherNoDownsampleMarkFilter struct { + logger log.Logger + bkt objstore.InstrumentedBucketReader + noDownsampleMarkedMap map[ulid.ULID]*metadata.NoDownsampleMark + concurrency int + mtx sync.Mutex +} + +// NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { + return &GatherNoDownsampleMarkFilter{ + logger: logger, + bkt: bkt, + concurrency: 1, + } +} + +// NoDownsampleMarkedBlocks returns block ids that were marked for no downsample. +func (f *GatherNoDownsampleMarkFilter) NoDownsampleMarkedBlocks() map[ulid.ULID]*metadata.NoDownsampleMark { + f.mtx.Lock() + copiedNoDownsampleMarked := make(map[ulid.ULID]*metadata.NoDownsampleMark, len(f.noDownsampleMarkedMap)) + for k, v := range f.noDownsampleMarkedMap { + copiedNoDownsampleMarked[k] = v + } + f.mtx.Unlock() + + return copiedNoDownsampleMarked +} + +// TODO (@rohitkochhar): reduce code duplication here by combining +// this code with that of GatherNoCompactMarkFilter +// Filter passes all metas, while gathering no downsample markers. +func (f *GatherNoDownsampleMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error { + f.mtx.Lock() + f.noDownsampleMarkedMap = make(map[ulid.ULID]*metadata.NoDownsampleMark) + f.mtx.Unlock() + + // Make a copy of block IDs to check, in order to avoid concurrency issues + // between the scheduler and workers. + blockIDs := make([]ulid.ULID, 0, len(metas)) + for id := range metas { + blockIDs = append(blockIDs, id) + } + + var ( + eg errgroup.Group + ch = make(chan ulid.ULID, f.concurrency) + ) + + for i := 0; i < f.concurrency; i++ { + eg.Go(func() error { + var lastErr error + for id := range ch { + m := &metadata.NoDownsampleMark{} + + if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil { + if errors.Cause(err) == metadata.ErrorMarkerNotFound { + continue + } + if errors.Cause(err) == metadata.ErrorUnmarshalMarker { + level.Warn(f.logger).Log("msg", "found partial no-downsample-mark.json; if we will see it happening often for the same block, consider manually deleting no-downsample-mark.json from the object storage", "block", id, "err", err) + continue + } + // Remember the last error and continue draining the channel. + lastErr = err + continue + } + + f.mtx.Lock() + f.noDownsampleMarkedMap[id] = m + f.mtx.Unlock() + synced.WithLabelValues(block.MarkedForNoDownsampleMeta).Inc() + } + + return lastErr + }) + } + + // Workers scheduled, distribute blocks. + eg.Go(func() error { + defer close(ch) + + for _, id := range blockIDs { + select { + case ch <- id: + // Nothing to do. + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil + }) + + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "filter blocks marked for no downsample") + } + + return nil +}