Skip to content

Commit

Permalink
Merge 0a76451 into ccb4d6e
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Apr 28, 2017
2 parents ccb4d6e + 0a76451 commit a2dc650
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 23 deletions.
50 changes: 27 additions & 23 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,36 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc {
// state may cause subsequent Call operations to fail (without recourse).
var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response {
return &mesos.ResponseWrapper{
Response: resp,
DecoderFunc: func() encoding.Decoder {
decoder := resp.Decoder()
return func(u encoding.Unmarshaler) (err error) {
err = decoder(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
Response: resp,
DecoderFunc: disconnectionDecoder(resp.Decoder, disconnect),
}
}

func disconnectionDecoder(f func() encoding.Decoder, disconnect func()) func() encoding.Decoder {
return func() encoding.Decoder {
decoder := f()
return func(u encoding.Unmarshaler) (err error) {
err = decoder(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
return
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
disconnect()
}
},
return
}
}
}

Expand Down
67 changes: 67 additions & 0 deletions api/v1/lib/httpcli/httpsched/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package httpsched

import (
"errors"
"testing"

"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/scheduler"
)

type latch struct{ line chan struct{} }

func newLatch() *latch { return &latch{make(chan struct{})} }
func (l *latch) Reset() { l.line = make(chan struct{}) }
func (l *latch) Close() { close(l.line) }
func (l *latch) Closed() (result bool) {
select {
case <-l.line:
result = true
default:
}
return
}

func TestDisconnectionDecoder(t *testing.T) {

// invoke disconnect upon decoder errors
expected := errors.New("unmarshaler error")
decoder := encoding.Decoder(func(_ encoding.Unmarshaler) error { return expected })
f := func() encoding.Decoder { return decoder }
latch := newLatch()

d := disconnectionDecoder(f, latch.Close)
err := d().Invoke(nil)
if err != expected {
t.Errorf("expected %v instead of %v", expected, err)
}
if !latch.Closed() {
t.Error("disconnect func was not called")
}

// ERROR event triggers disconnect
latch.Reset()
errtype := scheduler.Event_ERROR
event := &scheduler.Event{Type: &errtype}
decoder = encoding.Decoder(func(um encoding.Unmarshaler) error { return nil })
_ = d().Invoke(event)
if !latch.Closed() {
t.Error("disconnect func was not called")
}

// sanity: non-ERROR event does not trigger disconnect
latch.Reset()
errtype = scheduler.Event_SUBSCRIBED
_ = d().Invoke(event)
if latch.Closed() {
t.Error("disconnect func was unexpectedly called")
}

// non scheduler.Event objects trigger disconnect
latch.Reset()
nonEvent := &scheduler.Call{}
_ = d().Invoke(nonEvent)
if !latch.Closed() {
t.Error("disconnect func was not called")
}
}

0 comments on commit a2dc650

Please sign in to comment.