Skip to content

Commit

Permalink
Parallel BLocks
Browse files Browse the repository at this point in the history
  • Loading branch information
alanprot committed Jul 5, 2022
1 parent 30a146e commit 60ca9ca
Showing 1 changed file with 43 additions and 32 deletions.
75 changes: 43 additions & 32 deletions pkg/compact/compact.go
Expand Up @@ -997,51 +997,62 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin := time.Now()

toCompactDirs := make([]string, 0, len(toCompact))
for _, meta := range toCompact {
bdir := filepath.Join(dir, meta.ULID.String())
for _, s := range meta.Compaction.Sources {
g, ctx := errgroup.WithContext(ctx)
for _, m := range toCompact {
bdir := filepath.Join(dir, m.ULID.String())
for _, s := range m.Compaction.Sources {
if _, ok := uniqueSources[s]; ok {
return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact))
}
uniqueSources[s] = struct{}{}
}

tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID))
}
func(meta *metadata.Meta) {
g.Go(func() error {
tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return retry(errors.Wrapf(err, "download block %s", meta.ULID))
}

// Ensure all input blocks are valid.
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir)
}
// Ensure all input blocks are valid.
var stats block.HealthStats
tracing.DoInSpanWithErr(ctx, "compaction_block_health_stats", func(ctx context.Context) error {
stats, err = block.GatherIndexHealthStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime)
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
return errors.Wrapf(err, "gather index issues for block %s", bdir)
}

if err := stats.CriticalErr(); err != nil {
return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}
if err := stats.CriticalErr(); err != nil {
return halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels))
}

if err := stats.OutOfOrderChunksErr(); err != nil {
return false, ulid.ULID{}, outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}
if err := stats.OutOfOrderChunksErr(); err != nil {
return outOfOrderChunkError(errors.Wrapf(err, "blocks with out-of-order chunks are dropped from compaction: %s", bdir), meta.ULID)
}

if err := stats.Issue347OutsideChunksErr(); err != nil {
return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}
if err := stats.Issue347OutsideChunksErr(); err != nil {
return issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID)
}

if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return false, ulid.ULID{}, errors.Wrapf(err,
"block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}
if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil {
return errors.Wrapf(err, "block id %s, try running with --debug.accept-malformed-index", meta.ULID)
}

return nil
})
}(m)
toCompactDirs = append(toCompactDirs, bdir)
}

if err := g.Wait(); err != nil {
return false, ulid.ULID{}, err
}

level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())

begin = time.Now()
Expand Down

0 comments on commit 60ca9ca

Please sign in to comment.