Skip to content

Commit

Permalink
mirror feature changes from ISSUE-159 on upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
platinummonkey committed Aug 25, 2020
1 parent 735d888 commit b4d3985
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 1 deletion.
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -75,6 +76,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
Expand All @@ -101,6 +103,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
23 changes: 22 additions & 1 deletion limit/vegas.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
// For traditional TCP Vegas alpha is typically 2-3 and beta is typically 4-6. To allow for better growth and stability
// at higher limits we set alpha=Max(3, 10% of the current limit) and beta=Max(6, 20% of the current limit).
type VegasLimit struct {
initialLimit float64
estimatedLimit float64
maxLimit int
rttNoLoad core.MeasurementInterface
smoothing float64
bufferFactor float64
pauseUpdateUntil int64
alphaFunc func(estimatedLimit int) int
betaFunc func(estimatedLimit int) int
thresholdFunc func(estimatedLimit int) int
Expand Down Expand Up @@ -54,6 +57,7 @@ func NewDefaultVegasLimit(
nil,
-1,
-1,
0,
nil,
nil,
nil,
Expand All @@ -80,6 +84,7 @@ func NewDefaultVegasLimitWithLimit(
nil,
-1,
-1,
0,
nil,
nil,
nil,
Expand All @@ -99,6 +104,7 @@ func NewVegasLimitWithRegistry(
rttNoLoad core.MeasurementInterface,
maxConcurrency int,
smoothing float64,
bufferFactor float64,
alphaFunc func(estimatedLimit int) int,
betaFunc func(estimatedLimit int) int,
thresholdFunc func(estimatedLimit int) int,
Expand All @@ -125,6 +131,10 @@ func NewVegasLimitWithRegistry(
smoothing = 1.0
}

if bufferFactor < 0 {
bufferFactor = 0
}

if probeMultiplier <= 0 {
probeMultiplier = 30
}
Expand Down Expand Up @@ -166,6 +176,7 @@ func NewVegasLimitWithRegistry(
}

l := &VegasLimit{
initialLimit: float64(initialLimit),
estimatedLimit: float64(initialLimit),
maxLimit: maxConcurrency,
alphaFunc: alphaFunc,
Expand All @@ -174,6 +185,7 @@ func NewVegasLimitWithRegistry(
increaseFunc: increaseFunc,
decreaseFunc: decreaseFunc,
smoothing: smoothing,
bufferFactor: bufferFactor,
probeMultipler: probeMultiplier,
probeJitter: newProbeJitter(),
probeCount: 0,
Expand Down Expand Up @@ -229,6 +241,10 @@ func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop
l.probeJitter = newProbeJitter()
l.probeCount = 0
l.rttNoLoad.Add(float64(rtt))
if l.bufferFactor > 0 {
l.pauseUpdateUntil = startTime + rtt + int64(float64(rtt)*(l.bufferFactor/(l.bufferFactor+1)))
l.estimatedLimit = math.Max(l.initialLimit, math.Ceil(l.estimatedLimit/(l.bufferFactor+1)))
}
return
}

Expand All @@ -239,6 +255,11 @@ func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop
}

l.rttSampleListener.AddSample(l.rttNoLoad.Get())

if l.pauseUpdateUntil != 0 && l.pauseUpdateUntil > startTime {
return
}

l.updateEstimatedLimit(startTime, rtt, inFlight, didDrop)
}

Expand All @@ -247,7 +268,7 @@ func (l *VegasLimit) shouldProbe() bool {
}

func (l *VegasLimit) updateEstimatedLimit(startTime int64, rtt int64, inFlight int, didDrop bool) {
queueSize := int(math.Ceil(l.estimatedLimit * (1 - l.rttNoLoad.Get()/float64(rtt))))
queueSize := int(math.Ceil(l.estimatedLimit * (1 - (l.rttNoLoad.Get()*(l.bufferFactor+1))/float64(rtt))))

var newLimit float64
// Treat any drop (i.e timeout) as needing to reduce the limit
Expand Down
96 changes: 96 additions & 0 deletions limit/vegas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func createVegasLimit() *VegasLimit {
nil,
20,
1.0,
0,
functions.FixedQueueSizeFunc(3),
functions.FixedQueueSizeFunc(6),
nil,
Expand Down Expand Up @@ -92,6 +93,7 @@ func TestVegasLimit(t *testing.T) {
nil,
200,
0.5,
0,
nil,
nil,
nil,
Expand Down Expand Up @@ -125,6 +127,7 @@ func TestVegasLimit(t *testing.T) {
nil,
200,
-1,
0,
nil,
nil,
nil,
Expand All @@ -148,4 +151,97 @@ func TestVegasLimit(t *testing.T) {
l.OnSample(20, (time.Millisecond * 20).Nanoseconds(), 100, false)
asrt.Equal(25, l.EstimatedLimit())
})

t.Run("DecreaseLimitWithBufferFactor", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)
l := NewVegasLimitWithRegistry(
"test",
10,
nil,
200,
-1,
1.0,
nil,
nil,
nil,
nil,
func(estimatedLimit float64) float64 {
return estimatedLimit / 2.0
},
0,
NoopLimitLogger{},
core.EmptyMetricRegistryInstance)

// Pick up first min-rtt
l.OnSample(0, (time.Millisecond * 10).Nanoseconds(), 10, false)
asrt.Equal(10, l.EstimatedLimit())

// First decrease
l.OnSample(10, (time.Millisecond * 50).Nanoseconds(), 11, false)
asrt.Equal(10, l.EstimatedLimit())
})

t.Run("NoChangeIfWithinThresholdsWithBuffer", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)
l := NewVegasLimitWithRegistry(
"test",
10,
nil,
200,
-1,
1.0,
nil,
nil,
nil,
nil,
func(estimatedLimit float64) float64 {
return estimatedLimit / 2.0
},
0,
NoopLimitLogger{},
core.EmptyMetricRegistryInstance)

// Pick up first min-rtt
l.OnSample(0, (time.Millisecond * 10).Nanoseconds(), 10, false)
asrt.Equal(10, l.EstimatedLimit())

// First decrease
l.OnSample(10, (time.Millisecond * 5).Nanoseconds(), 14, false)
asrt.Equal(10, l.EstimatedLimit())
})

t.Run("PauseUpdateWhenProbeWithBuffer", func(t2 *testing.T) {
t2.Parallel()
asrt := assert.New(t2)
listener := testNotifyListener{}
l := NewVegasLimitWithRegistry(
"test",
10,
nil,
200,
-1,
1.0,
nil,
nil,
nil,
nil,
func(estimatedLimit float64) float64 {
return estimatedLimit / 2.0
},
0,
NoopLimitLogger{},
core.EmptyMetricRegistryInstance)

l.NotifyOnChange(listener.updater())

for i := 1; i < 600; i++ {
l.OnSample(0, (time.Millisecond * 10).Nanoseconds(), 100, false)
}
asrt.Equal(16, listener.changes[0])
asrt.Equal(22, listener.changes[1])
asrt.Equal(28, listener.changes[2])
})

}
1 change: 1 addition & 0 deletions patterns/pool/fixed_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (p *FixedPool) Limit() int {
return p.limit
}

// Ordering returns the Ordering for the given pool
func (p *FixedPool) Ordering() Ordering {
return p.ordering
}
Expand Down

0 comments on commit b4d3985

Please sign in to comment.