Skip to content

Commit

Permalink
Merge branch 'vegas_limit_improvements'
Browse files Browse the repository at this point in the history
  • Loading branch information
Cody Lee committed Oct 4, 2018
2 parents f8afff6 + a796bd6 commit f8fe453
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 27 deletions.
6 changes: 3 additions & 3 deletions limit/gradient.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type GradientLimit struct {
}

func nextProbeCountdown(probeInterval int) int {
if probeInterval == LimitProbeDisabled {
return LimitProbeDisabled
if probeInterval == ProbeDisabled {
return ProbeDisabled
}
return probeInterval + rand.Int()
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (l *GradientLimit) OnSample(startTime int64, rtt int64, inFlight int, didDr
// Reset or probe for a new noload RTT and a new estimatedLimit. It's necessary to cut the limit
// in half to avoid having the limit drift upwards when the RTT is probed during heavy load.
// To avoid decreasing the limit too much we don't allow it to go lower than the queueSize.
if l.probeInterval != LimitProbeDisabled {
if l.probeInterval != ProbeDisabled {
l.resetRTTCounter--
if l.resetRTTCounter <= 0 {
l.resetRTTCounter = nextProbeCountdown(l.probeInterval)
Expand Down
2 changes: 1 addition & 1 deletion limit/gradient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestGradientLimit(t *testing.T) {
t.Run("nextProbeInterval", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)
asrt.Equal(LimitProbeDisabled, nextProbeCountdown(LimitProbeDisabled))
asrt.Equal(ProbeDisabled, nextProbeCountdown(ProbeDisabled))
asrt.True(nextProbeCountdown(1) > 0)
})

Expand Down
63 changes: 40 additions & 23 deletions limit/vegas.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/platinummonkey/go-concurrency-limits/core"
"github.com/platinummonkey/go-concurrency-limits/limit/functions"
"github.com/platinummonkey/go-concurrency-limits/measurements"
)

// VegasLimit implements a Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is
Expand All @@ -21,7 +22,7 @@ import (
type VegasLimit struct {
estimatedLimit float64
maxLimit int
rttNoLoad int64
rttNoLoad core.MeasurementInterface
smoothing float64
alphaFunc func(estimatedLimit int) int
betaFunc func(estimatedLimit int) int
Expand All @@ -31,7 +32,8 @@ type VegasLimit struct {
rttSampleListener core.MetricSampleListener
commonSampler *core.CommonMetricSampler
probeMultipler int
probeCountdown int
probeJitter float64
probeCount int64

listeners []core.LimitChangeListener
registry core.MetricRegistry
Expand All @@ -49,6 +51,7 @@ func NewDefaultVegasLimit(
return NewVegasLimitWithRegistry(
name,
-1,
nil,
-1,
-1,
nil,
Expand All @@ -74,6 +77,7 @@ func NewDefaultVegasLimitWithLimit(
return NewVegasLimitWithRegistry(
name,
initialLimit,
nil,
-1,
-1,
nil,
Expand All @@ -92,6 +96,7 @@ func NewDefaultVegasLimitWithLimit(
func NewVegasLimitWithRegistry(
name string,
initialLimit int,
rttNoLoad core.MeasurementInterface,
maxConcurrency int,
smoothing float64,
alphaFunc func(estimatedLimit int) int,
Expand All @@ -107,12 +112,19 @@ func NewVegasLimitWithRegistry(
if initialLimit < 1 {
initialLimit = 20
}

if rttNoLoad == nil {
rttNoLoad = &measurements.MinimumMeasurement{}
}

if maxConcurrency < 0 {
maxConcurrency = 1000
}

if smoothing < 0 || smoothing > 1.0 {
smoothing = 1.0
}

if probeMultiplier <= 0 {
probeMultiplier = 30
}
Expand Down Expand Up @@ -163,7 +175,9 @@ func NewVegasLimitWithRegistry(
decreaseFunc: decreaseFunc,
smoothing: smoothing,
probeMultipler: probeMultiplier,
probeCountdown: nextVegasProbeCountdown(probeMultiplier, float64(initialLimit)),
probeJitter: newProbeJitter(),
probeCount: 0,
rttNoLoad: rttNoLoad,
rttSampleListener: registry.RegisterDistribution(core.PrefixMetricWithName(core.MetricMinRTT, name), tags...),
listeners: make([]core.LimitChangeListener, 0),
registry: registry,
Expand All @@ -174,12 +188,11 @@ func NewVegasLimitWithRegistry(
return l
}

// LimitProbeDisabled represents the disabled value for probing.
const LimitProbeDisabled = -1
// ProbeDisabled represents the disabled value for probing.
const ProbeDisabled = -1

func nextVegasProbeCountdown(probeMultiplier int, estimatedLimit float64) int {
maxRange := int(float64(probeMultiplier)*estimatedLimit) / 2
return rand.Intn(maxRange) + maxRange // return roughly [maxVal / 2, maxVal]
func newProbeJitter() float64 {
return (rand.Float64() / 2.0) + 0.5
}

// EstimatedLimit returns the current estimated limit.
Expand Down Expand Up @@ -209,28 +222,32 @@ func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop
defer l.mu.Unlock()
l.commonSampler.Sample(rtt, inFlight, didDrop)

if l.probeCountdown != LimitProbeDisabled {
l.probeCountdown--
if l.probeCountdown <= 0 {
l.logger.Debugf("probe MinRTT %d", rtt/1e6)
l.probeCountdown = nextVegasProbeCountdown(l.probeMultipler, l.estimatedLimit)
l.rttNoLoad = rtt
return
}
l.probeCount++
if l.shouldProbe() {
l.logger.Debugf("Probe triggered update to RTT No Load %d ms from %d ms",
rtt/1e6, int64(l.rttNoLoad.Get())/1e6)
l.probeJitter = newProbeJitter()
l.probeCount = 0
l.rttNoLoad.Add(float64(rtt))
return
}

if l.rttNoLoad == 0 || rtt < l.rttNoLoad {
l.logger.Debugf("New MinRTT %d", rtt/1e6)
l.rttNoLoad = rtt
if l.rttNoLoad.Get() == 0 || float64(rtt) < l.rttNoLoad.Get() {
l.logger.Debugf("Update RTT No Load to %d ms from %d ms", rtt/1e6, int64(l.rttNoLoad.Get())/1e6)
l.rttNoLoad.Add(float64(rtt))
return
}

l.rttSampleListener.AddSample(float64(l.rttNoLoad))
l.rttSampleListener.AddSample(l.rttNoLoad.Get())
l.updateEstimatedLimit(startTime, rtt, inFlight, didDrop)
}

func (l *VegasLimit) shouldProbe() bool {
return int64(l.probeJitter*float64(l.probeMultipler)*l.estimatedLimit) <= l.probeCount
}

func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight int, didDrop bool) {
queueSize := int(math.Ceil(l.estimatedLimit * (1 - float64(l.rttNoLoad)/float64(rtt))))
queueSize := int(math.Ceil(l.estimatedLimit * (1 - l.rttNoLoad.Get()/float64(rtt))))

var newLimit float64
// Treat any drop (i.e timeout) as needing to reduce the limit
Expand Down Expand Up @@ -264,7 +281,7 @@ func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight i

if int(newLimit) != int(l.estimatedLimit) && l.logger.IsDebugEnabled() {
l.logger.Debugf("New limit=%d, minRTT=%d ms, winRTT=%d ms, queueSize=%d",
int(newLimit), l.rttNoLoad/1e6, rtt/1e6, queueSize)
int(newLimit), int64(l.rttNoLoad.Get())/1e6, rtt/1e6, queueSize)
}

l.estimatedLimit = newLimit
Expand All @@ -275,7 +292,7 @@ func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight i
func (l *VegasLimit) RTTNoLoad() int64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.rttNoLoad
return int64(l.rttNoLoad.Get())
}

func (l *VegasLimit) String() string {
Expand Down
3 changes: 3 additions & 0 deletions limit/vegas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func createVegasLimit() *VegasLimit {
return NewVegasLimitWithRegistry(
"test",
10,
nil,
20,
1.0,
functions.FixedQueueSizeFunc(3),
Expand Down Expand Up @@ -88,6 +89,7 @@ func TestVegasLimit(t *testing.T) {
l := NewVegasLimitWithRegistry(
"test",
100,
nil,
200,
0.5,
nil,
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestVegasLimit(t *testing.T) {
l := NewVegasLimitWithRegistry(
"test",
100,
nil,
200,
-1,
nil,
Expand Down

0 comments on commit f8fe453

Please sign in to comment.