-
Notifications
You must be signed in to change notification settings - Fork 2k
/
packet_size.go
210 lines (169 loc) · 6.04 KB
/
packet_size.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
/*
Copyright 2021 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vstreamer
import (
"flag"
"time"
"vitess.io/vitess/go/mathstats"
)
// defaultPacketSize is the suggested packet size for VReplication streamer.
var defaultPacketSize = flag.Int("vstream_packet_size", 250000, "Suggested packet size for VReplication streamer. This is used only as a recommendation. The actual packet size may be more or less than this amount.")
// useDynamicPacketSize controls whether to use dynamic packet size adjustments to increase performance while streaming
var useDynamicPacketSize = flag.Bool("vstream_dynamic_packet_size", true, "Enable dynamic packet sizing for VReplication. This will adjust the packet size during replication to improve performance.")
// PacketSizer is a controller that adjusts the size of the packets being sent by the vstreamer at runtime
type PacketSizer interface {
ShouldSend(byteCount int) bool
Record(byteCount int, duration time.Duration)
Limit() int
}
// DefaultPacketSizer creates a new PacketSizer using the default settings.
// If dynamic packet sizing is enabled, this will return a dynamicPacketSizer.
func DefaultPacketSizer() PacketSizer {
if *useDynamicPacketSize {
return newDynamicPacketSizer(*defaultPacketSize)
}
return newFixedPacketSize(*defaultPacketSize)
}
// AdjustPacketSize temporarily adjusts the default packet sizes to the given value.
// Calling the returned cleanup function resets them to their original value.
// This function is only used for testing.
func AdjustPacketSize(size int) func() {
originalSize := *defaultPacketSize
originalDyn := *useDynamicPacketSize
*defaultPacketSize = size
*useDynamicPacketSize = false
return func() {
*defaultPacketSize = originalSize
*useDynamicPacketSize = originalDyn
}
}
type fixedPacketSizer struct {
baseSize int
}
func newFixedPacketSize(baseSize int) PacketSizer {
return &fixedPacketSizer{baseSize: baseSize}
}
func (ps *fixedPacketSizer) Limit() int {
return ps.baseSize
}
// ShouldSend checks whether the given byte count is large enough to be sent as a packet while streaming
func (ps *fixedPacketSizer) ShouldSend(byteCount int) bool {
return byteCount >= ps.baseSize
}
// Record records the total duration it took to send the given byte count while streaming
func (ps *fixedPacketSizer) Record(_ int, _ time.Duration) {}
type dynamicPacketSizer struct {
// currentSize is the last size for the packet that is safe to use
currentSize int
// currentMetrics are the performance metrics for the current size
currentMetrics *mathstats.Sample
// candidateSize is the target size for packets being tested
candidateSize int
// candidateMetrics are the performance metrics for this new metric
candidateMetrics *mathstats.Sample
// grow is the growth rate with which we adjust the packet size
grow int
// calls is the amount of calls to the packet sizer
calls int
// settled is true when the last experiment has finished and arrived at a new target packet size
settled bool
// elapsed is the time elapsed since the last experiment was settled
elapsed time.Duration
}
func newDynamicPacketSizer(baseSize int) PacketSizer {
return &dynamicPacketSizer{
currentSize: baseSize,
currentMetrics: &mathstats.Sample{},
candidateMetrics: &mathstats.Sample{},
candidateSize: baseSize,
grow: baseSize / 4,
}
}
func (ps *dynamicPacketSizer) Limit() int {
return ps.candidateSize
}
// ShouldSend checks whether the given byte count is large enough to be sent as a packet while streaming
func (ps *dynamicPacketSizer) ShouldSend(byteCount int) bool {
return byteCount >= ps.candidateSize
}
type change int8
const (
notChanging change = iota
gettingFaster
gettingSlower
)
func (ps *dynamicPacketSizer) changeInThroughput() change {
const PValueMargin = 0.1
t, err := mathstats.TwoSampleWelchTTest(ps.currentMetrics, ps.candidateMetrics, mathstats.LocationDiffers)
if err != nil {
return notChanging
}
if t.P < PValueMargin {
if ps.candidateMetrics.Mean() > ps.currentMetrics.Mean() {
return gettingFaster
}
return gettingSlower
}
return notChanging
}
func (ps *dynamicPacketSizer) reset() {
ps.currentMetrics.Clear()
ps.candidateMetrics.Clear()
ps.calls = 0
ps.settled = false
ps.elapsed = 0
}
// Record records the total duration it took to send the given byte count while streaming
func (ps *dynamicPacketSizer) Record(byteCount int, d time.Duration) {
const ExperimentDelay = 5 * time.Second
const CheckFrequency = 16
const GrowthFrequency = 32
const InitialCandidateLen = 32
const SettleCandidateLen = 64
if ps.settled {
ps.elapsed += d
if ps.elapsed < ExperimentDelay {
return
}
ps.reset()
}
ps.calls++
ps.candidateMetrics.Xs = append(ps.candidateMetrics.Xs, float64(byteCount)/float64(d))
if ps.calls%CheckFrequency == 0 {
ps.candidateMetrics.Sorted = false
ps.candidateMetrics.FilterOutliers()
if len(ps.currentMetrics.Xs) == 0 {
if len(ps.candidateMetrics.Xs) >= InitialCandidateLen {
ps.currentMetrics, ps.candidateMetrics = ps.candidateMetrics, ps.currentMetrics
}
return
}
change := ps.changeInThroughput()
switch change {
case notChanging, gettingSlower:
if len(ps.candidateMetrics.Xs) >= SettleCandidateLen {
ps.candidateSize = ps.currentSize
ps.settled = true
} else {
if change == notChanging && ps.calls%GrowthFrequency == 0 {
ps.candidateSize += ps.grow
}
}
case gettingFaster:
ps.candidateMetrics, ps.currentMetrics = ps.currentMetrics, ps.candidateMetrics
ps.candidateMetrics.Clear()
ps.candidateSize += ps.grow
ps.currentSize = ps.candidateSize
}
}
}