Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downsample: fix deadlock if error occurs #4962

Merged
merged 1 commit into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4918](https://github.com/thanos-io/thanos/pull/4918) Tracing: Fixing force tracing with Jaeger.
- [#4928](https://github.com/thanos-io/thanos/pull/4928) Azure: Only create an http client once, to conserve memory.
- [#4879](https://github.com/thanos-io/thanos/pull/4879) Bucket verify: Fixed bug causing wrong number of blocks to be checked.
- [#4962](https://github.com/thanos-io/thanos/pull/4962) Compact/downsample: fix deadlock if error occurs with some backlog of blocks; fixes [this pull request](https://github.com/thanos-io/thanos/pull/4430). Affected versions are 0.22.0 - 0.23.1.

### Changed

Expand Down
131 changes: 73 additions & 58 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
Expand All @@ -20,13 +21,13 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"golang.org/x/sync/errgroup"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/client"
Expand Down Expand Up @@ -229,90 +230,104 @@ func downsampleBucket(
})

var (
eg errgroup.Group
ch = make(chan *metadata.Meta, downsampleConcurrency)
wg sync.WaitGroup
metaCh = make(chan *metadata.Meta)
downsampleErrs errutil.MultiError
errCh = make(chan error, downsampleConcurrency)
workerCtx, workerCancel = context.WithCancel(ctx)
)

defer workerCancel()

level.Debug(logger).Log("msg", "downsampling bucket", "concurrency", downsampleConcurrency)
for i := 0; i < downsampleConcurrency; i++ {
eg.Go(func() error {
for m := range ch {
wg.Add(1)
go func() {
defer wg.Done()
for m := range metaCh {
resolution := downsample.ResLevel1
errMsg := "downsampling to 5 min"
if m.Thanos.Downsample.Resolution == downsample.ResLevel1 {
resolution = downsample.ResLevel2
errMsg = "downsampling to 60 min"
}
if err := processDownsampling(ctx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
if err := processDownsampling(workerCtx, logger, bkt, m, dir, resolution, hashFunc, metrics); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, errMsg)
errCh <- errors.Wrap(err, errMsg)
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
}
return nil
})
}()
}

// Workers scheduled, distribute blocks.
eg.Go(func() error {
defer close(ch)
for _, mk := range metasULIDS {
m := metas[mk]
metaSendLoop:
for _, mk := range metasULIDS {
m := metas[mk]

switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue
switch m.Thanos.Downsample.Resolution {
case downsample.ResLevel2:
continue

case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
case downsample.ResLevel0:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources5m[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange0 {
continue
}

case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
case downsample.ResLevel1:
missing := false
for _, id := range m.Compaction.Sources {
if _, ok := sources1h[id]; !ok {
missing = true
break
}
}

select {
case <-ctx.Done():
return ctx.Err()
case ch <- m:
if !missing {
continue
}
// Only downsample blocks once we are sure to get roughly 2 chunks out of it.
// NOTE(fabxc): this must match with at which block size the compactor creates downsampled
// blocks. Otherwise we may never downsample some data.
if m.MaxTime-m.MinTime < downsample.DownsampleRange1 {
continue
}
}
return nil
})

if err := eg.Wait(); err != nil {
return errors.Wrap(err, "downsample bucket")
select {
case <-workerCtx.Done():
downsampleErrs.Add(workerCtx.Err())
break metaSendLoop
case metaCh <- m:
case downsampleErr := <-errCh:
downsampleErrs.Add(downsampleErr)
break metaSendLoop
}
}
return nil

close(metaCh)
wg.Wait()
workerCancel()
close(errCh)

// Collect any other error reported by the workers.
for downsampleErr := range errCh {
downsampleErrs.Add(downsampleErr)
}

return downsampleErrs.Err()
}

func processDownsampling(
Expand Down
141 changes: 141 additions & 0 deletions cmd/thanos/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ package main

import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"

Expand All @@ -26,6 +29,144 @@ import (
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

type erroringBucket struct {
bkt objstore.InstrumentedBucket
}

func (b *erroringBucket) Close() error {
return b.bkt.Close()
}

// WithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) WithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return b.bkt.WithExpectedErrs(f)
}

// ReaderWithExpectedErrs allows to specify a filter that marks certain errors as expected, so it will not increment
// thanos_objstore_bucket_operation_failures_total metric.
func (b *erroringBucket) ReaderWithExpectedErrs(f objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return b.bkt.ReaderWithExpectedErrs(f)
}

func (b *erroringBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return b.bkt.Iter(ctx, dir, f, options...)
}

// Get returns a reader for the given object name.
func (b *erroringBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.Get(ctx, name)
}

// GetRange returns a new range reader for the given object name and range.
func (b *erroringBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
if strings.Contains(name, "chunk") {
return nil, fmt.Errorf("some random error has occurred")
}
return b.bkt.GetRange(ctx, name, off, length)
}

// Exists checks if the given object exists in the bucket.
func (b *erroringBucket) Exists(ctx context.Context, name string) (bool, error) {
return b.bkt.Exists(ctx, name)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *erroringBucket) IsObjNotFoundErr(err error) bool {
return b.bkt.IsObjNotFoundErr(err)
}

// Attributes returns information about the specified object.
func (b *erroringBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return b.bkt.Attributes(ctx, name)
}

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (b *erroringBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return b.bkt.Upload(ctx, name, r)
}

// Delete removes the object with the given name.
// If object does not exists in the moment of deletion, Delete should throw error.
func (b *erroringBucket) Delete(ctx context.Context, name string) error {
return b.bkt.Delete(ctx, name)
}

// Name returns the bucket name for the provider.
func (b *erroringBucket) Name() string {
return b.bkt.Name()
}

// Ensures that downsampleBucket() stops its work properly
// after an error occurs with some blocks in the backlog.
// Testing for https://github.com/thanos-io/thanos/issues/4960.
func TestRegression4960_Deadlock(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
bkt = &erroringBucket{bkt: bkt}
var id, id2, id3 ulid.ULID
{
id, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "1"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "1"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id.String()), metadata.NoneFunc))
}
{
id2, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id2.String()), metadata.NoneFunc))
}
{
id3, err = e2eutil.CreateBlock(
ctx,
dir,
[]labels.Labels{{{Name: "a", Value: "2"}}},
1, 0, downsample.DownsampleRange0+1, // Pass the minimum DownsampleRange0 check.
labels.Labels{{Name: "e1", Value: "2"}},
downsample.ResLevel0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, path.Join(dir, id3.String()), metadata.NoneFunc))
}

meta, err := block.DownloadMeta(ctx, logger, bkt, id)
testutil.Ok(t, err)

metrics := newDownsampleMetrics(prometheus.NewRegistry())
testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(meta.Thanos))))
metaFetcher, err := block.NewMetaFetcher(nil, block.FetcherConcurrency, bkt, "", nil, nil, nil)
testutil.Ok(t, err)

metas, _, err := metaFetcher.Fetch(ctx)
testutil.Ok(t, err)
err = downsampleBucket(ctx, logger, metrics, bkt, metas, dir, 1, metadata.NoneFunc)
testutil.NotOk(t, err)

testutil.Assert(t, strings.Contains(err.Error(), "some random error has occurred"))

}

func TestCleanupDownsampleCacheFolder(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
dir, err := ioutil.TempDir("", "test-compact-cleanup")
Expand Down