Skip to content

Commit

Permalink
Corrected minor race condition in Get()
Browse files Browse the repository at this point in the history
Occasionally, while one process executing on one CPU was calling
`Complete()` and another was calling `Get()`, the one calling `Get()`
would observe the value of the `state` as `PENDING`.

During this time, another process on another CPU would invoke
`Complete()` which would change the state and update the waitgroup as
`Done()`. After this occurred, the earlier goroutine would continue
and invoke `Wait()` on the waitgroup — whose state would thereafter
never change.

This has been corrected, by a two phase approach. First, all access to
the state has been moved to the `atomic` package so it is delegated to
the `CMPXCHG` and `CMOVE` instructions — ensuring that the status of
the state was stable during the examination. Second, the waitgroup has
been replaced with a condition variable which uses the mutex central
to the promise. Callers of the `Get()` method now lock the promise
before entering the wait state (which unlocks the promise), ensuring
the promise MUST be in a stable state in order to wait on it.

Resolves: quantcast#3
  • Loading branch information
ssmccoy committed Feb 15, 2017
1 parent 2bad279 commit 2cc6c85
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 55 deletions.
115 changes: 67 additions & 48 deletions completable.go
Expand Up @@ -3,22 +3,22 @@ package promise
import (
"fmt"
"sync"
"sync/atomic"
)

type State uint8

// Unfortunately there are no atomic operations smaller values than 32
const (
PENDING State = iota
PENDING uint32 = iota
FULFILLED
REJECTED
)

