diff --git a/pkg/deferred/deferred.go b/pkg/deferred/deferred.go index d843c7b7..60f06f79 100644 --- a/pkg/deferred/deferred.go +++ b/pkg/deferred/deferred.go @@ -18,9 +18,17 @@ import ( ) var ( - globalDeferreds = &deferreds{} + globalDeferreds = newDeferreds() ) +func newDeferreds() *deferreds { + d := &deferreds{ + serial: 1, + } + d.serialCond = sync.NewCond(&d.serialMu) + return d +} + // Register schedules an action to be performed later, with the result // sent to `resolver`, using the global deferred scheduler. func Register(p performFunc, r resolver) Serial { @@ -37,13 +45,38 @@ func Wait() { // JavaScript. type Serial uint64 +// To enforce determinism, we resolve deferred in the same order they are +// created. This is done through resolvedSerial that stores what was the last +// deferred resolved and we use a sync.Cond to handle synchronization between +// goroutines servicing the deferred. type deferreds struct { - serialMu sync.Mutex - serial Serial + serialMu sync.Mutex + serial Serial + serialCond *sync.Cond + resolvedSerial Serial outstanding sync.WaitGroup } +func (d *deferreds) waitForSerial(s Serial) { + d.serialMu.Lock() + defer d.serialMu.Unlock() + + for { + if d.resolvedSerial == s { + return + } + d.serialCond.Wait() + } +} + +func (d *deferreds) serialResolved(s Serial) { + d.serialMu.Lock() + d.resolvedSerial = s + d.serialMu.Unlock() + d.serialCond.Broadcast() +} + // responder is the interface for a deferred request to use to send // its response. type resolver interface { @@ -63,8 +96,16 @@ func (d *deferreds) Register(perform performFunc, r resolver) Serial { d.serialMu.Unlock() d.outstanding.Add(1) go func(s Serial) { - defer d.outstanding.Done() + defer func() { + d.serialResolved(s) + d.outstanding.Done() + }() + b, err := perform() + + // Wait for the serial-1 goroutine to be resolved. + d.waitForSerial(s - 1) + if err != nil { r.Error(s, err) return