Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetcher: sanity check metas for common corruptions #7282

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2.
- [#7223](https://github.com/thanos-io/thanos/pull/7223) Automatic detection of memory limits and configure GOMEMLIMIT to match.
- [#7282](https://github.com/thanos-io/thanos/pull/7282) Fetcher: mark metas with incomplete files as partial if configured.
- [#7282](https://github.com/thanos-io/thanos/pull/7282) Compactor: add flag to disable cleanup of partial uploads

### Removed

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,10 @@ func runCompact(
var cleanMtx sync.Mutex
// TODO(GiedriusS): we could also apply retention policies here but the logic would be a bit more complex.
cleanPartialMarked := func() error {
if conf.disableCleanupPartialUploads {
level.Info(logger).Log("msg", "cleanup of partial uploads is disabled, skipping")
return nil
}
cleanMtx.Lock()
defer cleanMtx.Unlock()

Expand Down Expand Up @@ -724,6 +728,7 @@ type compactConfig struct {
progressCalculateInterval time.Duration
filterConf *store.FilterConfig
disableAdminOperations bool
disableCleanupPartialUploads bool
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -837,4 +842,5 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label)

cmd.Flag("disable-admin-operations", "Disable UI/API admin operations like marking blocks for deletion and no compaction.").Default("false").BoolVar(&cc.disableAdminOperations)
cmd.Flag("compact.disable-cleanup-partial-uploads", "Disable cleanup of partial uploads.").Default("false").BoolVar(&cc.disableCleanupPartialUploads)
}
168 changes: 134 additions & 34 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -24,10 +25,10 @@
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -77,10 +78,10 @@
FailedMeta = "failed"

// Synced label values.
labelExcludedMeta = "label-excluded"
timeExcludedMeta = "time-excluded"
tooFreshMeta = "too-fresh"
duplicateMeta = "duplicate"
LabelExcludedMeta = "label-excluded"
TimeExcludedMeta = "time-excluded"
TooFreshMeta = "too-fresh"
DuplicateMeta = "duplicate"
// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
MarkedForDeletionMeta = "marked-for-deletion"
Expand All @@ -92,7 +93,10 @@
MarkedForNoDownsampleMeta = "marked-for-no-downsample"

// Modified label values.
replicaRemovedMeta = "replica-label-removed"
ReplicaRemovedMeta = "replica-label-removed"

// Mysterious incomplete block, meta was uploaded indicating that its uploaded just fine, but at the same time it indicates that the block is incomplete

Check failure on line 98 in pkg/block/fetcher.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Comment should end in a period (godot)
MetaHasIncompleteFiles = "meta-has-incomplete-files"
)

func NewBaseFetcherMetrics(reg prometheus.Registerer) *BaseFetcherMetrics {
Expand Down Expand Up @@ -154,27 +158,28 @@
{CorruptedMeta},
{NoMeta},
{LoadedMeta},
{tooFreshMeta},
{TooFreshMeta},
{FailedMeta},
{labelExcludedMeta},
{timeExcludedMeta},
{duplicateMeta},
{LabelExcludedMeta},
{TimeExcludedMeta},
{DuplicateMeta},
{MarkedForDeletionMeta},
{MarkedForNoCompactionMeta},
{MetaHasIncompleteFiles},
}
}

func DefaultModifiedLabelValues() [][]string {
return [][]string{
{replicaRemovedMeta},
{ReplicaRemovedMeta},
}
}

// Lister lists block IDs from a bucket.
type Lister interface {
// GetActiveAndPartialBlockIDs GetActiveBlocksIDs returning it via channel (streaming) and response.
// Active blocks are blocks which contain meta.json, while partial blocks are blocks without meta.json
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error)
GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error)
}

// RecursiveLister lists block IDs by recursively iterating through a bucket.
Expand All @@ -190,8 +195,8 @@
}
}

