diff --git a/cmd/thanos/bucket.go b/cmd/thanos/bucket.go index 37c2c7025a..ab4639af2c 100644 --- a/cmd/thanos/bucket.go +++ b/cmd/thanos/bucket.go @@ -54,6 +54,8 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin PlaceHolder("").String() verifyIssues := verify.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())). Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings() + verifyIDWhitelist := verify.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+ + "If none is specified, all blocks will be verified. Repeated field").Strings() m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error { bkt, closeFn, err := client.NewBucket(gcsBucket, *s3Config, reg, name) if err != nil { @@ -100,7 +102,26 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin v = verifier.New(logger, bkt, issues) } - return v.Verify(ctx) + idMatcher := func(ulid.ULID) bool { return true } + if len(*verifyIDWhitelist) > 0 { + whilelistIDs := map[string]struct{}{} + for _, bid := range *verifyIDWhitelist { + id, err := ulid.Parse(bid) + if err != nil { + return errors.Wrap(err, "invalid ULID found in --id-whitelist flag") + } + whilelistIDs[id.String()] = struct{}{} + } + + idMatcher = func(id ulid.ULID) bool { + if _, ok := whilelistIDs[id.String()]; !ok { + return false + } + return true + } + } + + return v.Verify(ctx, idMatcher) } ls := cmd.Command("ls", "list all blocks in the bucket") diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 53601346b8..d70fd9f6d7 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -142,14 +142,23 @@ func runCompact( done := true for _, g := range groups { id, err := g.Compact(ctx, compactDir, comp) - if err != nil { - return errors.Wrap(err, "compaction") + if err == nil { + // If the returned ID has a zero value, the group had no blocks to be compacted. + // We keep going through the outer loop until no group has any work left. + if id != (ulid.ULID{}) { + done = false + } + continue } - // If the returned ID has a zero value, the group had no blocks to be compacted. - // We keep going through the outer loop until no group has any work left. - if id != (ulid.ULID{}) { - done = false + + if compact.IsIssue347Error(err) { + err = compact.RepairIssue347(ctx, logger, bkt, err) + if err == nil { + done = false + continue + } } + return errors.Wrap(err, "compaction") } if done { break diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index a37b186271..f72a99e28d 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -22,6 +22,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" + "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/objstore/s3" @@ -360,7 +361,7 @@ func runRule( } }() - s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }) + s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 495fdf0a40..f09242d089 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/cluster" "github.com/improbable-eng/thanos/pkg/objstore/client" "github.com/improbable-eng/thanos/pkg/objstore/s3" @@ -241,7 +242,7 @@ func runSidecar( } }() - s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels) + s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { diff --git a/pkg/block/block.go b/pkg/block/block.go index 9517c0d9c7..f5a54ec24a 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -19,6 +19,30 @@ import ( "github.com/prometheus/tsdb/fileutil" ) +const ( + // MetaFilename is the known JSON filename for meta information. + MetaFilename = "meta.json" + // IndexFilename is the known index file for block index. + IndexFilename = "index" + // ChunksDirname is the known dir name for chunks with compressed samples. + ChunksDirname = "chunks" + + // DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging. + DebugMetas = "debug/metas" +) + +type SourceType string + +const ( + UnknownSource SourceType = "" + SidecarSource SourceType = "sidecar" + CompactorSource SourceType = "compactor" + CompactorRepairSource SourceType = "compactor.repair" + RulerSource SourceType = "ruler" + BucketRepairSource SourceType = "bucket.repair" + TestSource SourceType = "test" +) + // Meta describes the a block's meta. It wraps the known TSDB meta structure and // extends it by Thanos-specific fields. type Meta struct { @@ -31,23 +55,16 @@ type Meta struct { // ThanosMeta holds block meta information specific to Thanos. type ThanosMeta struct { - Labels map[string]string `json:"labels"` - Downsample struct { - Resolution int64 `json:"resolution"` - } `json:"downsample"` -} + Labels map[string]string `json:"labels"` + Downsample ThanosDownsampleMeta `json:"downsample"` -const ( - // MetaFilename is the known JSON filename for meta information. - MetaFilename = "meta.json" - // IndexFilename is the known index file for block index. - IndexFilename = "index" - // ChunksDirname is the known dir name for chunks with compressed samples. - ChunksDirname = "chunks" + // Source is a real upload source of the block. + Source SourceType `json:"source"` +} - // DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging. - DebugMetas = "debug/metas" -) +type ThanosDownsampleMeta struct { + Resolution int64 `json:"resolution"` +} // WriteMetaFile writes the given meta into /meta.json. func WriteMetaFile(dir string, meta *Meta) error { @@ -216,16 +233,14 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) { return id, err == nil } -// Finalize sets Thanos meta to the block meta JSON and saves it to the disk. It also removes tombstones which are not -// useful for Thanos. +// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk. // NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata. -func Finalize(bdir string, extLset map[string]string, resolution int64, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { +func InjectThanosMeta(bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) { newMeta, err := ReadMetaFile(bdir) if err != nil { return nil, errors.Wrap(err, "read new meta") } - newMeta.Thanos.Labels = extLset - newMeta.Thanos.Downsample.Resolution = resolution + newMeta.Thanos = meta // While downsampling we need to copy original compaction. if downsampledMeta != nil { @@ -236,9 +251,5 @@ func Finalize(bdir string, extLset map[string]string, resolution int64, downsamp return nil, errors.Wrap(err, "write new meta") } - if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { - return nil, errors.Wrap(err, "remove tombstones") - } - return newMeta, nil } diff --git a/pkg/block/index.go b/pkg/block/index.go index 9d02b6c4d7..20f00fe516 100644 --- a/pkg/block/index.go +++ b/pkg/block/index.go @@ -2,6 +2,7 @@ package block import ( "encoding/json" + "fmt" "hash/crc32" "math/rand" "os" @@ -157,45 +158,94 @@ func VerifyIndex(fn string, minTime int64, maxTime int64) error { return err } - err = stats.ErrSummary() - if err != nil { - return err - } + return stats.AnyErr() +} +type Stats struct { + // TotalSeries represents total number of series in block. + TotalSeries int + // OutOfOrderSeries represents number of series that have out of order chunks. + OutOfOrderSeries int + + // OutOfOrderChunks represents number of chunks that are out of order (older time range is after younger one) + OutOfOrderChunks int + // DuplicatedChunks represents number of exactly the same chunks within same series. + DuplicatedChunks int + // OutsideChunks represents number of all chunks that are before or after time range specified in block meta. + OutsideChunks int + // CompleteOutsideChunks is subset of OutsideChunks that will be never accessed. They are completely out of time range specified in block meta. + CompleteOutsideChunks int + // Issue347OutsideChunks represents subset of OutsideChunks that are outsiders caused by https://github.com/prometheus/tsdb/issues/347 + // and is something that Thanos handle. + // + // Specifically we mean here chunks with minTime == block.maxTime and maxTime > block.MaxTime. These are + // are segregated into separate counters. These chunks are safe to be deleted, since they are duplicated across 2 blocks. + Issue347OutsideChunks int +} + +// Issue347OutsideChunksErr returns error if stats indicates issue347 block issue, that is repaired explicitly before compaction (on plan block). +func (i Stats) Issue347OutsideChunksErr() error { + if i.Issue347OutsideChunks > 0 { + return errors.Errorf("found %d chunks outside the block time range introduced by https://github.com/prometheus/tsdb/issues/347", i.Issue347OutsideChunks) + } return nil } -type IndexIssueStats struct { - Total int +// CriticalErr returns error if stats indicates critical block issue, that might solved only by manual repair procedure. +func (i Stats) CriticalErr() error { + var errMsg []string - OutOfOrderCount int - OutOfOrderSum int - ExactSum int + if i.OutOfOrderSeries > 0 { + errMsg = append(errMsg, fmt.Sprintf( + "%d/%d series have an average of %.3f out-of-order chunks: "+ + "%.3f of these are exact duplicates (in terms of data and time range)", + i.OutOfOrderSeries, + i.TotalSeries, + float64(i.OutOfOrderChunks)/float64(i.OutOfOrderSeries), + float64(i.DuplicatedChunks)/float64(i.OutOfOrderChunks), + )) + } - // Chunks that are before or after time range in meta. - Outsiders int - // Outsiders that will be never accessed. They are completely out of time range specified in block meta. - CompleteOutsiders int + n := i.OutsideChunks - (i.CompleteOutsideChunks + i.Issue347OutsideChunks) + if n > 0 { + errMsg = append(errMsg, fmt.Sprintf("found %d chunks non-completely outside the block time range", n)) + } + + if i.CompleteOutsideChunks > 0 { + errMsg = append(errMsg, fmt.Sprintf("found %d chunks completely outside the block time range", i.CompleteOutsideChunks)) + } + + if len(errMsg) > 0 { + return errors.New(strings.Join(errMsg, ", ")) + } + + return nil } -func (i IndexIssueStats) ErrSummary() error { - if i.OutOfOrderCount > 0 { - return errors.Errorf("%d/%d series have an average of %.3f out-of-order chunks. "+ - "%.3f of these are exact duplicates (in terms of data and time range). Outsiders: %d, complete outsiders: %d", - i.OutOfOrderCount, i.Total, float64(i.OutOfOrderSum)/float64(i.OutOfOrderCount), - float64(i.ExactSum)/float64(i.OutOfOrderSum), i.Outsiders, i.CompleteOutsiders) +// AnyErr returns error if stats indicates any block issue. +func (i Stats) AnyErr() error { + var errMsg []string + + if err := i.CriticalErr(); err != nil { + errMsg = append(errMsg, err.Error()) + } + + if err := i.Issue347OutsideChunksErr(); err != nil { + errMsg = append(errMsg, err.Error()) } - if i.Outsiders > 0 { - return errors.Errorf("No chunks are out of order, but found some outsider blocks. (Blocks that outside of block time range): %d. Complete: %d", - i.Outsiders, i.CompleteOutsiders) + if len(errMsg) > 0 { + return errors.New(strings.Join(errMsg, ", ")) } return nil } -// GatherIndexIssueStats returns useful counters as well as outsider chunks (chunks outside of block time range) that helps to assess index health. -func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats IndexIssueStats, err error) { +// GatherIndexIssueStats returns useful counters as well as outsider chunks (chunks outside of block time range) that +// helps to assess index health. +// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle. +// See Stats.Issue347OutsideChunks for details. +func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats Stats, err error) { r, err := index.NewFileReader(fn) if err != nil { return stats, errors.Wrap(err, "open index file") @@ -212,11 +262,12 @@ func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats Index chks []chunks.Meta ) + // Per series. for p.Next() { lastLset = append(lastLset[:0], lset...) id := p.At() - stats.Total++ + stats.TotalSeries++ if err := r.Series(id, &lset, &chks); err != nil { return stats, errors.Wrap(err, "read series") @@ -239,22 +290,22 @@ func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats Index } ooo := 0 - if chks[0].MinTime < minTime || chks[0].MaxTime > maxTime { - stats.Outsiders++ - if chks[0].MinTime > maxTime || chks[0].MaxTime < minTime { - stats.CompleteOutsiders++ - } - } - for i, c := range chks[1:] { - c0 := chks[i] - + // Per chunk in series. + for i, c := range chks { if c.MinTime < minTime || c.MaxTime > maxTime { - stats.Outsiders++ + stats.OutsideChunks++ if c.MinTime > maxTime || c.MaxTime < minTime { - stats.CompleteOutsiders++ + stats.CompleteOutsideChunks++ + } else if c.MinTime == maxTime { + stats.Issue347OutsideChunks++ } } + if i == 0 { + continue + } + + c0 := chks[i-1] if c.MinTime > c0.MaxTime { continue } @@ -265,14 +316,14 @@ func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats Index cb := crc32.Checksum(c.Chunk.Bytes(), castagnoli) if ca == cb { // Duplicate. - stats.ExactSum++ + stats.DuplicatedChunks++ } ooo++ } } if ooo > 0 { - stats.OutOfOrderCount++ - stats.OutOfOrderSum += ooo + stats.OutOfOrderSeries++ + stats.OutOfOrderChunks += ooo } } if p.Err() != nil { @@ -282,12 +333,20 @@ func GatherIndexIssueStats(fn string, minTime int64, maxTime int64) (stats Index return stats, nil } -// Repair open the block with given id in dir and creates a new one with the same data. +type ignoreFnType func(mint, maxt int64, prev *chunks.Meta, curr *chunks.Meta) (bool, error) + +// Repair open the block with given id in dir and creates a new one with fixed data. // It: // - removes out of order duplicates // - all "complete" outsiders (they will not accessed anyway) +// - removes all near "complete" outside chunks introduced by https://github.com/prometheus/tsdb/issues/347. // Fixable inconsistencies are resolved in the new block. -func Repair(dir string, id ulid.ULID) (resid ulid.ULID, err error) { +// TODO(bplotka): https://github.com/improbable-eng/thanos/issues/378 +func Repair(dir string, id ulid.ULID, source SourceType, ignoreChkFns ...ignoreFnType) (resid ulid.ULID, err error) { + if len(ignoreChkFns) == 0 { + return resid, errors.New("no ignore chunk function specified") + } + bdir := filepath.Join(dir, id.String()) entropy := rand.New(rand.NewSource(time.Now().UnixNano())) resid = ulid.MustNew(ulid.Now(), entropy) @@ -310,11 +369,13 @@ func Repair(dir string, id ulid.ULID) (resid ulid.ULID, err error) { if err != nil { return resid, errors.Wrap(err, "open index") } + defer indexr.Close() chunkr, err := b.Chunks() if err != nil { return resid, errors.Wrap(err, "open chunks") } + defer chunkr.Close() resdir := filepath.Join(dir, resid.String()) @@ -335,8 +396,9 @@ func Repair(dir string, id ulid.ULID) (resid ulid.ULID, err error) { resmeta := *meta resmeta.ULID = resid resmeta.Stats = tsdb.BlockStats{} // reset stats + resmeta.Thanos.Source = source // update source - if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta); err != nil { + if err := rewrite(indexr, chunkr, indexw, chunkw, &resmeta, ignoreChkFns); err != nil { return resid, errors.Wrap(err, "rewrite block") } if err := WriteMetaFile(resdir, &resmeta); err != nil { @@ -347,9 +409,50 @@ func Repair(dir string, id ulid.ULID) (resid ulid.ULID, err error) { var castagnoli = crc32.MakeTable(crc32.Castagnoli) +func IgnoreCompleteOutsideChunk(mint int64, maxt int64, _ *chunks.Meta, curr *chunks.Meta) (bool, error) { + if curr.MinTime > maxt || curr.MaxTime < mint { + // "Complete" outsider. Ignore. + return true, nil + } + return false, nil +} + +func IgnoreIssue347OutsideChunk(_ int64, maxt int64, _ *chunks.Meta, curr *chunks.Meta) (bool, error) { + if curr.MinTime == maxt { + // "Near" outsider from issue https://github.com/prometheus/tsdb/issues/347. Ignore. + return true, nil + } + return false, nil +} + +func IgnoreDuplicateOutsideChunk(_ int64, _ int64, last *chunks.Meta, curr *chunks.Meta) (bool, error) { + if last == nil { + return false, nil + } + + if curr.MinTime > last.MaxTime { + return false, nil + } + + // Verify that the overlapping chunks are exact copies so we can safely discard + // the current one. + if curr.MinTime != last.MinTime || curr.MaxTime != last.MaxTime { + return false, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]", + last.MaxTime, last.MaxTime, curr.MinTime, curr.MaxTime) + } + ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli) + cb := crc32.Checksum(curr.Chunk.Bytes(), castagnoli) + + if ca != cb { + return false, errors.Errorf("non-sequential chunks not equal: %x and %x", ca, cb) + } + + return true, nil +} + // sanitizeChunkSequence ensures order of the input chunks and drops any duplicates. // It errors if the sequence contains non-dedupable overlaps. -func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64) ([]chunks.Meta, error) { +func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64, ignoreChkFns []ignoreFnType) ([]chunks.Meta, error) { if len(chks) == 0 { return nil, nil } @@ -358,38 +461,25 @@ func sanitizeChunkSequence(chks []chunks.Meta, mint int64, maxt int64) ([]chunks return chks[i].MinTime < chks[j].MinTime }) - // Remove duplicates and complete outsiders. + // Remove duplicates, complete outsiders and near outsiders. repl := make([]chunks.Meta, 0, len(chks)) - for i, c := range chks { - if c.MinTime > maxt || c.MaxTime < mint { - // "Complete" outsider. Ignore. - continue - } - - if i == 0 { - repl = append(repl, c) - continue - } - - last := repl[i-1] + var last *chunks.Meta - if c.MinTime > last.MaxTime { - repl = append(repl, c) - continue - } +OUTER: + for _, c := range chks { + for _, ignoreChkFn := range ignoreChkFns { + ignore, err := ignoreChkFn(mint, maxt, last, &c) + if err != nil { + return nil, errors.Wrap(err, "ignore function") + } - // Verify that the overlapping chunks are exact copies so we can safely discard - // the current one. - if c.MinTime != last.MinTime || c.MaxTime != last.MaxTime { - return nil, errors.Errorf("non-sequential chunks not equal: [%d, %d] and [%d, %d]", - last.MaxTime, last.MaxTime, c.MinTime, c.MaxTime) + if ignore { + continue OUTER + } } - ca := crc32.Checksum(last.Chunk.Bytes(), castagnoli) - cb := crc32.Checksum(c.Chunk.Bytes(), castagnoli) - if ca != cb { - return nil, errors.Errorf("non-sequential chunks not equal: %x and %x", ca, cb) - } + last = &c + repl = append(repl, c) } return repl, nil @@ -401,6 +491,7 @@ func rewrite( indexr tsdb.IndexReader, chunkr tsdb.ChunkReader, indexw tsdb.IndexWriter, chunkw tsdb.ChunkWriter, meta *Meta, + ignoreChkFns []ignoreFnType, ) error { symbols, err := indexr.Symbols() if err != nil { @@ -438,7 +529,8 @@ func rewrite( return err } } - chks, err := sanitizeChunkSequence(chks, meta.MinTime, meta.MaxTime) + + chks, err := sanitizeChunkSequence(chks, meta.MinTime, meta.MaxTime, ignoreChkFns) if err != nil { return err } diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 894a148945..dacc89f11b 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" - "path" "path/filepath" "sort" "sync" "time" + "io/ioutil" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" @@ -170,9 +171,17 @@ func (c *Syncer) syncMetas(ctx context.Context) error { } // ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to - // avoid races when a block is only partially uploaded. This relates only to level 1 blocks. - // NOTE: It is not safe to miss compacted block in sync step. Compactor needs to aware of ALL old blocks. - if meta.Compaction.Level == 1 && ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) { + // avoid races when a block is only partially uploaded. This relates to all blocks, excluding: + // - repair created blocks + // - compactor created blocks + // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. + // TODO(bplotka): https://github.com/improbable-eng/thanos/issues/377 + if ulid.Now()-id.Time() < uint64(c.syncDelay/time.Millisecond) && + meta.Thanos.Source != block.BucketRepairSource && + meta.Thanos.Source != block.CompactorSource && + meta.Thanos.Source != block.CompactorRepairSource { + + level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) return nil } @@ -441,7 +450,7 @@ func (cg *Group) Resolution() int64 { // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (ulid.ULID, error) { - subDir := path.Join(dir, cg.Key()) + subDir := filepath.Join(dir, cg.Key()) if err := os.RemoveAll(subDir); err != nil { return ulid.ULID{}, errors.Wrap(err, "clean compaction group dir") @@ -450,13 +459,34 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - id, err := cg.compact(ctx, subDir, comp) + compID, err := cg.compact(ctx, subDir, comp) if err != nil { cg.compactionFailures.Inc() } cg.compactions.Inc() - return id, err + return compID, err +} + +// Issue347Error is a type wrapper for errors that should invoke repair process for broken block. +type Issue347Error struct { + err error + + id ulid.ULID +} + +func issue347Error(err error, brokenBlock ulid.ULID) Issue347Error { + return Issue347Error{err: err, id: brokenBlock} +} + +func (e Issue347Error) Error() string { + return e.err.Error() +} + +// Issue347Error returns true if the base error is a Issue347Error. +func IsIssue347Error(err error) bool { + _, ok := errors.Cause(err).(Issue347Error) + return ok } // HaltError is a type wrapper for errors that should halt any further progress on compactions. @@ -532,6 +562,60 @@ func (cg *Group) areBlocksOverlapping(include *block.Meta, excludeDirs ...string return nil } +// RepairIssue347 repairs the https://github.com/prometheus/tsdb/issues/347 issue when having issue347Error. +func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, issue347Err error) error { + ie, ok := errors.Cause(issue347Err).(Issue347Error) + if !ok { + return errors.Errorf("Given error is not an issue347 error: %v", issue347Err) + } + + level.Info(logger).Log("msg", "Repairing block broken by https://github.com/prometheus/tsdb/issues/347", "id", ie.id, "err", issue347Err) + + tmpdir, err := ioutil.TempDir("", fmt.Sprintf("repair-issue-347-id-%s-", ie.id)) + if err != nil { + return err + } + defer os.RemoveAll(tmpdir) + + bdir := filepath.Join(tmpdir, ie.id.String()) + if err := block.Download(ctx, bkt, ie.id, bdir); err != nil { + return retry(errors.Wrapf(err, "download block %s", ie.id)) + } + + meta, err := block.ReadMetaFile(bdir) + if err != nil { + return errors.Wrapf(err, "read meta from %s", bdir) + } + + resid, err := block.Repair(tmpdir, ie.id, block.CompactorRepairSource, block.IgnoreIssue347OutsideChunk) + if err != nil { + return errors.Wrapf(err, "repair failed for block %s", ie.id) + } + + // Verify repaired id before uploading it. + if err := block.VerifyIndex(filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { + return errors.Wrapf(err, "repaired block is invalid %s", resid) + } + + level.Info(logger).Log("msg", "uploading repaired block", "newID", resid) + if err = block.Upload(ctx, bkt, filepath.Join(tmpdir, resid.String())); err != nil { + return retry(errors.Wrapf(err, "upload of %s failed", resid)) + } + + level.Info(logger).Log("msg", "deleting broken block", "id", ie.id) + + // Spawn a new context so we always delete a block in full on shutdown. + delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // TODO(bplotka): Issue with this will introduce overlap that will halt compactor. Automate that (fix duplicate overlaps caused by this). + if err := block.Delete(delCtx, bkt, ie.id); err != nil { + return errors.Wrapf(err, "deleting old block %s failed. You need to delete this block manually", ie.id) + } + + return nil +} + func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -592,13 +676,26 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return compID, errors.Wrapf(err, "plan dir %s", pdir) } + if meta.ULID.Compare(id) != 0 { + return compID, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) + } + if err := block.Download(ctx, cg.bkt, id, pdir); err != nil { return compID, retry(errors.Wrapf(err, "download block %s", id)) } // Ensure all input blocks are valid. - if err := block.VerifyIndex(filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { - return compID, halt(errors.Wrapf(err, "invalid plan block %s", pdir)) + stats, err := block.GatherIndexIssueStats(filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + if err != nil { + return compID, errors.Wrapf(err, "gather index issues for block %s", pdir) + } + + if err := stats.CriticalErr(); err != nil { + return compID, halt(errors.Wrapf(err, "invalid plan id %s", pdir)) + } + + if err := stats.Issue347OutsideChunksErr(); err != nil { + return compID, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) } } level.Debug(cg.logger).Log("msg", "downloaded and verified blocks", @@ -615,11 +712,19 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( bdir := filepath.Join(dir, compID.String()) - newMeta, err := block.Finalize(bdir, cg.labels.Map(), cg.resolution, nil) + newMeta, err := block.InjectThanosMeta(bdir, block.ThanosMeta{ + Labels: cg.labels.Map(), + Downsample: block.ThanosDownsampleMeta{Resolution: cg.resolution}, + Source: block.CompactorSource, + }, nil) if err != nil { return compID, errors.Wrapf(err, "failed to finalize the block %s", bdir) } + if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { + return compID, errors.Wrap(err, "remove tombstones") + } + // Ensure the output block is valid. if err := block.VerifyIndex(filepath.Join(bdir, block.IndexFilename), newMeta.MinTime, newMeta.MaxTime); err != nil { return compID, halt(errors.Wrapf(err, "invalid result block %s", bdir)) diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 2b818fb8d7..9738874cfb 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/tsdb/chunkenc" + "os" + "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -127,10 +129,19 @@ func Downsample( } bdir := filepath.Join(dir, id.String()) - _, err = block.Finalize(bdir, origMeta.Thanos.Labels, resolution, &origMeta.BlockMeta) + var tmeta block.ThanosMeta + tmeta = origMeta.Thanos + tmeta.Source = block.CompactorSource + tmeta.Downsample.Resolution = resolution + + _, err = block.InjectThanosMeta(bdir, tmeta, &origMeta.BlockMeta) if err != nil { return id, errors.Wrapf(err, "failed to finalize the block %s", bdir) } + + if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil { + return id, errors.Wrap(err, "remove tombstones") + } return id, nil } @@ -394,7 +405,7 @@ func downsampleRaw(data []sample, resolution int64) []chunks.Meta { lastT := downsampleBatch(batch, resolution, ab.add) - // Finalize the chunk's counter aggregate with the last true sample. + // InjectThanosMeta the chunk's counter aggregate with the last true sample. ab.finalizeChunk(lastT, batch[len(batch)-1].v) chks = append(chks, ab.encode()) diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6f7113ad91..9313a8fa91 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -68,6 +68,7 @@ type Shipper struct { metrics *metrics bucket objstore.Bucket labels func() labels.Labels + source block.SourceType } // New creates a new shipper that detects new TSDB blocks in dir and uploads them @@ -78,6 +79,7 @@ func New( dir string, bucket objstore.Bucket, lbls func() labels.Labels, + source block.SourceType, ) *Shipper { if logger == nil { logger = log.NewNopLogger() @@ -91,6 +93,7 @@ func New( bucket: bucket, labels: lbls, metrics: newMetrics(r), + source: source, } } @@ -215,6 +218,7 @@ func (s *Shipper) sync(ctx context.Context, meta *block.Meta) (err error) { if lset := s.labels(); lset != nil { meta.Thanos.Labels = lset.Map() } + meta.Thanos.Source = s.source if err := block.WriteMetaFile(updir, meta); err != nil { return errors.Wrap(err, "write meta file") } diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index a31ea95547..90aeedd0d7 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -30,7 +30,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { defer os.RemoveAll(dir) extLset := labels.FromStrings("prometheus", "prom-1") - shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }) + shipper := New(log.NewLogfmtLogger(os.Stderr), nil, dir, bkt, func() labels.Labels { return extLset }, block.TestSource) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -60,6 +60,7 @@ func TestShipper_UploadBlocks_e2e(t *testing.T) { } meta.Version = 1 meta.ULID = id + meta.Thanos.Source = block.TestSource metab, err := json.Marshal(&meta) testutil.Ok(t, err) diff --git a/pkg/shipper/shipper_test.go b/pkg/shipper/shipper_test.go index 5b1e497f5f..0dd7fb0f0e 100644 --- a/pkg/shipper/shipper_test.go +++ b/pkg/shipper/shipper_test.go @@ -20,7 +20,7 @@ func TestShipperTimestamps(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - s := New(nil, nil, dir, nil, nil) + s := New(nil, nil, dir, nil, nil, block.TestSource) // Missing thanos meta file. _, _, err = s.Timestamps() diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 34c0455bb3..f331f2ff86 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -12,8 +12,6 @@ import ( "syscall" "time" - "path" - "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/block" "github.com/oklog/ulid" @@ -196,9 +194,17 @@ func CreateBlock( return id, errors.Wrap(err, "write block") } - if _, err = block.Finalize(path.Join(dir, id.String()), extLset.Map(), resolution, nil); err != nil { + if _, err = block.InjectThanosMeta(filepath.Join(dir, id.String()), block.ThanosMeta{ + Labels: extLset.Map(), + Downsample: block.ThanosDownsampleMeta{Resolution: resolution}, + Source: block.TestSource, + }, nil); err != nil { return id, errors.Wrap(err, "finalize block") } + if err = os.Remove(filepath.Join(dir, id.String(), "tombstones")); err != nil { + return id, errors.Wrap(err, "remove tombstones") + } + return id, nil } diff --git a/pkg/verifier/duplicated_compaction.go b/pkg/verifier/duplicated_compaction.go index c417bcf58a..16ddedda5f 100644 --- a/pkg/verifier/duplicated_compaction.go +++ b/pkg/verifier/duplicated_compaction.go @@ -23,7 +23,11 @@ const DuplicatedCompactionIssueID = "duplicated_compaction" // until sync-delay passes. // The expected print of this are same overlapped blocks with exactly the same sources, time ranges and stats. // If repair is enabled, all but one duplicates are safely deleted. -func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool) error { +func DuplicatedCompactionIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool) error { + if idMatcher != nil { + return errors.Errorf("id matching is not supported by issue %s verifier", DuplicatedCompactionIssueID) + } + level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", DuplicatedCompactionIssueID) overlaps, err := fetchOverlaps(ctx, bkt) diff --git a/pkg/verifier/index_issue.go b/pkg/verifier/index_issue.go index 6821eef3c4..9b31f2046b 100644 --- a/pkg/verifier/index_issue.go +++ b/pkg/verifier/index_issue.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/oklog/ulid" "github.com/pkg/errors" ) @@ -22,7 +23,7 @@ const IndexIssueID = "index_issue" // If the replacement was created successfully it is uploaded to the bucket and the input // block is deleted. // NOTE: This also verifies all indexes against chunks mismatches and duplicates. -func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool) error { +func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool) error { level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", IndexIssueID) err := bkt.Iter(ctx, "", func(name string) error { @@ -31,14 +32,17 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return nil } + if idMatcher != nil && !idMatcher(id) { + return nil + } + tmpdir, err := ioutil.TempDir("", fmt.Sprintf("index-issue-block-%s-", id)) if err != nil { return err } defer os.RemoveAll(tmpdir) - err = objstore.DownloadFile(ctx, bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)) - if err != nil { + if err = objstore.DownloadFile(ctx, bkt, path.Join(id.String(), block.IndexFilename), filepath.Join(tmpdir, block.IndexFilename)); err != nil { return errors.Wrapf(err, "download index file %s", path.Join(id.String(), block.IndexFilename)) } @@ -52,8 +56,7 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return errors.Wrapf(err, "gather index issues %s", id) } - err = stats.ErrSummary() - if err == nil { + if err = stats.AnyErr(); err == nil { return nil } @@ -64,34 +67,40 @@ func IndexIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bac return nil } - if stats.OutOfOrderSum > stats.ExactSum { + if stats.OutOfOrderChunks > stats.DuplicatedChunks { level.Warn(logger).Log("msg", "detected overlaps are not entirely by duplicated chunks. We are able to repair only duplicates", "id", id, "issue", IndexIssueID) } - if stats.Outsiders > stats.CompleteOutsiders { - level.Warn(logger).Log("msg", "detected outsiders are not all 'complete' outsiders. We can safely delete only complete outsiders", "id", id, "issue", IndexIssueID) + if stats.OutsideChunks > (stats.CompleteOutsideChunks + stats.Issue347OutsideChunks) { + level.Warn(logger).Log("msg", "detected outsiders are not all 'complete' outsiders or outsiders from https://github.com/prometheus/tsdb/issues/347. We can safely delete only these outsiders", "id", id, "issue", IndexIssueID) } - level.Info(logger).Log("msg", "repairing block", "id", id, "issue", IndexIssueID) - if meta.Thanos.Downsample.Resolution > 0 { return errors.New("cannot repair downsampled blocks") } - err = block.Download(ctx, bkt, id, path.Join(tmpdir, id.String())) - if err != nil { + level.Info(logger).Log("msg", "downloading block for repair", "id", id, "issue", IndexIssueID) + if err = block.Download(ctx, bkt, id, path.Join(tmpdir, id.String())); err != nil { return errors.Wrapf(err, "download block %s", id) } level.Info(logger).Log("msg", "downloaded block to be repaired", "id", id, "issue", IndexIssueID) - resid, err := block.Repair(tmpdir, id) + level.Info(logger).Log("msg", "repairing block", "id", id, "issue", IndexIssueID) + resid, err := block.Repair( + tmpdir, + id, + block.BucketRepairSource, + block.IgnoreCompleteOutsideChunk, + block.IgnoreDuplicateOutsideChunk, + block.IgnoreIssue347OutsideChunk, + ) if err != nil { return errors.Wrapf(err, "repair failed for block %s", id) } + level.Info(logger).Log("msg", "verifying repaired block", "id", id, "newID", resid, "issue", IndexIssueID) // Verify repaired block before uploading it. - err = block.VerifyIndex(filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime) - if err != nil { + if err := block.VerifyIndex(filepath.Join(tmpdir, resid.String(), block.IndexFilename), meta.MinTime, meta.MaxTime); err != nil { return errors.Wrapf(err, "repaired block is invalid %s", resid) } diff --git a/pkg/verifier/overlapped_blocks.go b/pkg/verifier/overlapped_blocks.go index cf71c1e63f..0bf9b8fb08 100644 --- a/pkg/verifier/overlapped_blocks.go +++ b/pkg/verifier/overlapped_blocks.go @@ -8,6 +8,7 @@ import ( "github.com/improbable-eng/thanos/pkg/block" "github.com/improbable-eng/thanos/pkg/compact" "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb" ) @@ -16,7 +17,11 @@ const OverlappedBlocksIssueID = "overlapped_blocks" // OverlappedBlocksIssue checks bucket for blocks with overlapped time ranges. // No repair is available for this issue. -func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, _ objstore.Bucket, repair bool) error { +func OverlappedBlocksIssue(ctx context.Context, logger log.Logger, bkt objstore.Bucket, _ objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool) error { + if idMatcher != nil { + return errors.Errorf("id matching is not supported by issue %s verifier", DuplicatedCompactionIssueID) + } + level.Info(logger).Log("msg", "started verifying issue", "with-repair", repair, "issue", OverlappedBlocksIssueID) overlaps, err := fetchOverlaps(ctx, bkt) diff --git a/pkg/verifier/verify.go b/pkg/verifier/verify.go index 860e8807fd..d50f23904e 100644 --- a/pkg/verifier/verify.go +++ b/pkg/verifier/verify.go @@ -6,12 +6,13 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/objstore" + "github.com/oklog/ulid" "github.com/pkg/errors" ) // Issue is an function that does verification and repair only if repair arg is true. // It should log affected blocks using warn level logs. It should be safe for issue to run on healthy bucket. -type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool) error +type Issue func(ctx context.Context, logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bucket, repair bool, idMatcher func(ulid.ULID) bool) error // Verifier runs given issues to verify if bucket is healthy. type Verifier struct { @@ -44,7 +45,7 @@ func NewWithRepair(logger log.Logger, bkt objstore.Bucket, backupBkt objstore.Bu } // Verify verifies registered issues. -func (v *Verifier) Verify(ctx context.Context) error { +func (v *Verifier) Verify(ctx context.Context, idMatcher func(ulid.ULID) bool) error { level.Warn(v.logger).Log( "msg", "GLOBAL COMPACTOR SHOULD __NOT__ BE RUNNING ON THE SAME BUCKET", "issues", len(v.issues), @@ -58,7 +59,7 @@ func (v *Verifier) Verify(ctx context.Context) error { // TODO(blotka): Wrap bucket with BucketWithMetrics and print metrics after each issue (e.g how many blocks where touched). // TODO(bplotka): Implement disk "bucket" to allow this verify to work on local disk space as well. for _, issueFn := range v.issues { - err := issueFn(ctx, v.logger, v.bkt, v.backupBkt, v.repair) + err := issueFn(ctx, v.logger, v.bkt, v.backupBkt, v.repair, idMatcher) if err != nil { return errors.Wrap(err, "verify") }