-
Notifications
You must be signed in to change notification settings - Fork 3
/
adjusters.go
118 lines (102 loc) · 2.82 KB
/
adjusters.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"fmt"
"math"
"time"
)
type nullAdjuster struct {
}
func (adjuster *nullAdjuster) adjust(response *Response) {
}
type boundAdjuster struct {
boundTo *int
boundBy *int
state *OPState
}
func (a *boundAdjuster) adjust(response *Response) {
a.state.concurrency = *a.boundTo * *a.boundBy
if a.state.concurrency > config.maxChannels {
a.state.concurrency = config.maxChannels
}
return
}
const arrowUp = `↗`
const arrowDown = `↘`
type latencyAdjuster struct {
movingCount int
movingTotalTime time.Duration
movingAvgTime time.Duration
state *OPState
barrier int
errorTolerationPercent float64
errorRequestCount int
errorCount int
}
func newLatencyAdjuster(state *OPState) *latencyAdjuster {
a := latencyAdjuster{state: state, errorTolerationPercent: 3}
return &a
}
var logbase = 1 / math.Log(1.3)
const thresholdPercent = 5
func (a *latencyAdjuster) decrease() {
if a.state.concurrency > 1 {
p(a.state.colored(arrowDown))
a.state.concurrency--
a.movingCount = 0
a.movingTotalTime = 0
}
}
func (a *latencyAdjuster) increase() {
p(a.state.colored(arrowUp))
a.state.concurrency++
a.movingCount = 0
a.movingTotalTime = 0
}
func (a *latencyAdjuster) setBarrier(barrier int) {
a.barrier = barrier
p(fmt.Sprintf("[%d]", a.barrier))
}
func (a *latencyAdjuster) adjust(response *Response) {
a.movingCount++
a.errorRequestCount++
if a.errorRequestCount > int(a.errorTolerationPercent)*100 { // resetting window
a.errorRequestCount = int(a.errorTolerationPercent) * 33 // keeping correct moving window is more expensive, raw estimate is good enough
a.errorCount = a.errorCount / 3
}
barrierPenalty := (a.state.concurrency + 1) - a.barrier
sample := int(math.Log(float64(a.state.concurrency)) * logbase)
if barrierPenalty > 0 && a.barrier > 0 {
sample += a.state.concurrency * barrierPenalty
}
if config.adjustOnErrors && response.err != nil {
a.errorCount++
if float64(a.errorCount)/float64(a.errorRequestCount)*100 > a.errorTolerationPercent {
a.errorRequestCount = 0
a.errorCount = 0
if a.barrier > a.state.concurrency || a.barrier == 0 {
a.setBarrier(a.state.concurrency)
}
a.decrease()
return
} else {
a.movingTotalTime += config.maxLatency // Not downscaling, but treating error as max latency
}
} else {
a.movingTotalTime += response.latency
}
if a.movingCount >= sample {
a.movingAvgTime = a.movingTotalTime / time.Duration(a.movingCount)
if a.movingAvgTime >= config.maxLatency/100*(100+thresholdPercent) {
a.decrease()
} else if a.movingAvgTime <= config.maxLatency/100*(100-thresholdPercent) {
if a.state.concurrency < config.maxChannels {
a.increase()
if a.barrier != 0 {
if a.state.concurrency-a.barrier > 4 {
a.setBarrier(0)
}
}
}
}
}
}