forked from Azure/azure-storage-azcopy
/
lifecyleMgr.go
702 lines (594 loc) · 23.8 KB
/
lifecyleMgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
package common
import (
"bufio"
"encoding/json"
"fmt"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
"time"
"github.com/Azure/azure-pipeline-go/pipeline"
)
// only one instance of the formatter should exist
var lcm = func() (lcmgr *lifecycleMgr) {
lcmgr = &lifecycleMgr{
msgQueue: make(chan outputMessage, 1000),
progressCache: "",
cancelChannel: make(chan os.Signal, 1),
e2eContinueChannel: make(chan struct{}),
e2eAllowOpenChannel: make(chan struct{}),
outputFormat: EOutputFormat.Text(), // output text by default
logSanitizer: NewAzCopyLogSanitizer(),
inputQueue: make(chan userInput, 1000),
allowCancelFromStdIn: false,
allowWatchInput: false,
closeFunc: func() {}, // noop since we have nothing to do by default
waitForUserResponse: make(chan bool),
msgHandlerChannel: make(chan *LCMMsg),
}
// kick off the single routine that processes output
go lcmgr.processOutputMessage()
// and process input
go lcmgr.watchInputs()
// Check if need to do CPU profiling, and do CPU profiling accordingly when azcopy life start.
lcmgr.checkAndStartCPUProfiling()
return
}()
// create a public interface so that consumers outside of this package can refer to the lifecycle manager
// but they would not be able to instantiate one
type LifecycleMgr interface {
Init(OutputBuilder) // let the user know the job has started and initial information like log location
Progress(OutputBuilder) // print on the same line over and over again, not allowed to float up
Exit(OutputBuilder, ExitCode) // indicates successful execution exit after printing, allow user to specify exit code
Info(string) // simple print, allowed to float up
Dryrun(OutputBuilder) // print files for dry run mode
Error(string) // indicates fatal error, exit after printing, exit code is always Failed (1)
Prompt(message string, details PromptDetails) ResponseOption // ask the user a question(after erasing the progress), then return the response
SurrenderControl() // give up control, this should never return
InitiateProgressReporting(WorkController) // start writing progress with another routine
AllowReinitiateProgressReporting() // allow re-initiation of progress reporting for followup job
GetEnvironmentVariable(EnvironmentVariable) string // get the environment variable or its default value
ClearEnvironmentVariable(EnvironmentVariable) // clears the environment variable
SetOutputFormat(OutputFormat) // change the output format of the entire application
EnableInputWatcher() // depending on the command, we may allow user to give input through Stdin
EnableCancelFromStdIn() // allow user to send in `cancel` to stop the job
AddUserAgentPrefix(string) string // append the global user agent prefix, if applicable
E2EAwaitContinue() // used by E2E tests
E2EAwaitAllowOpenFiles() // used by E2E tests
E2EEnableAwaitAllowOpenFiles(enable bool) // used by E2E tests
RegisterCloseFunc(func())
SetForceLogging()
IsForceLoggingDisabled() bool
DownloadToTempPath() bool
MsgHandlerChannel() <-chan *LCMMsg
ReportAllJobPartsDone()
SetOutputVerbosity(mode OutputVerbosity)
}
func GetLifecycleMgr() LifecycleMgr {
return lcm
}
// single point of control for all outputs
type lifecycleMgr struct {
msgQueue chan outputMessage
progressCache string // useful for keeping job progress on the last line
cancelChannel chan os.Signal
doneChannel chan bool
e2eContinueChannel chan struct{}
e2eAllowOpenChannel chan struct{}
waitEverCalled int32
outputFormat OutputFormat
logSanitizer pipeline.LogSanitizer
inputQueue chan userInput // msgs from the user
allowWatchInput bool // accept user inputs and place then in the inputQueue
allowCancelFromStdIn bool // allow user to send in 'cancel' from the stdin to stop the current job
e2eAllowAwaitContinue bool // allow the user to send 'continue' from stdin to start the current job
e2eAllowAwaitOpen bool // allow the user to send 'open' from stdin to allow the opening of the first file
closeFunc func() // used to close logs before exiting
disableSyslog bool
waitForUserResponse chan bool
msgHandlerChannel chan *LCMMsg
OutputVerbosityType OutputVerbosity
}
type userInput struct {
timeReceived time.Time
content string
}
// should be started in a single go routine
func (lcm *lifecycleMgr) watchInputs() {
consoleReader := bufio.NewReader(os.Stdin)
for {
// sleep for a bit, the option might be enabled later
if !lcm.allowWatchInput {
time.Sleep(time.Microsecond * 500)
continue
}
// reads input until the first occurrence of \n in the input,
input, err := consoleReader.ReadString('\n')
if err != nil {
continue
}
// remove spaces before/after the content
msg := strings.TrimSpace(input)
timeReceived := time.Now()
select {
case <-lcm.waitForUserResponse:
lcm.inputQueue <- userInput{timeReceived: timeReceived, content: msg}
continue
default:
}
var req LCMMsgReq
if lcm.allowCancelFromStdIn && strings.EqualFold(msg, "cancel") {
lcm.cancelChannel <- os.Interrupt
} else if lcm.e2eAllowAwaitContinue && strings.EqualFold(msg, "continue") {
close(lcm.e2eContinueChannel)
} else if lcm.e2eAllowAwaitOpen && strings.EqualFold(msg, "open") {
close(lcm.e2eAllowOpenChannel)
} else if err := json.Unmarshal([]byte(msg), &req); err == nil { //json string
lcm.Info(fmt.Sprintf("Received request for %s with timeStamp %s", req.MsgType, req.TimeStamp.String()))
var msgType LCMMsgType
if err := msgType.Parse(req.MsgType); err != nil {
lcm.Info(fmt.Sprintf("Discarding incorrect message: %s.", req.MsgType))
continue
}
switch msgType {
case ELCMMsgType.CancelJob():
lcm.cancelChannel <- os.Interrupt
default:
m := NewLCMMsg()
m.Req = &req
lcm.msgHandlerChannel <- m
//wait till the message is completed
<-m.respChan
lcm.Response(*m.Resp)
}
} else {
lcm.Info("Discarding incorrectly formatted input message")
}
}
}
// get the answer to a question that was asked at a certain time
// only user input after the specified time is returned to make sure that we are getting the right answer to our question
// NOTE: to ask a question, go through Prompt, to guarantee that only 1 question is asked at a time
func (lcm *lifecycleMgr) getInputAfterTime(time time.Time) string {
for {
msg := <-lcm.inputQueue
// keep reading until we find an input that came in after the user specified time
if msg.timeReceived.After(time) {
return msg.content
}
// otherwise keep waiting as it's possible that the user has not typed it in yet
}
}
func (lcm *lifecycleMgr) EnableInputWatcher() {
lcm.allowWatchInput = true
}
func (lcm *lifecycleMgr) EnableCancelFromStdIn() {
lcm.allowCancelFromStdIn = true
}
func (lcm *lifecycleMgr) ClearEnvironmentVariable(variable EnvironmentVariable) {
_ = os.Setenv(variable.Name, "")
}
func (lcm *lifecycleMgr) SetOutputFormat(format OutputFormat) {
lcm.outputFormat = format
}
func (lcm *lifecycleMgr) checkAndStartCPUProfiling() {
// CPU Profiling add-on. Set AZCOPY_PROFILE_CPU to enable CPU profiling,
// the value AZCOPY_PROFILE_CPU indicates the path to save CPU profiling data.
// e.g. export AZCOPY_PROFILE_CPU="cpu.prof"
// For more details, please refer to https://golang.org/pkg/runtime/pprof/
cpuProfilePath := lcm.GetEnvironmentVariable(EEnvironmentVariable.ProfileCPU())
if cpuProfilePath != "" {
lcm.Info(fmt.Sprintf("pprof start CPU profiling, and saving profiling data to: %q", cpuProfilePath))
f, err := os.Create(cpuProfilePath)
if err != nil {
lcm.Error(fmt.Sprintf("Fail to create file for CPU profiling, %v", err))
}
if err := pprof.StartCPUProfile(f); err != nil {
lcm.Error(fmt.Sprintf("Fail to start CPU profiling, %v", err))
}
}
}
func (lcm *lifecycleMgr) checkAndStopCPUProfiling() {
// Stop CPU profiling if there is ongoing CPU profiling.
pprof.StopCPUProfile()
}
func (lcm *lifecycleMgr) checkAndTriggerMemoryProfiling() {
// Memory Profiling add-on. Set AZCOPY_PROFILE_MEM to enable memory profiling,
// the value AZCOPY_PROFILE_MEM indicates the path to save memory profiling data.
// e.g. export AZCOPY_PROFILE_MEM="mem.prof"
// For more details, please refer to https://golang.org/pkg/runtime/pprof/
memProfilePath := lcm.GetEnvironmentVariable(EEnvironmentVariable.ProfileMemory())
if memProfilePath != "" {
lcm.Info(fmt.Sprintf("pprof start memory profiling, and saving profiling data to: %q", memProfilePath))
f, err := os.Create(memProfilePath)
if err != nil {
lcm.Error(fmt.Sprintf("Fail to create file for memory profiling, %v", err))
}
runtime.GC()
if err := pprof.WriteHeapProfile(f); err != nil {
lcm.Error(fmt.Sprintf("Fail to start memory profiling, %v", err))
}
if err := f.Close(); err != nil {
lcm.Info(fmt.Sprintf("Fail to close memory profiling file, %v", err))
}
}
}
func (lcm *lifecycleMgr) Init(o OutputBuilder) {
lcm.msgQueue <- outputMessage{
msgContent: o(lcm.outputFormat),
msgType: eOutputMessageType.Init(),
}
}
func (lcm *lifecycleMgr) Progress(o OutputBuilder) {
messageContent := ""
if o != nil {
messageContent = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: messageContent,
msgType: eOutputMessageType.Progress(),
}
}
func (lcm *lifecycleMgr) Info(msg string) {
msg = lcm.logSanitizer.SanitizeLogMessage(msg) // sometimes error-like text comes through Info, before the final "we've failed, please stop now" signal comes to Error. So we sanitize in both places.
infoMsg := fmt.Sprintf("INFO: %v", msg)
lcm.msgQueue <- outputMessage{
msgContent: infoMsg,
msgType: eOutputMessageType.Info(),
}
}
func (lcm *lifecycleMgr) Prompt(message string, details PromptDetails) ResponseOption {
expectedInputChannel := make(chan string, 1)
lcm.msgQueue <- outputMessage{
msgContent: message,
msgType: eOutputMessageType.Prompt(),
inputChannel: expectedInputChannel,
promptDetails: details,
}
// Request watchInputs() to wait for response from user
lcm.waitForUserResponse <- true
// block until input comes from the user
rawResponse := <-expectedInputChannel
// match the given response against one of the options we gave
for _, option := range details.ResponseOptions {
// in case the user misunderstood and typed full response type instead, we still tolerate it
// e.g. instead of "y", user typed "Yes"
if strings.EqualFold(option.ResponseString, rawResponse) ||
strings.EqualFold(option.UserFriendlyResponseType, rawResponse) {
return option
}
}
// nothing matched our options, assume default behavior (up to whoever that called Prompt)
// we don't re-prompt the user since this makes the integration with Stg Exp more complex
return EResponseOption.Default()
}
func (lcm *lifecycleMgr) Dryrun(o OutputBuilder) {
dryrunMessage := ""
if o != nil {
dryrunMessage = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: dryrunMessage,
msgType: eOutputMessageType.Dryrun(),
}
}
// TODO minor: consider merging with Exit
func (lcm *lifecycleMgr) Error(msg string) {
msg = lcm.logSanitizer.SanitizeLogMessage(msg)
// Check if need to do memory profiling, and do memory profiling accordingly before azcopy exits.
lcm.checkAndTriggerMemoryProfiling()
// Check if there is ongoing CPU profiling, and stop CPU profiling.
lcm.checkAndStopCPUProfiling()
lcm.msgQueue <- outputMessage{
msgContent: msg,
msgType: eOutputMessageType.Error(),
exitCode: EExitCode.Error(),
}
// stall forever until the success message is printed and program exits
lcm.SurrenderControl()
}
func (lcm *lifecycleMgr) Exit(o OutputBuilder, applicationExitCode ExitCode) {
if applicationExitCode != EExitCode.NoExit() {
// Check if need to do memory profiling, and do memory profiling accordingly before azcopy exits.
lcm.checkAndTriggerMemoryProfiling()
// Check if there is ongoing CPU profiling, and stop CPU profiling.
lcm.checkAndStopCPUProfiling()
}
messageContent := ""
if o != nil {
messageContent = o(lcm.outputFormat)
}
lcm.msgQueue <- outputMessage{
msgContent: messageContent,
msgType: eOutputMessageType.EndOfJob(),
exitCode: applicationExitCode,
}
if applicationExitCode != EExitCode.NoExit() {
// stall forever until the success message is printed and program exits
lcm.SurrenderControl()
}
}
func (lcm *lifecycleMgr) Response(resp LCMMsgResp) {
var respMsg string
if lcm.outputFormat == EOutputFormat.Json() {
m, err := json.Marshal(resp)
respMsg = string(m)
PanicIfErr(err)
} else {
respMsg = fmt.Sprintf("INFO: %v", resp.Value.String())
}
respMsg = lcm.logSanitizer.SanitizeLogMessage(respMsg)
lcm.msgQueue <- outputMessage{
msgContent: respMsg,
msgType: eOutputMessageType.Response(),
}
}
// this is used by commands that wish to stall forever to wait for the operations to complete
func (lcm *lifecycleMgr) SurrenderControl() {
// stall forever
select {}
}
func (lcm *lifecycleMgr) RegisterCloseFunc(closeFunc func()) {
lcm.closeFunc = closeFunc
}
func (lcm *lifecycleMgr) processOutputMessage() {
// this function constantly pulls out message to output
// and pass them onto the right handler based on the output format
for {
msgToPrint := <-lcm.msgQueue
if shouldQuietMessage(msgToPrint, lcm.OutputVerbosityType) {
lcm.processNoneOutput(msgToPrint)
continue
}
switch lcm.outputFormat {
case EOutputFormat.Json():
lcm.processJSONOutput(msgToPrint)
case EOutputFormat.Text():
lcm.processTextOutput(msgToPrint)
case EOutputFormat.None():
lcm.processNoneOutput(msgToPrint)
default:
panic("unimplemented output format")
}
}
}
func (lcm *lifecycleMgr) processNoneOutput(msgToOutput outputMessage) {
if msgToOutput.msgType == eOutputMessageType.Error() {
lcm.closeFunc()
os.Exit(int(EExitCode.Error()))
} else if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
}
// ignore all other outputs
return
}
func (lcm *lifecycleMgr) processJSONOutput(msgToOutput outputMessage) {
msgType := msgToOutput.msgType
questionTime := time.Now()
// simply output the json message
// we assume the msgContent is already formatted correctly
fmt.Println(GetJsonStringFromTemplate(newJsonOutputTemplate(msgType, msgToOutput.msgContent,
msgToOutput.promptDetails)))
// exit if needed
if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
} else if msgType == eOutputMessageType.Prompt() {
// read the response to the prompt and send it back through the channel
msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime)
}
}
func (lcm *lifecycleMgr) processTextOutput(msgToOutput outputMessage) {
// when a new line needs to overwrite the current line completely
// we need to make sure that if the new line is shorter, we properly erase everything from the current line
var matchLengthWithSpaces = func(curLineLength, newLineLength int) {
if dirtyLeftover := curLineLength - newLineLength; dirtyLeftover > 0 {
for i := 0; i < dirtyLeftover; i++ {
fmt.Print(" ")
}
}
}
switch msgToOutput.msgType {
case eOutputMessageType.Error(), eOutputMessageType.EndOfJob():
// simply print and quit
// if no message is intended, avoid adding new lines
if msgToOutput.msgContent != "" {
fmt.Println("\n" + msgToOutput.msgContent)
}
if msgToOutput.shouldExitProcess() {
lcm.closeFunc()
os.Exit(int(msgToOutput.exitCode))
}
case eOutputMessageType.Progress():
fmt.Print("\r") // return carriage back to start
fmt.Print(msgToOutput.msgContent) // print new progress
// it is possible that the new progress status is somehow shorter than the previous one
// in this case we must erase the left over characters from the previous progress
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
lcm.progressCache = msgToOutput.msgContent
case eOutputMessageType.Init(), eOutputMessageType.Info(), eOutputMessageType.Dryrun(), eOutputMessageType.Response():
if lcm.progressCache != "" { // a progress status is already on the last line
// print the info from the beginning on current line
fmt.Print("\r")
fmt.Print(msgToOutput.msgContent)
// it is possible that the info is shorter than the progress status
// in this case we must erase the left over characters from the progress status
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
// print the previous progress status again, so that it's on the last line
fmt.Print("\n")
fmt.Print(lcm.progressCache)
} else {
fmt.Println(msgToOutput.msgContent)
}
case eOutputMessageType.Prompt():
questionTime := time.Now()
if lcm.progressCache != "" { // a progress status is already on the last line
// print the prompt from the beginning on current line
fmt.Print("\r")
fmt.Print(msgToOutput.msgContent)
// it is possible that the prompt is shorter than the progress status
// in this case we must erase the left over characters from the progress status
matchLengthWithSpaces(len(lcm.progressCache), len(msgToOutput.msgContent))
} else {
fmt.Print(msgToOutput.msgContent)
}
// example output: Please confirm with: [Y] Yes [N] No [A] Yes for all [L] No for all
fmt.Print(" Please confirm with:")
for _, option := range msgToOutput.promptDetails.ResponseOptions {
fmt.Printf(" [%s] %s ", strings.ToUpper(option.ResponseString), option.UserFriendlyResponseType)
}
// read the response to the prompt and send it back through the channel
msgToOutput.inputChannel <- lcm.getInputAfterTime(questionTime)
}
}
// for the lifecycleMgr to babysit a job, it must be given a controller to get information about the job
type WorkController interface {
Cancel(mgr LifecycleMgr) // handle to cancel the work
ReportProgressOrExit(mgr LifecycleMgr) (totalKnownCount uint32) // print the progress status, optionally exit the application if work is done
}
// AllowReinitiateProgressReporting must be called before running an cleanup job, to allow the initiation of that job's
// progress reporting to begin
func (lcm *lifecycleMgr) AllowReinitiateProgressReporting() {
atomic.StoreInt32(&lcm.waitEverCalled, 0)
}
// isInteractive indicates whether the application was spawned by an actual user on the command
func (lcm *lifecycleMgr) InitiateProgressReporting(jc WorkController) {
if !atomic.CompareAndSwapInt32(&lcm.waitEverCalled, 0, 1) {
return
}
// this go routine never returns
// it will terminate the whole process eventually when the work is complete
go func() {
const progressFrequencyThreshold = 1000000
var oldCount, newCount uint32
wait := 2 * time.Second
lastFetchTime := time.Now().Add(-wait) // So that we start fetching time immediately
// cancelChannel will be notified when os receives os.Interrupt and os.Kill signals
signal.Notify(lcm.cancelChannel, os.Interrupt, os.Kill)
cancelCalled := false
doCancel := func() {
cancelCalled = true
lcm.Info("Cancellation requested. Beginning clean shutdown...")
jc.Cancel(lcm)
}
for {
select {
case <-lcm.cancelChannel:
doCancel()
continue // to exit on next pass through loop
case <-lcm.doneChannel:
newCount = jc.ReportProgressOrExit(lcm)
lastFetchTime = time.Now()
case <-time.After(wait):
if time.Since(lastFetchTime) >= wait {
newCount = jc.ReportProgressOrExit(lcm)
lastFetchTime = time.Now()
}
}
if newCount >= progressFrequencyThreshold && !cancelCalled {
// report less on progress - to save on the CPU costs of doing so and because, if there are this many files,
// its going to be a long job anyway, so no need to report so often
wait = 2 * time.Second
if oldCount < progressFrequencyThreshold {
lcm.Info(fmt.Sprintf("Reducing progress output frequency to %v, because there are over %d files", wait, progressFrequencyThreshold))
}
}
oldCount = newCount
}
}()
}
func (lcm *lifecycleMgr) GetEnvironmentVariable(env EnvironmentVariable) string {
value := os.Getenv(env.Name)
if value == "" {
return env.DefaultValue
}
return value
}
func (lcm *lifecycleMgr) AddUserAgentPrefix(userAgent string) string {
prefix := lcm.GetEnvironmentVariable(EEnvironmentVariable.UserAgentPrefix())
if len(prefix) > 0 {
userAgent = prefix + " " + userAgent
}
return userAgent
}
func (_ *lifecycleMgr) awaitChannel(ch chan struct{}, timeout time.Duration) {
select {
case <-ch:
case <-time.After(timeout):
}
}
// E2EAwaitContinue is used in case where a developer want's to debug AzCopy by attaching to the running process,
// before it starts doing any actual work.
func (lcm *lifecycleMgr) E2EAwaitContinue() {
lcm.e2eAllowAwaitContinue = true // not technically gorountine safe (since its shared state) but its consistent with EnableInputWatcher
lcm.EnableInputWatcher()
lcm.awaitChannel(lcm.e2eContinueChannel, time.Minute)
}
// E2EAwaitAllowOpenFiles is used in cases where we want to artificially produce a pause between enumeration and sending
// of the first file, for test purposes. (It only achieves that effect when the total file count is <= size of one job part).
// Does not pause at all, unless the feature has been enabled with a command-line flag.
func (lcm *lifecycleMgr) E2EAwaitAllowOpenFiles() {
lcm.awaitChannel(lcm.e2eAllowOpenChannel, 5*time.Minute)
}
func (lcm *lifecycleMgr) E2EEnableAwaitAllowOpenFiles(enable bool) {
if enable {
lcm.e2eAllowAwaitOpen = true // not technically gorountine safe (since its shared state) but its consistent with EnableInputWatcher
lcm.EnableInputWatcher()
} else {
close(lcm.e2eAllowOpenChannel) // so that E2EAwaitAllowOpenFiles will instantly return every time
}
}
// Fetching `AZCOPY_DISABLE_SYSLOG` from the environment variables and
// setting `disableSyslog` flag in LifeCycleManager to avoid Env Vars Lookup redundantly
func (lcm *lifecycleMgr) SetForceLogging() {
disableSyslog, err := strconv.ParseBool(lcm.GetEnvironmentVariable(EEnvironmentVariable.DisableSyslog()))
if err != nil {
// By default, we'll retain the current behaviour. i.e. To log in Syslog/WindowsEventLog if not specified by the user
disableSyslog = false
}
lcm.disableSyslog = disableSyslog
}
func (lcm *lifecycleMgr) IsForceLoggingDisabled() bool {
return lcm.disableSyslog
}
func (lcm *lifecycleMgr) DownloadToTempPath() bool {
ret, err := strconv.ParseBool(lcm.GetEnvironmentVariable(EEnvironmentVariable.DownloadToTempPath()))
if err != nil {
// By default we'll download to temp path
ret = true
}
return ret
}
func (lcm *lifecycleMgr) MsgHandlerChannel() <-chan *LCMMsg {
return lcm.msgHandlerChannel
}
func (lcm *lifecycleMgr) ReportAllJobPartsDone() {
lcm.doneChannel <- true
}
func (lcm *lifecycleMgr) SetOutputVerbosity(mode OutputVerbosity) {
lcm.OutputVerbosityType = mode
}
// captures the common logic of exiting if there's an expected error
func PanicIfErr(err error) {
if err != nil {
panic(err)
}
}
func shouldQuietMessage(msgToOutput outputMessage, quietMode OutputVerbosity) bool {
messageType := msgToOutput.msgType
switch quietMode {
case EOutputVerbosity.Default():
return false
case EOutputVerbosity.Essential():
return messageType == eOutputMessageType.Progress() || messageType == eOutputMessageType.Info() || messageType == eOutputMessageType.Prompt()
case EOutputVerbosity.Quiet():
return true
default:
return false
}
}