Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

objstore: Added WithExpectedErrs which allows to control instrumentation (e.g not increment failures for expected not found) #2383

Merged
merged 2 commits into from Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/thanos/main_test.go
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)
Expand All @@ -36,7 +36,7 @@ func TestCleanupIndexCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())

// Upload one compaction lvl = 2 block, one compaction lvl = 1.
// We generate index cache files only for lvl > 1 blocks.
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
var id ulid.ULID
{
id, err = e2eutil.CreateBlock(
Expand Down
8 changes: 4 additions & 4 deletions pkg/block/block_test.go
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"

Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUpload(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
{{Name: "a", Value: "2"}},
Expand Down Expand Up @@ -185,7 +185,7 @@ func TestDelete(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
{
b1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestMarkForDeletion(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.NewInMemBucket()
{
blockWithoutDeletionMark, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{
{{Name: "a", Value: "1"}},
Expand Down
12 changes: 6 additions & 6 deletions pkg/block/fetcher.go
Expand Up @@ -142,7 +142,7 @@ type MetadataModifier interface {
type BaseFetcher struct {
logger log.Logger
concurrency int
bkt objstore.BucketReader
bkt objstore.InstrumentedBucketReader

// Optional local directory to cache meta.json files.
cacheDir string
Expand All @@ -152,7 +152,7 @@ type BaseFetcher struct {
}

// NewBaseFetcher constructs BaseFetcher.
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer) (*BaseFetcher, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func NewBaseFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReade
}

// NewMetaFetcher returns meta fetcher.
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.BucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.InstrumentedBucketReader, dir string, reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier) (*MetaFetcher, error) {
b, err := NewBaseFetcher(logger, concurrency, bkt, dir, reg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -236,7 +236,7 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
}
}

r, err := f.bkt.Get(ctx, metaFile)
r, err := f.bkt.ReaderWithExpectedErrs(f.bkt.IsObjNotFoundErr).Get(ctx, metaFile)
if f.bkt.IsObjNotFoundErr(err) {
// Meta.json was deleted between bkt.Exists and here.
return nil, errors.Wrapf(ErrorSyncMetaNotFound, "%v", err)
Expand Down Expand Up @@ -740,12 +740,12 @@ func (f *ConsistencyDelayMetaFilter) Filter(_ context.Context, metas map[ulid.UL
type IgnoreDeletionMarkFilter struct {
logger log.Logger
delay time.Duration
bkt objstore.BucketReader
bkt objstore.InstrumentedBucketReader
deletionMarkMap map[ulid.ULID]*metadata.DeletionMark
}

// NewIgnoreDeletionMarkFilter creates IgnoreDeletionMarkFilter.
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.BucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
func NewIgnoreDeletionMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, delay time.Duration) *IgnoreDeletionMarkFilter {
return &IgnoreDeletionMarkFilter{
logger: logger,
bkt: bkt,
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/fetcher_test.go
Expand Up @@ -75,7 +75,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {

var ulidToDelete ulid.ULID
r := prometheus.NewRegistry()
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, bkt, dir, r)
baseFetcher, err := NewBaseFetcher(log.NewNopLogger(), 20, objstore.WithNoopInstr(bkt), dir, r)
testutil.Ok(t, err)

fetcher := baseFetcher.NewMetaFetcher(r, []MetadataFilter{
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
now := time.Now()
f := &IgnoreDeletionMarkFilter{
logger: log.NewNopLogger(),
bkt: bkt,
bkt: objstore.WithNoopInstr(bkt),
delay: 48 * time.Hour,
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/block/indexheader/json_reader.go
Expand Up @@ -199,7 +199,7 @@ type JSONReader struct {
}

// NewJSONReader loads or builds new index-cache.json if not present on disk or object storage.
func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.InstrumentedBucketReader, dir string, id ulid.ULID) (*JSONReader, error) {
cachefn := filepath.Join(dir, id.String(), block.IndexCacheFilename)
jr, err := newFileJSONReader(logger, cachefn)
if err == nil {
Expand All @@ -216,7 +216,7 @@ func NewJSONReader(ctx context.Context, logger log.Logger, bkt objstore.BucketRe
}

// Try to download index cache file from object store.
if err = objstore.DownloadFile(ctx, logger, bkt, filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
if err = objstore.DownloadFile(ctx, logger, bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr), filepath.Join(id.String(), block.IndexCacheFilename), cachefn); err == nil {
return newFileJSONReader(logger, cachefn)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/deletionmark.go
Expand Up @@ -45,10 +45,10 @@ type DeletionMark struct {
}

// ReadDeletionMark reads the given deletion mark file from <dir>/deletion-mark.json in bucket.
func ReadDeletionMark(ctx context.Context, bkt objstore.BucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
func ReadDeletionMark(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, dir string) (*DeletionMark, error) {
deletionMarkFile := path.Join(dir, DeletionMarkFilename)

r, err := bkt.Get(ctx, deletionMarkFile)
r, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, deletionMarkFile)
if err != nil {
if bkt.IsObjNotFoundErr(err) {
return nil, ErrorDeletionMarkNotFound
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/deletionmark_test.go
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/fortytw2/leaktest"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand All @@ -29,7 +29,7 @@ func TestReadDeletionMark(t *testing.T) {
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }()

bkt := inmem.NewBucket()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
{
blockWithoutDeletionMark := ulid.MustNew(uint64(1), nil)
_, err := ReadDeletionMark(ctx, bkt, nil, path.Join(tmpDir, blockWithoutDeletionMark.String()))
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/clean_test.go
Expand Up @@ -18,15 +18,15 @@ import (
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestBestEffortCleanAbortedPartialUploads(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

bkt := inmem.NewBucket()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
logger := log.NewNopLogger()

metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, nil, nil)
Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -90,7 +90,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
}

duplicateBlocksFilter := block.NewDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
duplicateBlocksFilter,
}, nil)
testutil.Ok(t, err)
Expand Down Expand Up @@ -176,9 +176,9 @@ func TestGroup_Compact_e2e(t *testing.T) {

reg := prometheus.NewRegistry()

ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, 48*time.Hour)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, objstore.WithNoopInstr(bkt), 48*time.Hour)
duplicateBlocksFilter := block.NewDeduplicateFilter()
metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil, []block.MetadataFilter{
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
}, nil)
Expand Down
3 changes: 1 addition & 2 deletions pkg/compact/retention_test.go
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/objstore/inmem"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -240,7 +239,7 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) {
},
} {
t.Run(tt.name, func(t *testing.T) {
bkt := inmem.NewBucket()
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
for _, b := range tt.blocks {
uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/client/factory.go
Expand Up @@ -42,7 +42,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string) (objstore.InstrumentedBucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/objstore/cos/cos.go
Expand Up @@ -52,6 +52,7 @@ func (conf *Config) validate() error {
return nil
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
Expand Down