diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 6465f16927e..a469080b38c 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -997,51 +997,62 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp begin := time.Now() toCompactDirs := make([]string, 0, len(toCompact)) - for _, meta := range toCompact { - bdir := filepath.Join(dir, meta.ULID.String()) - for _, s := range meta.Compaction.Sources { + g, ctx := errgroup.WithContext(ctx) + for _, m := range toCompact { + bdir := filepath.Join(dir, m.ULID.String()) + for _, s := range m.Compaction.Sources { if _, ok := uniqueSources[s]; ok { return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) } uniqueSources[s] = struct{}{} } - tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { - err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) - return err - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID)) - } + func(meta *metadata.Meta) { + g.Go(func() error { + tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { + err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir) + return err + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { + return retry(errors.Wrapf(err, "download block %s", meta.ULID)) + } - // Ensure all input blocks are valid. - var stats block.HealthStats - tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error { - stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) - return err - }, opentracing.Tags{"block.id": meta.ULID}) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) - } + // Ensure all input blocks are valid. + var stats block.HealthStats + tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error { + stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + return err + }, opentracing.Tags{"block.id": meta.ULID}) + if err != nil { + return errors.Wrapf(err, "gather index issues for block %s", bdir) + } - if err := stats.CriticalErr(); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) - } + if err := stats.CriticalErr(); err != nil { + return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) + } - if err := stats.OutOfOrderChunksErr(); err != nil { - return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) - } + if err := stats.OutOfOrderChunksErr(); err != nil { + return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID) + } - if err := stats.Issue347OutsideChunksErr(); err != nil { - return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) - } + if err := stats.Issue347OutsideChunksErr(); err != nil { + return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) + } - if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, - "block id %s, try running with --debug.accept-malformed-index", meta.ULID) - } + if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { + return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID) + } + + return nil + }) + }(m) toCompactDirs = append(toCompactDirs, bdir) } + + if err := g.Wait(); err != nil { + return false, ulid.ULID{}, err + } + level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) begin = time.Now() diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index c2f3c126438..40f71b324e4 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "golang.org/x/sync/errgroup" "github.com/thanos-io/thanos/pkg/runutil" ) @@ -259,29 +260,36 @@ func DownloadDir(ctx context.Context, logger log.Logger, bkt BucketReader, origi return errors.Wrap(err, "create dir") } + g, ctx := errgroup.WithContext(ctx) + var downloadedFiles []string - if err := bkt.Iter(ctx, src, func(name string) error { - dst := filepath.Join(dst, filepath.Base(name)) - if strings.HasSuffix(name, DirDelim) { - if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil { - return err - } - downloadedFiles = append(downloadedFiles, dst) - return nil - } - for _, ignoredPath := range ignoredPaths { - if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { - level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) + err := bkt.Iter(ctx, src, func(name string) error { + g.Go(func() error { + dst := filepath.Join(dst, filepath.Base(name)) + if strings.HasSuffix(name, DirDelim) { + if err := DownloadDir(ctx, logger, bkt, originalSrc, name, dst, ignoredPaths...); err != nil { + return err + } + downloadedFiles = append(downloadedFiles, dst) return nil } - } - if err := DownloadFile(ctx, logger, bkt, name, dst); err != nil { - return err - } - + for _, ignoredPath := range ignoredPaths { + if ignoredPath == strings.TrimPrefix(name, string(originalSrc)+DirDelim) { + level.Debug(logger).Log("msg", "not downloading again because a provided path matches this one", "file", name) + return nil + } + } + return DownloadFile(ctx, logger, bkt, name, dst) + }) downloadedFiles = append(downloadedFiles, dst) return nil - }); err != nil { + }) + + if err == nil { + err = g.Wait() + } + + if err != nil { downloadedFiles = append(downloadedFiles, dst) // Last, clean up the root dst directory. // Best-effort cleanup if the download failed. for _, f := range downloadedFiles {