Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encoding: refactor Decoder and Encoder as single-function interfaces w/ functional adapters #299

Merged
merged 1 commit into from
May 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func run(cfg config.Config) {
}
if err == nil {
// we're officially connected, start decoding events
err = eventLoop(state, resp.Decoder(), handler)
err = eventLoop(state, resp, handler)
disconnected = time.Now()
}
if err != nil && err != io.EOF {
Expand Down Expand Up @@ -144,7 +144,7 @@ func eventLoop(state *internalState, decoder encoding.Decoder, h events.Handler)
sendFailedTasks(state)

var e executor.Event
if err = decoder.Invoke(&e); err == nil {
if err = decoder.Decode(&e); err == nil {
err = h.HandleEvent(&e)
}
}
Expand Down
42 changes: 30 additions & 12 deletions api/v1/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Client interface {
Do(encoding.Marshaler) (Response, error)
}

// ClientFunc is a functional variant (and implementation) of the Client interface
// ClientFunc is a functional adapter of the Client interface
type ClientFunc func(encoding.Marshaler) (Response, error)

// Do implements Client
Expand All @@ -23,29 +23,47 @@ func (cf ClientFunc) Do(m encoding.Marshaler) (Response, error) { return cf(m) }
// Close when they're finished processing the response otherwise there may be connection leaks.
type Response interface {
io.Closer
Decoder() encoding.Decoder
encoding.Decoder
}

// ResponseWrapper delegates to optional handler funcs for invocations of Response methods.
// ResponseDecorator optionally modifies the behavior of a Response
type ResponseDecorator interface {
Decorate(Response) Response
}

// ResponseDecoratorFunc is the functional adapter for ResponseDecorator
type ResponseDecoratorFunc func(Response) Response

func (f ResponseDecoratorFunc) Decorate(r Response) Response { return f(r) }

// CloseFunc is the functional adapter for io.Closer
type CloseFunc func() error

// Close implements io.Closer
func (f CloseFunc) Close() error { return f() }

// ResponseWrapper delegates to optional overrides for invocations of Response methods.
type ResponseWrapper struct {
Response Response
CloseFunc func() error
DecoderFunc func() encoding.Decoder
Response Response
Closer io.Closer
Decoder encoding.Decoder
}

func (wrapper *ResponseWrapper) Close() error {
if wrapper.CloseFunc != nil {
return wrapper.CloseFunc()
if wrapper.Closer != nil {
return wrapper.Closer.Close()
}
if wrapper.Response != nil {
return wrapper.Response.Close()
}
return nil
}

func (wrapper *ResponseWrapper) Decoder() encoding.Decoder {
if wrapper.DecoderFunc != nil {
return wrapper.DecoderFunc()
func (wrapper *ResponseWrapper) Decode(u encoding.Unmarshaler) error {
if wrapper.Decoder != nil {
return wrapper.Decoder.Decode(u)
}
return wrapper.Response.Decoder()
return wrapper.Response.Decode(u)
}

var _ = Response(&ResponseWrapper{})
31 changes: 21 additions & 10 deletions api/v1/lib/encoding/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,40 @@ type (
json.Unmarshaler
}
// An Encoder encodes a given Marshaler or returns an error in case of failure.
Encoder func(Marshaler) error
Encoder interface {
Encode(Marshaler) error
}

// EncoderFunc is the functional adapter for Encoder
EncoderFunc func(Marshaler) error

// A Decoder decodes a given Unmarshaler or returns an error in case of failure.
Decoder func(Unmarshaler) error
Decoder interface {
Decode(Unmarshaler) error
}

// DecoderFunc is the functional adapter for Decoder
DecoderFunc func(Unmarshaler) error
)

// Encode is an utility method that calls the Encoder itself.
func (e Encoder) Invoke(m Marshaler) error { return e(m) }
// Decode implements the Decoder interface
func (f DecoderFunc) Decode(u Unmarshaler) error { return f(u) }

// Decode is an utility method that calls the Decoder itself.
func (d Decoder) Invoke(u Unmarshaler) error { return d(u) }
// Encode implements the Encoder interface
func (f EncoderFunc) Encode(m Marshaler) error { return f(m) }

// NewProtobufEncoder returns a new Encoder of Calls to Protobuf messages written to
// the given io.Writer.
func NewProtobufEncoder(w io.Writer) Encoder {
enc := proto.NewEncoder(w)
return func(m Marshaler) error { return enc.Encode(m) }
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
}

// NewJSONEncoder returns a new Encoder of Calls to JSON messages written to
// the given io.Writer.
func NewJSONEncoder(w io.Writer) Encoder {
enc := json.NewEncoder(w)
return func(m Marshaler) error { return enc.Encode(m) }
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
}

// NewProtobufDecoder returns a new Decoder of Protobuf messages read from the
Expand All @@ -94,12 +105,12 @@ func NewProtobufDecoder(r framing.Reader) Decoder {
return pb.Unmarshal(b, m.(pb.Message))
}
dec := framing.NewDecoder(r, uf)
return func(u Unmarshaler) error { return dec.Decode(u) }
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
}

// NewJSONDecoder returns a new Decoder of JSON messages read from the
// given io.Reader to Events.
func NewJSONDecoder(r framing.Reader) Decoder {
dec := framing.NewDecoder(r, json.Unmarshal)
return func(u Unmarshaler) error { return dec.Decode(u) }
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
}
4 changes: 2 additions & 2 deletions api/v1/lib/extras/scheduler/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func processSubscription(config Config, resp mesos.Response, err error) error {
defer resp.Close()
}
if err == nil {
err = eventLoop(config, resp.Decoder())
err = eventLoop(config, resp)
}
return err
}
Expand All @@ -105,7 +105,7 @@ func eventLoop(config Config, eventDecoder encoding.Decoder) (err error) {
}
for err == nil && !config.Context.Done() {
var e scheduler.Event
if err = eventDecoder.Invoke(&e); err == nil {
if err = eventDecoder.Decode(&e); err == nil {
err = h.HandleEvent(&e)
}
}
Expand Down
10 changes: 3 additions & 7 deletions api/v1/lib/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,10 @@ type DoFunc func(*http.Request) (*http.Response, error)
// Close when they're finished processing the response otherwise there may be connection leaks.
type Response struct {
io.Closer
encoding.Decoder
Header http.Header

decoder encoding.Decoder
}

// implements mesos.Response
func (r *Response) Decoder() encoding.Decoder { return r.decoder }

// ErrorMapperFunc generates an error for the given response.
type ErrorMapperFunc func(*http.Response) error

Expand Down Expand Up @@ -145,7 +141,7 @@ func (c *Client) Mesos(opts ...RequestOpt) mesos.Client {
// given marshaler and request options.
func (c *Client) BuildRequest(m encoding.Marshaler, opt ...RequestOpt) (*http.Request, error) {
var body bytes.Buffer //TODO(jdef): use a pool to allocate these (and reduce garbage)?
if err := c.codec.NewEncoder(&body).Invoke(m); err != nil {
if err := c.codec.NewEncoder(&body).Encode(m); err != nil {
return nil, err
}

Expand Down Expand Up @@ -191,7 +187,7 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response,
res.Body.Close()
return nil, ProtocolError(fmt.Sprintf("unexpected content type: %q", ct))
}
result.decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body))
result.Decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body))

