Skip to content

Commit

Permalink
Download blocks to compact in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Jul 5, 2022
1 parent 5f6ee25 commit 12e2896
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 50 deletions.
75 changes: 43 additions & 32 deletions pkg/compact/compact.go
Expand Up @@ -997,51 +997,62 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := time.Now()

toCompactDirs := make([]string, 0, len(toCompact))
for _, meta := range toCompact {
bdir := filepath.Join(dir, meta.ULID.String())
for _, s := range meta.Compaction.Sources {
g, ctx := errgroup.WithContext(ctx)
for _, m := range toCompact {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}

tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID))
}
func(meta *metadata.Meta) {
g.Go(func() error {
tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
// Ensure all input blocks are valid.
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}
if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}

return nil
})
}(m)
toCompactDirs = append(toCompactDirs, bdir)
}

if err := g.Wait(); err != nil {
return false, ulid.ULID{}, err
}

level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

begin = time.Now()
Expand Down
44 changes: 26 additions & 18 deletions pkg/objstore/objstore.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/runutil"
)
Expand Down Expand Up @@ -259,29 +260,36 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi
return errors.Wrap(err, "create dir")
}

g, ctx := errgroup.WithContext(ctx)

var downloadedFiles []string
if err := bkt.Iter(ctx, src, func(name string) error {
dst := filepath.Join(dst, filepath.Base(name))
if strings.HasSuffix(name, DirDelim) {
if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil {
return err
}
downloadedFiles = append(downloadedFiles, dst)
return nil
}
for _, ignoredPath := range ignoredPaths {
if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) {
level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name)
err := bkt.Iter(ctx, src, func(name string) error {
g.Go(func() error {
dst := filepath.Join(dst, filepath.Base(name))
if strings.HasSuffix(name, DirDelim) {
if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil {
return err
}
downloadedFiles = append(downloadedFiles, dst)
return nil
}
}
if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil {
return err
}

for _, ignoredPath := range ignoredPaths {
if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) {
level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name)
return nil
}
}
return DownloadFile(ctx, logger, bkt, name, dst)
})
downloadedFiles = append(downloadedFiles, dst)
return nil
}); err != nil {
})

if err == nil {
err = g.Wait()
}

if err != nil {
downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory.
// Best-effort cleanup if the download failed.
for _, f := range downloadedFiles {
Expand Down

0 comments on commit 12e2896

Please sign in to comment.