-
Notifications
You must be signed in to change notification settings - Fork 0
/
loadtest.go
1206 lines (1001 loc) · 30.3 KB
/
loadtest.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package loadtester
import (
"context"
"encoding/csv"
"errors"
"fmt"
"os"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
const (
skipInterTaskSchedulingThreshold = 20 * time.Millisecond
)
var (
ErrBadReadTasksImpl = errors.New("bad ReadTasks implementation: returned a value less than zero or larger than the input slice length")
ErrRetriesFailedToFlush = errors.New("failed to flush all retries")
errLoadtestContextDone = errors.New("loadtest parent context errored")
)
// TODO: RetriesDisabled runtime checks can turn into init time checks; same with MaxTasks based checks
// I would not dream of doing this before proving it is warranted first.
// TaskProvider describes how to read tasks into a
// loadtest and how to control a loadtest's configuration
// over time
type TaskProvider interface {
// ReadTasks fills the provided slice up to slice length starting at index 0 and returns how many records have been inserted
//
// Failing to fill the whole slice will signal the end of the loadtest.
//
// Note in general you should not use this behavior to signal loadtests to stop
// if your loadtest needs to be time-bound. For that case you should signal a stop
// via the context. This stop on failure to fill behavior only exists for cases
// where the author wants to exhaustively run a set of tasks and not bound the
// loadtest to a timespan but rather completeness of the tasks space.
//
// Note that if you have only partially filled the slice, those filled task slots
// will still be run before termination of the loadtest.
ReadTasks([]Doer) int
// UpdateConfigChan should return the same channel each time or nil;
// but once nil it must never be non-nil again
UpdateConfigChan() <-chan ConfigUpdate
}
type taskResultFlags struct {
Passed uint8
Panicked uint8
RetryQueued uint8
Errored uint8
}
func (trf taskResultFlags) isZero() bool {
return trf == taskResultFlags{}
}
type taskResult struct {
taskResultFlags
QueuedDuration, TaskDuration time.Duration
Meta taskMeta
}
type Loadtest struct {
taskProvider TaskProvider
maxTasks int
maxWorkers int
numWorkers int
workers []chan struct{}
workerWaitGroup sync.WaitGroup
resultWaitGroup sync.WaitGroup
taskChan chan taskWithMeta
resultsChan chan taskResult
// intervalTasksSema will always have capacity set to the task interval rate + worker count
// but it is created with the maximum weight to allow up to max task interval rate + max
// worker count configurations ( accounts for work in progress and work queued )
//
// This addition prevents enqueuing more than beyond the desired tasks per interval
// and creates negative pressure on the task enqueue routine since the max channel sizes
// are static and bound to the maximums we could reach in some pacer implementation.
//
// Meaning we could send far more than we intended for the wall-clock interval and it
// could cause severe bursts in load without this protection.
//
// As a result this does slow down each task runner, but only slightly as it's just acquiring
// a lock, doing some simple math, and then unlocking as the task worker calls release on
// the semaphore. It's worth it to me.
intervalTasksSema *semaphore.Weighted
numIntervalTasks int
maxIntervalTasks int
interval time.Duration
retryTaskChan chan *retryTask
retryTaskPool sync.Pool
startTime time.Time
csvData csvData
flushRetriesOnShutdown bool
flushRetriesTimeout time.Duration
retriesDisabled bool
logger SugaredLogger
}
func NewLoadtest(taskProvider TaskProvider, options ...LoadtestOption) (*Loadtest, error) {
cfg, err := newLoadtestConfig(options...)
if err != nil {
return nil, err
}
// lag results are reported per interval through the same channel as results
// so it's important to account for them per interval when constructing max
// config buffers
const intervalPossibleLagResultCount = 1
resultsChanSize := (cfg.maxIntervalTasks + intervalPossibleLagResultCount) * cfg.outputBufferingFactor
var csvWriteErr error
if cfg.csvOutputDisabled {
csvWriteErr = errCsvWriterDisabled
}
var retryTaskChan chan *retryTask
var sm *semaphore.Weighted
{
maxNumInProgressOrQueuedTasks := maxPendingTasks(cfg.maxWorkers, cfg.maxIntervalTasks)
sm = semaphore.NewWeighted(int64(maxNumInProgressOrQueuedTasks))
if !sm.TryAcquire(int64(maxNumInProgressOrQueuedTasks)) {
return nil, errors.New("failed to initialize load generation semaphore")
}
if !cfg.retriesDisabled {
retryTaskChan = make(chan *retryTask, maxNumInProgressOrQueuedTasks)
}
}
return &Loadtest{
taskProvider: taskProvider,
maxTasks: cfg.maxTasks,
maxWorkers: cfg.maxWorkers,
numWorkers: cfg.numWorkers,
workers: make([]chan struct{}, 0, cfg.maxWorkers),
taskChan: make(chan taskWithMeta, cfg.maxIntervalTasks),
resultsChan: make(chan taskResult, resultsChanSize),
retryTaskChan: retryTaskChan,
maxIntervalTasks: cfg.maxIntervalTasks,
numIntervalTasks: cfg.numIntervalTasks,
interval: cfg.interval,
retryTaskPool: sync.Pool{
New: func() interface{} {
return &retryTask{}
},
},
csvData: csvData{
outputFilename: cfg.csvOutputFilename,
flushInterval: cfg.csvOutputFlushInterval,
writeErr: csvWriteErr,
},
flushRetriesTimeout: cfg.flushRetriesTimeout,
flushRetriesOnShutdown: cfg.flushRetriesOnShutdown,
retriesDisabled: cfg.retriesDisabled,
logger: cfg.logger,
intervalTasksSema: sm,
}, nil
}
func (lt *Loadtest) addWorker(ctx context.Context, workerID int) {
pauseChan := make(chan struct{}, 2)
lt.workerWaitGroup.Add(1)
go func() {
defer lt.workerWaitGroup.Done()
lt.workerLoop(ctx, workerID, pauseChan)
}()
lt.workers = append(lt.workers, pauseChan)
}
func (lt *Loadtest) workerLoop(ctx context.Context, workerID int, pauseChan <-chan struct{}) {
for {
var task taskWithMeta
// duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select
// that follows
//
// ref: https://go.dev/ref/spec#Select_statements
select {
case _, ok := <-pauseChan:
if !ok {
// all work is done
return
}
// paused
_, ok = <-pauseChan
if !ok {
// all work is done
return
}
default:
}
select {
case _, ok := <-pauseChan:
if !ok {
// all work is done
return
}
// paused
_, ok = <-pauseChan
if !ok {
// all work is done
return
}
continue
case task = <-lt.taskChan:
}
taskStart := time.Now()
respFlags := lt.doTask(ctx, workerID, task.doer)
taskEnd := time.Now()
lt.resultsChan <- taskResult{
taskResultFlags: respFlags,
QueuedDuration: taskStart.Sub(task.enqueueTime),
TaskDuration: taskEnd.Sub(taskStart),
Meta: task.meta,
}
lt.intervalTasksSema.Release(1)
}
}
func (lt *Loadtest) doTask(ctx context.Context, workerID int, task Doer) (result taskResultFlags) {
var err_resp error
// phase is the name of the step which has possibly caused a panic
phase := "do"
var rt *retryTask
if !lt.retriesDisabled {
if v, ok := task.(*retryTask); ok {
rt = v
phase = "retry"
defer func() {
*rt = retryTask{}
lt.retryTaskPool.Put(v)
}()
}
}
defer func() {
if err_resp != nil {
lt.logger.Warnw(
"task error",
"worker_id", workerID,
"error", err_resp,
)
}
if r := recover(); r != nil {
result.Panicked = 1
result.Errored = 1
switch v := r.(type) {
case error:
lt.logger.Errorw(
"worker recovered from panic",
"worker_id", workerID,
"phase", phase,
"error", v,
)
case []byte:
lt.logger.Errorw(
"worker recovered from panic",
"worker_id", workerID,
"phase", phase,
"error", string(v),
)
case string:
lt.logger.Errorw(
"worker recovered from panic",
"worker_id", workerID,
"phase", phase,
"error", v,
)
default:
const msg = "unknown cause"
lt.logger.Errorw(
"worker recovered from panic",
"worker_id", workerID,
"phase", phase,
"error", msg,
)
}
}
}()
err := task.Do(ctx, workerID)
phase = "" // done, no panic occurred
if err == nil {
result.Passed = 1
return
}
err_resp = err
result.Errored = 1
if lt.retriesDisabled {
return
}
var dr DoRetryer
if rt != nil {
dr = rt.DoRetryer
} else if v, ok := task.(DoRetryer); ok {
dr = v
} else {
return
}
phase = "can-retry"
if v, ok := dr.(DoRetryChecker); ok && !v.CanRetry(ctx, workerID, err) {
phase = "" // done, no panic occurred
return
}
phase = "" // done, no panic occurred
lt.retryTaskChan <- lt.newRetryTask(dr, err)
result.RetryQueued = 1
return
}
func (lt *Loadtest) newRetryTask(task DoRetryer, err error) *retryTask {
result := lt.retryTaskPool.Get().(*retryTask)
*result = retryTask{task, err}
return result
}
func (lt *Loadtest) readRetries(p []Doer) int {
// make sure you only fill up to len
var i int
for i < len(p) {
select {
case task := <-lt.retryTaskChan:
p[i] = task
default:
return i
}
i++
}
return i
}
func (lt *Loadtest) getLoadtestConfigAsJson() interface{} {
type Config struct {
StartTime string `json:"start_time"`
Interval string `json:"interval"`
MaxIntervalTasks int `json:"max_interval_tasks"`
MaxTasks int `json:"max_tasks"`
MaxWorkers int `json:"max_workers"`
NumIntervalTasks int `json:"num_interval_tasks"`
NumWorkers int `json:"num_workers"`
MetricsFlushInterval string `json:"metrics_flush_interval"`
FlushRetriesOnShutdown bool `json:"flush_retries_on_shutdown"`
FlushRetriesTimeout string `json:"flush_retries_timeout"`
RetriesDisabled bool `json:"retries_disabled"`
}
return Config{
StartTime: timeToString(lt.startTime),
Interval: lt.interval.String(),
MaxIntervalTasks: lt.maxIntervalTasks,
MaxTasks: lt.maxTasks,
MaxWorkers: lt.maxWorkers,
NumIntervalTasks: lt.numIntervalTasks,
NumWorkers: lt.numWorkers,
MetricsFlushInterval: lt.csvData.flushInterval.String(),
FlushRetriesOnShutdown: lt.flushRetriesOnShutdown,
FlushRetriesTimeout: lt.flushRetriesTimeout.String(),
RetriesDisabled: lt.retriesDisabled,
}
}
func (lt *Loadtest) Run(ctx context.Context) (err_result error) {
// all that should ever be present in this function is logic to aggregate async errors
// into one error response
//
// and then a call to the internal lt.run(ctx, &cleanupErr) func
// this defer ensures that any async csv writing error bubbles up to the err_result
// if it would be nil
var shutdownErr error
defer func() {
if err_result != nil {
return
}
if shutdownErr != nil {
err_result = shutdownErr
return
}
if err := lt.csvData.writeErr; err != nil && err != errCsvWriterDisabled {
err_result = err
}
}()
return lt.run(ctx, &shutdownErr)
}
func (lt *Loadtest) run(ctx context.Context, shutdownErrResp *error) error {
lt.startTime = time.Now()
cd := <.csvData
if cd.writeErr == nil {
csvFile, err := os.Create(cd.outputFilename)
if err != nil {
return fmt.Errorf("failed to open output csv metrics file for writing: %w", err)
}
defer lt.writeOutputCsvFooterAndClose(csvFile)
cd.writeErr = lt.writeOutputCsvConfigComment(csvFile)
if cd.writeErr == nil {
cd.writer = csv.NewWriter(csvFile)
cd.writeErr = lt.writeOutputCsvHeaders()
}
}
lt.logger.Infow(
"starting loadtest",
"config", lt.getLoadtestConfigAsJson(),
)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
lt.resultsHandler()
}()
numWorkers := lt.numWorkers
// numTasks is the total number of tasks
// scheduled to run ( including retries )
var numTasks int
intervalID := time.Now()
maxTasks := lt.maxTasks
interval := lt.interval
numNewTasks := lt.numIntervalTasks
ctxDone := ctx.Done()
updateChan := lt.taskProvider.UpdateConfigChan()
configChanges := make([]interface{}, 0, 12)
meta := taskMeta{
NumIntervalTasks: lt.numIntervalTasks,
}
var interTaskInterval time.Duration
if meta.NumIntervalTasks > 0 {
interTaskInterval = interval / time.Duration(meta.NumIntervalTasks)
}
taskBuf := make([]Doer, 0, lt.maxIntervalTasks)
var delay time.Duration
// stopping routine runs on return
// flushing as much as possible
defer func() {
err := func(flushRetries bool) error {
if !flushRetries {
lt.logger.Debugw(
"not waiting on retries to flush on shutdown",
"reason", "retries disabled or flush retries on shutdown disabled",
"num_tasks", numTasks,
)
return nil
}
if err := ctx.Err(); err != nil {
lt.logger.Warnw(
"not waiting on retries to flush on shutdown",
"reason", "user stopped loadtest",
"num_tasks", numTasks,
"error", err,
)
return nil
}
lt.logger.Debugw(
"waiting on retries to flush",
"num_tasks", numTasks,
)
if meta.NumIntervalTasks <= 0 || numWorkers <= 0 {
lt.logger.Errorw(
"retry flushing could not be attempted",
"num_tasks", numTasks,
"num_interval_tasks", meta.NumIntervalTasks,
"num_workers", numWorkers,
)
return ErrRetriesFailedToFlush
}
preflushNumTasks := numTasks
lt.logger.Warnw(
"shutting down: flushing retries",
"num_tasks", numTasks,
"flush_retries_timeout", lt.flushRetriesTimeout.String(),
)
shutdownCtx, cancel := context.WithTimeout(context.Background(), lt.flushRetriesTimeout)
defer cancel()
intervalID = time.Now()
taskBuf = taskBuf[:0]
meta.Lag = 0
for {
if err := shutdownCtx.Err(); err != nil {
lt.logger.Errorw(
"failed to flush all retries",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
"error", err,
)
return ErrRetriesFailedToFlush
}
lt.resultWaitGroup.Wait()
for {
if err := shutdownCtx.Err(); err != nil {
lt.logger.Errorw(
"failed to flush all retries",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
"error", err,
)
return ErrRetriesFailedToFlush
}
if maxTasks > 0 {
if numTasks >= maxTasks {
lt.logger.Errorw(
"failed to flush all retries",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
"reason", "reached max tasks",
)
return ErrRetriesFailedToFlush
}
// 1. the below looks off/odd, why not use?:
//
// ```
// if n := maxTasks - numTasks; n < numNewTasks {
// numNewTasks = n
// }
// ```
//
// 2. And for that matter, why not keep meta.NumIntervalTasks in sync with numNewTasks?
//
// ---
//
// 1. The implementation would be exactly the same, just using another variable
// 2. the meta.NumIntervalTasks value is used in RATE calculations, if we keep it in sync
// with BOUNDS values then the last tasks could run at a lower RATE than intended. It
// is only kept in sync when a user adjusts the RATE via a ConfigUpdate. Don't confuse
// bounds purpose values with rate purpose values.
//
numNewTasks = maxTasks - numTasks
if numNewTasks > meta.NumIntervalTasks {
numNewTasks = meta.NumIntervalTasks
}
}
select {
case <-ctxDone:
lt.logger.Warnw(
"user stopped loadtest while attempting to flush retries",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
)
return nil
default:
// continue with load generating retries
}
// acquire load generation opportunity slots ( smooths bursts )
//
// in the shutdown retry flow we always want to acquire before reading retries
// to avoid a deadlock edge case of the retry queue being full, all workers tasks failed and need to be retried
if err := lt.intervalTasksSema.Acquire(shutdownCtx, int64(numNewTasks)); err != nil {
lt.logger.Errorw(
"failed to flush all retries",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
"error", err,
"reason", "shutdown timeout likely reached while waiting for semaphore acquisition",
)
return ErrRetriesFailedToFlush
}
// read up to numNewTasks from retry slice
taskBufSize := lt.readRetries(taskBuf[:numNewTasks:numNewTasks])
if taskBufSize <= 0 {
// wait for any pending tasks to flush and try read again
lt.logger.Debugw(
"verifying all retries have flushed",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
)
lt.resultWaitGroup.Wait()
// read up to numNewTasks from retry slice again
taskBufSize = lt.readRetries(taskBuf[:numNewTasks:numNewTasks])
if taskBufSize <= 0 {
lt.logger.Infow(
"all retries flushed",
"preflush_num_tasks", preflushNumTasks,
"num_tasks", numTasks,
)
return nil
}
}
taskBuf = taskBuf[:taskBufSize]
// re-release any extra load slots we allocated beyond what really remains to do
if numNewTasks > taskBufSize {
lt.intervalTasksSema.Release(int64(numNewTasks - taskBufSize))
}
lt.resultWaitGroup.Add(taskBufSize)
meta.IntervalID = intervalID
if taskBufSize == 1 || interTaskInterval <= skipInterTaskSchedulingThreshold {
for _, task := range taskBuf {
lt.taskChan <- taskWithMeta{task, intervalID, meta}
}
} else {
lt.taskChan <- taskWithMeta{taskBuf[0], intervalID, meta}
for _, task := range taskBuf[1:] {
time.Sleep(interTaskInterval)
lt.taskChan <- taskWithMeta{task, time.Now(), meta}
}
}
taskBuf = taskBuf[:0]
numTasks += taskBufSize
meta.Lag = 0
// wait for next interval time to exist
nextIntervalID := intervalID.Add(interval)
realNow := time.Now()
delay = nextIntervalID.Sub(realNow)
if delay > 0 {
time.Sleep(delay)
intervalID = nextIntervalID
if taskBufSize < numNewTasks {
// just finished this iteration of retry enqueuing
//
// break to loop through retry drain context again
break
}
continue
}
if delay < 0 {
lag := -delay
intervalID = realNow
meta.Lag = lag
lt.resultWaitGroup.Add(1)
lt.resultsChan <- taskResult{
Meta: taskMeta{
IntervalID: intervalID,
Lag: lag,
},
}
}
if taskBufSize < numNewTasks {
// just finished this iteration of retry enqueuing
//
// break to loop through retry drain context again
break
}
}
}
}(!lt.retriesDisabled && lt.flushRetriesOnShutdown)
if err != nil {
*shutdownErrResp = err
}
// wait for all results to come in
lt.logger.Debugw("waiting for loadtest results")
lt.resultWaitGroup.Wait()
lt.logger.Debugw("stopping result handler routine")
// signal for result handler routines to stop
close(lt.resultsChan)
// signal for workers to stop
lt.logger.Debugw("stopping workers")
for i := 0; i < len(lt.workers); i++ {
close(lt.workers[i])
}
// wait for result handler routines to stop
lt.logger.Debugw("waiting for result handler routines to stop")
wg.Wait()
// wait for workers to stop
lt.logger.Debugw("waiting for workers to stop")
lt.workerWaitGroup.Wait()
lt.logger.Infow("loadtest stopped")
}()
// getTaskSlotCount is the task emission back pressure
// throttle that conveys the number of tasks that
// are allowed to be un-finished for the performance
// interval under normal circumstances
getTaskSlotCount := func() int {
return maxPendingTasks(numWorkers, numNewTasks)
}
// apply initial task buffer limits to the interval semaphore
taskSlotCount := getTaskSlotCount()
lt.intervalTasksSema.Release(int64(taskSlotCount))
configCausesPause := func() bool {
return meta.NumIntervalTasks <= 0 || numWorkers <= 0
}
var paused bool
var pauseStart time.Time
handleConfigUpdateAndPauseState := func(cu ConfigUpdate) error {
for {
var prepSemaErr error
var recomputeInterTaskInterval, recomputeTaskSlots bool
if cu.numWorkers.set {
recomputeTaskSlots = true
n := cu.numWorkers.val
// prevent over committing on the maxWorkers count
if n < 0 {
lt.logger.Errorw(
"config update not within loadtest boundary conditions: numWorkers",
"reason", "update tried to set numWorkers too low",
"remediation_taken", "using min value",
"requested", n,
"min", 0,
)
n = 0
} else if n > lt.maxWorkers {
lt.logger.Errorw(
"config update not within loadtest boundary conditions: numWorkers",
"reason", "update tried to set numWorkers too high",
"remediation_hint", "increase the loadtest MaxWorkers setting",
"remediation_taken", "using max value",
"requested", n,
"max", lt.maxWorkers,
)
n = lt.maxWorkers
}
if n > numWorkers {
// unpause workers
for i := numWorkers; i < len(lt.workers); i++ {
lt.workers[i] <- struct{}{}
}
// spawn new workers if needed
for i := len(lt.workers); i < n; i++ {
lt.addWorker(ctx, i)
}
} else if n < numWorkers {
// pause workers if needed
for i := numWorkers - 1; i >= n; i-- {
lt.workers[i] <- struct{}{}
}
}
configChanges = append(configChanges,
"old_num_workers", numWorkers,
"new_num_workers", n,
)
numWorkers = n
}
if cu.numIntervalTasks.set {
recomputeInterTaskInterval = true
recomputeTaskSlots = true
n := cu.numIntervalTasks.val
// prevent over committing on the maxIntervalTasks count
if n < 0 {
lt.logger.Errorw(
"config update not within loadtest boundary conditions: numIntervalTasks",
"reason", "update tried to set numIntervalTasks too low",
"remediation_taken", "using min value",
"requested", n,
"min", 0,
)
n = 0
} else if n > lt.maxIntervalTasks {
lt.logger.Errorw(
"config update not within loadtest boundary conditions: numIntervalTasks",
"reason", "update tried to set numIntervalTasks too high",
"remediation_hint", "increase the loadtest MaxIntervalTasks setting",
"remediation_taken", "using max value",
"requested", n,
"max", lt.maxIntervalTasks,
)
n = lt.maxIntervalTasks
}
configChanges = append(configChanges,
"old_num_interval_tasks", meta.NumIntervalTasks,
"new_num_interval_tasks", n,
)
numNewTasks = n
meta.NumIntervalTasks = n
}
if cu.interval.set {
recomputeInterTaskInterval = true
n := cu.interval.val
if n < 0 {
lt.logger.Errorw(
"config update not within loadtest boundary conditions: interval",
"reason", "update tried to set interval too low",
"remediation_taken", "using min value",
"requested", n,
"min", 0,
)
n = 0
}
configChanges = append(configChanges,
"old_interval", interval,
"new_interval", n,
)
interval = n
}
// && clause: protects against divide by zero
if recomputeInterTaskInterval && meta.NumIntervalTasks > 0 {
interTaskInterval = interval / time.Duration(meta.NumIntervalTasks)
}
if recomputeTaskSlots {
if newTaskSlotCount := getTaskSlotCount(); newTaskSlotCount != taskSlotCount {
if newTaskSlotCount > taskSlotCount {
lt.intervalTasksSema.Release(int64(newTaskSlotCount - taskSlotCount))
} else {
prepSemaErr = lt.intervalTasksSema.Acquire(ctx, int64(taskSlotCount-newTaskSlotCount))
if prepSemaErr != nil {
lt.logger.Errorw(
"loadtest config update: failed to pre-acquire load generation slots",
"error", prepSemaErr,
)
// not returning and error... yet
// going to let config update log statement occur and then report the error present in prepSemaErr
}
}
taskSlotCount = newTaskSlotCount
}
}
if !cu.onStartup {
lt.logger.Warnw(
"loadtest config updated",
configChanges...,
)
}
configChanges = configChanges[:0]
if prepSemaErr != nil {
return prepSemaErr
}
// pause load generation if unable to schedule anything
if configCausesPause() {
if !paused {
paused = true
pauseStart = time.Now().UTC()
lt.logger.Warnw(
"pausing load generation",
"num_interval_tasks", meta.NumIntervalTasks,
"num_workers", numWorkers,
"paused_at", pauseStart,
)
}
// duplicating short-circuit signal control processing to give it priority over the randomizing nature of the multi-select
// that follows
//
// ref: https://go.dev/ref/spec#Select_statements
select {
case <-ctxDone:
return errLoadtestContextDone
default:
}
select {
case <-ctxDone:
return errLoadtestContextDone
case cu = <-updateChan:
continue
}
}
if paused {
paused = false
intervalID = time.Now()
lt.logger.Warnw(
"resuming load generation",
"num_interval_tasks", meta.NumIntervalTasks,
"num_workers", numWorkers,
"paused_at", pauseStart,
"resumed_at", intervalID.UTC(),
)
}
return nil
}
}
if configCausesPause() {
if err := handleConfigUpdateAndPauseState(ConfigUpdate{onStartup: true}); err != nil {
if err == errLoadtestContextDone {
return nil
}
return err
}