Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: azure: allow passing an endpoint parameter for specific regions #980

Merged
merged 2 commits into from Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/storage.md
Expand Up @@ -245,6 +245,7 @@ config:
storage_account: ""
storage_account_key: ""
container: ""
endpoint: ""
```

### OpenStack Swift Configuration
Expand Down
31 changes: 20 additions & 11 deletions pkg/objstore/azure/azure.go
Expand Up @@ -18,18 +18,15 @@ import (
)

const (
opObjectsList = "ListBucket"
opObjectInsert = "PutObject"
opObjectGet = "GetObject"
opObjectHead = "HeadObject"
opObjectDelete = "DeleteObject"
azureDefaultEndpoint = "blob.core.windows.net"
)

// Config Azure storage configuration.
type Config struct {
StorageAccountName string `yaml:"storage_account"`
StorageAccountKey string `yaml:"storage_account_key"`
ContainerName string `yaml:"container"`
Endpoint string `yaml:"endpoint"`
}

// Bucket implements the store.Bucket interface against Azure APIs.
Expand All @@ -45,6 +42,18 @@ func (conf *Config) validate() error {
conf.StorageAccountKey == "" {
return errors.New("invalid Azure storage configuration")
}
if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
return errors.New("no Azure storage_account specified while storage_account_key is present in config file; both should be present.")
}
if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
return errors.New("no Azure storage_account_key specified while storage_account is present in config file; both should be present.")
}
if conf.ContainerName == "" {
return errors.New("no Azure container specified")
}
if conf.Endpoint == "" {
conf.Endpoint = azureDefaultEndpoint
}
return nil
}

Expand All @@ -62,15 +71,15 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
}

ctx := context.Background()
container, err := createContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName)
container, err := createContainer(ctx, conf)
if err != nil {
ret, ok := err.(blob.StorageError)
if !ok {
return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
}
if ret.ServiceCode() == "ContainerAlreadyExists" {
level.Debug(logger).Log("msg", "Getting connection to existing Azure blob container", "container", conf.ContainerName)
container, err = getContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName)
container, err = getContainer(ctx, conf)
if err != nil {
return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
}
Expand Down Expand Up @@ -166,7 +175,7 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length
return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
}

blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand Down Expand Up @@ -211,7 +220,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return false, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand All @@ -229,7 +238,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
level.Debug(b.logger).Log("msg", "Uploading blob", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand All @@ -247,7 +256,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
level.Debug(b.logger).Log("msg", "Deleting blob", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/objstore/azure/azure_test.go
@@ -0,0 +1,87 @@
package azure

import (
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
)

func TestConfig_validate(t *testing.T) {
type fields struct {
StorageAccountName string
StorageAccountKey string
ContainerName string
Endpoint string
}
tests := []struct {
name string
fields fields
wantErr bool
wantEndpoint string
}{
{
name: "valid global configuration",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "roo",
},
wantErr: false,
wantEndpoint: azureDefaultEndpoint,
},
{
name: "valid custom endpoint",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "roo",
Endpoint: "blob.core.chinacloudapi.cn",
},
wantErr: false,
wantEndpoint: "blob.core.chinacloudapi.cn",
},
{
name: "no account key but account name",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "",
ContainerName: "roo",
},
wantErr: true,
},
{
name: "no account name but account key",
fields: fields{
StorageAccountName: "",
StorageAccountKey: "bar",
ContainerName: "roo",
},
wantErr: true,
},
{
name: "no container name",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := &Config{
StorageAccountName: tt.fields.StorageAccountName,
StorageAccountKey: tt.fields.StorageAccountKey,
ContainerName: tt.fields.ContainerName,
Endpoint: tt.fields.Endpoint,
}
err := conf.validate()
if (err != nil) != tt.wantErr {
t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr)
} else {
testutil.Equals(t, tt.wantEndpoint, conf.Endpoint)
}
})
}
}
39 changes: 22 additions & 17 deletions pkg/objstore/azure/helpers.go
Expand Up @@ -5,36 +5,42 @@ import (
"fmt"
"net/url"
"regexp"
"time"

blob "github.com/Azure/azure-storage-blob-go/azblob"
)

var (
blobFormatString = `https://%s.blob.core.windows.net`
)

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerURL(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(accountName, accountKey)
var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)

func getContainerURL(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
if err != nil {
return blob.ContainerURL{}, err
}

retryOptions := blob.RetryOptions{}
if deadline, ok := ctx.Deadline(); ok {
retryOptions.TryTimeout = deadline.Sub(time.Now())
}

p := blob.NewPipeline(c, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Thanos"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be another PR backed by an issue ? Not sure what values we should put in there. You already have defaults here: https://github.com/Azure/azure-storage-blob-go/blob/HEAD/azblob/zc_policy_retry.go#L70

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really as you removed context args everywhere.

ctx.Deadline() gives you time.Time & ok flag

You could just put that into you could put that into time.Now()- deadline into TryTimeout and IMO this would be good enough,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@povilasv Done ! Thanks for the feedback 😄 I learned something.

})
u, err := url.Parse(fmt.Sprintf(blobFormatString, accountName))
u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
if err != nil {
return blob.ContainerURL{}, err
}
service := blob.NewServiceURL(*u, p)

return service.NewContainerURL(containerName), nil
return service.NewContainerURL(conf.ContainerName), nil
}

func getContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func getContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.ContainerURL{}, err
}
Expand All @@ -43,29 +49,28 @@ func getContainer(ctx context.Context, accountName, accountKey, containerName st
return c, err
}

func createContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func createContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.ContainerURL{}, err
}
_, err = c.Create(
context.Background(),
ctx,
blob.Metadata{},
blob.PublicAccessNone)
return c, err
}

func getBlobURL(ctx context.Context, accountName, accountKey, containerName, blobName string) (blob.BlockBlobURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func getBlobURL(ctx context.Context, conf Config, blobName string) (blob.BlockBlobURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.BlockBlobURL{}, err
}
return c.NewBlockBlobURL(blobName), nil
}

func parseError(errorCode string) string {
re, _ := regexp.Compile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
match := re.FindStringSubmatch(errorCode)
match := errorCodeRegex.FindStringSubmatch(errorCode)
if match != nil && len(match) == 2 {
return match[1]
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/objstore/azure/helpers_test.go
@@ -0,0 +1,58 @@
package azure

import (
"context"
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
)

func Test_getContainerURL(t *testing.T) {
type args struct {
conf Config
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "default",
args: args{
conf: Config{
StorageAccountName: "foo",
StorageAccountKey: "Zm9vCg==",
ContainerName: "roo",
Endpoint: azureDefaultEndpoint,
},
},
want: "https://foo.blob.core.windows.net/roo",
wantErr: false,
},
{
name: "azure china",
args: args{
conf: Config{
StorageAccountName: "foo",
StorageAccountKey: "Zm9vCg==",
ContainerName: "roo",
Endpoint: "blob.core.chinacloudapi.cn",
},
},
want: "https://foo.blob.core.chinacloudapi.cn/roo",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
got, err := getContainerURL(ctx, tt.args.conf)
if (err != nil) != tt.wantErr {
t.Errorf("getContainerURL() error = %v, wantErr %v", err, tt.wantErr)
return
}
testutil.Equals(t, tt.want, got.String())
})
}
}