Skip to content

Commit

Permalink
objstore: Download and Upload block files in parallel (#5475)
Browse files Browse the repository at this point in the history
* Parallel Chunks

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* test

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Changelog

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* making ApplyDownloadOptions private

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* upload concurrency

Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Upload Test

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* update change log

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Change comments

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Address comments

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Remove duplicate entries on changelog

Signed-off-by: Alan Protasio <approtas@amazon.com>
Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Addressing Comments

Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* update golang.org/x/sync

Signed-off-by: alanprot <alanprot@gmail.com>
Signed-off-by: Alan Protasio <approtas@amazon.com>

* Adding Commentts

Signed-off-by: Alan Protasio <approtas@amazon.com>
  • Loading branch information
alanprot committed Jul 12, 2022
1 parent 5034d71 commit 79ab7c6
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5424](https://github.com/thanos-io/thanos/pull/5424) Receive: Export metrics regarding size of remote write requests.
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.
- [#5472](https://github.com/thanos-io/thanos/pull/5472) Receive: add new tenant metrics to example dashboard.
- [#5475](https://github.com/thanos-io/thanos/pull/5475) Compact/Store: Added `--block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction.

### Changed

Expand Down
4 changes: 4 additions & 0 deletions cmd/thanos/compact.go
Expand Up @@ -349,6 +349,7 @@ func runCompact(
compactMetrics.garbageCollectedBlocks,
compactMetrics.blocksMarked.WithLabelValues(metadata.NoCompactMarkFilename, metadata.OutOfOrderChunksNoCompactReason),
metadata.HashFunc(conf.hashFunc),
conf.blockFilesConcurrency,
)
tsdbPlanner := compact.NewPlanner(logger, levels, noCompactMarkerFilter)
planner := compact.WithLargeTotalIndexSizeFilter(
Expand Down Expand Up @@ -630,6 +631,7 @@ type compactConfig struct {
waitInterval time.Duration
disableDownsampling bool
blockMetaFetchConcurrency int
blockFilesConcurrency int
blockViewerSyncBlockInterval time.Duration
blockViewerSyncBlockTimeout time.Duration
cleanupBlocksInterval time.Duration
Expand Down Expand Up @@ -688,6 +690,8 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-files-concurrency", "Number of goroutines to use when fetching/uploading block files from object storage.").
Default("1").IntVar(&cc.blockFilesConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Default("1m").DurationVar(&cc.blockViewerSyncBlockInterval)
cmd.Flag("block-viewer.global.sync-block-timeout", "Maximum time for syncing the blocks between local and remote view for /global Block Viewer UI.").
Expand Down
4 changes: 4 additions & 0 deletions docs/components/compact.md
Expand Up @@ -279,6 +279,10 @@ usage: thanos compact [<flags>]
Continuously compacts blocks in an object store bucket.
Flags:
--block-files-concurrency=1
Number of goroutines to use when
fetching/uploading block files from object
storage.
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -86,7 +86,7 @@ require (
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
golang.org/x/text v0.3.7
google.golang.org/api v0.78.0
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Expand Up @@ -2262,8 +2262,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200930132711-30421366ff76/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
16 changes: 8 additions & 8 deletions pkg/block/block.go
Expand Up @@ -45,7 +45,7 @@ const (
// Download downloads directory that is mean to be block directory. If any of the files
// have a hash calculated in the meta file and it matches with what is in the destination path then
// we do not download it. We always re-download the meta file.
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string) error {
func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id ulid.ULID, dst string, options ...objstore.DownloadOption) error {
if err := os.MkdirAll(dst, 0750); err != nil {
return errors.Wrap(err, "create dir")
}
Expand Down Expand Up @@ -74,7 +74,7 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id
}
}

if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, ignoredPaths...); err != nil {
if err := objstore.DownloadDir(ctx, logger, bucket, id.String(), id.String(), dst, append(options, objstore.WithDownloadIgnoredPaths(ignoredPaths...))...); err != nil {
return err
}

Expand All @@ -94,21 +94,21 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id

// Upload uploads a TSDB block to the object storage. It verifies basic
// features of Thanos block.
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, true)
func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error {
return upload(ctx, logger, bkt, bdir, hf, true, options...)
}

// UploadPromBlock uploads a TSDB block to the object storage. It assumes
// the block is used in Prometheus so it doesn't check Thanos external labels.
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc) error {
return upload(ctx, logger, bkt, bdir, hf, false)
func UploadPromBlock(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error {
return upload(ctx, logger, bkt, bdir, hf, false, options...)
}

// upload uploads block from given block dir that ends with block id.
// It makes sure cleanup is done on error to avoid partial block uploads.
// TODO(bplotka): Ensure bucket operations have reasonable backoff retries.
// NOTE: Upload updates `meta.Thanos.File` section.
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool) error {
func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, checkExternalLabels bool, options ...objstore.UploadOption) error {
df, err := os.Stat(bdir)
if err != nil {
return err
Expand Down Expand Up @@ -145,7 +145,7 @@ func upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir st
return errors.Wrap(err, "encode meta file")
}

if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname)); err != nil {
if err := objstore.UploadDir(ctx, logger, bkt, filepath.Join(bdir, ChunksDirname), path.Join(id.String(), ChunksDirname), options...); err != nil {
return cleanUp(logger, bkt, id, errors.Wrap(err, "upload chunks"))
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/compact/compact.go
Expand Up @@ -232,6 +232,7 @@ type DefaultGrouper struct {
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
}

// NewDefaultGrouper makes a new DefaultGrouper.
Expand All @@ -245,6 +246,7 @@ func NewDefaultGrouper(
garbageCollectedBlocks prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
) *DefaultGrouper {
return &DefaultGrouper{
bkt: bkt,
Expand Down Expand Up @@ -275,6 +277,7 @@ func NewDefaultGrouper(
garbageCollectedBlocks: garbageCollectedBlocks,
blocksMarkedForDeletion: blocksMarkedForDeletion,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
}
}

Expand Down Expand Up @@ -304,6 +307,7 @@ func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Gro
g.blocksMarkedForDeletion,
g.blocksMarkedForNoCompact,
g.hashFunc,
g.blockFilesConcurrency,
)
if err != nil {
return nil, errors.Wrap(err, "create compaction group")
Expand Down Expand Up @@ -342,6 +346,7 @@ type Group struct {
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
hashFunc metadata.HashFunc
blockFilesConcurrency int
}

// NewGroup returns a new compaction group.
Expand All @@ -362,10 +367,16 @@ func NewGroup(
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
hashFunc metadata.HashFunc,
blockFilesConcurrency int,
) (*Group, error) {
if logger == nil {
logger = log.NewNopLogger()
}

if blockFilesConcurrency <= 0 {
return nil, errors.Errorf("invalid concurrency level (%d), blockFilesConcurrency level must be > 0", blockFilesConcurrency)
}

g := &Group{
logger: logger,
bkt: bkt,
Expand All @@ -383,6 +394,7 @@ func NewGroup(
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
hashFunc: hashFunc,
blockFilesConcurrency: blockFilesConcurrency,
}
return g, nil
}
Expand Down Expand Up @@ -1007,7 +1019,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
}

tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error {
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir)
err = block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency))
return err
}, opentracing.Tags{"block.id": meta.ULID})
if err != nil {
Expand Down Expand Up @@ -1109,7 +1121,7 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
begin = time.Now()

tracing.DoInSpanWithErr(ctx, "compaction_block_upload", func(ctx context.Context) error {
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc)
err = block.Upload(ctx, cg.logger, cg.bkt, bdir, cg.hashFunc, objstore.WithUploadConcurrency(cg.blockFilesConcurrency))
return err
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -139,7 +139,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
testutil.Ok(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc)
grouper := NewDefaultGrouper(nil, bkt, false, false, nil, blocksMarkedForDeletion, garbageCollectedBlocks, blockMarkedForNoCompact, metadata.NoneFunc, 1)
groups, err := grouper.Groups(sy.Metas())
testutil.Ok(t, err)

Expand Down Expand Up @@ -214,7 +214,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
testutil.Ok(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc)
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks, blocksMaredForNoCompact, metadata.NoneFunc, 1)
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true)
testutil.Ok(t, err)

Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_test.go
Expand Up @@ -210,7 +210,7 @@ func TestRetentionProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

type groupedResult map[string]float64

Expand Down Expand Up @@ -376,7 +376,7 @@ func TestCompactProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

for _, tcase := range []struct {
testName string
Expand Down Expand Up @@ -498,7 +498,7 @@ func TestDownsampleProgressCalculate(t *testing.T) {

var bkt objstore.Bucket
temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"})
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "")
grouper := NewDefaultGrouper(logger, bkt, false, false, reg, temp, temp, temp, "", 1)

for _, tcase := range []struct {
testName string
Expand Down

0 comments on commit 79ab7c6

Please sign in to comment.