Skip to content

Commit

Permalink
pkg/infrastructure/azure: parallelize blob uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
jhixson74 committed Feb 22, 2024
1 parent 72a9dd4 commit 2773be1
Showing 1 changed file with 76 additions and 25 deletions.
101 changes: 76 additions & 25 deletions pkg/infrastructure/azure/storage.go
Expand Up @@ -3,6 +3,7 @@ package azure
import (
"context"
"fmt"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
Expand Down Expand Up @@ -215,40 +216,90 @@ func CreatePageBlob(ctx context.Context, in *CreatePageBlobInput) (*CreatePageBl
}

logrus.Debugf("Uploading to blob")
err = doUploadPagesFromURL(ctx, pageBlobClient, in.ImageURL, in.ImageLength)
if err != nil {
return nil, fmt.Errorf("failed to upload blob image %s: %w", in.ImageURL, err)
}
return &CreatePageBlobOutput{
PageBlobClient: pageBlobClient,
SharedKeyCredential: sharedKeyCredential,
}, nil
}

// Azure only allows 4MB chunks
func doUploadPagesFromURL(ctx context.Context, pageBlobClient *pageblob.Client, imageURL string, imageLength int64) error {
// Azure only allows 4MB chunks, See
// https://docs.microsoft.com/rest/api/storageservices/put-page-from-url
pageSize := int64(1024 * 1024 * 4)
leftOverBytes := (in.ImageLength % pageSize)
leftOverBytes := int64(imageLength % pageSize)
offset := int64(0)
pages := int64(0)
pagesLeft := pages

page := int64(1)
pages := int64(in.ImageLength)
percent := int64(1)

// XXX: parallelize this
for {
if offset+pageSize >= in.ImageLength && leftOverBytes > 0 {
pageSize = leftOverBytes
leftOverBytes = 0
} else if offset >= in.ImageLength {
break
if imageLength > pageSize {
pages = int64(imageLength / pageSize)
if imageLength%pageSize > 0 {
pages++
}
} else {
pageSize = imageLength
pages = 1
}

threadsPerGroup := int64(64)
if pages < threadsPerGroup {
threadsPerGroup = pages
}

logrus.Debugf("Uploading: [%d%%] offset=%d imageLength=%d pageSize=%d", percent, offset, in.ImageLength, pageSize)
threadGroups := pages / threadsPerGroup
if pages%threadsPerGroup > 0 {
threadGroups++
}

var wg sync.WaitGroup
var threadError error

errors := make(chan error, 1)
defer close(errors)

// We should try to transmit a few times if this fails
_, err = pageBlobClient.UploadPagesFromURL(ctx, in.ImageURL, offset, offset, pageSize, nil)
if err != nil {
return nil, fmt.Errorf("failed to upload blob image %s: wrote %d out of %d bytes: %w", in.ImageURL, offset, in.ImageLength, err)
for threadGroup := int64(0); threadGroup < threadGroups; threadGroup++ {
if pagesLeft < threadsPerGroup {
threadsPerGroup = pagesLeft
}

offset += pageSize
percent = int64(float64(float64(page)/float64(pages)) * 100)
page++
wg.Add(int(threadsPerGroup))
for thread := int64(0); thread < threadsPerGroup; thread++ {
if offset+pageSize >= imageLength && leftOverBytes > 0 {
pageSize = leftOverBytes
leftOverBytes = 0

} else if offset > imageLength {
break
}

//logrus.Debugf("ThreadGroup=%d Thread=%d", threadGroup, thread)
go func(ctx context.Context, source string, sourceOffset, destOffset, count int64) {
defer wg.Done()
_, err := pageBlobClient.UploadPagesFromURL(ctx, imageURL, sourceOffset, destOffset, count, nil)
errors <- err
}(context.Background(), imageURL, offset, offset, pageSize)

select {
case threadError = <-errors:
if threadError != nil {
goto threadExit
}
}

offset += pageSize
}
wg.Wait()
pagesLeft -= threadsPerGroup
}

return &CreatePageBlobOutput{
PageBlobClient: pageBlobClient,
SharedKeyCredential: sharedKeyCredential,
}, nil
threadExit:
if threadError != nil {
return threadError
}

return nil
}

0 comments on commit 2773be1

Please sign in to comment.