forked from Azure/azure-storage-azcopy
/
sender-pageBlobFromURL.go
210 lines (179 loc) · 8.56 KB
/
sender-pageBlobFromURL.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"context"
"net/url"
"strings"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/nitin-deamon/azure-storage-azcopy/v10/common"
)
type urlToPageBlobCopier struct {
pageBlobSenderBase
srcURL url.URL
sourcePageRangeOptimizer *pageRangeOptimizer // nil if src is not a page blob
}
func newURLToPageBlobCopier(jptm IJobPartTransferMgr, destination string, p pipeline.Pipeline, pacer pacer, srcInfoProvider IRemoteSourceInfoProvider) (s2sCopier, error) {
srcURL, err := srcInfoProvider.PreSignedSourceURL()
if err != nil {
return nil, err
}
destBlobTier := azblob.AccessTierNone
var pageRangeOptimizer *pageRangeOptimizer
if blobSrcInfoProvider, ok := srcInfoProvider.(IBlobSourceInfoProvider); ok {
if blobSrcInfoProvider.BlobType() == azblob.BlobPageBlob {
// if the source is page blob, preserve source's blob tier.
destBlobTier = blobSrcInfoProvider.BlobTier()
// capture the necessary info so that we can perform optimizations later
pageRangeOptimizer = newPageRangeOptimizer(azblob.NewPageBlobURL(*srcURL, p),
context.WithValue(jptm.Context(), ServiceAPIVersionOverride, azblob.ServiceVersion))
}
}
senderBase, err := newPageBlobSenderBase(jptm, destination, p, pacer, srcInfoProvider, destBlobTier)
if err != nil {
return nil, err
}
return &urlToPageBlobCopier{
pageBlobSenderBase: *senderBase,
srcURL: *srcURL,
sourcePageRangeOptimizer: pageRangeOptimizer}, nil
}
func (c *urlToPageBlobCopier) Prologue(ps common.PrologueState) (destinationModified bool) {
destinationModified = c.pageBlobSenderBase.Prologue(ps)
if c.sourcePageRangeOptimizer != nil {
c.sourcePageRangeOptimizer.fetchPages()
}
return
}
// Returns a chunk-func for blob copies
func (c *urlToPageBlobCopier) GenerateCopyFunc(id common.ChunkID, blockIndex int32, adjustedChunkSize int64, chunkIsWholeFile bool) chunkFunc {
return createSendToRemoteChunkFunc(c.jptm, id, func() {
if c.jptm.Info().SourceSize == 0 {
// nothing to do, since this is a dummy chunk in a zero-size file, and the prologue will have done all the real work
return
}
// if there's no data at the source (and the destination for managed disks), skip this chunk
pageRange := azblob.PageRange{Start: id.OffsetInFile(), End: id.OffsetInFile() + adjustedChunkSize - 1}
if c.sourcePageRangeOptimizer != nil && !c.sourcePageRangeOptimizer.doesRangeContainData(pageRange) {
var destContainsData bool
if c.destPageRangeOptimizer != nil {
destContainsData = c.destPageRangeOptimizer.doesRangeContainData(pageRange)
}
if !destContainsData {
return
}
}
// control rate of sending (since page blobs can effectively have per-blob throughput limits)
// Note that this level of control here is specific to the individual page blob, and is additional
// to the application-wide pacing that we do with c.pacer
c.jptm.LogChunkStatus(id, common.EWaitReason.FilePacer())
if err := c.filePacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil {
c.jptm.FailActiveUpload("Pacing block (file level)", err)
}
// set the latest service version from sdk as service version in the context, to use UploadPagesFromURL API.
// AND enrich the context for 503 (ServerBusy) detection
enrichedContext := withRetryNotification(
context.WithValue(c.jptm.Context(), ServiceAPIVersionOverride, azblob.ServiceVersion),
c.filePacer)
// upload the page (including application of global pacing. We don't have a separate wait reason for global pacing
// so just do it inside the S2SCopyOnWire state)
c.jptm.LogChunkStatus(id, common.EWaitReason.S2SCopyOnWire())
if err := c.pacer.RequestTrafficAllocation(c.jptm.Context(), adjustedChunkSize); err != nil {
c.jptm.FailActiveUpload("Pacing block (global level)", err)
}
_, err := c.destPageBlobURL.UploadPagesFromURL(
enrichedContext, c.srcURL, id.OffsetInFile(), id.OffsetInFile(), adjustedChunkSize, nil,
azblob.PageBlobAccessConditions{}, azblob.ModifiedAccessConditions{}, c.cpkToApply)
if err != nil {
c.jptm.FailActiveS2SCopy("Uploading page from URL", err)
return
}
})
}
// GetDestinationLength gets the destination length.
func (c *urlToPageBlobCopier) GetDestinationLength() (int64, error) {
properties, err := c.destPageBlobURL.GetProperties(c.jptm.Context(), azblob.BlobAccessConditions{}, c.cpkToApply)
if err != nil {
return -1, err
}
return properties.ContentLength(), nil
}
// isolate the logic to fetch page ranges for a page blob, and check whether a given range has data
// for two purposes:
// 1. capture the necessary info to do so, so that fetchPages can be invoked anywhere
// 2. open to extending the logic, which could be re-used for both download and s2s scenarios
type pageRangeOptimizer struct {
srcPageBlobURL azblob.PageBlobURL
ctx context.Context
srcPageList *azblob.PageList // nil if src is not a page blob, or it was not possible to get a response
}
func newPageRangeOptimizer(srcPageBlobURL azblob.PageBlobURL, ctx context.Context) *pageRangeOptimizer {
return &pageRangeOptimizer{srcPageBlobURL: srcPageBlobURL, ctx: ctx}
}
func (p *pageRangeOptimizer) fetchPages() {
// don't fetch page blob list if optimizations are not desired,
// the lack of page list indicates that there's data everywhere
if !strings.EqualFold(common.GetLifecycleMgr().GetEnvironmentVariable(
common.EEnvironmentVariable.OptimizeSparsePageBlobTransfers()), "true") {
return
}
// according to the REST API documentation:
// in a highly fragmented page blob with a large number of writes,
// a Get Page Ranges request can fail due to an internal server timeout.
// thus, if the page blob is not sparse, it's ok for it to fail
// TODO follow up with the service folks to confirm the scale at which the timeouts occur
// TODO perhaps we need to add more logic here to optimize for more cases
limitedContext := withNoRetryForBlob(p.ctx) // we don't want retries here. If it doesn't work the first time, we don't want to chew up (lots) time retrying
pageList, err := p.srcPageBlobURL.GetPageRanges(limitedContext, 0, 0, azblob.BlobAccessConditions{})
if err == nil {
p.srcPageList = pageList
}
}
// check whether a particular given range is worth transferring, i.e. whether there's data at the source
func (p *pageRangeOptimizer) doesRangeContainData(givenRange azblob.PageRange) bool {
// if we have no page list stored, then assume there's data everywhere
// (this is particularly important when we are using this code not just for performance, but also
// for correctness - as we do when using on the destination of a managed disk upload)
if p.srcPageList == nil {
return true
}
// note that the page list is ordered in increasing order (in terms of position)
for _, srcRange := range p.srcPageList.PageRange {
if givenRange.End < srcRange.Start {
// case 1: due to the nature of the list (it's sorted), if we've reached such a srcRange
// we've checked all the appropriate srcRange already and haven't found any overlapping srcRange
// given range: | |
// source range: | |
return false
} else if srcRange.End < givenRange.Start {
// case 2: the givenRange comes after srcRange, continue checking
// given range: | |
// source range: | |
continue
} else {
// case 3: srcRange and givenRange overlap somehow
// we don't particularly care how it overlaps
return true
}
}
// went through all srcRanges, but nothing overlapped
return false
}