Skip to content

Commit

Permalink
encoding: refactor Decoder and Encoder as single-function interfaces …
Browse files Browse the repository at this point in the history
…w/ functional adapters
  • Loading branch information
James DeFelice committed Apr 28, 2017
1 parent d0dc16a commit 1999317
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 70 deletions.
4 changes: 2 additions & 2 deletions api/v1/cmd/example-executor/main.go
Expand Up @@ -81,7 +81,7 @@ func run(cfg config.Config) {
}
if err == nil {
// we're officially connected, start decoding events
err = eventLoop(state, resp.Decoder(), handler)
err = eventLoop(state, resp, handler)
disconnected = time.Now()
}
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -144,7 +144,7 @@ func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler)
sendFailedTasks(state)

var e executor.Event
if err = decoder.Invoke(&e); err == nil {
if err = decoder.Decode(&e); err == nil {
err = h.HandleEvent(&e)
}
}
Expand Down
42 changes: 30 additions & 12 deletions api/v1/lib/client.go
Expand Up @@ -13,7 +13,7 @@ type Client interface {
Do(encoding.Marshaler) (Response, error)
}

// ClientFunc is a functional variant (and implementation) of the Client interface
// ClientFunc is a functional adapter of the Client interface
type ClientFunc func(encoding.Marshaler) (Response, error)

// Do implements Client
Expand All @@ -23,29 +23,47 @@ func (cf ClientFunc) Do(m encoding.Marshaler) (Response, error) { return cf(m) }
// Close when they're finished processing the response otherwise there may be connection leaks.
type Response interface {
io.Closer
Decoder() encoding.Decoder
encoding.Decoder
}

// ResponseWrapper delegates to optional handler funcs for invocations of Response methods.
// ResponseDecorator optionally modifies the behavior of a Response
type ResponseDecorator interface {
Decorate(Response) Response
}

// ResponseDecoratorFunc is the functional adapter for ResponseDecorator
type ResponseDecoratorFunc func(Response) Response

func (f ResponseDecoratorFunc) Decorate(r Response) Response { return f(r) }

// CloseFunc is the functional adapter for io.Closer
type CloseFunc func() error

// Close implements io.Closer
func (f CloseFunc) Close() error { return f() }

// ResponseWrapper delegates to optional overrides for invocations of Response methods.
type ResponseWrapper struct {
Response Response
CloseFunc func() error
DecoderFunc func() encoding.Decoder
Response Response
Closer io.Closer
Decoder encoding.Decoder
}