func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
partialBlocks = make(map[ulid.ULID]bool)
func (f *RecursiveLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) {
partialBlocks = make(map[ulid.ULID]error)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
dir, file := parts[0], parts[len(parts)-1]
Expand All @@ -200,12 +205,12 @@
return nil
}
if _, ok := partialBlocks[id]; !ok {
partialBlocks[id] = true
partialBlocks[id] = errors.Wrapf(ErrorSyncMetaNotFound, "block id: %s", id)
}
if !IsBlockMetaFile(file) {
return nil
}
partialBlocks[id] = false
partialBlocks[id] = nil

select {
case <-ctx.Done():
Expand All @@ -217,6 +222,93 @@
return partialBlocks, err
}

// RecursiveBlockValidatingLister lists block IDs by recursively iterating through a bucket and performs several validations

Check failure on line 225 in pkg/block/fetcher.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

Comment should end in a period (godot)
type RecursiveBlockValidatingLister struct {
logger log.Logger
bkt objstore.InstrumentedBucketReader
}

func NewRecursiveBlockValidatingLister(logger log.Logger, bkt objstore.InstrumentedBucketReader) *RecursiveBlockValidatingLister {
return &RecursiveBlockValidatingLister{
logger: logger,
bkt: bkt,
}
}

func (f *RecursiveBlockValidatingLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) {
filesPerBlock := make(map[ulid.ULID][]string)
err = f.bkt.Iter(ctx, "", func(name string) error {
parts := strings.Split(name, "/")
id, ok := IsBlockDir(parts[0])
if !ok {
return nil
}
filesPerBlock[id] = append(filesPerBlock[id], strings.TrimLeft(name, parts[0]+"/"))
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}, objstore.WithRecursiveIter)

partialBlocks = make(map[ulid.ULID]error)
for id, files := range filesPerBlock {
if checkErr := checkForIncompleteFiles(files); checkErr != nil {
partialBlocks[id] = checkErr
} else {
select {
case <-ctx.Done():
return nil, ctx.Err()
case ch <- id:
}
}
}
return partialBlocks, err
}

func checkForIncompleteFiles(files []string) error {
var (
numChunkFiles int
highestChunkFile int
hasIndex, hasMeta bool
)

for _, f := range files {
if f == "index" {
hasIndex = true
}
if f == "meta.json" {
hasMeta = true
}
dir, name := path.Split(f)
if dir == "chunks/" {
numChunkFiles++
idx, err := strconv.Atoi(name)
if err != nil {
return errors.Wrap(err, "unexpected chunk file name")
}
if idx > highestChunkFile {
highestChunkFile = idx
}
}
}

if !hasMeta {
return ErrorSyncMetaNotFound
}
if !hasIndex {
return errors.Wrap(ErrorSyncMetaIncomplete, "no index file in meta")
}
if numChunkFiles == 0 {
return errors.Wrap(ErrorSyncMetaIncomplete, "no chunk files in meta")
}
if numChunkFiles != highestChunkFile {
return errors.Wrap(ErrorSyncMetaIncomplete, "incomplete chunk files in meta")
}
return nil
}

// ConcurrentLister lists block IDs by doing a top level iteration of the bucket
// followed by one Exists call for each discovered block to detect partial blocks.
type ConcurrentLister struct {
Expand All @@ -231,10 +323,10 @@
}
}

