Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove bucket operator metrics #599

Merged
merged 1 commit into from Oct 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/objstore/client/factory.go
Expand Up @@ -50,9 +50,9 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg *prometheus.Regist
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, reg, component)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component)
case string(S3):
bucket, err = s3.NewBucket(logger, config, reg, component)
bucket, err = s3.NewBucket(logger, config, component)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
}
Expand Down
41 changes: 5 additions & 36 deletions pkg/objstore/gcs/gcs.go
Expand Up @@ -15,25 +15,12 @@ import (
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
yaml "gopkg.in/yaml.v2"
)

const (
// Class A operations.
opObjectsList = "objects.list"
opObjectInsert = "object.insert"

// Class B operation.
opObjectGet = "object.get"

// Free operations.
opObjectDelete = "object.delete"
)

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

Expand All @@ -44,16 +31,15 @@ type gcsConfig struct {

// Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS.
type Bucket struct {
logger log.Logger
bkt *storage.BucketHandle
opsTotal *prometheus.CounterVec
name string
logger log.Logger
bkt *storage.BucketHandle
name string

closer io.Closer
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, reg prometheus.Registerer, component string) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
var gc gcsConfig
if err := yaml.Unmarshal(conf, &gc); err != nil {
return nil, err
Expand All @@ -69,17 +55,9 @@ func NewBucket(ctx context.Context, logger log.Logger, conf []byte, reg promethe
bkt := &Bucket{
logger: logger,
bkt: gcsClient.Bucket(gc.Bucket),
opsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_objstore_gcs_bucket_operations_total",
Help: "Total number of operations that were executed against a Google Compute Storage bucket.",
ConstLabels: prometheus.Labels{"bucket": gc.Bucket},
}, []string{"operation"}),
closer: gcsClient,
name: gc.Bucket,
}
if reg != nil {
reg.MustRegister()
}
return bkt, nil
}

Expand All @@ -91,7 +69,6 @@ func (b *Bucket) Name() string {
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
b.opsTotal.WithLabelValues(opObjectsList).Inc()
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
Expand Down Expand Up @@ -122,13 +99,11 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
b.opsTotal.WithLabelValues(opObjectGet).Inc()
return b.bkt.Object(name).NewReader(ctx)
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
b.opsTotal.WithLabelValues(opObjectGet).Inc()
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
}

Expand All @@ -140,8 +115,6 @@ func (b *Bucket) Handle() *storage.BucketHandle {

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
b.opsTotal.WithLabelValues(opObjectGet).Inc()

if _, err := b.bkt.Object(name).Attrs(ctx); err == nil {
return true, nil
} else if err != storage.ErrObjectNotExist {
Expand All @@ -152,8 +125,6 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {

// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
b.opsTotal.WithLabelValues(opObjectInsert).Inc()

w := b.bkt.Object(name).NewWriter(ctx)

if _, err := io.Copy(w, r); err != nil {
Expand All @@ -164,8 +135,6 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
b.opsTotal.WithLabelValues(opObjectDelete).Inc()

return b.bkt.Object(name).Delete(ctx)
}

Expand All @@ -192,7 +161,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error
return nil, nil, err
}

b, err := NewBucket(ctx, log.NewNopLogger(), bc, nil, "thanos-e2e-test")
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
cancel()
return nil, nil, err
Expand Down
40 changes: 8 additions & 32 deletions pkg/objstore/s3/s3.go
Expand Up @@ -22,19 +22,10 @@ import (
"github.com/minio/minio-go/pkg/credentials"
"github.com/minio/minio-go/pkg/encrypt"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
yaml "gopkg.in/yaml.v2"
)

const (
opObjectsList = "ListBucket"
opObjectInsert = "PutObject"
opObjectGet = "GetObject"
opObjectHead = "HEADObject"
opObjectDelete = "DeleteObject"
)

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

Expand All @@ -51,25 +42,24 @@ type Config struct {

// Bucket implements the store.Bucket interface against s3-compatible APIs.
type Bucket struct {
logger log.Logger
name string
client *minio.Client
sse encrypt.ServerSide
opsTotal *prometheus.CounterVec
logger log.Logger
name string
client *minio.Client
sse encrypt.ServerSide
}

// NewBucket returns a new Bucket using the provided s3 config values.
func NewBucket(logger log.Logger, conf []byte, reg prometheus.Registerer, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
var config Config
if err := yaml.Unmarshal(conf, &config); err != nil {
return nil, err
}

return NewBucketWithConfig(logger, config, reg, component)
return NewBucketWithConfig(logger, config, component)
}

// NewBucket returns a new Bucket using the provided s3 config values.
func NewBucketWithConfig(logger log.Logger, config Config, reg prometheus.Registerer, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
var chain []credentials.Provider

if err := Validate(config); err != nil {
Expand Down Expand Up @@ -139,14 +129,6 @@ func NewBucketWithConfig(logger log.Logger, config Config, reg prometheus.Regist
name: config.Bucket,
client: client,
sse: sse,
opsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "thanos_objstore_s3_bucket_operations_total",
Help: "Total number of operations that were executed against an s3 bucket.",
ConstLabels: prometheus.Labels{"bucket": config.Bucket},
}, []string{"operation"}),
}
if reg != nil {
reg.MustRegister(bkt.opsTotal)
}
return bkt, nil
}
Expand Down Expand Up @@ -179,7 +161,6 @@ func ValidateForTests(conf Config) error {
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) error {
b.opsTotal.WithLabelValues(opObjectsList).Inc()
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
Expand All @@ -204,7 +185,6 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
}

func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
b.opsTotal.WithLabelValues(opObjectGet).Inc()
opts := &minio.GetObjectOptions{ServerSideEncryption: b.sse}
if length != -1 {
if err := opts.SetRange(off, off+length-1); err != nil {
Expand Down Expand Up @@ -240,7 +220,6 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
b.opsTotal.WithLabelValues(opObjectHead).Inc()
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
Expand All @@ -254,8 +233,6 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
b.opsTotal.WithLabelValues(opObjectInsert).Inc()

_, err := b.client.PutObjectWithContext(ctx, b.name, name, r, -1,
minio.PutObjectOptions{ServerSideEncryption: b.sse},
)
Expand All @@ -265,7 +242,6 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
b.opsTotal.WithLabelValues(opObjectDelete).Inc()
return b.client.RemoveObject(b.name, name)
}

Expand Down Expand Up @@ -313,7 +289,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc, nil, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/store_gateway_test.go
Expand Up @@ -77,7 +77,7 @@ func TestStoreGatewayQuery(t *testing.T) {

l := log.NewLogfmtLogger(os.Stdout)

bkt, err := s3.NewBucketWithConfig(l, s3Config, nil, "test-feed")
bkt, err := s3.NewBucketWithConfig(l, s3Config, "test-feed")
testutil.Ok(t, err)

testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String()))
Expand Down