Skip to content

Commit

Permalink
store: azure: allow passing an endpoint parameter for specific regions (
Browse files Browse the repository at this point in the history
#980)

Fix #968

Signed-off-by: Adrien Fillon <adrien.fillon@cdiscount.com>
  • Loading branch information
adrien-f authored and bwplotka committed Apr 18, 2019
1 parent 620f5be commit 54f0883
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 28 deletions.
1 change: 1 addition & 0 deletions docs/storage.md
Expand Up @@ -245,6 +245,7 @@ config:
storage_account: ""
storage_account_key: ""
container: ""
endpoint: ""
```

### OpenStack Swift Configuration
Expand Down
31 changes: 20 additions & 11 deletions pkg/objstore/azure/azure.go
Expand Up @@ -18,18 +18,15 @@ import (
)

const (
opObjectsList = "ListBucket"
opObjectInsert = "PutObject"
opObjectGet = "GetObject"
opObjectHead = "HeadObject"
opObjectDelete = "DeleteObject"
azureDefaultEndpoint = "blob.core.windows.net"
)

// Config Azure storage configuration.
type Config struct {
StorageAccountName string `yaml:"storage_account"`
StorageAccountKey string `yaml:"storage_account_key"`
ContainerName string `yaml:"container"`
Endpoint string `yaml:"endpoint"`
}

// Bucket implements the store.Bucket interface against Azure APIs.
Expand All @@ -45,6 +42,18 @@ func (conf *Config) validate() error {
conf.StorageAccountKey == "" {
return errors.New("invalid Azure storage configuration")
}
if conf.StorageAccountName == "" && conf.StorageAccountKey != "" {
return errors.New("no Azure storage_account specified while storage_account_key is present in config file; both should be present.")
}
if conf.StorageAccountName != "" && conf.StorageAccountKey == "" {
return errors.New("no Azure storage_account_key specified while storage_account is present in config file; both should be present.")
}
if conf.ContainerName == "" {
return errors.New("no Azure container specified")
}
if conf.Endpoint == "" {
conf.Endpoint = azureDefaultEndpoint
}
return nil
}

Expand All @@ -62,15 +71,15 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
}

ctx := context.Background()
container, err := createContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName)
container, err := createContainer(ctx, conf)
if err != nil {
ret, ok := err.(blob.StorageError)
if !ok {
return nil, errors.Wrapf(err, "Azure API return unexpected error: %T\n", err)
}
if ret.ServiceCode() == "ContainerAlreadyExists" {
level.Debug(logger).Log("msg", "Getting connection to existing Azure blob container", "container", conf.ContainerName)
container, err = getContainer(ctx, conf.StorageAccountName, conf.StorageAccountKey, conf.ContainerName)
container, err = getContainer(ctx, conf)
if err != nil {
return nil, errors.Wrapf(err, "cannot get existing Azure blob container: %s", container)
}
Expand Down Expand Up @@ -166,7 +175,7 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, offset, length
return nil, errors.New("X-Ms-Error-Code: [BlobNotFound]")
}

blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return nil, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand Down Expand Up @@ -211,7 +220,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
level.Debug(b.logger).Log("msg", "check if blob exists", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return false, errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand All @@ -229,7 +238,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
level.Debug(b.logger).Log("msg", "Uploading blob", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand All @@ -247,7 +256,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
level.Debug(b.logger).Log("msg", "Deleting blob", "blob", name)
blobURL, err := getBlobURL(ctx, b.config.StorageAccountName, b.config.StorageAccountKey, b.config.ContainerName, name)
blobURL, err := getBlobURL(ctx, *b.config, name)
if err != nil {
return errors.Wrapf(err, "cannot get Azure blob URL, address: %s", name)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/objstore/azure/azure_test.go
@@ -0,0 +1,87 @@
package azure

import (
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
)

func TestConfig_validate(t *testing.T) {
type fields struct {
StorageAccountName string
StorageAccountKey string
ContainerName string
Endpoint string
}
tests := []struct {
name string
fields fields
wantErr bool
wantEndpoint string
}{
{
name: "valid global configuration",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "roo",
},
wantErr: false,
wantEndpoint: azureDefaultEndpoint,
},
{
name: "valid custom endpoint",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "roo",
Endpoint: "blob.core.chinacloudapi.cn",
},
wantErr: false,
wantEndpoint: "blob.core.chinacloudapi.cn",
},
{
name: "no account key but account name",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "",
ContainerName: "roo",
},
wantErr: true,
},
{
name: "no account name but account key",
fields: fields{
StorageAccountName: "",
StorageAccountKey: "bar",
ContainerName: "roo",
},
wantErr: true,
},
{
name: "no container name",
fields: fields{
StorageAccountName: "foo",
StorageAccountKey: "bar",
ContainerName: "",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := &Config{
StorageAccountName: tt.fields.StorageAccountName,
StorageAccountKey: tt.fields.StorageAccountKey,
ContainerName: tt.fields.ContainerName,
Endpoint: tt.fields.Endpoint,
}
err := conf.validate()
if (err != nil) != tt.wantErr {
t.Errorf("Config.validate() error = %v, wantErr %v", err, tt.wantErr)
} else {
testutil.Equals(t, tt.wantEndpoint, conf.Endpoint)
}
})
}
}
39 changes: 22 additions & 17 deletions pkg/objstore/azure/helpers.go
Expand Up @@ -5,36 +5,42 @@ import (
"fmt"
"net/url"
"regexp"
"time"

blob "github.com/Azure/azure-storage-blob-go/azblob"
)

var (
blobFormatString = `https://%s.blob.core.windows.net`
)

// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerURL(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(accountName, accountKey)
var errorCodeRegex = regexp.MustCompile(`X-Ms-Error-Code:\D*\[(\w+)\]`)

func getContainerURL(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := blob.NewSharedKeyCredential(conf.StorageAccountName, conf.StorageAccountKey)
if err != nil {
return blob.ContainerURL{}, err
}

retryOptions := blob.RetryOptions{}
if deadline, ok := ctx.Deadline(); ok {
retryOptions.TryTimeout = deadline.Sub(time.Now())
}

p := blob.NewPipeline(c, blob.PipelineOptions{
Retry: retryOptions,
Telemetry: blob.TelemetryOptions{Value: "Thanos"},
})
u, err := url.Parse(fmt.Sprintf(blobFormatString, accountName))
u, err := url.Parse(fmt.Sprintf("https://%s.%s", conf.StorageAccountName, conf.Endpoint))
if err != nil {
return blob.ContainerURL{}, err
}
service := blob.NewServiceURL(*u, p)

return service.NewContainerURL(containerName), nil
return service.NewContainerURL(conf.ContainerName), nil
}

func getContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func getContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.ContainerURL{}, err
}
Expand All @@ -43,29 +49,28 @@ func getContainer(ctx context.Context, accountName, accountKey, containerName st
return c, err
}

func createContainer(ctx context.Context, accountName, accountKey, containerName string) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func createContainer(ctx context.Context, conf Config) (blob.ContainerURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.ContainerURL{}, err
}
_, err = c.Create(
context.Background(),
ctx,
blob.Metadata{},
blob.PublicAccessNone)
return c, err
}

func getBlobURL(ctx context.Context, accountName, accountKey, containerName, blobName string) (blob.BlockBlobURL, error) {
c, err := getContainerURL(ctx, accountName, accountKey, containerName)
func getBlobURL(ctx context.Context, conf Config, blobName string) (blob.BlockBlobURL, error) {
c, err := getContainerURL(ctx, conf)
if err != nil {
return blob.BlockBlobURL{}, err
}
return c.NewBlockBlobURL(blobName), nil
}

func parseError(errorCode string) string {
re, _ := regexp.Compile(`X-Ms-Error-Code:\D*\[(\w+)\]`)
match := re.FindStringSubmatch(errorCode)
match := errorCodeRegex.FindStringSubmatch(errorCode)
if match != nil && len(match) == 2 {
return match[1]
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/objstore/azure/helpers_test.go
@@ -0,0 +1,58 @@
package azure

import (
"context"
"testing"

"github.com/improbable-eng/thanos/pkg/testutil"
)

func Test_getContainerURL(t *testing.T) {
type args struct {
conf Config
}
tests := []struct {
name string
args args
want string
wantErr bool
}{
{
name: "default",
args: args{
conf: Config{
StorageAccountName: "foo",
StorageAccountKey: "Zm9vCg==",
ContainerName: "roo",
Endpoint: azureDefaultEndpoint,
},
},
want: "https://foo.blob.core.windows.net/roo",
wantErr: false,
},
{
name: "azure china",
args: args{
conf: Config{
StorageAccountName: "foo",
StorageAccountKey: "Zm9vCg==",
ContainerName: "roo",
Endpoint: "blob.core.chinacloudapi.cn",
},
},
want: "https://foo.blob.core.chinacloudapi.cn/roo",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
got, err := getContainerURL(ctx, tt.args.conf)
if (err != nil) != tt.wantErr {
t.Errorf("getContainerURL() error = %v, wantErr %v", err, tt.wantErr)
return
}
testutil.Equals(t, tt.want, got.String())
})
}
}

0 comments on commit 54f0883

Please sign in to comment.