Skip to content

Commit

Permalink
Merge pull request #95986 from yue9944882/max-min-fairness
Browse files Browse the repository at this point in the history
Mitigate wind-up problem in AP&F: prevent queue virtualStart lag
  • Loading branch information
k8s-ci-robot committed Nov 5, 2020
2 parents f3fbd17 + fd889ec commit c0e88a3
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 67 deletions.
Expand Up @@ -645,6 +645,7 @@ func (qs *queueSet) selectQueueLocked() *queue {
qs.robinIndex = (qs.robinIndex + 1) % nq
queue := qs.queues[qs.robinIndex]
if len(queue.requests) != 0 {

currentVirtualFinish := queue.GetVirtualFinish(0, qs.estimatedServiceTime)
if currentVirtualFinish < minVirtualFinish {
minVirtualFinish = currentVirtualFinish
Expand All @@ -657,6 +658,23 @@ func (qs *queueSet) selectQueueLocked() *queue {
// for the next round. This way the non-selected queues
// win in the case that the virtual finish times are the same
qs.robinIndex = minIndex
// according to the original FQ formula:
//
// Si = MAX(R(t), Fi-1)
//
// the virtual start (excluding the estimated cost) of the chose
// queue should always be greater or equal to the global virtual
// time.
//
// hence we're refreshing the per-queue virtual time for the chosen
// queue here. if the last virtual start time (excluded estimated cost)
// falls behind the global virtual time, we update the latest virtual
// start by: <latest global virtual time> + <previously estimated cost>
previouslyEstimatedServiceTime := float64(minQueue.requestsExecuting) * qs.estimatedServiceTime
if qs.virtualTime > minQueue.virtualStart-previouslyEstimatedServiceTime {
// per-queue virtual time should not fall behind the global
minQueue.virtualStart = qs.virtualTime + previouslyEstimatedServiceTime
}
return minQueue
}

Expand Down
Expand Up @@ -125,7 +125,8 @@ type uniformScenario struct {
clients []uniformClient
concurrencyLimit int
evalDuration time.Duration
expectFair []bool
expectedFair []bool
expectedFairnessMargin []float64
expectAllRequests bool
evalInqueueMetrics, evalExecutingMetrics bool
rejectReason string
Expand Down Expand Up @@ -182,9 +183,9 @@ func (uss *uniformScenarioState) exercise() {
}
}
if uss.doSplit {
uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectFair[0])
uss.evalTo(uss.startTime.Add(uss.evalDuration/2), false, uss.expectedFair[0], uss.expectedFairnessMargin[0])
}
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectFair[len(uss.expectFair)-1])
uss.evalTo(uss.startTime.Add(uss.evalDuration), true, uss.expectedFair[len(uss.expectedFair)-1], uss.expectedFairnessMargin[len(uss.expectedFairnessMargin)-1])
uss.clk.Run(nil)
uss.finalReview()
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func (ust *uniformScenarioThread) callK(k int) {
}
}

func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool, margin float64) {
uss.clk.Run(&lim)
uss.clk.SetTime(lim)
if uss.doSplit && !last {
Expand All @@ -275,10 +276,11 @@ func (uss *uniformScenarioState) evalTo(lim time.Time, last, expectFair bool) {
var gotFair bool
if fairAverages[i] > 0 {
relDiff := (averages[i] - fairAverages[i]) / fairAverages[i]
gotFair = math.Abs(relDiff) <= 0.1
gotFair = math.Abs(relDiff) <= margin
} else {
gotFair = math.Abs(averages[i]) <= 0.1
gotFair = math.Abs(averages[i]) <= margin
}

if gotFair != expectFair {
uss.t.Errorf("%s client %d last=%v got an Average of %v but the fair average was %v", uss.name, i, last, averages[i], fairAverages[i])
} else {
Expand Down Expand Up @@ -371,12 +373,13 @@ func TestNoRestraint(t *testing.T) {
{1001001001, 5, 10, time.Second, time.Second, false},
{2002002002, 2, 10, time.Second, time.Second / 2, false},
},
concurrencyLimit: 10,
evalDuration: time.Second * 15,
expectFair: []bool{true},
expectAllRequests: true,
clk: clk,
counter: counter,
concurrencyLimit: 10,
evalDuration: time.Second * 15,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -405,14 +408,15 @@ func TestUniformFlowsHandSize1(t *testing.T) {
{1001001001, 8, 20, time.Second, time.Second - 1, false},
{2002002002, 8, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 50,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -440,14 +444,15 @@ func TestUniformFlowsHandSize3(t *testing.T) {
{1001001001, 8, 30, time.Second, time.Second - 1, false},
{2002002002, 8, 30, time.Second, time.Second - 1, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 60,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -476,14 +481,15 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
{1001001001, 8, 20, time.Second, time.Second, false},
{2002002002, 7, 30, time.Second, time.Second / 2, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 40,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -512,14 +518,15 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
{1001001001, 4, 20, time.Second, time.Second - 1, false},
{2002002002, 2, 20, time.Second, time.Second - 1, false},
},
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectFair: []bool{true},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 3,
evalDuration: time.Second * 20,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -547,14 +554,15 @@ func TestWindup(t *testing.T) {
{1001001001, 2, 40, time.Second, -1, false},
{2002002002, 2, 40, time.Second, -1, true},
},
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectFair: []bool{true, false},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
concurrencyLimit: 3,
evalDuration: time.Second * 40,
expectedFair: []bool{true, true},
expectedFairnessMargin: []float64{0.1, 0.26},
expectAllRequests: true,
evalInqueueMetrics: true,
evalExecutingMetrics: true,
clk: clk,
counter: counter,
}.exercise(t)
}

Expand All @@ -580,13 +588,14 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
{1001001001, 6, 10, time.Second, 57 * time.Millisecond, false},
{2002002002, 4, 15, time.Second, 750 * time.Millisecond, false},
},
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectFair: []bool{false},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
counter: counter,
concurrencyLimit: 4,
evalDuration: time.Second * 13,
expectedFair: []bool{false},
expectedFairnessMargin: []float64{0.1},
evalExecutingMetrics: true,
rejectReason: "concurrency-limit",
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down Expand Up @@ -614,14 +623,15 @@ func TestTimeout(t *testing.T) {
clients: []uniformClient{
{1001001001, 5, 100, time.Second, time.Second, false},
},
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectFair: []bool{true},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
concurrencyLimit: 1,
evalDuration: time.Second * 10,
expectedFair: []bool{true},
expectedFairnessMargin: []float64{0.1},
evalInqueueMetrics: true,
evalExecutingMetrics: true,
rejectReason: "time-out",
clk: clk,
counter: counter,
}.exercise(t)
}

Expand Down

0 comments on commit c0e88a3

Please sign in to comment.