-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcs.go
234 lines (198 loc) · 6.5 KB
/
gcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
// Package gcs implements common object storage abstractions against Google Cloud Storage.
package gcs
import (
"context"
"fmt"
"io"
"runtime"
"strings"
"testing"
"cloud.google.com/go/storage"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/version"
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"gopkg.in/yaml.v2"
"github.com/swgupta98/objstore"
)
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"
// Config stores the configuration for gcs bucket.
type Config struct {
Bucket string `yaml:"bucket"`
ServiceAccount string `yaml:"service_account"`
}
// Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS.
type Bucket struct {
logger log.Logger
bkt *storage.BucketHandle
name string
closer io.Closer
}
// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
var gc Config
if err := yaml.Unmarshal(conf, &gc); err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, gc, component)
}
// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
var opts []option.ClientOption
// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
if gc.ServiceAccount != "" {
credentials, err := google.CredentialsFromJSON(ctx, []byte(gc.ServiceAccount), storage.ScopeFullControl)
if err != nil {
return nil, errors.Wrap(err, "failed to create credentials from JSON")
}
opts = append(opts, option.WithCredentials(credentials))
}
opts = append(opts,
option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version())),
)
gcsClient, err := storage.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
bkt := &Bucket{
logger: logger,
bkt: gcsClient.Bucket(gc.Bucket),
closer: gcsClient,
name: gc.Bucket,
}
return bkt, nil
}
// Name returns the bucket name for gcs.
func (b *Bucket) Name() string {
return b.name
}
// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
// Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the
// object itself as one prefix item.
if dir != "" {
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}
// If recursive iteration is enabled we should pass an empty delimiter.
delimiter := DirDelim
if objstore.ApplyIterOptions(options...).Recursive {
delimiter = ""
}
it := b.bkt.Objects(ctx, &storage.Query{
Prefix: dir,
Delimiter: delimiter,
})
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
attrs, err := it.Next()
if err == iterator.Done {
return nil
}
if err != nil {
return err
}
if err := f(attrs.Prefix + attrs.Name); err != nil {
return err
}
}
}
// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bkt.Object(name).NewReader(ctx)
}
// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
}
// Attributes returns information about the specified object.
func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
attrs, err := b.bkt.Object(name).Attrs(ctx)
if err != nil {
return objstore.ObjectAttributes{}, err
}
return objstore.ObjectAttributes{
Size: attrs.Size,
LastModified: attrs.Updated,
}, nil
}
// Handle returns the underlying GCS bucket handle.
// Used for testing purposes (we return handle, so it is not instrumented).
func (b *Bucket) Handle() *storage.BucketHandle {
return b.bkt
}
// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
if _, err := b.bkt.Object(name).Attrs(ctx); err == nil {
return true, nil
} else if err != storage.ErrObjectNotExist {
return false, err
}
return false, nil
}
// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
w := b.bkt.Object(name).NewWriter(ctx)
if _, err := io.Copy(w, r); err != nil {
return err
}
return w.Close()
}
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.bkt.Object(name).Delete(ctx)
}
// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
return errors.Is(err, storage.ErrObjectNotExist)
}
// IsCustomerManagedKeyError returns true if the permissions for key used to encrypt the object was revoked.
func (b *Bucket) IsCustomerManagedKeyError(_ error) bool {
return false
}
func (b *Bucket) Close() error {
return b.closer.Close()
}
// NewTestBucket creates test bkt client that before returning creates temporary bucket.
// In a close function it empties and deletes the bucket.
func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
gTestConfig := Config{
Bucket: objstore.CreateTemporaryTestBucketName(t),
}
bc, err := yaml.Marshal(gTestConfig)
if err != nil {
return nil, nil, err
}
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
if err != nil {
return nil, nil, err
}
if err = b.bkt.Create(ctx, project, nil); err != nil {
_ = b.Close()
return nil, nil, err
}
t.Log("created temporary GCS bucket for GCS tests with name", b.name, "in project", project)
return b, func() {
objstore.EmptyBucket(t, ctx, b)
if err := b.bkt.Delete(ctx); err != nil {
t.Logf("deleting bucket failed: %s", err)
}
if err := b.Close(); err != nil {
t.Logf("closing bucket failed: %s", err)
}
}, nil
}