Skip to content

Commit

Permalink
bucket: Added verify struct to explictly verify all known/potential i…
Browse files Browse the repository at this point in the history
…ssues. (#257)

* bucket: Added verify struct to explictly verify all known/potential issues.

Added overlapping blocks check.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comment and refactored overlapped blocks.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Grouped by group.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comments.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Fixed comment.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>

* Addressed comment.

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 27, 2018
1 parent 70600f3 commit ca9614e
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 231 deletions.
248 changes: 82 additions & 166 deletions cmd/thanos/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,38 @@ import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"text/template"
"time"

"cloud.google.com/go/storage"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/gcs"
"github.com/improbable-eng/thanos/pkg/objstore/s3"
"github.com/improbable-eng/thanos/pkg/verifier"
"github.com/oklog/run"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

var (
issuesMap = map[string]verifier.Issue{
verifier.IndexIssueID: verifier.IndexIssue,
verifier.OverlappedBlocksIssueID: verifier.OverlappedBlocksIssue,
}
allIssues = func() (s []string) {
for id := range issuesMap {
s = append(s, id)
}
return s
}
)

func registerBucket(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "inspect metric data in an object storage bucket")

Expand All @@ -35,12 +44,13 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin

s3Config := s3.RegisterS3Params(cmd)

check := cmd.Command("check", "verify all blocks in the bucket")

checkRepair := check.Flag("repair", "attempt to repair blocks for which issues were detected").
// Verify command.
verify := cmd.Command("verify", "verify all blocks in the bucket against specified issues")
verifyRepair := verify.Flag("repair", "attempt to repair blocks for which issues were detected").
Short('r').Default("false").Bool()

m[name+" check"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer) error {
verifyIssues := verify.Flag("issues", "issues to verify (and optionally repair)").
Short('i').Default(allIssues()...).Strings()
m[name+" verify"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer) error {
bkt, closeFn, err := getBucketClient(gcsBucket, *s3Config, reg)
if err != nil {
return err
Expand All @@ -51,14 +61,28 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin

defer closeFn()

return runBucketCheck(logger, bkt, *checkRepair)
var (
ctx = context.Background()
v *verifier.Verifier
issues []verifier.Issue
)

for _, i := range *verifyIssues {
issues = append(issues, issuesMap[i])
}

if *verifyRepair {
v = verifier.NewWithRepair(logger, bkt, issues)
} else {
v = verifier.New(logger, bkt, issues)
}

return v.Verify(ctx)
}

ls := cmd.Command("ls", "list all blocks in the bucket")

lsOutput := ls.Flag("ouput", "format in which to print each block's information; may be 'json' or custom template").
Short('o').Default("").String()

m[name+" ls"] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, _ opentracing.Tracer) error {
bkt, closeFn, err := getBucketClient(gcsBucket, *s3Config, reg)
if err != nil {
Expand All @@ -70,166 +94,58 @@ func registerBucket(m map[string]setupFunc, app *kingpin.Application, name strin

defer closeFn()

return runBucketList(bkt, *lsOutput)
}
}

func runBucketCheck(logger log.Logger, bkt objstore.Bucket, repair bool) error {
var all []ulid.ULID

ctx := context.Background()

err := bkt.Iter(ctx, "", func(name string) error {
if id, err := ulid.Parse(name[:len(name)-1]); err == nil {
all = append(all, id)
}
return nil
})
if err != nil {
return errors.Wrap(err, "iter bucket")
}
level.Info(logger).Log("msg", "start verifying blocks", "count", len(all))

for _, id := range all {
level.Info(logger).Log("msg", "verify block", "id", id)

if err = verifyBlock(ctx, bkt, id); err != nil {
level.Warn(logger).Log("msg", "detected issue", "id", id, "err", err)
}
if err == nil || !repair {
continue
}
repid, err := repairBlock(ctx, bkt, id)
if err != nil {
level.Warn(logger).Log("msg", "repairing block failed", "id", id, "err", err)
continue
}
level.Info(logger).Log("msg", "repaired block", "id", id, "repl", repid)
}
return nil
}

// verifyBlock checks whether the block in the bucket has inconsistencies.
func verifyBlock(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID) error {
tmpdir, err := ioutil.TempDir("", fmt.Sprintf("verify-block-%s", id))
if err != nil {
return err
}
defer os.RemoveAll(tmpdir)

err = objstore.DownloadFile(ctx, bkt,
path.Join(id.String(), "index"), filepath.Join(tmpdir, "index"))
if err != nil {
return errors.Wrap(err, "download index file")
}

if err := block.VerifyIndex(filepath.Join(tmpdir, "index")); err != nil {
return errors.Wrap(err, "verify index")
}
return nil
}

// repairBlock rewrites the given block while fixing repairable inconsistencies.
// If the replacement was created successfully it is uploaded to the bucket and the input
// block is deleted.
func repairBlock(ctx context.Context, bkt objstore.Bucket, id ulid.ULID) (resid ulid.ULID, err error) {
tmpdir, err := ioutil.TempDir("", fmt.Sprintf("repair-block-%s", id))
if err != nil {
return resid, err
}
defer os.RemoveAll(tmpdir)

bdir := filepath.Join(tmpdir, id.String())

if err := compact.DownloadBlockDir(ctx, bkt, id.String(), bdir); err != nil {
return resid, errors.Wrap(err, "download block")
}
meta, err := block.ReadMetaFile(bdir)
if err != nil {
return resid, errors.Wrap(err, "read meta file")
}

if meta.Thanos.Downsample.Resolution > 0 {
return resid, errors.New("cannot repair downsampled blocks")
}

resid, err = block.Repair(tmpdir, meta.ULID)
if err != nil {
return resid, errors.Wrap(err, "repair failed")
}
// Verify repaired block before uploading it.
if err := block.VerifyIndex(filepath.Join(tmpdir, resid.String(), "index")); err != nil {
return resid, errors.Wrap(err, "repaired block invalid")
}

err = objstore.UploadDir(ctx, bkt, filepath.Join(tmpdir, resid.String()), resid.String())
if err != nil {
return resid, errors.Wrapf(err, "upload of %s failed", resid)
}
if err := objstore.DeleteDir(ctx, bkt, id.String()); err != nil {
return resid, errors.Wrapf(err, "deleting old block %s failed", id)
}
return resid, nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

func parseMeta(ctx context.Context, bkt objstore.Bucket, name string) (block.Meta, error) {
rc, err := bkt.Get(ctx, path.Join(name, "meta.json"))
if err != nil {
return block.Meta{}, errors.Wrap(err, "get reader for meta.json")
}
defer rc.Close()

// Do a full decode/encode cycle to ensure we only print valid JSON.
var m block.Meta

if err := json.NewDecoder(rc).Decode(&m); err != nil {
return block.Meta{}, errors.Wrap(err, "deocde meta.json")
}
return m, nil
}

func runBucketList(bkt objstore.Bucket, format string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

var printBlock func(name string) error

switch format {
case "":
printBlock = func(name string) error {
fmt.Fprintln(os.Stdout, name[:len(name)-1])
return nil
}
case "json":
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", "\t")
var (
format = *lsOutput
printBlock func(id ulid.ULID) error
)

printBlock = func(name string) error {
m, err := parseMeta(ctx, bkt, name)
if err != nil {
return err
switch format {
case "":
printBlock = func(id ulid.ULID) error {
fmt.Fprintln(os.Stdout, id.String())
return nil
}
return enc.Encode(&m)
}
default:
tmpl, err := template.New("").Parse(format)
if err != nil {
return errors.Wrap(err, "invalid template")
}
printBlock = func(name string) error {
m, err := parseMeta(ctx, bkt, name)
case "json":
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", "\t")

printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, bkt, id)
if err != nil {
return err
}
return enc.Encode(&m)
}
default:
tmpl, err := template.New("").Parse(format)
if err != nil {
return err
return errors.Wrap(err, "invalid template")
}

if err := tmpl.Execute(os.Stdout, &m); err != nil {
return errors.Wrap(err, "execute template")
printBlock = func(id ulid.ULID) error {
m, err := block.DownloadMeta(ctx, bkt, id)
if err != nil {
return err
}

if err := tmpl.Execute(os.Stdout, &m); err != nil {
return errors.Wrap(err, "execute template")
}
fmt.Fprintln(os.Stdout, "")
return nil
}
fmt.Fprintln(os.Stdout, "")
return nil
}

return bkt.Iter(ctx, "", func(name string) error {
id, ok := block.IsBlockDir(name)
if !ok {
return nil
}
return printBlock(id)
})
}
return bkt.Iter(ctx, "", printBlock)
}

func getBucketClient(gcsBucket *string, s3Config s3.Config, reg *prometheus.Registry) (objstore.Bucket, func() error, error) {
Expand Down
6 changes: 3 additions & 3 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application, name stri
syncDelay := cmd.Flag("sync-delay", "Minimum age of blocks before they are being processed.").
Default("2h").Duration()

wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait wait for now work.").
wait := cmd.Flag("wait", "Do not exit after all compactions have been processed and wait for new work.").
Short('w').Bool()

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer) error {
Expand Down Expand Up @@ -119,8 +119,8 @@ func runCompact(
comp, err := tsdb.NewLeveledCompactor(reg, logger, []int64{
int64(2 * time.Hour / time.Millisecond),
int64(8 * time.Hour / time.Millisecond),
int64(2 * 24 * time.Hour / time.Millisecond),
int64(14 * 24 * time.Hour / time.Millisecond),
int64(2 * 24 * time.Hour / time.Millisecond), // dwo days
int64(14 * 24 * time.Hour / time.Millisecond), // 2 weeks
}, nil)
if err != nil {
return errors.Wrap(err, "create compactor")
Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/compact"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/objstore/gcs"
Expand Down Expand Up @@ -224,7 +223,7 @@ func processDownsampling(ctx context.Context, logger log.Logger, bkt objstore.Bu
begin := time.Now()
bdir := filepath.Join(dir, m.ULID.String())

err := compact.DownloadBlockDir(ctx, bkt, m.ULID.String(), bdir)
err := block.Download(ctx, bkt, m.ULID, bdir)
if err != nil {
return errors.Wrapf(err, "download block %s", m.ULID)
}
Expand Down

0 comments on commit ca9614e

Please sign in to comment.