func (wrapper *ResponseWrapper) Close() error {
if wrapper.CloseFunc != nil {
return wrapper.CloseFunc()
if wrapper.Closer != nil {
return wrapper.Closer.Close()
}
if wrapper.Response != nil {
return wrapper.Response.Close()
}
return nil
}

func (wrapper *ResponseWrapper) Decoder() encoding.Decoder {
if wrapper.DecoderFunc != nil {
return wrapper.DecoderFunc()
func (wrapper *ResponseWrapper) Decode(u encoding.Unmarshaler) error {
if wrapper.Decoder != nil {
return wrapper.Decoder.Decode(u)
}
return wrapper.Response.Decoder()
return wrapper.Response.Decode(u)
}

var _ = Response(&ResponseWrapper{})
31 changes: 21 additions & 10 deletions api/v1/lib/encoding/codec.go
Expand Up @@ -62,29 +62,40 @@ type (
json.Unmarshaler
}
// An Encoder encodes a given Marshaler or returns an error in case of failure.
Encoder func(Marshaler) error
Encoder interface {
Encode(Marshaler) error
}

// EncoderFunc is the functional adapter for Encoder
EncoderFunc func(Marshaler) error

// A Decoder decodes a given Unmarshaler or returns an error in case of failure.
Decoder func(Unmarshaler) error
Decoder interface {
Decode(Unmarshaler) error
}

// DecoderFunc is the functional adapter for Decoder
DecoderFunc func(Unmarshaler) error
)

// Encode is an utility method that calls the Encoder itself.
func (e Encoder) Invoke(m Marshaler) error { return e(m) }
// Decode implements the Decoder interface
func (f DecoderFunc) Decode(u Unmarshaler) error { return f(u) }

// Decode is an utility method that calls the Decoder itself.
func (d Decoder) Invoke(u Unmarshaler) error { return d(u) }
// Encode implements the Encoder interface
func (f EncoderFunc) Encode(m Marshaler) error { return f(m) }

// NewProtobufEncoder returns a new Encoder of Calls to Protobuf messages written to
// the given io.Writer.
func NewProtobufEncoder(w io.Writer) Encoder {
enc := proto.NewEncoder(w)
return func(m Marshaler) error { return enc.Encode(m) }
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
}

// NewJSONEncoder returns a new Encoder of Calls to JSON messages written to
// the given io.Writer.
func NewJSONEncoder(w io.Writer) Encoder {
enc := json.NewEncoder(w)
return func(m Marshaler) error { return enc.Encode(m) }
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
}

// NewProtobufDecoder returns a new Decoder of Protobuf messages read from the
Expand All @@ -94,12 +105,12 @@ func NewProtobufDecoder(r framing.Reader) Decoder {
return pb.Unmarshal(b, m.(pb.Message))
}
dec := framing.NewDecoder(r, uf)
return func(u Unmarshaler) error { return dec.Decode(u) }
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
}

// NewJSONDecoder returns a new Decoder of JSON messages read from the
// given io.Reader to Events.
func NewJSONDecoder(r framing.Reader) Decoder {
dec := framing.NewDecoder(r, json.Unmarshal)
return func(u Unmarshaler) error { return dec.Decode(u) }
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
}
4 changes: 2 additions & 2 deletions api/v1/lib/extras/scheduler/controller/controller.go
Expand Up @@ -91,7 +91,7 @@ func processSubscription(config Config, resp mesos.Response, err error) error {
defer resp.Close()
}
if err == nil {
err = eventLoop(config, resp.Decoder())
err = eventLoop(config, resp)
}
return err
}
Expand All @@ -105,7 +105,7 @@ func eventLoop(config Config, eventDecoder encoding.Decoder) (err error) {
}
for err == nil && !config.Context.Done() {
var e scheduler.Event
if err = eventDecoder.Invoke(&e); err == nil {
if err = eventDecoder.Decode(&e); err == nil {
err = h.HandleEvent(&e)
}
}
Expand Down
10 changes: 3 additions & 7 deletions api/v1/lib/httpcli/http.go
Expand Up @@ -55,14 +55,10 @@ type DoFunc func(*http.Request) (*http.Response, error)
// Close when they're finished processing the response otherwise there may be connection leaks.
type Response struct {
io.Closer
encoding.Decoder
Header http.Header

decoder encoding.Decoder
}

// implements mesos.Response
func (r *Response) Decoder() encoding.Decoder { return r.decoder }

// ErrorMapperFunc generates an error for the given response.
type ErrorMapperFunc func(*http.Response) error

Expand Down Expand Up @@ -145,7 +141,7 @@ func (c *Client) Mesos(opts ...RequestOpt) mesos.Client {
// given marshaler and request options.
func (c *Client) BuildRequest(m encoding.Marshaler, opt ...RequestOpt) (*http.Request, error) {
var body bytes.Buffer //TODO(jdef): use a pool to allocate these (and reduce garbage)?
if err := c.codec.NewEncoder(&body).Invoke(m); err != nil {
if err := c.codec.NewEncoder(&body).Encode(m); err != nil {
return nil, err
}

Expand Down Expand Up @@ -191,7 +187,7 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response,
res.Body.Close()
return nil, ProtocolError(fmt.Sprintf("unexpected content type: %q", ct))
}
result.decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body))
result.Decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body))

