Skip to content

Commit

Permalink
addressing the review comments, and adding a CHANGELOG entry
Browse files Browse the repository at this point in the history
Signed-off-by: sayan <sayan@infracloud.io>
  • Loading branch information
dsayan154 committed Dec 14, 2020
1 parent a5ee9ef commit 05104b2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -12,6 +12,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
## Unreleased

- [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before.
- [#3289](https://github.com/thanos-io/thanos/pull/3289) Thanos Object Store: Adding `prefix` to a bucket config

## [v0.16.0](https://github.com/thanos-io/thanos/releases) - Release in progress

Expand Down
4 changes: 2 additions & 2 deletions docs/storage.md
Expand Up @@ -421,12 +421,12 @@ prefix: ""
## Prefix

Prefix field allows adding an optional prefix to block (`/<ulid>`) and block files which are uploaded by any block "producer" (e.g sidecar, ruler, receiver).
The sample config below ensures that all the bucket operations e.g: upload, delete, list, etc are performed on `/tenant-0` path pf the object store only.
The sample config below ensures that all the bucket operations e.g: upload, delete, list, etc are performed on `/tenant-0/prefix0/` path of the object store only. The prefix shall start and end with '/'.
Sample object store config:
```yaml
type: S3
config:
<provider specific config/s>
prefix: tenant-0
prefix: /tenant-0/prefix0/
```
It is worth mentioning that this feature can be used to store data of different tenants in different paths of the same bucket. However, for such use-cases, putting data on different paths WILL REQUIRE totally separate Store Gateway / Compactor by design.
19 changes: 12 additions & 7 deletions pkg/objstore/objstore.go
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"io"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -450,6 +451,10 @@ func NewPrefixedBucket(bkt Bucket, prefix string) *prefixedBucket {
}
}

func (pbkt *prefixedBucket) nameWithPrefix(name string) string {
return path.Join(pbkt.prefix, name)
}

func (pbkt *prefixedBucket) WithExpectedErrs(expectedFunc IsOpFailureExpectedFunc) Bucket {
if ib, ok := pbkt.bkt.(InstrumentedBucket); ok {
return &prefixedBucket{
Expand All @@ -469,13 +474,13 @@ func (pbkt *prefixedBucket) Name() string {
}

func (pbkt *prefixedBucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
err = pbkt.bkt.Upload(ctx, pname, r)
return
}

func (pbkt *prefixedBucket) Delete(ctx context.Context, name string) (err error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
err = pbkt.bkt.Delete(ctx, pname)
return
}
Expand All @@ -485,7 +490,7 @@ func (pbkt *prefixedBucket) Close() error {
}

func (pbkt *prefixedBucket) Iter(ctx context.Context, dir string, f func(string) error) error {
pdir := filepath.Join(pbkt.prefix, dir)
pdir := pbkt.nameWithPrefix(dir)
if pbkt.prefix != "" {
return pbkt.bkt.Iter(ctx, pdir, func(s string) error {
return f(strings.Join(strings.Split(s, pbkt.prefix)[1:], "/"))
Expand All @@ -495,17 +500,17 @@ func (pbkt *prefixedBucket) Iter(ctx context.Context, dir string, f func(string)
}

func (pbkt *prefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
return pbkt.bkt.Get(ctx, pname)
}

func (pbkt *prefixedBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
return pbkt.bkt.GetRange(ctx, pname, off, length)
}

func (pbkt *prefixedBucket) Exists(ctx context.Context, name string) (bool, error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
return pbkt.bkt.Exists(ctx, pname)
}

Expand All @@ -514,7 +519,7 @@ func (pbkt *prefixedBucket) IsObjNotFoundErr(err error) bool {
}

func (pbkt *prefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
pname := filepath.Join(pbkt.prefix, name)
pname := pbkt.nameWithPrefix(name)
return pbkt.bkt.Attributes(ctx, pname)
}

Expand Down
61 changes: 61 additions & 0 deletions pkg/objstore/objstore_test.go
Expand Up @@ -4,6 +4,10 @@
package objstore

import (
"context"
"io/ioutil"
"sort"
"strings"
"testing"

promtest "github.com/prometheus/client_golang/prometheus/testutil"
Expand Down Expand Up @@ -63,3 +67,60 @@ func TestMetricBucket_Close(t *testing.T) {
testutil.Equals(t, 7, promtest.CollectAndCount(bkt.opsDuration))
testutil.Assert(t, promtest.ToFloat64(bkt.lastSuccessfulUploadTime) > lastUpload)
}

func TestPrefixedBucket(t *testing.T) {
prefix := "/abc/def/"
ctx := context.Background()
bkt := NewPrefixedBucket(NewInMemBucket(), prefix)
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_1.some", strings.NewReader("@test-data@")))
// Double check we can immediately read it.
rc1, err := bkt.Get(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc1.Close()) }()
content, err := ioutil.ReadAll(rc1)
testutil.Ok(t, err)
testutil.Equals(t, "@test-data@", string(content))
// Upload other objects.
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
// Upload should be idempotent.
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_2.some", strings.NewReader("@test-data2@")))
testutil.Ok(t, bkt.Upload(ctx, "id1/obj_3.some", strings.NewReader("@test-data3@")))
testutil.Ok(t, bkt.Upload(ctx, "id2/obj_4.some", strings.NewReader("@test-data4@")))
testutil.Ok(t, bkt.Upload(ctx, "obj_5.some", strings.NewReader("@test-data5@")))

// Can we iter over items from top dir?
var seen []string
testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error {
seen = append(seen, fn)
return nil
}))
expected := []string{"obj_5.some", "id1/", "id2/"}
sort.Strings(expected)
sort.Strings(seen)
// fmt.Printf("%v\n", inmembkt.Objects())
testutil.Equals(t, expected, seen)

// Can we iter over items from id1/ dir?
seen = []string{}
testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error {
seen = append(seen, fn)
return nil
}))
testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)

// Can we iter over items from id1 dir?
seen = []string{}
testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error {
seen = append(seen, fn)
return nil
}))
testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some"}, seen)

// Can we iter over items from not existing dir?
testutil.Ok(t, bkt.Iter(ctx, "id0", func(fn string) error {
t.Error("Not expected to loop through not existing directory")
t.FailNow()

return nil
}))
}

0 comments on commit 05104b2

Please sign in to comment.