Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uses container/heap for DelayingQueue #45070

Merged
merged 2 commits into from
May 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 83 additions & 72 deletions staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package workqueue

import (
"sort"
"container/heap"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -43,13 +43,12 @@ func NewNamedDelayingQueue(name string) DelayingInterface {

func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingTimeByEntry: map[t]time.Time{},
waitingForAddCh: make(chan waitFor, 1000),
metrics: newRetryMetrics(name),
Interface: NewNamed(name),
clock: clock,
heartbeat: clock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(name),
}

go ret.waitingLoop()
Expand All @@ -73,12 +72,8 @@ type delayingType struct {
// clock.Tick will leak.
heartbeat <-chan time.Time

// waitingForAdd is an ordered slice of items to be added to the contained work queue
waitingForAdd []waitFor
// waitingTimeByEntry holds wait time by entry, so we can lookup pre-existing indexes
waitingTimeByEntry map[t]time.Time
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddCh chan waitFor
waitingForAddCh chan *waitFor

// metrics counts the number of retries
metrics retryMetrics
Expand All @@ -88,6 +83,55 @@ type delayingType struct {
type waitFor struct {
data t
readyAt time.Time
// index in the priority queue (heap)
index int
}

// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occuring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
type waitForPriorityQueue []*waitFor

func (pq waitForPriorityQueue) Len() int {
return len(pq)
}
func (pq waitForPriorityQueue) Less(i, j int) bool {
return pq[i].readyAt.Before(pq[j].readyAt)
}
func (pq waitForPriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

// Push adds an item to the queue. Push should not be called directly; instead,
// use `heap.Push`.
func (pq *waitForPriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*waitFor)
item.index = n
*pq = append(*pq, item)
}

// Pop removes an item from the queue. Pop should not be called directly;
// instead, use `heap.Pop`.
func (pq *waitForPriorityQueue) Pop() interface{} {
n := len(*pq)
item := (*pq)[n-1]
item.index = -1
*pq = (*pq)[0:(n - 1)]
return item
}

// Peek returns the item at the beginning of the queue, without removing the
// item or otherwise mutating the queue. It is safe to call directly.
func (pq waitForPriorityQueue) Peek() interface{} {
return pq[0]
}

// ShutDown gives a way to shut off this queue
Expand All @@ -114,7 +158,7 @@ func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
select {
case <-q.stopCh:
// unblock if ShutDown() is called
case q.waitingForAddCh <- waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
}
}

Expand All @@ -130,32 +174,35 @@ func (q *delayingType) waitingLoop() {
// Make a placeholder channel to use when there are no items in our list
never := make(<-chan time.Time)

waitingForQueue := &waitForPriorityQueue{}
heap.Init(waitingForQueue)

waitingEntryByData := map[t]*waitFor{}

for {
if q.Interface.ShuttingDown() {
// discard waiting entries
q.waitingForAdd = nil
q.waitingTimeByEntry = nil
return
}

now := q.clock.Now()

// Add ready entries
readyEntries := 0
for _, entry := range q.waitingForAdd {
for waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you peek at the [0] element here (which I think is the "earliest") and then you add the popped [n-1] element which I think is the latest.

Copy link
Contributor Author

@alindeman alindeman May 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deads2k Gotcha, I agree this initially looks confusing but I think it's correct. I'll try to explain it as best I can here, but let me know if you think I can clarify or make the code more clear.

  • The container/heap docs state that "The minimum element in the tree is the root, at index 0."
  • But the container/heap#Interface docs state that the Pop method should "remove and return element Len() - 1." and that the Push method should "add x as element Len()"

Basically, the 0th element of the heap is the least element (in this case, the element with the timestamp that will occur next in time). Therefore that's the element we want to Peek. In the Pop case, though, the container/heap will have moved the minimum element to be temporarily at Len() - 1, so we pop off from that end. In the Push case, we add the new element to the 'end' of the heap and the element is percolated up to its correct position by container/heap.

I've created a simple integer-based heap on the playground that might demonstrate this more clearly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like this comment attached to the peek, push, pop method area.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@deads2k Sounds good. I've added a commit with some documentation. How do you think it reads?

if entry.readyAt.After(now) {
break
}

entry = heap.Pop(waitingForQueue).(*waitFor)
q.Add(entry.data)
delete(q.waitingTimeByEntry, entry.data)
readyEntries++
delete(waitingEntryByData, entry.data)
}
q.waitingForAdd = q.waitingForAdd[readyEntries:]

// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt := never
if len(q.waitingForAdd) > 0 {
nextReadyAt = q.clock.After(q.waitingForAdd[0].readyAt.Sub(now))
if waitingForQueue.Len() > 0 {
entry := waitingForQueue.Peek().(*waitFor)
nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
}

select {
Expand All @@ -170,7 +217,7 @@ func (q *delayingType) waitingLoop() {

case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
Expand All @@ -180,7 +227,7 @@ func (q *delayingType) waitingLoop() {
select {
case waitEntry := <-q.waitingForAddCh:
if waitEntry.readyAt.After(q.clock.Now()) {
q.waitingForAdd = insert(q.waitingForAdd, q.waitingTimeByEntry, waitEntry)
insert(waitingForQueue, waitingEntryByData, waitEntry)
} else {
q.Add(waitEntry.data)
}
Expand All @@ -192,55 +239,19 @@ func (q *delayingType) waitingLoop() {
}
}

// inserts the given entry into the sorted entries list
// same semantics as append()... the given slice may be modified,
// and the returned value should be used
//
// TODO: This should probably be converted to use container/heap to improve
// running time for a large number of items.
func insert(entries []waitFor, knownEntries map[t]time.Time, entry waitFor) []waitFor {
// if the entry is already in our retry list and the existing time is before the new one, just skip it
existingTime, exists := knownEntries[entry.data]
if exists && existingTime.Before(entry.readyAt) {
return entries
}

// if the entry exists and is scheduled for later, go ahead and remove the entry
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
// if the entry already exists, update the time only if it would cause the item to be queued sooner
existing, exists := knownEntries[entry.data]
if exists {
if existingIndex := findEntryIndex(entries, existingTime, entry.data); existingIndex >= 0 && existingIndex < len(entries) {
entries = append(entries[:existingIndex], entries[existingIndex+1:]...)
if existing.readyAt.After(entry.readyAt) {
existing.readyAt = entry.readyAt
heap.Fix(q, existing.index)
}
}

insertionIndex := sort.Search(len(entries), func(i int) bool {
return entry.readyAt.Before(entries[i].readyAt)
})

// grow by 1
entries = append(entries, waitFor{})
// shift items from the insertion point to the end
copy(entries[insertionIndex+1:], entries[insertionIndex:])
// insert the record
entries[insertionIndex] = entry

knownEntries[entry.data] = entry.readyAt

return entries
}

// findEntryIndex returns the index for an existing entry
func findEntryIndex(entries []waitFor, existingTime time.Time, data t) int {
index := sort.Search(len(entries), func(i int) bool {
return entries[i].readyAt.After(existingTime) || existingTime == entries[i].readyAt
})

// we know this is the earliest possible index, but there could be multiple with the same time
// iterate from here to find the dupe
for ; index < len(entries); index++ {
if entries[index].data == data {
break
}
return
}

return index
heap.Push(q, entry)
knownEntries[entry.data] = entry
}
20 changes: 20 additions & 0 deletions staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package workqueue

import (
"fmt"
"math/rand"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -214,6 +215,25 @@ func TestCopyShifting(t *testing.T) {
}
}

func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
r := rand.New(rand.NewSource(time.Now().Unix()))

fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")

// Add items
for n := 0; n < b.N; n++ {
data := fmt.Sprintf("%d", n)
q.AddAfter(data, time.Duration(r.Int63n(int64(10*time.Minute))))
}

// Exercise item removal as well
fakeClock.Step(11 * time.Minute)
for n := 0; n < b.N; n++ {
_, _ = q.Get()
}
}

func waitForAdded(q DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestRateLimitingQueue(t *testing.T) {
clock: fakeClock,
heartbeat: fakeClock.Tick(maxWait),
stopCh: make(chan struct{}),
waitingForAddCh: make(chan waitFor, 1000),
waitingForAddCh: make(chan *waitFor, 1000),
metrics: newRetryMetrics(""),
}
queue.DelayingInterface = delayingQueue
Expand Down