Skip to content

Commit

Permalink
block: Handle gracefully extra duplicated chunk produced by TSDB issue.
Browse files Browse the repository at this point in the history
TSDB issue: prometheus-junkyard/tsdb#347

How we handle it?
- segregate this issue in special stat entry in verifier.
- auto-fix broken plan block before thanos compaction.
- adding repair job to run offline batch repair for indivdual blocks.

NOTE: At this point we have no power of fixing the bug when someone uses local compaction ):

Fixes #354

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jun 14, 2018
1 parent 35d7e10 commit f35be41
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 138 deletions.
23 changes: 22 additions & 1 deletion cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
PlaceHolder("<bucket>").String()
verifyIssues := verify.Flag("issues", fmt.Sprintf("Issues to verify (and optionally repair). Possible values: %v", allIssues())).
Short('i').Default(verifier.IndexIssueID, verifier.OverlappedBlocksIssueID).Strings()
verifyIDWhitelist := verify.Flag("id-whitelist", "Block IDs to verify (and optionally repair) only. "+
"If none is specified, all blocks will be verified. Repeated field").Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer, _ bool) error {
bkt, closeFn, err := client.NewBucket(gcsBucket, *s3Config, reg, name)
if err != nil {
Expand Down Expand Up @@ -100,7 +102,26 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin
v = verifier.New(logger, bkt, issues)
}

return v.Verify(ctx)
idMatcher := func(ulid.ULID) bool { return true }
if len(*verifyIDWhitelist) > 0 {
whilelistIDs := map[string]struct{}{}
for _, bid := range *verifyIDWhitelist {
id, err := ulid.Parse(bid)
if err != nil {
return errors.Wrap(err, "invalid ULID found in --id-whitelist flag")
}
whilelistIDs[id.String()] = struct{}{}
}

idMatcher = func(id ulid.ULID) bool {
if _, ok := whilelistIDs[id.String()]; !ok {
return false
}
return true
}
}

return v.Verify(ctx, idMatcher)
}

ls := cmd.Command("ls", "list all blocks in the bucket")
Expand Down
21 changes: 15 additions & 6 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,23 @@ func runCompact(
done := true
for _, g := range groups {
id, err := g.Compact(ctx, compactDir, comp)
if err != nil {
return errors.Wrap(err, "compaction")
if err == nil {
// If the returned ID has a zero value, the group had no blocks to be compacted.
// We keep going through the outer loop until no group has any work left.
if id != (ulid.ULID{}) {
done = false
}
continue
}
// If the returned ID has a zero value, the group had no blocks to be compacted.
// We keep going through the outer loop until no group has any work left.
if id != (ulid.ULID{}) {
done = false

if compact.IsIssue347Error(err) {
err = compact.RepairIssue347(ctx, logger, bkt, err)
if err == nil {
done = false
continue
}
}
return errors.Wrap(err, "compaction")
}
if done {
break
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/objstore/s3"
Expand Down Expand Up @@ -360,7 +361,7 @@ func runRule(
}
}()

s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset })
s := shipper.New(logger, nil, dataDir, bkt, func() labels.Labels { return lset }, block.RulerSource)

ctx, cancel := context.WithCancel(context.Background())

Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/objstore/s3"
Expand Down Expand Up @@ -241,7 +242,7 @@ func runSidecar(
}
}()

s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels)
s := shipper.New(logger, nil, dataDir, bkt, metadata.Labels, block.SidecarSource)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand Down
59 changes: 35 additions & 24 deletions pkg/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,30 @@ import (
"github.com/prometheus/tsdb/fileutil"
)

const (
// MetaFilename is the known JSON filename for meta information.
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"

// DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging.
DebugMetas = "debug/metas"
)

type SourceType string

const (
UnknownSource SourceType = ""
SidecarSource SourceType = "sidecar"
CompactorSource SourceType = "compactor"
CompactorRepairSource SourceType = "compactor.repair"
RulerSource SourceType = "ruler"
BucketRepairSource SourceType = "bucket.repair"
TestSource SourceType = "test"
)

// Meta describes the a block's meta. It wraps the known TSDB meta structure and
// extends it by Thanos-specific fields.
type Meta struct {
Expand All @@ -31,23 +55,16 @@ type Meta struct {

// ThanosMeta holds block meta information specific to Thanos.
type ThanosMeta struct {
Labels map[string]string `json:"labels"`
Downsample struct {
Resolution int64 `json:"resolution"`
} `json:"downsample"`
}
Labels map[string]string `json:"labels"`
Downsample ThanosDownsampleMeta `json:"downsample"`

const (
// MetaFilename is the known JSON filename for meta information.
MetaFilename = "meta.json"
// IndexFilename is the known index file for block index.
IndexFilename = "index"
// ChunksDirname is the known dir name for chunks with compressed samples.
ChunksDirname = "chunks"
// Real upload source of the block.
Source SourceType `json:"source"`
}

// DebugMetas is a directory for debug meta files that happen in the past. Useful for debugging.
DebugMetas = "debug/metas"
)
type ThanosDownsampleMeta struct {
Resolution int64 `json:"resolution"`
}

// WriteMetaFile writes the given meta into <dir>/meta.json.
func WriteMetaFile(dir string, meta *Meta) error {
Expand Down Expand Up @@ -216,16 +233,14 @@ func IsBlockDir(path string) (id ulid.ULID, ok bool) {
return id, err == nil
}

// Finalize sets Thanos meta to the block meta JSON and saves it to the disk. It also removes tombstones which are not
// useful for Thanos.
// InjectThanosMeta sets Thanos meta to the block meta JSON and saves it to the disk.
// NOTE: It should be used after writing any block by any Thanos component, otherwise we will miss crucial metadata.
func Finalize(bdir string, extLset map[string]string, resolution int64, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
func InjectThanosMeta(bdir string, meta ThanosMeta, downsampledMeta *tsdb.BlockMeta) (*Meta, error) {
newMeta, err := ReadMetaFile(bdir)
if err != nil {
return nil, errors.Wrap(err, "read new meta")
}
newMeta.Thanos.Labels = extLset
newMeta.Thanos.Downsample.Resolution = resolution
newMeta.Thanos = meta

// While downsampling we need to copy original compaction.
if downsampledMeta != nil {
Expand All @@ -236,9 +251,5 @@ func Finalize(bdir string, extLset map[string]string, resolution int64, downsamp
return nil, errors.Wrap(err, "write new meta")
}

if err = os.Remove(filepath.Join(bdir, "tombstones")); err != nil {
return nil, errors.Wrap(err, "remove tombstones")
}

return newMeta, nil
}

0 comments on commit f35be41

Please sign in to comment.