-
Notifications
You must be signed in to change notification settings - Fork 38.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uses container/heap for DelayingQueue #45070
Conversation
Hi @alindeman. Thanks for your PR. I'm waiting for a kubernetes member to verify that this patch is reasonable to test. If it is, they should reply with Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
@k8s-bot ok to test |
@k8s-bot gce etcd3 e2e test this |
/assign @deads2k |
return item | ||
} | ||
func (pq waitForPriorityQueue) Peek() interface{} { | ||
return pq[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you write up an example with the index field filled in? My cursory look at push, pop, and peek seem to suggest that we peek at pq[0], pop from pq[n-1], and push onto the end.
I like the change, but I'm confused by what appears to be an inconsistent push, pop, and peek. |
Thanks for the review @deads2k. The structure of the heap is a little confusing at first, I agree. But it is documented that the minimum element is always at index 0:
How do you best suggest I clarify this code? By example, do you mean an example test? I'd be happy to work one up if so. |
readyEntries := 0 | ||
for _, entry := range q.waitingForAdd { | ||
for waitingForQueue.Len() > 0 { | ||
entry := waitingForQueue.Peek().(*waitFor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So you peek at the [0] element here (which I think is the "earliest") and then you add the popped [n-1] element which I think is the latest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deads2k Gotcha, I agree this initially looks confusing but I think it's correct. I'll try to explain it as best I can here, but let me know if you think I can clarify or make the code more clear.
- The container/heap docs state that "The minimum element in the tree is the root, at index 0."
- But the container/heap#Interface docs state that the
Pop
method should "remove and return element Len() - 1." and that thePush
method should "add x as element Len()"
Basically, the 0th element of the heap is the least element (in this case, the element with the timestamp that will occur next in time). Therefore that's the element we want to Peek
. In the Pop
case, though, the container/heap
will have moved the minimum element to be temporarily at Len() - 1
, so we pop off from that end. In the Push
case, we add the new element to the 'end' of the heap and the element is percolated up to its correct position by container/heap
.
I've created a simple integer-based heap on the playground that might demonstrate this more clearly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like this comment attached to the peek, push, pop method area.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@deads2k Sounds good. I've added a commit with some documentation. How do you think it reads?
this lgtm @liggitt want a look? |
pq[i].index = i | ||
pq[j].index = j | ||
} | ||
func (pq *waitForPriorityQueue) Push(x interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc that this is for the heap package to call, and heap.Push
should be called instead?
item.index = n | ||
*pq = append(*pq, item) | ||
} | ||
func (pq *waitForPriorityQueue) Pop() interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc that this is for the heap package to call, and heap.Pop
should be called instead (especially since it depends on the heap.Pop swap to make sense)
Thanks for the review @liggitt. I've now added some documentation to the functions as well. What do you think? |
|
||
for n := 0; n < b.N; n++ { | ||
data := fmt.Sprintf("%d", n) | ||
q.AddAfter(data, time.Duration(r.Int63n(int64(10*time.Minute)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see removal (add N items, pop N items, etc) exercised as well... the new impl must sweep every pending item on every Pop() to update indices...
for q.Len() > 0 { | ||
_, _ = q.Get() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, curious what the effect on the benchmark was (mind updating the description)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liggitt 👍 I'm glad you brought it up. I've updated the original description with the new benchmark. It appears to be pretty negligible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great, thanks
f5e632a
to
51dbb74
Compare
@k8s-bot pull-kubernetes-federation-e2e-gce test this |
/lgtm |
@k8s-bot pull-kubernetes-federation-e2e-gce test this |
1 similar comment
@k8s-bot pull-kubernetes-federation-e2e-gce test this |
@deads2k Thanks, looks like we're fully green now. Let me know if you have any other feedback. |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alindeman, deads2k, liggitt
Needs approval from an approver in each of these OWNERS Files:
You can indicate your approval by writing |
Automatic merge from submit-queue |
The current implementation of DelayingQueue doesn't perform very well when a large number of items (at random delays) are inserted. The original authors seemed to be aware of this and noted it in a
TODO
comment. This is my attempt at switching the implementation to use a priority queue based oncontainer/heap
.Benchmarks from before the change:
After:
Comparison:
I also find the
container/heap
-based code a bit more easy to understand. The implementation of the PriorityQueue is based on the documentation forcontainer/heap
.Feedback definitely welcomed. This is one of my first contributions.