diff --git a/CHANGELOG.md b/CHANGELOG.md index 28fe4f41893..f670a7c9356 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased - [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. +- [#3289](https://github.com/thanos-io/thanos/pull/3289) Thanos Object Store: Adding `prefix` to a bucket config ## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress diff --git a/docs/storage.md b/docs/storage.md index a45352f2cb3..4910d14e276 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -421,12 +421,12 @@ prefix: "" ## Prefix Prefix field allows adding an optional prefix to block (`/`) and block files which are uploaded by any block "producer" (e.g sidecar, ruler, receiver). -The sample config below ensures that all the bucket operations e.g: upload, delete, list, etc are performed on `/tenant-0` path pf the object store only. +The sample config below ensures that all the bucket operations e.g: upload, delete, list, etc are performed on `/tenant-0/prefix0/` path of the object store only. The prefix shall start and end with '/'. Sample object store config: ```yaml type: S3 config: -prefix: tenant-0 +prefix: /tenant-0/prefix0/ ``` It is worth mentioning that this feature can be used to store data of different tenants in different paths of the same bucket. However, for such use-cases, putting data on different paths WILL REQUIRE totally separate Store Gateway / Compactor by design. diff --git a/pkg/objstore/objstore.go b/pkg/objstore/objstore.go index 91ad4b90c0e..7b6e9ab5988 100644 --- a/pkg/objstore/objstore.go +++ b/pkg/objstore/objstore.go @@ -8,6 +8,7 @@ import ( "context" "io" "os" + "path" "path/filepath" "strings" "time" @@ -450,6 +451,10 @@ func NewPrefixedBucket(bkt Bucket, prefix string) *prefixedBucket { } } +func (pbkt *prefixedBucket) nameWithPrefix(name string) string { + return path.Join(pbkt.prefix, name) +} + func (pbkt *prefixedBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket { if ib, ok := pbkt.bkt.(InstrumentedBucket); ok { return &prefixedBucket{ @@ -469,13 +474,13 @@ func (pbkt *prefixedBucket) Name() string { } func (pbkt *prefixedBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) err = pbkt.bkt.Upload(ctx, pname, r) return } func (pbkt *prefixedBucket) Delete(ctx context.Context, name string) (err error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) err = pbkt.bkt.Delete(ctx, pname) return } @@ -485,7 +490,7 @@ func (pbkt *prefixedBucket) Close() error { } func (pbkt *prefixedBucket) Iter(ctx context.Context, dir string, f func(string) error) error { - pdir := filepath.Join(pbkt.prefix, dir) + pdir := pbkt.nameWithPrefix(dir) if pbkt.prefix != "" { return pbkt.bkt.Iter(ctx, pdir, func(s string) error { return f(strings.Join(strings.Split(s, pbkt.prefix)[1:], "/")) @@ -495,17 +500,17 @@ func (pbkt *prefixedBucket) Iter(ctx context.Context, dir string, f func(string) } func (pbkt *prefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) return pbkt.bkt.Get(ctx, pname) } func (pbkt *prefixedBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) return pbkt.bkt.GetRange(ctx, pname, off, length) } func (pbkt *prefixedBucket) Exists(ctx context.Context, name string) (bool, error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) return pbkt.bkt.Exists(ctx, pname) } @@ -514,7 +519,7 @@ func (pbkt *prefixedBucket) IsObjNotFoundErr(err error) bool { } func (pbkt *prefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { - pname := filepath.Join(pbkt.prefix, name) + pname := pbkt.nameWithPrefix(name) return pbkt.bkt.Attributes(ctx, pname) } diff --git a/pkg/objstore/objstore_test.go b/pkg/objstore/objstore_test.go index 73ba1671357..4db74b4376b 100644 --- a/pkg/objstore/objstore_test.go +++ b/pkg/objstore/objstore_test.go @@ -4,6 +4,10 @@ package objstore import ( + "context" + "io/ioutil" + "sort" + "strings" "testing" promtest "github.com/prometheus/client_golang/prometheus/testutil" @@ -63,3 +67,60 @@ func TestMetricBucket_Close(t *testing.T) { testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration)) testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload) } + +func TestPrefixedBucket(t *testing.T) { + prefix := "/abc/def/" + ctx := context.Background() + bkt := NewPrefixedBucket(NewInMemBucket(), prefix) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@"))) + // Double check we can immediately read it. + rc1, err := bkt.Get(ctx, "id1/obj_1.some") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, rc1.Close()) }() + content, err := ioutil.ReadAll(rc1) + testutil.Ok(t, err) + testutil.Equals(t, "@test-data@", string(content)) + // Upload other objects. + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + // Upload should be idempotent. + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@"))) + testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@"))) + testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@"))) + testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@"))) + + // Can we iter over items from top dir? + var seen []string + testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { + seen = append(seen, fn) + return nil + })) + expected := []string{"obj_5.some", "id1/", "id2/"} + sort.Strings(expected) + sort.Strings(seen) + // fmt.Printf("%v\n", inmembkt.Objects()) + testutil.Equals(t, expected, seen) + + // Can we iter over items from id1/ dir? + seen = []string{} + testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { + seen = append(seen, fn) + return nil + })) + testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen) + + // Can we iter over items from id1 dir? + seen = []string{} + testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { + seen = append(seen, fn) + return nil + })) + testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen) + + // Can we iter over items from not existing dir? + testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error { + t.Error("Not expected to loop through not existing directory") + t.FailNow() + + return nil + })) +}