type CompletablePromise struct {
state State
state uint32
cause error
value interface{}
mutex sync.Mutex
waitGroup sync.WaitGroup
cond *sync.Cond
compute func(interface{}) interface{}
handle func(error)
dependencies []Completable
Expand All @@ -27,11 +27,11 @@ type CompletablePromise struct {
func completable(compute func(interface{}) interface{}, handle func(error)) *CompletablePromise {
completable := new(CompletablePromise)

completable.cond = sync.NewCond(&completable.mutex)
completable.compute = compute
completable.handle = handle
completable.state = PENDING
completable.dependencies = make([]Completable, 0)
completable.waitGroup.Add(1)

return completable
}
Expand All @@ -42,21 +42,33 @@ func Promise() Completable {
return completable(func(x interface{}) interface{} { return x }, nil)
}

func (promise *CompletablePromise) State() uint32 {
return atomic.LoadUint32(&promise.state)
}

// Determine if the promise has been resolved.
func (promise *CompletablePromise) Resolved() bool {
return promise.state == FULFILLED
return promise.State() == FULFILLED
}

func (promise *CompletablePromise) Rejected() bool {
return promise.state == REJECTED
return promise.State() == REJECTED
}

// Return the value of the promise, if it was resolved successfully, or return
// the cause of failure if it was not. Block until the promise is either
// completed or rejected.
func (promise *CompletablePromise) Get() (interface{}, error) {
if promise.state == PENDING {
promise.waitGroup.Wait()
if promise.State() == PENDING {
promise.mutex.Lock()

for promise.State() == PENDING {
// wait unlocks its associated mutex (incase you were wondering)
// so we cannot guarantee that the state has actually changed.
promise.cond.Wait()
}

promise.mutex.Unlock()
}

return promise.value, promise.cause
Expand All @@ -73,7 +85,7 @@ func (promise *CompletablePromise) depend(compute func(interface{}) interface{})
// The private version of this is used for `Combine` to call, so that it won't
// attempt to acquire the mutex twice.
func (promise *CompletablePromise) then(compute func(interface{}) interface{}) Thenable {
switch promise.state {
switch promise.State() {
case PENDING:
return promise.depend(compute)
case REJECTED:
Expand All @@ -88,7 +100,7 @@ func (promise *CompletablePromise) then(compute func(interface{}) interface{}) T
// Compose this promise into one which is complete when the following code has
// executed.
func (promise *CompletablePromise) Then(compute func(interface{}) interface{}) Thenable {
switch promise.state {
switch promise.State() {
case PENDING:
promise.mutex.Lock()

Expand All @@ -107,13 +119,13 @@ func (promise *CompletablePromise) Then(compute func(interface{}) interface{}) T
// Compose this promise into another one which handles an upstream error with
// the given handler.
func (promise *CompletablePromise) Catch(handle func(error)) Thenable {
if promise.state == PENDING {
if promise.State() == PENDING {
promise.mutex.Lock()

defer promise.mutex.Unlock()

// Double check now that we have the lock that this is still true.
if promise.state == PENDING {
if promise.State() == PENDING {
rejectable := completable(nil, handle)

promise.dependencies = append(promise.dependencies, rejectable)
Expand All @@ -122,7 +134,7 @@ func (promise *CompletablePromise) Catch(handle func(error)) Thenable {
}
}

if promise.state == REJECTED {
if promise.State() == REJECTED {
handle(promise.cause)

return Rejected(promise.cause)
Expand All @@ -146,33 +158,30 @@ func panicStateComplete(rejected bool) {
}

func (promise *CompletablePromise) complete(value interface{}) interface{} {
// Composing the value (performing any necessary mutations) before actually
// storing it allows this code to guarantee that even a.Then( func() {
// a.Then( ... ) } ) does not cause a deadlock.
//
// a.Then( func() { a.Get() } ) however, is unstoppable.
composed := value

if promise.compute != nil {
// Because this composition function
composed = promise.compute(value)
}

// This should rarely actually be blocking, there's a separate mutex for
// each completable promise and the mutex is only acquired during assembly
// and completion.
promise.mutex.Lock()

defer promise.mutex.Unlock()

if promise.state != PENDING {
panicStateComplete(promise.state == REJECTED)
composed := value

if promise.compute != nil {
// Because this composition function
composed = promise.compute(value)
}

if promise.State() != PENDING {
panicStateComplete(promise.State() == REJECTED)
}

if composed != nil {
promise.value = composed
}

atomic.StoreUint32(&promise.state, FULFILLED)

return composed
}

Expand All @@ -185,16 +194,14 @@ func (promise *CompletablePromise) Complete(value interface{}) {
// Completed promise, meaning they will be satisfied immediately.
composed := promise.complete(value)

// Which means that now the wait group can be notified and each of the
// subsequent promises can be completed. If this is done while the lock is
// held, a deadlock is possible.
promise.waitGroup.Done()
// So now that the condition has been satisified, broadcast to all waiters
// that thie task is now complete. They should be in the `Get()` wait loop,
// above.
promise.cond.Broadcast()

for _, dependency := range promise.dependencies {
dependency.Complete(composed)
}

promise.state = FULFILLED
}

// Reject this promise and all of its dependencies.
Expand All @@ -207,27 +214,33 @@ func (promise *CompletablePromise) Reject(cause error) {

promise.mutex.Lock()

defer promise.mutex.Unlock()

if promise.state != PENDING {
panicStateComplete(promise.state == REJECTED)
if promise.State() != PENDING {
panicStateComplete(promise.State() == REJECTED)
}

promise.cause = cause

atomic.StoreUint32(&promise.state, REJECTED)

promise.mutex.Unlock()

// Unlike the Complete() routine, which executes the transformation
// *before* actually storing the value or transitioning the state, this
// transitions after. The reason for that is two-fold: The return value of
// the handle() callback is not *stored*, and the second is that we want a
// promise accessed from within the Catch() handler to be in a rejected
// state.
if promise.handle != nil {
promise.handle(cause)
}

promise.waitGroup.Done()
// Now that this is all done, notify all of the handlers that yeah, we're
// done.
promise.cond.Broadcast()

for _, dependency := range promise.dependencies {
dependency.Reject(cause)
}

// Due to the fact that this code is a little racey (specifically,
// completed is used as a guard), the order of these assignments is
// important — specifically, the *completed* flag must be *last*.
promise.cause = cause
promise.state = FULFILLED
}

// Combine this promise with another by applying the combinator `create` to the
Expand All @@ -236,12 +249,12 @@ func (promise *CompletablePromise) Reject(cause error) {
// promise which is completed when the returned promise, and this promise, are
// completed...but no sooner.
func (promise *CompletablePromise) Combine(create func(interface{}) Thenable) Thenable {
if promise.state == PENDING {
if promise.State() == PENDING {
promise.mutex.Lock()

defer promise.mutex.Unlock()

if promise.state == PENDING {
if promise.State() == PENDING {
// So, this may seem a little whacky, but what is happening here is
// that seeing as there is presently no value from which to generate
// the new promise, a callback is registered using Then() which
Expand All @@ -250,6 +263,12 @@ func (promise *CompletablePromise) Combine(create func(interface{}) Thenable) Th
// over to the placeholder thus satisfying the request.
placeholder := Promise()

// So, is it possible that Combine() is called, and the promise is
// completed while it's being combined? Should *not* be.
//
// Perhaps all access to promise.state should be atomic. We are
// using the double lock idiom here, after all...

// It's important that the internal then() is used here, because the
// external one allocates a mutex lock. sync.Mutex is not a reentrant lock
// type, unfortunately.
Expand Down
21 changes: 14 additions & 7 deletions promise_test.go
Expand Up @@ -2,11 +2,16 @@ package promise

import (
"errors"
"runtime"
"sync/atomic"
"testing"
"time"
)

func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
}

// Ensure that the basic properties of a promise holds true if the value is
// already resolved.
func TestCompletedPromise(test *testing.T) {
Expand Down Expand Up @@ -69,7 +74,7 @@ func TestReentrantComplete(test *testing.T) {
sumValue, _ := value.(int)

if sumValue != 5 {
test.Fatalf("Expected sum value to be 3")
test.Fatalf("Expected sumValue(%d) to be 5", sumValue)
}

return nil
Expand All @@ -84,7 +89,7 @@ func TestReentrantComplete(test *testing.T) {
return nil
})

a.Complete(3)
a.Complete(2)
}

// Ensure that the basic functions of the Promise API work for values that are
Expand Down Expand Up @@ -166,11 +171,6 @@ const (
)

func TestWaitgroups(test *testing.T) {
go func() {
time.Sleep(5 * time.Second)
test.Fatalf("TestWaitgroups appears to have deadlocked")
}()

promise := Promise()

var counter uint64 = 0
Expand All @@ -194,10 +194,17 @@ func TestWaitgroups(test *testing.T) {
promise.Complete(true)
})()

// Wait for all of the promises created to be gathered over the channel.
// It's important to only mutate the slice in a single goroutine, so that
// happens here.
for i := 0; i < WAITERS; i++ {
waiterPromises = append(waiterPromises, <-waiterChan)
}

// The promises will now be in varying states of completion, observation
// has shown ¼th of the time the final promise will be being completed
// while this Get() method is running, causing a race condition whereby
// Get() never returned.
_, err := All(waiterPromises...).Get()

if err != nil {
Expand Down

0 comments on commit 2cc6c85

Please sign in to comment.