Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chunk objects when uploading via PutObject #30

Merged
merged 4 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions backupstoragelocation.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ spec:
#
# Optional.
subscriptionId: my-subscription

# The block size, in bytes, to use when uploading objects to Azure blob storage.
# See https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs#about-block-blobs
# for more information on block blobs.
#
# Optional (defaults to 104857600, i.e. 100MB).
blockSizeInBytes: "104857600"
```
1 change: 0 additions & 1 deletion changelogs/unreleased/00001-carlisia.md

This file was deleted.

1 change: 1 addition & 0 deletions changelogs/unreleased/30-skriss
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
chunk objects when uploading to object storage in order to support objects larger than 256MB
2 changes: 2 additions & 0 deletions velero-plugin-for-microsoft-azure/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package main

import (
"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
veleroplugin "github.com/vmware-tanzu/velero/pkg/plugin/framework"
)

func main() {
veleroplugin.NewServer().
BindFlags(pflag.CommandLine).
RegisterObjectStore("velero.io/azure", newAzureObjectStore).
RegisterVolumeSnapshotter("velero.io/azure", newAzureVolumeSnapshotter).
Serve()
Expand Down
89 changes: 84 additions & 5 deletions velero-plugin-for-microsoft-azure/object_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package main

import (
"context"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"

Expand All @@ -36,6 +38,11 @@ import (
const (
storageAccountConfigKey = "storageAccount"
subscriptionIdConfigKey = "subscriptionId"
blockSizeConfigKey = "blockSizeInBytes"

// blocks must be less than/equal to 100MB in size
// ref. https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#uri-parameters
defaultBlockSize = 100 * 1024 * 1024
)

type containerGetter interface {
Expand Down Expand Up @@ -94,7 +101,8 @@ func (bg *azureBlobGetter) getBlob(bucket, key string) (blob, error) {
}

type blob interface {
CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error
PutBlock(blockID string, chunk []byte, options *storage.PutBlockOptions) error
PutBlockList(blocks []storage.Block, options *storage.PutBlockListOptions) error
Exists() (bool, error)
Get(options *storage.GetBlobOptions) (io.ReadCloser, error)
Delete(options *storage.DeleteBlobOptions) error
Expand All @@ -105,8 +113,11 @@ type azureBlob struct {
blob *storage.Blob
}

func (b *azureBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
return b.blob.CreateBlockBlobFromReader(blob, options)
func (b *azureBlob) PutBlock(blockID string, chunk []byte, options *storage.PutBlockOptions) error {
return b.blob.PutBlock(blockID, chunk, options)
}
func (b *azureBlob) PutBlockList(blocks []storage.Block, options *storage.PutBlockListOptions) error {
return b.blob.PutBlockList(blocks, options)
}

func (b *azureBlob) Exists() (bool, error) {
Expand All @@ -126,9 +137,10 @@ func (b *azureBlob) GetSASURI(options *storage.BlobSASOptions) (string, error) {
}

type ObjectStore struct {
log logrus.FieldLogger
containerGetter containerGetter
blobGetter blobGetter
log logrus.FieldLogger
blockSize int
}

func newObjectStore(logger logrus.FieldLogger) *ObjectStore {
Expand Down Expand Up @@ -212,6 +224,7 @@ func (o *ObjectStore) Init(config map[string]string) error {
resourceGroupConfigKey,
storageAccountConfigKey,
subscriptionIdConfigKey,
blockSizeConfigKey,
); err != nil {
return err
}
Expand All @@ -235,16 +248,82 @@ func (o *ObjectStore) Init(config map[string]string) error {
blobService: &blobClient,
}

o.blockSize = getBlockSize(o.log, config)

return nil
}

func getBlockSize(log logrus.FieldLogger, config map[string]string) int {
val, ok := config[blockSizeConfigKey]
if !ok {
// no alternate block size specified in config, so return with the default
return defaultBlockSize
}

blockSize, err := strconv.Atoi(val)
if err != nil {
log.WithError(err).Warnf("Error parsing config.blockSizeInBytes value %v, using default block size of %d", val, defaultBlockSize)
return defaultBlockSize
}

if blockSize <= 0 || blockSize > defaultBlockSize {
log.WithError(err).Warnf("Value provided for config.blockSizeInBytes (%d) is outside the allowed range of 1 to %d, using default block size of %d", blockSize, defaultBlockSize, defaultBlockSize)
return defaultBlockSize
}

return blockSize
}

func (o *ObjectStore) PutObject(bucket, key string, body io.Reader) error {
blob, err := o.blobGetter.getBlob(bucket, key)
if err != nil {
return err
}

return errors.WithStack(blob.CreateBlockBlobFromReader(body, nil))
// Azure requires a blob/object to be chunked if it's larger than 256MB. Since we
// don't know ahead of time if the body is over this limit or not, and it would
// require reading the entire object into memory to determine the size, we use the
// chunking approach for all objects.

var (
block = make([]byte, o.blockSize)
blockIDs []storage.Block
)

for {
n, err := body.Read(block)
if n > 0 {
// blockID needs to be the same length for all blocks, so use a fixed width.
// ref. https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#uri-parameters
blockID := fmt.Sprintf("%08d", len(blockIDs))
skriss marked this conversation as resolved.
Show resolved Hide resolved

o.log.Debugf("Putting block (id=%s) of length %d", blockID, n)
if putErr := blob.PutBlock(blockID, block[0:n], nil); putErr != nil {
return errors.Wrapf(putErr, "error putting block %s", blockID)
}

blockIDs = append(blockIDs, storage.Block{
ID: blockID,
Status: storage.BlockStatusLatest,
})
}

// got an io.EOF: we're done reading chunks from the body
if err == io.EOF {
break
}
// any other error: bubble it up
if err != nil {
return errors.Wrap(err, "error reading block from body")
}
}

o.log.Debugf("Putting block list %v", blockIDs)
ashish-amarnath marked this conversation as resolved.
Show resolved Hide resolved
if err := blob.PutBlockList(blockIDs, nil); err != nil {
return errors.Wrap(err, "error putting block list")
}

return nil
}

func (o *ObjectStore) ObjectExists(bucket, key string) (bool, error) {
Expand Down
8 changes: 6 additions & 2 deletions velero-plugin-for-microsoft-azure/object_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,12 @@ type mockBlob struct {
mock.Mock
}

func (m *mockBlob) CreateBlockBlobFromReader(blob io.Reader, options *storage.PutBlobOptions) error {
args := m.Called(blob, options)
func (m *mockBlob) PutBlock(blockID string, chunk []byte, options *storage.PutBlockOptions) error {
args := m.Called(blockID, chunk, options)
return args.Error(0)
}
func (m *mockBlob) PutBlockList(blocks []storage.Block, options *storage.PutBlockListOptions) error {
args := m.Called(blocks, options)
return args.Error(0)
}

Expand Down