Skip to content

Commit

Permalink
Azure: parallelize blob uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
jhixson74 committed Feb 22, 2024
1 parent 72a9dd4 commit cacbe08
Showing 1 changed file with 76 additions and 25 deletions.
101 changes: 76 additions & 25 deletions pkg/infrastructure/azure/storage.go
Expand Up @@ -3,6 +3,7 @@ package azure
import (
"context"
"fmt"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/arm"
Expand Down Expand Up @@ -215,40 +216,90 @@ func CreatePageBlob(ctx context.Context, in *CreatePageBlobInput) (*CreatePageBl
}

logrus.Debugf("Uploading to blob")
err = doUploadPagesFromURL(ctx, pageBlobClient, in.ImageURL, in.ImageLength)
if err != nil {
return nil, fmt.Errorf("failed to upload blob image %s: %w", in.ImageURL, err)
}
return &CreatePageBlobOutput{
PageBlobClient: pageBlobClient,
SharedKeyCredential: sharedKeyCredential,
}, nil
}

// Azure only allows 4MB chunks
func doUploadPagesFromURL(ctx context.Context, pageBlobClient *pageblob.Client, imageURL string, imageLength int64) error {
// Azure only allows 4MB chunks, See
// https://docs.microsoft.com/rest/api/storageservices/put-page-from-url
pageSize := int64(1024 * 1024 * 4)
leftOverBytes := (in.ImageLength % pageSize)
leftOverBytes := int64(imageLength % pageSize)
offset := int64(0)
pages := int64(0)

page := int64(1)
pages := int64(in.ImageLength)
percent := int64(1)

// XXX: parallelize this
for {
if offset+pageSize >= in.ImageLength && leftOverBytes > 0 {
pageSize = leftOverBytes
leftOverBytes = 0
} else if offset >= in.ImageLength {
break
if imageLength > pageSize {
pages = int64(imageLength / pageSize)
if imageLength%pageSize > 0 {
pages++
}
} else {
pageSize = imageLength
pages = 1
}

threadsPerGroup := int64(64)
if pages < threadsPerGroup {
threadsPerGroup = pages
}

threadGroups := pages / threadsPerGroup
if pages%threadsPerGroup > 0 {
threadGroups++
}

logrus.Debugf("Uploading: [%d%%] offset=%d imageLength=%d pageSize=%d", percent, offset, in.ImageLength, pageSize)
//var mutex sync.Mutex
var wg sync.WaitGroup
var threadError error
var res error = nil

// We should try to transmit a few times if this fails
_, err = pageBlobClient.UploadPagesFromURL(ctx, in.ImageURL, offset, offset, pageSize, nil)
if err != nil {
return nil, fmt.Errorf("failed to upload blob image %s: wrote %d out of %d bytes: %w", in.ImageURL, offset, in.ImageLength, err)
pagesLeft := pages
for threadGroup := int64(0); threadGroup < threadGroups; threadGroup++ {
if pagesLeft < threadsPerGroup {
threadsPerGroup = pagesLeft
}

offset += pageSize
percent = int64(float64(float64(page)/float64(pages)) * 100)
page++
errors := make(chan error, 1)
defer close(errors)

results := make(chan int64, threadsPerGroup)
defer close(results)

for thread := int64(0); thread < threadsPerGroup; thread++ {
if offset+pageSize >= imageLength && leftOverBytes > 0 {
pageSize = leftOverBytes
leftOverBytes = 0

} else if offset > imageLength {
break
}

go func(ctx context.Context, source string, thread, sourceOffset, destOffset, count int64, wg *sync.WaitGroup) {
logrus.Debugf("threadGroup=%d thread=%d sourceOffset=%d destOffset=%d count=%d",
threadGroup, thread, sourceOffset, destOffset, count)
_, err := pageBlobClient.UploadPagesFromURL(ctx, imageURL, sourceOffset, destOffset, count, nil)
errors <- err
results <- thread
}(ctx, imageURL, thread, offset, offset, pageSize, &wg)

offset += pageSize
}
pagesLeft -= threadsPerGroup
for thread := int64(0); thread < threadsPerGroup; thread++ {
threadError = <-errors
if threadError != nil {
res = threadError
}
<-results
}
logrus.Debugf("%d out of %d pages uploaded", pages-pagesLeft, pages)
}

return &CreatePageBlobOutput{
PageBlobClient: pageBlobClient,
SharedKeyCredential: sharedKeyCredential,
}, nil
return res
}

0 comments on commit cacbe08

Please sign in to comment.