Skip to content

Commit

Permalink
Merge pull request #81182 from MikeSpreitzer/sharding-redux
Browse files Browse the repository at this point in the history
[FEATURE BRANCH] More work on shuffle sharding utils
  • Loading branch information
k8s-ci-robot committed Aug 21, 2019
2 parents 4eb996e + e5c9f50 commit 57e217a
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 101 deletions.
Expand Up @@ -16,6 +16,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apiserver/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/shufflesharding:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
Expand Down
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apiserver/pkg/util/clock"
"k8s.io/apiserver/pkg/util/shufflesharding"
"k8s.io/klog"
)

Expand Down Expand Up @@ -222,30 +223,6 @@ func (qs *queueSetImpl) GetRequestsExecuting() int {
return total
}

func shuffleDealAndPick(v, nq uint64,
lengthOfQueue func(int) int,
mr func(int /*in [0, nq-1]*/) int, /*in [0, numQueues-1] and excluding previously determined members of I*/
nRem, minLen, bestIdx int) int {
if nRem < 1 {
return bestIdx
}
vNext := v / nq
ai := int(v - nq*vNext)
ii := mr(ai)
mrNext := func(a int /*in [0, nq-2]*/) int /*in [0, numQueues-1] and excluding I[0], I[1], ... ii*/ {
if a < ai {
return mr(a)
}
return mr(a + 1)
}
lenI := lengthOfQueue(ii)
if lenI < minLen {
minLen = lenI
bestIdx = ii
}
return shuffleDealAndPick(vNext, nq-1, lengthOfQueue, mrNext, nRem-1, minLen, bestIdx)
}

// ChooseQueueIdx uses shuffle sharding to select an queue index
// using a 'hashValue'. The 'hashValue' derives a hand from a set range of
// indexes (range 'desiredNumQueues') and returns the queue with the least queued packets
Expand All @@ -254,10 +231,16 @@ func (qs *queueSetImpl) ChooseQueueIdx(hashValue uint64, handSize int) int {
// TODO(aaron-prindle) currently a lock is held for this in a larger anonymous function
// verify that makes sense...

bestQueueIdx := -1
bestQueueLen := int(math.MaxInt32)
// desiredNumQueues is used here instead of numQueues to omit quiesce queues
return shuffleDealAndPick(hashValue, uint64(qs.desiredNumQueues),
func(idx int) int { return len(qs.queues[idx].Requests) },
func(i int) int { return i }, handSize, math.MaxInt32, -1)
shufflesharding.ShuffleAndDeal(hashValue, qs.desiredNumQueues, handSize, func(queueIdx int) {
thisLen := len(qs.queues[queueIdx].Requests)
if thisLen < bestQueueLen {
bestQueueIdx, bestQueueLen = queueIdx, thisLen
}
})
return bestQueueIdx
}

// rejectOrEnqueue rejects or enqueues the newly arrived request if
Expand Down
Expand Up @@ -18,37 +18,51 @@ package shufflesharding

import (
"errors"
"fmt"
"math"
"strings"
)

const maxHashBits = 60