func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]bool, err error) {
func (f *ConcurrentLister) GetActiveAndPartialBlockIDs(ctx context.Context, ch chan<- ulid.ULID) (partialBlocks map[ulid.ULID]error, err error) {
const concurrency = 64

partialBlocks = make(map[ulid.ULID]bool)
partialBlocks = make(map[ulid.ULID]error)
var (
metaChan = make(chan ulid.ULID, concurrency)
eg, gCtx = errgroup.WithContext(ctx)
Expand All @@ -249,11 +341,11 @@
metaFile := path.Join(uid.String(), MetaFilename)
ok, err := f.bkt.Exists(gCtx, metaFile)
if err != nil {
return errors.Wrapf(err, "meta.json file exists: %v", uid)
return errors.Wrapf(err, "meta.json file exists call: %v", uid)
}
if !ok {
mu.Lock()
partialBlocks[uid] = true
partialBlocks[uid] = errors.Wrapf(ErrorSyncMetaNotFound, "block id: %s", uid)
mu.Unlock()
continue
}
Expand Down Expand Up @@ -382,8 +474,9 @@
}

var (
ErrorSyncMetaNotFound = errors.New("meta.json not found")
ErrorSyncMetaCorrupted = errors.New("meta.json corrupted")
ErrorSyncMetaNotFound = errors.New("meta.json not found")
ErrorSyncMetaCorrupted = errors.New("meta.json corrupted")
ErrorSyncMetaIncomplete = errors.New("meta.json incomplete")
)

// loadMeta returns metadata from object storage or error.
Expand Down Expand Up @@ -457,8 +550,9 @@
// If metaErr > 0 it means incomplete view, so some metas, failed to be loaded.
metaErrs errutil.MultiError

noMetas float64
corruptedMetas float64
noMetas float64
corruptedMetas float64
incompleteMetas float64
}

func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
Expand Down Expand Up @@ -509,7 +603,7 @@
})
}

var partialBlocks map[ulid.ULID]bool
var partialBlocks map[ulid.ULID]error
var err error
// Workers scheduled, distribute blocks.
eg.Go(func() error {
Expand All @@ -523,10 +617,15 @@
}

mtx.Lock()
for blockULID, isPartial := range partialBlocks {
if isPartial {
resp.partial[blockULID] = errors.Errorf("block %s has no meta file", blockULID)
resp.noMetas++
for blockULID, err := range partialBlocks {
if err != nil {
switch errors.Cause(err) {
case ErrorSyncMetaNotFound:
resp.noMetas++
case ErrorSyncMetaIncomplete:
resp.incompleteMetas++
}
resp.partial[blockULID] = err
}
}
mtx.Unlock()
Expand Down Expand Up @@ -608,6 +707,7 @@
metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs)))
metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas)
metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas)
metrics.Synced.WithLabelValues(MetaHasIncompleteFiles).Set(resp.incompleteMetas)

for _, filter := range filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
Expand Down Expand Up @@ -685,7 +785,7 @@
if m.MaxTime >= f.minTime.PrometheusTimestamp() && m.MinTime <= f.maxTime.PrometheusTimestamp() {
continue
}
synced.WithLabelValues(timeExcludedMeta).Inc()
synced.WithLabelValues(TimeExcludedMeta).Inc()
delete(metas, id)
}
return nil
Expand Down Expand Up @@ -719,7 +819,7 @@
}

if processedLabels, _ := relabel.Process(b.Labels(), f.relabelConfig...); processedLabels.IsEmpty() {
synced.WithLabelValues(labelExcludedMeta).Inc()
synced.WithLabelValues(LabelExcludedMeta).Inc()
delete(metas, id)
}
}
Expand Down Expand Up @@ -815,7 +915,7 @@
if metas[duplicate] != nil {
f.duplicateIDs = append(f.duplicateIDs, duplicate)
}
synced.WithLabelValues(duplicateMeta).Inc()
synced.WithLabelValues(DuplicateMeta).Inc()
delete(metas, duplicate)
}
f.mu.Unlock()
Expand Down Expand Up @@ -873,7 +973,7 @@
if _, exists := l[replicaLabel]; exists {
delete(l, replicaLabel)
countReplicaLabelRemoved[replicaLabel] += 1
modified.WithLabelValues(replicaRemovedMeta).Inc()
modified.WithLabelValues(ReplicaRemovedMeta).Inc()
}
}
if len(l) == 0 {
Expand Down Expand Up @@ -934,7 +1034,7 @@
meta.Thanos.Source != metadata.CompactorRepairSource {

level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id)
synced.WithLabelValues(tooFreshMeta).Inc()
synced.WithLabelValues(TooFreshMeta).Inc()
delete(metas, id)
}
}
Expand Down
Loading
Loading