Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESOS: fix race condition in contrib/mesos/pkg/queue/delay #24916

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 26 additions & 2 deletions contrib/mesos/pkg/queue/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package queue

import (
"container/heap"
"runtime"
"sync"
"time"

Expand Down Expand Up @@ -138,6 +139,29 @@ func (q *DelayQueue) Pop() interface{} {
}, nil)
}

func finishWaiting(cond *sync.Cond, waitFinished <-chan struct{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pretty ugly. But I don't know a better solution right now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree. I'd really like to have something cleaner, but this should stop the
bleeding.

On Fri, Apr 29, 2016 at 8:46 AM, Dr. Stefan Schimanski <
notifications@github.com> wrote:

In contrib/mesos/pkg/queue/delay.go
#24916 (comment)
:

@@ -138,6 +139,29 @@ func (q *DelayQueue) Pop() interface{} {
}, nil)
}

+func finishWaiting(cond *sync.Cond, waitFinished <-chan struct{}) {

this is pretty ugly. But I don't know a better solution right now.


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
https://github.com/kubernetes/kubernetes/pull/24916/files/a74573277f81565b3aeff8c5d4bfe2a3110c1ccd#r61570968

runtime.Gosched()
select {
// avoid creating a timer if we can help it...
case <-waitFinished:
return
default:
const spinTimeout = 100 * time.Millisecond
t := time.NewTimer(spinTimeout)
defer t.Stop()
for {
runtime.Gosched()
cond.Broadcast()
select {
case <-waitFinished:
return
case <-t.C:
t.Reset(spinTimeout)
}
}
}
}

// returns a non-nil value from the queue, or else nil if/when cancelled; if cancel
// is nil then cancellation is disabled and this func must return a non-nil value.
func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{} {
Expand All @@ -164,6 +188,7 @@ func (q *DelayQueue) pop(next func() *qitem, cancel <-chan struct{}) interface{}
select {
case <-cancel:
item.readd(item)
finishWaiting(&q.cond, ch)
return nil
case <-ch:
// we may no longer have the earliest deadline, re-try
Expand Down Expand Up @@ -353,8 +378,7 @@ func (q *DelayFIFO) pop(cancel <-chan struct{}) interface{} {
// we may not have the lock yet, so
// broadcast to abort Wait, then
// return after lock re-acquisition
q.cond().Broadcast()
<-signal
finishWaiting(q.cond(), signal)
return nil
case <-signal:
// we have the lock, re-check
Expand Down