// ValidateParameters can validate parameters for shuffle sharding
// in a fast but approximate way, including deckSize and handSize
// Algorithm: maxHashBits >= bits(deckSize^handSize)
func ValidateParameters(deckSize, handSize int32) bool {
if handSize <= 0 || deckSize <= 0 || handSize > deckSize {
return false
// ValidateParameters finds errors in the parameters for shuffle
// sharding. Returns a slice for which `len()` is 0 if and only if
// there are no errors. The entropy requirement is evaluated in a
// fast but approximate way: bits(deckSize^handSize).
func ValidateParameters(deckSize, handSize int) (errs []string) {
if handSize <= 0 {
errs = append(errs, "handSize is not positive")
}

return math.Log2(float64(deckSize))*float64(handSize) <= maxHashBits
if deckSize <= 0 {
errs = append(errs, "deckSize is not positive")
}
if len(errs) > 0 {
return
}
if handSize > deckSize {
return []string{"handSize is greater than deckSize"}
}
if math.Log2(float64(deckSize))*float64(handSize) > maxHashBits {
return []string{fmt.Sprintf("more than %d bits of entropy required", maxHashBits)}
}
return
}

// ShuffleAndDeal can shuffle a hash value to handSize-quantity and non-redundant
// indices of decks, with the pick function, we can get the optimal deck index
// Eg. From deckSize=128, handSize=8, we can get an index array [12 14 73 18 119 51 117 26],
// then pick function will choose the optimal index from these
// Algorithm: https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/20190228-priority-and-fairness.md#queue-assignment-proof-of-concept
func ShuffleAndDeal(hashValue uint64, deckSize, handSize int32, pick func(int32)) {
remainders := make([]int32, handSize)
func ShuffleAndDeal(hashValue uint64, deckSize, handSize int, pick func(int)) {
remainders := make([]int, handSize)

for i := int32(0); i < handSize; i++ {
for i := 0; i < handSize; i++ {
hashValueNext := hashValue / uint64(deckSize-i)
remainders[i] = int32(hashValue - uint64(deckSize-i)*hashValueNext)
remainders[i] = int(hashValue - uint64(deckSize-i)*hashValueNext)
hashValue = hashValueNext
}

for i := int32(0); i < handSize; i++ {
for i := 0; i < handSize; i++ {
candidate := remainders[i]
for j := i; j > 0; j-- {
if candidate >= remainders[j-1] {
Expand All @@ -60,9 +74,9 @@ func ShuffleAndDeal(hashValue uint64, deckSize, handSize int32, pick func(int32)
}

// ShuffleAndDealWithValidation will do validation before ShuffleAndDeal
func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int32, pick func(int32)) error {
if !ValidateParameters(deckSize, handSize) {
return errors.New("bad parameters")
func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int, pick func(int)) error {
if errs := ValidateParameters(deckSize, handSize); len(errs) > 0 {
return errors.New(strings.Join(errs, ";"))
}

ShuffleAndDeal(hashValue, deckSize, handSize, pick)
Expand All @@ -71,14 +85,14 @@ func ShuffleAndDealWithValidation(hashValue uint64, deckSize, handSize int32, pi

// ShuffleAndDealToSlice will use specific pick function to return slices of indices
// after ShuffleAndDeal
func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int32) []int32 {
func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int) []int {
var (
candidates = make([]int32, handSize)
candidates = make([]int, handSize)
idx = 0
)

pickToSlices := func(can int32) {
candidates[idx] = can
pickToSlices := func(can int) {
candidates[idx] = int(can)
idx++
}

Expand All @@ -87,11 +101,33 @@ func ShuffleAndDealToSlice(hashValue uint64, deckSize, handSize int32) []int32 {
return candidates
}

// ShuffleAndDealIntoHand shuffles a deck of the given size by the
// given hash value and deals cards into the given slice. The virtue
// of this function compared to ShuffleAndDealToSlice is that the
// caller provides the storage for the hand.
func ShuffleAndDealIntoHand(hashValue uint64, deckSize int, hand []int) {
handSize := len(hand)
var idx int
ShuffleAndDeal(hashValue, deckSize, handSize, func(card int) {
hand[idx] = int(card)
idx++
})
}

// ShuffleAndDealToSliceWithValidation will do validation before ShuffleAndDealToSlice
func ShuffleAndDealToSliceWithValidation(hashValue uint64, deckSize, handSize int32) ([]int32, error) {
if !ValidateParameters(deckSize, handSize) {
return nil, errors.New("bad parameters")
func ShuffleAndDealToSliceWithValidation(hashValue uint64, deckSize, handSize int) ([]int, error) {
if errs := ValidateParameters(deckSize, handSize); len(errs) > 0 {
return nil, errors.New(strings.Join(errs, ";"))
}

return ShuffleAndDealToSlice(hashValue, deckSize, handSize), nil
}

// ShuffleAndDealIntoHandWithValidation does validation and then ShuffleAndDealIntoHand
func ShuffleAndDealIntoHandWithValidation(hashValue uint64, deckSize int, hand []int) error {
if errs := ValidateParameters(deckSize, len(hand)); len(errs) > 0 {
return errors.New(strings.Join(errs, ";"))
}
ShuffleAndDealIntoHand(hashValue, deckSize, hand)
return nil
}

0 comments on commit 57e217a

Please sign in to comment.