Skip to content

Commit

Permalink
codec: refactor API, Codec is an interface, introduce MediaType
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 21, 2017
1 parent e7ac2fc commit ab70592
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 66 deletions.
2 changes: 1 addition & 1 deletion api/v1/cmd/example-executor/main.go
Expand Up @@ -54,7 +54,7 @@ func run(cfg config.Config) {
state = &internalState{
cli: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(&encoding.ProtobufCodec),
httpcli.Codec(encoding.MediaTypeProtobuf.Codec()),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
),
callOptions: executor.CallOptions{
Expand Down
2 changes: 1 addition & 1 deletion api/v1/cmd/example-scheduler/app/config.go
Expand Up @@ -86,7 +86,7 @@ func NewConfig() Config {
user: env("FRAMEWORK_USER", "root"),
name: env("FRAMEWORK_NAME", "example"),
url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"),
codec: codec{Codec: &encoding.ProtobufCodec},
codec: codec{Codec: encoding.MediaTypeProtobuf.Codec()},
timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"),
failoverTimeout: envDuration("SCHEDULER_FAILOVER_TIMEOUT", "1000h"),
checkpoint: true,
Expand Down
19 changes: 9 additions & 10 deletions api/v1/cmd/example-scheduler/app/flags.go
Expand Up @@ -25,18 +25,17 @@ func (u *URL) Set(value string) error {
return nil
}

type codec struct{ *encoding.Codec }
type codec struct{ encoding.Codec }

func (c *codec) Set(value string) (err error) {
switch strings.ToLower(value) {
case "protobuf":
c.Codec = &encoding.ProtobufCodec
case "json":
c.Codec = &encoding.JSONCodec
default:
err = fmt.Errorf("bad codec %q", value)
func (c *codec) Set(value string) error {
v := strings.ToLower(value)
for _, codec := range encoding.DefaultCodecs {
if v == codec.Name() {
c.Codec = codec
return nil
}
}
return
return fmt.Errorf("bad codec %q", value)
}

type Labels []mesos.Label
Expand Down
122 changes: 74 additions & 48 deletions api/v1/lib/encoding/codec.go
Expand Up @@ -2,6 +2,7 @@ package encoding

import (
"encoding/json"
"fmt"
"io"

"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
Expand All @@ -10,53 +11,78 @@ import (
pb "github.com/gogo/protobuf/proto"
)

type MediaType string

const (
// ProtobufMediaType is the Protobuf serialization format media type.
ProtobufMediaType = "application/x-protobuf"
// JSONMediaType is the JSON serialiation format media type.
JSONMediaType = "application/json"
// MediaTypeProtobuf is the Protobuf serialization format media type.
MediaTypeProtobuf = MediaType("application/x-protobuf")
// MediaTypeJSON is the JSON serialiation format media type.
MediaTypeJSON = MediaType("application/json")
)

var (
// ProtobufCodec is the Mesos scheduler API Protobufs codec.
ProtobufCodec = Codec{
Name: "protobuf",
MediaTypes: [2]string{ProtobufMediaType, ProtobufMediaType},
NewEncoder: NewProtobufEncoder,
NewDecoder: NewProtobufDecoder,
var DefaultCodecs = func() map[MediaType]Codec {
m := make(map[MediaType]Codec)
m[MediaTypeProtobuf] = Codec(&codec{
name: "protobuf",
mediaTypes: [2]MediaType{MediaTypeProtobuf, MediaTypeProtobuf},
newEncoder: NewProtobufEncoder,
newDecoder: NewProtobufDecoder,
})
m[MediaTypeJSON] = Codec(&codec{
name: "json",
mediaTypes: [2]MediaType{MediaTypeJSON, MediaTypeJSON},
newEncoder: NewJSONEncoder,
newDecoder: NewJSONDecoder,
})

return m
}()

// Codec returns the configured Codec for the media type, or nil if no such Codec has been configured.
func (m MediaType) Codec() Codec { return DefaultCodecs[m] }

// ContentType returns the HTTP Content-Type associated with the MediaType
func (m MediaType) ContentType() string { return string(m) }

type (
// A Codec composes encoding and decoding of a serialization format.
Codec interface {
fmt.Stringer
Name() string
RequestType() MediaType
ResponseType() MediaType
// NewEncoder returns a new encoder for the defined media type.
NewEncoder(io.Writer) Encoder
// NewDecoder returns a new decoder for the defined media type.
NewDecoder(framing.Reader) Decoder
}
// JSONCodec is the Mesos scheduler API JSON codec.
JSONCodec = Codec{
Name: "json",
MediaTypes: [2]string{JSONMediaType, JSONMediaType},
NewEncoder: NewJSONEncoder,
NewDecoder: NewJSONDecoder,

codec struct {
// Name holds the codec name.
name string
// MediaTypes holds the media types of the codec encoding and decoding
// formats, respectively.
mediaTypes [2]MediaType
// NewEncoder returns a new encoder for the defined media type.
newEncoder func(io.Writer) EncoderFunc
// NewDecoder returns a new decoder for the defined media type.
newDecoder func(framing.Reader) DecoderFunc
}
)

// A Codec composes encoding and decoding of a serialization format.
type Codec struct {
// Name holds the codec name.
Name string
// MediaTypes holds the media types of the codec encoding and decoding
// formats, respectively.
MediaTypes [2]string
// NewEncoder returns a new encoder for the defined media type.
NewEncoder func(io.Writer) Encoder
// NewDecoder returns a new decoder for the defined media type.
NewDecoder func(framing.Reader) Decoder
}

// String implements the fmt.Stringer interface.
func (c *Codec) String() string {
func (c *codec) String() string {
if c == nil {
return ""
}
return c.Name
return c.name
}

func (c *Codec) RequestContentType() string { return c.MediaTypes[0] }
func (c *Codec) ResponseContentType() string { return c.MediaTypes[1] }
func (c *codec) Name() string { return c.name }
func (c *codec) RequestType() MediaType { return c.mediaTypes[0] }
func (c *codec) ResponseType() MediaType { return c.mediaTypes[1] }
func (c *codec) NewEncoder(w io.Writer) Encoder { return c.newEncoder(w) }
func (c *codec) NewDecoder(r framing.Reader) Decoder { return c.newDecoder(r) }

type (
// Marshaler composes the supported marshaling formats.
Expand Down Expand Up @@ -94,31 +120,31 @@ func (f EncoderFunc) Encode(m Marshaler) error { return f(m) }

// NewProtobufEncoder returns a new Encoder of Calls to Protobuf messages written to
// the given io.Writer.
func NewProtobufEncoder(w io.Writer) Encoder {
func NewProtobufEncoder(w io.Writer) EncoderFunc {
enc := proto.NewEncoder(w)
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
return func(m Marshaler) error { return enc.Encode(m) }
}

// NewJSONEncoder returns a new Encoder of Calls to JSON messages written to
// the given io.Writer.
func NewJSONEncoder(w io.Writer) Encoder {
func NewJSONEncoder(w io.Writer) EncoderFunc {
enc := json.NewEncoder(w)
return EncoderFunc(func(m Marshaler) error { return enc.Encode(m) })
return func(m Marshaler) error { return enc.Encode(m) }
}

// NewProtobufDecoder returns a new Decoder of Protobuf messages read from the
// given io.Reader to Events.
func NewProtobufDecoder(r framing.Reader) Decoder {
uf := func(b []byte, m interface{}) error {
return pb.Unmarshal(b, m.(pb.Message))
}
dec := framing.NewDecoder(r, uf)
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
// given framing.Reader to Events.
func NewProtobufDecoder(r framing.Reader) DecoderFunc {
var (
uf = func(b []byte, m interface{}) error { return pb.Unmarshal(b, m.(pb.Message)) }
dec = framing.NewDecoder(r, uf)
)
return func(u Unmarshaler) error { return dec.Decode(u) }
}

// NewJSONDecoder returns a new Decoder of JSON messages read from the
// given io.Reader to Events.
func NewJSONDecoder(r framing.Reader) Decoder {
// given framing.Reader to Events.
func NewJSONDecoder(r framing.Reader) DecoderFunc {
dec := framing.NewDecoder(r, json.Unmarshal)
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
return func(u Unmarshaler) error { return dec.Decode(u) }
}
12 changes: 6 additions & 6 deletions api/v1/lib/httpcli/http.go
Expand Up @@ -66,15 +66,15 @@ type Client struct {
url string
do DoFunc
header http.Header
codec *encoding.Codec
codec encoding.Codec
errorMapper ErrorMapperFunc
requestOpts []RequestOpt
buildRequest func(encoding.Marshaler, ...RequestOpt) (*http.Request, error)
handleResponse ResponseHandler
}

var (
DefaultCodec = &encoding.ProtobufCodec
DefaultCodec = encoding.MediaTypeProtobuf.Codec()
DefaultHeaders = http.Header{}

// DefaultConfigOpt represents the default client config options.
Expand Down Expand Up @@ -166,8 +166,8 @@ func (c *Client) BuildRequest(m encoding.Marshaler, opt ...RequestOpt) (*http.Re
return helper.
withOptions(c.requestOpts, opt).
withHeaders(c.header).
withHeader("Content-Type", c.codec.RequestContentType()).
withHeader("Accept", c.codec.ResponseContentType()).
withHeader("Content-Type", c.codec.RequestType().ContentType()).
withHeader("Accept", c.codec.ResponseType().ContentType()).
Request, nil
}

Expand Down Expand Up @@ -195,7 +195,7 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response,
log.Println("request OK, decoding response")
}
ct := res.Header.Get("Content-Type")
if ct != c.codec.ResponseContentType() {
if ct != c.codec.ResponseType().ContentType() {
res.Body.Close()
return nil, ProtocolError(fmt.Sprintf("unexpected content type: %q", ct))
}
Expand Down Expand Up @@ -265,7 +265,7 @@ func Do(do DoFunc) Opt {
}

// Codec returns an Opt that sets a Client's Codec.
func Codec(codec *encoding.Codec) Opt {
func Codec(codec encoding.Codec) Opt {
return func(c *Client) Opt {
old := c.codec
c.codec = codec
Expand Down

0 comments on commit ab70592

Please sign in to comment.