case http.StatusAccepted:
if debug {
Expand Down
57 changes: 28 additions & 29 deletions api/v1/lib/httpcli/httpsched/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,39 +74,38 @@ func maybeLogged(f httpcli.DoFunc) httpcli.DoFunc {
// Consumers of this package may choose to override default behavior by overwriting the default
// value of this var, but should exercise caution: failure to properly transition to a disconnected
// state may cause subsequent Call operations to fail (without recourse).
var DisconnectionDetector = func(resp mesos.Response, disconnect func()) mesos.Response {
return &mesos.ResponseWrapper{
Response: resp,
DecoderFunc: disconnectionDecoder(resp.Decoder, disconnect),
}
var DisconnectionDetector = func(disconnect func()) mesos.ResponseDecorator {
return mesos.ResponseDecoratorFunc(func(resp mesos.Response) mesos.Response {
return &mesos.ResponseWrapper{
Response: resp,
Decoder: disconnectionDecoder(resp, disconnect),
}
})
}

func disconnectionDecoder(f func() encoding.Decoder, disconnect func()) func() encoding.Decoder {
return func() encoding.Decoder {
decoder := f()
return func(u encoding.Unmarshaler) (err error) {
err = decoder(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
func disconnectionDecoder(decoder encoding.Decoder, disconnect func()) encoding.Decoder {
return encoding.DecoderFunc(func(u encoding.Unmarshaler) (err error) {
err = decoder.Decode(u)
if err != nil {
disconnect()
return
}
switch e := u.(type) {
case (*scheduler.Event):
if e.GetType() == scheduler.Event_ERROR {
// the mesos scheduler API recommends that scheduler implementations
// resubscribe in this case. we initiate the disconnection here because
// it is assumed to be convenient for most framework implementations.
disconnect()
}
return
default:
// sanity check: this should never happen in practice.
err = httpcli.ProtocolError(
fmt.Sprintf("unexpected object on subscription event stream: %v", e))
disconnect()
}
}
return
})
}

func disconnectedFn(state *state) stateFn {
Expand Down Expand Up @@ -166,7 +165,7 @@ func disconnectedFn(state *state) stateFn {

// wrap the response: any errors processing the subscription stream should result in a
// transition to a disconnected state ASAP.
state.resp = DisconnectionDetector(stateResp, transitionToDisconnected)
state.resp = DisconnectionDetector(transitionToDisconnected).Decorate(stateResp)

// (e) else prepare callerTemporary w/ special header, return connectedFn since we're now subscribed
state.caller = &callerTemporary{
Expand Down
16 changes: 8 additions & 8 deletions api/v1/lib/httpcli/httpsched/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ func TestDisconnectionDecoder(t *testing.T) {

// invoke disconnect upon decoder errors
expected := errors.New("unmarshaler error")
decoder := encoding.Decoder(func(_ encoding.Unmarshaler) error { return expected })
f := func() encoding.Decoder { return decoder }
decoder := encoding.DecoderFunc(func(_ encoding.Unmarshaler) error { return expected })
latch := newLatch()

d := disconnectionDecoder(f, latch.Close)
err := d().Invoke(nil)
d := disconnectionDecoder(decoder, latch.Close)
err := d.Decode(nil)
if err != expected {
t.Errorf("expected %v instead of %v", expected, err)
}
Expand All @@ -43,24 +42,25 @@ func TestDisconnectionDecoder(t *testing.T) {
latch.Reset()
errtype := scheduler.Event_ERROR
event := &scheduler.Event{Type: &errtype}
decoder = encoding.Decoder(func(um encoding.Unmarshaler) error { return nil })
_ = d().Invoke(event)
decoder = encoding.DecoderFunc(func(um encoding.Unmarshaler) error { return nil })
d = disconnectionDecoder(decoder, latch.Close)
_ = d.Decode(event)
if !latch.Closed() {
t.Error("disconnect func was not called")
}

// sanity: non-ERROR event does not trigger disconnect
latch.Reset()
errtype = scheduler.Event_SUBSCRIBED
_ = d().Invoke(event)
_ = d.Decode(event)
if latch.Closed() {
t.Error("disconnect func was unexpectedly called")
}

// non scheduler.Event objects trigger disconnect
latch.Reset()
nonEvent := &scheduler.Call{}
_ = d().Invoke(nonEvent)
_ = d.Decode(nonEvent)
if !latch.Closed() {
t.Error("disconnect func was not called")
}
Expand Down