forked from Azure/azure-storage-azcopy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
concurrency.go
286 lines (236 loc) · 12.2 KB
/
concurrency.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Copyright Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package ste
import (
"fmt"
"log"
"runtime"
"strconv"
"github.com/nitin-deamon/azure-storage-azcopy/v10/common"
)
// ConfiguredInt is an integer which may be optionally configured by user through an environment variable
type ConfiguredInt struct {
Value int
IsUserSpecified bool
EnvVarName string
DefaultSourceDesc string
}
func (i *ConfiguredInt) GetDescription() string {
if i.IsUserSpecified {
return fmt.Sprintf("Based on %s environment variable", i.EnvVarName)
} else {
return fmt.Sprintf("Based on %s. Set %s environment variable to override", i.DefaultSourceDesc, i.EnvVarName)
}
}
// tryNewConfiguredInt populates a ConfiguredInt from an environment variable, or returns nil if env var is not set
func tryNewConfiguredInt(envVar common.EnvironmentVariable) *ConfiguredInt {
override := common.GetLifecycleMgr().GetEnvironmentVariable(envVar)
if override != "" {
val, err := strconv.ParseInt(override, 10, 64)
if err != nil {
log.Fatalf("error parsing the env %s %q failed with error %v",
envVar.Name, override, err)
}
return &ConfiguredInt{int(val), true, envVar.Name, ""}
}
return nil
}
// ConfiguredBool is a boolean which may be optionally configured by user through an environment variable
type ConfiguredBool struct {
Value bool
IsUserSpecified bool
EnvVarName string
DefaultSourceDesc string
}
func (b *ConfiguredBool) GetDescription() string {
if b.IsUserSpecified {
return fmt.Sprintf("Based on %s environment variable", b.EnvVarName)
} else {
return fmt.Sprintf("Based on %s. Set %s environment variable to true or false override", b.DefaultSourceDesc, b.EnvVarName)
}
}
// tryNewConfiguredBool populates a ConfiguredInt from an environment variable, or returns nil if env var is not set
func tryNewConfiguredBool(envVar common.EnvironmentVariable) *ConfiguredBool {
override := common.GetLifecycleMgr().GetEnvironmentVariable(envVar)
if override != "" {
val, err := strconv.ParseBool(override)
if err != nil {
log.Fatalf("error parsing the env %s %q failed with error %v",
envVar.Name, override, err)
}
return &ConfiguredBool{bool(val), true, envVar.Name, ""}
}
return nil
}
// ConcurrencySettings stores the set of related numbers that govern concurrency levels in the STE
type ConcurrencySettings struct {
// InitialMainPoolSize is the initial size of the main goroutine pool that transfers the data
// (i.e. executes chunkfuncs)
InitialMainPoolSize int
// MaxMainPoolSize is a number >= InitialMainPoolSize, representing max size we will grow the main pool to
MaxMainPoolSize *ConfiguredInt
// TransferInitiationPoolSize is the size of the auxiliary goroutine pool that initiates transfers
// (i.e. creates chunkfuncs)
TransferInitiationPoolSize *ConfiguredInt
// EnumerationPoolSize is size of auxiliary goroutine pool used in enumerators (only some of which are in fact parallelized)
EnumerationPoolSize *ConfiguredInt
// ParallelStatFiles says whether file.Stat calls should be parallelized during enumeration. May help enumeration performance
// on Linux, but is not necessary and should not be activate on Windows.
ParallelStatFiles *ConfiguredBool
// MaxIdleConnections is the max number of idle TCP connections to keep open
MaxIdleConnections int
// MaxOpenFiles is the max number of file handles that we should have open at any time
// Currently (July 2019) this is only used for downloads, which is where we wouldn't
// otherwise have strict control of the number of open files.
// For uploads, the number of open files is effectively controlled by
// TransferInitiationPoolSize, since all the file IO (except retries) happens in
// transfer initiation.
MaxOpenDownloadFiles int
// TODO: consider whether we should also use this (renamed to( MaxOpenFiles) for uploads, somehow (see command above). Is there any actual value in that? Maybe only highly handle-constrained Linux environments?
// CheckCpuWhenTuning determines whether CPU usage should be taken into account when auto-tuning
CheckCpuWhenTuning *ConfiguredBool
}
// AutoTuneMainPool says whether the main pool size should by dynamically tuned
func (c ConcurrencySettings) AutoTuneMainPool() bool {
return c.MaxMainPoolSize.Value > c.InitialMainPoolSize
}
const defaultTransferInitiationPoolSize = 64
const defaultEnumerationPoolSize = 16
const concurrentFilesFloor = 32
// NewConcurrencySettings gets concurrency settings by referring to the
// environment variable AZCOPY_CONCURRENCY_VALUE (if set) and to properties of the
// machine where we are running
func NewConcurrencySettings(maxFileAndSocketHandles int, requestAutoTuneGRs bool) ConcurrencySettings {
initialMainPoolSize, maxMainPoolSize := getMainPoolSize(runtime.NumCPU(), requestAutoTuneGRs)
s := ConcurrencySettings{
InitialMainPoolSize: initialMainPoolSize,
MaxMainPoolSize: maxMainPoolSize,
TransferInitiationPoolSize: getTransferInitiationPoolSize(),
EnumerationPoolSize: GetEnumerationPoolSize(),
ParallelStatFiles: GetParallelStatFiles(),
CheckCpuWhenTuning: getCheckCpuUsageWhenTuning(),
}
s.MaxOpenDownloadFiles = getMaxOpenPayloadFiles(maxFileAndSocketHandles,
maxMainPoolSize.Value+s.TransferInitiationPoolSize.Value+s.EnumerationPoolSize.Value)
// Set the max idle connections that we allow. If there are any more idle connections
// than this, they will be closed, and then will result in creation of new connections
// later if needed. In AzCopy, they almost always will be needed soon after, so better to
// keep them open.
// So set this number high so that that will not happen.
// (Previously, when using Dial instead of DialContext, there was an added benefit of keeping
// this value high, which was that, without it being high, all the extra dials,
// to compensate for the closures, were causing a pathological situation
// where lots and lots of OS threads get created during the creation of new connections
// (presumably due to some blocking OS call in dial) and the app hits Go's default
// limit of 10,000 OS threads, and panics and shuts down. This has been observed
// on Windows when this value was set to 500 but there were 1000 to 2000 goroutines in the
// main pool size. Using DialContext appears to mitigate that issue, so the value
// we compute here is really just to reduce unneeded make and break of connections)
s.MaxIdleConnections = maxMainPoolSize.Value
return s
}
func getMainPoolSize(numOfCPUs int, requestAutoTune bool) (initial int, max *ConfiguredInt) {
envVar := common.EEnvironmentVariable.ConcurrencyValue()
if common.GetLifecycleMgr().GetEnvironmentVariable(envVar) == "AUTO" {
// Allow user to force auto-tuning from the env var, even when not in benchmark mode
// Might be handy in some S2S cases, where we know that release 10.2.1 was using too few goroutines
// This feature will probably remain undocumented for at least one release cycle, while we consider
// whether to do more in this regard (e.g. make it the default behaviour)
requestAutoTune = true
} else if c := tryNewConfiguredInt(envVar); c != nil {
if requestAutoTune {
// Tell user that we can't actually auto tune, because configured value takes precedence
// This case happens when benchmarking with a fixed value from the env var
common.GetLifecycleMgr().Info(fmt.Sprintf("Cannot auto-tune concurrency because it is fixed by environment variable %s", envVar.Name))
}
return c.Value, c // initial and max are same, fixed to the env var
}
var initialValue int
if requestAutoTune {
initialValue = 4 // deliberately start with a small initial value if we are auto-tuning. If it's not small enough, then the auto tuning becomes
// sluggish since, every time it needs to tune downwards, it needs to let a lot of data (num connections * block size) get transmitted,
// and that is slow over very small links, e.g. 10 Mbps, and produces noticeable time lag when downsizing the connection count.
// So we start small. (The alternatives, of using small chunk sizes or small file sizes just for the first 200 MB or so, were too hard to orchestrate within the existing app architecture)
} else if numOfCPUs <= 4 {
// fix the concurrency value for smaller machines
initialValue = 32
} else if 16*numOfCPUs > 300 {
// for machines that are extremely powerful, fix to 300 (previously this was to avoid running out of file descriptors, but we have another solution to that now)
initialValue = 300
} else {
// for moderately powerful machines, compute a reasonable number
initialValue = 16 * numOfCPUs
}
reason := "number of CPUs"
maxValue := initialValue
if requestAutoTune {
reason = "auto-tuning limit"
maxValue = 3000 // TODO: what should this be? Testing indicates that this value is all we're ever likely to need, even in small-files cases
}
return initialValue, &ConfiguredInt{maxValue, false, envVar.Name, reason}
}
func getTransferInitiationPoolSize() *ConfiguredInt {
envVar := common.EEnvironmentVariable.TransferInitiationPoolSize()
if c := tryNewConfiguredInt(envVar); c != nil {
return c
}
return &ConfiguredInt{defaultTransferInitiationPoolSize, false, envVar.Name, "hard-coded default"}
}
func GetEnumerationPoolSize() *ConfiguredInt {
envVar := common.EEnvironmentVariable.EnumerationPoolSize()
if c := tryNewConfiguredInt(envVar); c != nil {
return c
}
return &ConfiguredInt{defaultEnumerationPoolSize, false, envVar.Name, "hard-coded default"}
}
func GetParallelStatFiles() *ConfiguredBool {
envVar := common.EEnvironmentVariable.ParallelStatFiles()
if c := tryNewConfiguredBool(envVar); c != nil {
return c
}
return &ConfiguredBool{false, false, envVar.Name, "hard-coded default"}
}
func getCheckCpuUsageWhenTuning() *ConfiguredBool {
envVar := common.EEnvironmentVariable.AutoTuneToCpu()
if c := tryNewConfiguredBool(envVar); c != nil {
return c
}
return &ConfiguredBool{true, false, envVar.Name, "hard-coded default"}
}
// getMaxOpenFiles finds a number of concurrently-openable files
// such that we'll have enough handles left, after using some as network handles.
// This is important on Unix, where total handles can be constrained.
func getMaxOpenPayloadFiles(maxFileAndSocketHandles int, concurrentConnections int) int {
// The value we return from this routine here only governs payload files. It does not govern plan
// files that azcopy opens as part of its own operations. So we make a reasonable allowance for
// how many of those may be opened
const fileHandleAllowanceForPlanFiles = 300 // 300 plan files = 300 * common.NumOfFilesPerDispatchJobPart = 3million in total
// make a conservative estimate of total network and file handles known so far
estimateOfKnownHandles := int(float32(concurrentConnections)*1.1) + fileHandleAllowanceForPlanFiles
// see what we've got left over for open files
concurrentFilesLimit := maxFileAndSocketHandles - estimateOfKnownHandles
// If we get a negative or ridiculously low value, bring it up to some kind of sensible floor
// (and take our chances of running out of total handles - which is effectively a bet that
// we were too conservative earlier)
if concurrentFilesLimit < concurrentFilesFloor {
concurrentFilesLimit = concurrentFilesFloor
}
return concurrentFilesLimit
}