Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added tools bucket marker for no-downsample.json #5945

Merged
merged 1 commit into from Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#5945](https://github.com/thanos-io/thanos/pull/5945) Tools: Added new `no-downsample` marker to skip blocks when downsampling via `thanos tools bucket mark --marker=no-downsample-mark.json`. This will skip downsampling for blocks with the new marker.
- [#5814](https://github.com/thanos-io/thanos/pull/5814) Store: Add metric `thanos_bucket_store_postings_size_bytes` that shows the distribution of how many postings (in bytes) were needed for each Series() call in Thanos Store. Useful for determining limits.
- [#5801](https://github.com/thanos-io/thanos/pull/5801) Store: add a new limiter `--store.grpc.downloaded-bytes-limit` that limits the number of bytes downloaded in each Series/LabelNames/LabelValues call. Use `thanos_bucket_store_postings_size_bytes` for determining the limits.
- [#5839](https://github.com/thanos-io/thanos/pull/5839) Receive: Add parameter `--tsdb.out-of-order.time-window` to set time window for experimental out-of-order samples ingestion. Disabled by default (set to 0s). Please note if you enable this option and you use compactor, make sure you set the `--enable-vertical-compaction` flag, otherwise you might risk compactor halt.
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/downsample.go
Expand Up @@ -85,8 +85,10 @@ func RunDownsample(
return err
}

// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, bkt),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/tools_bucket.go
Expand Up @@ -238,7 +238,7 @@ func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.F

func (tbc *bucketMarkBlockConfig) registerBucketMarkBlockFlag(cmd extkingpin.FlagClause) *bucketMarkBlockConfig {
cmd.Flag("id", "ID (ULID) of the blocks to be marked for deletion (repeated flag)").Required().StringsVar(&tbc.blockIDs)
cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename)
cmd.Flag("marker", "Marker to be put.").Required().EnumVar(&tbc.marker, metadata.DeletionMarkFilename, metadata.NoCompactMarkFilename, metadata.NoDownsampleMarkFilename)
cmd.Flag("details", "Human readable details to be put into marker.").Required().StringVar(&tbc.details)

return tbc
Expand Down Expand Up @@ -1059,6 +1059,10 @@ func registerBucketMarkBlock(app extkingpin.AppClause, objStoreConfig *extflag.P
if err := block.MarkForNoCompact(ctx, logger, bkt, id, metadata.ManualNoCompactReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, tbc.marker)
}
case metadata.NoDownsampleMarkFilename:
if err := block.MarkForNoDownsample(ctx, logger, bkt, id, metadata.ManualNoDownsampleReason, tbc.details, promauto.With(nil).NewCounter(prometheus.CounterOpts{})); err != nil {
return errors.Wrapf(err, "mark %v for %v", id, tbc.marker)
}
default:
return errors.Errorf("not supported marker %v", tbc.marker)
}
Expand Down
31 changes: 31 additions & 0 deletions pkg/block/block.go
Expand Up @@ -402,3 +402,34 @@ func MarkForNoCompact(ctx context.Context, logger log.Logger, bkt objstore.Bucke
level.Info(logger).Log("msg", "block has been marked for no compaction", "block", id)
return nil
}

// MarkForNoDownsample creates a file which marks block to be not downsampled.
func MarkForNoDownsample(ctx context.Context, logger log.Logger, bkt objstore.Bucket, id ulid.ULID, reason metadata.NoDownsampleReason, details string, markedForNoDownsample prometheus.Counter) error {
m := path.Join(id.String(), metadata.NoDownsampleMarkFilename)
noDownsampleMarkExists, err := bkt.Exists(ctx, m)
if err != nil {
return errors.Wrapf(err, "check exists %s in bucket", m)
}
if noDownsampleMarkExists {
level.Warn(logger).Log("msg", "requested to mark for no deletion, but file already exists; this should not happen; investigate", "err", errors.Errorf("file %s already exists in bucket", m))
return nil
}
noDownsampleMark, err := json.Marshal(metadata.NoDownsampleMark{
ID: id,
Version: metadata.NoDownsampleMarkVersion1,

NoDownsampleTime: time.Now().Unix(),
Reason: reason,
Details: details,
})
if err != nil {
return errors.Wrap(err, "json encode no downsample mark")
}

if err := bkt.Upload(ctx, m, bytes.NewBuffer(noDownsampleMark)); err != nil {
return errors.Wrapf(err, "upload file %s to bucket", m)
}
markedForNoDownsample.Inc()
level.Info(logger).Log("msg", "block has been marked for no downsample", "block", id)
return nil
}
55 changes: 55 additions & 0 deletions pkg/block/block_test.go
Expand Up @@ -380,6 +380,61 @@ func TestMarkForNoCompact(t *testing.T) {
}
}

func TestMarkForNoDownsample(t *testing.T) {

defer testutil.TolerantVerifyLeak(t)
ctx := context.Background()

tmpDir := t.TempDir()

for _, tcase := range []struct {
name string
preUpload func(t testing.TB, id ulid.ULID, bkt objstore.Bucket)

blocksMarked int
}{
{
name: "block marked",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {},
blocksMarked: 1,
},
{
name: "block with no-downsample mark already, expected log and no metric increment",
preUpload: func(t testing.TB, id ulid.ULID, bkt objstore.Bucket) {
m, err := json.Marshal(metadata.NoDownsampleMark{
ID: id,
NoDownsampleTime: time.Now().Unix(),
Version: metadata.NoDownsampleMarkVersion1,
})
testutil.Ok(t, err)
testutil.Ok(t, bkt.Upload(ctx, path.Join(id.String(), metadata.NoDownsampleMarkFilename), bytes.NewReader(m)))
},
blocksMarked: 0,
},
} {
t.Run(tcase.name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
id, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
{{Name: "a", Value: "3"}},
{{Name: "a", Value: "4"}},
{{Name: "b", Value: "1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "val1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

tcase.preUpload(t, id, bkt)

testutil.Ok(t, Upload(ctx, log.NewNopLogger(), bkt, path.Join(tmpDir, id.String()), metadata.NoneFunc))

c := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
err = MarkForNoDownsample(ctx, log.NewNopLogger(), bkt, id, metadata.ManualNoDownsampleReason, "", c)
testutil.Ok(t, err)
testutil.Equals(t, float64(tcase.blocksMarked), promtest.ToFloat64(c))
})
}
}

// TestHashDownload uploads an empty block to in-memory storage
// and tries to download it to the same dir. It should not try
// to download twice.
Expand Down
3 changes: 3 additions & 0 deletions pkg/block/fetcher.go
Expand Up @@ -80,6 +80,9 @@ const (
// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"

// MarkedForNoDownsampleMeta is label for blocks which are loaded but also marked for no downsample. This label is also counted in `loaded` label metric.
MarkedForNoDownsampleMeta = "marked-for-no-downsample"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
)
Expand Down
31 changes: 30 additions & 1 deletion pkg/block/metadata/markers.go
Expand Up @@ -24,11 +24,15 @@ const (
// NoCompactMarkFilename is the known json filename for optional file storing details about why block has to be excluded from compaction.
// If such file is present in block dir, it means the block has to excluded from compaction (both vertical and horizontal) or rewrite (e.g deletions).
NoCompactMarkFilename = "no-compact-mark.json"

// NoDownsampleMarkFilename is the known json filenanme for optional file storing details about why block has to be excluded from downsampling.
// If such file is present in block dir, it means the block has to be excluded from downsampling.
NoDownsampleMarkFilename = "no-downsample-mark.json"
// DeletionMarkVersion1 is the version of deletion-mark file supported by Thanos.
DeletionMarkVersion1 = 1
// NoCompactMarkVersion1 is the version of no-compact-mark file supported by Thanos.
NoCompactMarkVersion1 = 1
// NoDownsampleVersion1 is the version of no-downsample-mark file supported by Thanos.
NoDownsampleMarkVersion1 = 1
)

var (
Expand Down Expand Up @@ -62,9 +66,14 @@ func (m *DeletionMark) markerFilename() string { return DeletionMarkFilename }
// NoCompactReason is a reason for a block to be excluded from compaction.
type NoCompactReason string

// NoDownsampleReason is a reason for a block to be excluded from downsample.
type NoDownsampleReason string

const (
// ManualNoCompactReason is a custom reason of excluding from compaction that should be added when no-compact mark is added for unknown/user specified reason.
ManualNoCompactReason NoCompactReason = "manual"
// ManualNoDownsampleReason is a custom reason of excluding from downsample that should be added when no-downsample mark is added for unknown/user specified reason.
ManualNoDownsampleReason NoDownsampleReason = "manual"
// IndexSizeExceedingNoCompactReason is a reason of index being too big (for example exceeding 64GB limit: https://github.com/thanos-io/thanos/issues/1424)
// This reason can be ignored when vertical block sharding will be implemented.
IndexSizeExceedingNoCompactReason = "index-size-exceeding"
Expand All @@ -88,6 +97,22 @@ type NoCompactMark struct {

func (n *NoCompactMark) markerFilename() string { return NoCompactMarkFilename }

// NoDownsampleMark marker stores reason of block being excluded from downsample if needed.
type NoDownsampleMark struct {
// ID of the tsdb block.
ID ulid.ULID `json:"id"`
// Version of the file.
Version int `json:"version"`
// Details is a human readable string giving details of reason.
Details string `json:"details,omitempty"`

// NoDownsampleTime is a unix timestamp of when the block was marked for no downsample.
NoDownsampleTime int64 `json:"no_downsample_time"`
Reason NoDownsampleReason `json:"reason"`
}

func (n *NoDownsampleMark) markerFilename() string { return NoDownsampleMarkFilename }

// ReadMarker reads the given mark file from <dir>/<marker filename>.json in bucket.
func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, marker Marker) error {
markerFile := path.Join(dir, marker.markerFilename())
Expand All @@ -113,6 +138,10 @@ func ReadMarker(ctx context.Context, logger log.Logger, bkt objstore.Instrumente
if version := marker.(*NoCompactMark).Version; version != NoCompactMarkVersion1 {
return errors.Errorf("unexpected no-compact-mark file version %d, expected %d", version, NoCompactMarkVersion1)
}
case NoDownsampleMarkFilename:
if version := marker.(*NoDownsampleMark).Version; version != NoDownsampleMarkVersion1 {
return errors.Errorf("unexpected no-downsample-mark file version %d, expected %d", version, NoDownsampleMarkVersion1)
}
case DeletionMarkFilename:
if version := marker.(*DeletionMark).Version; version != DeletionMarkVersion1 {
return errors.Errorf("unexpected deletion-mark file version %d, expected %d", version, DeletionMarkVersion1)
Expand Down
108 changes: 108 additions & 0 deletions pkg/compact/downsample/downsample.go
Expand Up @@ -4,11 +4,13 @@
package downsample

import (
"context"
"fmt"
"math"
"math/rand"
"os"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -22,7 +24,10 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -734,3 +739,106 @@ func SamplesFromTSDBSamples(samples []tsdbutil.Sample) []sample {
}
return res
}

// GatherNoDownsampleMarkFilter is a block.Fetcher filter that passes all metas.
// While doing it, it gathers all no-downsample-mark.json markers.
type GatherNoDownsampleMarkFilter struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
noDownsampleMarkedMap map[ulid.ULID]*metadata.NoDownsampleMark
concurrency int
mtx sync.Mutex
}

// NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter.
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter {
return &GatherNoDownsampleMarkFilter{
logger: logger,
bkt: bkt,
concurrency: 1,
}
}

// NoDownsampleMarkedBlocks returns block ids that were marked for no downsample.
func (f *GatherNoDownsampleMarkFilter) NoDownsampleMarkedBlocks() map[ulid.ULID]*metadata.NoDownsampleMark {
f.mtx.Lock()
copiedNoDownsampleMarked := make(map[ulid.ULID]*metadata.NoDownsampleMark, len(f.noDownsampleMarkedMap))
for k, v := range f.noDownsampleMarkedMap {
copiedNoDownsampleMarked[k] = v
}
f.mtx.Unlock()

return copiedNoDownsampleMarked
}

// TODO (@rohitkochhar): reduce code duplication here by combining
// this code with that of GatherNoCompactionMarkFilter
// Filter passes all metas, while gathering no downsample markers.
func (f *GatherNoDownsampleMarkFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
RohitKochhar marked this conversation as resolved.
Show resolved Hide resolved
f.mtx.Lock()
f.noDownsampleMarkedMap = make(map[ulid.ULID]*metadata.NoDownsampleMark)
f.mtx.Unlock()

// Make a copy of block IDs to check, in order to avoid concurrency issues
// between the scheduler and workers.
blockIDs := make([]ulid.ULID, 0, len(metas))
for id := range metas {
blockIDs = append(blockIDs, id)
}

var (
eg errgroup.Group
ch = make(chan ulid.ULID, f.concurrency)
)

for i := 0; i < f.concurrency; i++ {
eg.Go(func() error {
var lastErr error
for id := range ch {
m := &metadata.NoDownsampleMark{}

if err := metadata.ReadMarker(ctx, f.logger, f.bkt, id.String(), m); err != nil {
if errors.Cause(err) == metadata.ErrorMarkerNotFound {
continue
}
if errors.Cause(err) == metadata.ErrorUnmarshalMarker {
level.Warn(f.logger).Log("msg", "found partial no-downsample-mark.json; if we will see it happening often for the same block, consider manually deleting no-downsample-mark.json from the object storage", "block", id, "err", err)
continue
}
// Remember the last error and continue draining the channel.
lastErr = err
continue
}

f.mtx.Lock()
f.noDownsampleMarkedMap[id] = m
f.mtx.Unlock()
synced.WithLabelValues(block.MarkedForNoDownsampleMeta).Inc()
}

return lastErr
})
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)

for _, id := range blockIDs {
select {
case ch <- id:
// Nothing to do.
case <-ctx.Done():
return ctx.Err()
}
}

return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "filter blocks marked for no downsample")
}

return nil
}