case http.StatusAccepted:
if debug {
Expand Down
57 changes: 28 additions & 29 deletions api/v1/lib/httpcli/httpsched/state.go
Expand Up @@ -74,39 +74,38 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc {
// Consumers of this package may choose to override default behavior by overwriting the default
// value of this var, but should exercise caution: failure to properly transition to a disconnected
// state may cause subsequent Call operations to fail (without recourse).
var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response {
return &mesos.ResponseWrapper{
Response: resp,
DecoderFunc: disconnectionDecoder(resp.Decoder, disconnect),
}
var DisconnectionDetector = func(disconnect func()) mesos.ResponseDecorator {
return mesos.ResponseDecoratorFunc(func(resp mesos.Response) mesos.Response {
return &mesos.ResponseWrapper{
Response: resp,
Decoder: disconnectionDecoder(resp, disconnect),
}
})
}

func disconnectionDecoder(f func() encoding.Decoder, disconnect func()) func() encoding.Decoder {
return func() encoding.Decoder {
decoder := f()
return func(u encoding.Unmarshaler) (err error) {
err = decoder(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
func disconnectionDecoder(decoder encoding.Decoder, disconnect func()) encoding.Decoder {
return encoding.DecoderFunc(func(u encoding.Unmarshaler) (err error) {
err = decoder.Decode(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
return
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
disconnect()
}
}
return
})
}

func disconnectedFn(state *state) stateFn {
Expand Down Expand Up @@ -166,7 +165,7 @@ func disconnectedFn(state *state) stateFn {

// wrap the response: any errors processing the subscription stream should result in a
// transition to a disconnected state ASAP.
state.resp = DisconnectionDetector(stateResp, transitionToDisconnected)
state.resp = DisconnectionDetector(transitionToDisconnected).Decorate(stateResp)

// (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed
state.caller = &callerTemporary{
Expand Down
16 changes: 8 additions & 8 deletions api/v1/lib/httpcli/httpsched/state_test.go
Expand Up @@ -26,12 +26,11 @@ func TestDisconnectionDecoder(t *testing.T) {

// invoke disconnect upon decoder errors
expected := errors.New("unmarshaler error")
decoder := encoding.Decoder(func(_ encoding.Unmarshaler) error { return expected })
f := func() encoding.Decoder { return decoder }
decoder := encoding.DecoderFunc(func(_ encoding.Unmarshaler) error { return expected })
latch := newLatch()

d := disconnectionDecoder(f, latch.Close)
err := d().Invoke(nil)
d := disconnectionDecoder(decoder, latch.Close)
err := d.Decode(nil)
if err != expected {
t.Errorf("expected %v instead of %v", expected, err)
}
Expand All @@ -43,24 +42,25 @@ func TestDisconnectionDecoder(t *testing.T) {
latch.Reset()
errtype := scheduler.Event_ERROR
event := &scheduler.Event{Type: &errtype}
decoder = encoding.Decoder(func(um encoding.Unmarshaler) error { return nil })
_ = d().Invoke(event)
decoder = encoding.DecoderFunc(func(um encoding.Unmarshaler) error { return nil })
d = disconnectionDecoder(decoder, latch.Close)
_ = d.Decode(event)
if !latch.Closed() {
t.Error("disconnect func was not called")
}

// sanity: non-ERROR event does not trigger disconnect
latch.Reset()
errtype = scheduler.Event_SUBSCRIBED
_ = d().Invoke(event)
_ = d.Decode(event)
if latch.Closed() {
t.Error("disconnect func was unexpectedly called")
}

// non scheduler.Event objects trigger disconnect
latch.Reset()
nonEvent := &scheduler.Call{}
_ = d().Invoke(nonEvent)
_ = d.Decode(nonEvent)
if !latch.Closed() {
t.Error("disconnect func was not called")
}
Expand Down

0 comments on commit 1999317

Please sign in to comment.