Skip to content

Commit

Permalink
encoding: API cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Sep 11, 2017
1 parent c5d4a73 commit 93a0891
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 105 deletions.
3 changes: 2 additions & 1 deletion api/v1/cmd/example-executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/backoff"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
"github.com/mesos/mesos-go/api/v1/lib/executor"
"github.com/mesos/mesos-go/api/v1/lib/executor/calls"
"github.com/mesos/mesos-go/api/v1/lib/executor/config"
Expand Down Expand Up @@ -54,7 +55,7 @@ func run(cfg config.Config) {
state = &internalState{
cli: httpcli.New(
httpcli.Endpoint(apiURL.String()),
httpcli.Codec(encoding.MediaTypeProtobuf.Codec()),
httpcli.Codec(codecs.ByMediaType[codecs.MediaTypeProtobuf]),
httpcli.Do(httpcli.With(httpcli.Timeout(httpTimeout))),
),
callOptions: executor.CallOptions{
Expand Down
4 changes: 2 additions & 2 deletions api/v1/cmd/example-scheduler/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/mesos/mesos-go/api/v1/cmd"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
)

type Config struct {
Expand Down Expand Up @@ -86,7 +86,7 @@ func NewConfig() Config {
user: env("FRAMEWORK_USER", "root"),
name: env("FRAMEWORK_NAME", "example"),
url: env("MESOS_MASTER_HTTP", "http://:5050/api/v1/scheduler"),
codec: codec{Codec: encoding.MediaTypeProtobuf.Codec()},
codec: codec{Codec: codecs.ByMediaType[codecs.MediaTypeProtobuf]},
timeout: envDuration("MESOS_CONNECT_TIMEOUT", "20s"),
failoverTimeout: envDuration("SCHEDULER_FAILOVER_TIMEOUT", "1000h"),
checkpoint: true,
Expand Down
3 changes: 2 additions & 1 deletion api/v1/cmd/example-scheduler/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
)

var (
Expand All @@ -29,7 +30,7 @@ type codec struct{ encoding.Codec }

func (c *codec) Set(value string) error {
v := strings.ToLower(value)
for _, codec := range encoding.DefaultCodecs {
for _, codec := range codecs.ByMediaType {
if v == codec.Name {
c.Codec = codec
return nil
Expand Down
33 changes: 33 additions & 0 deletions api/v1/lib/encoding/codecs/codecs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package codecs

import (
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/json"
"github.com/mesos/mesos-go/api/v1/lib/encoding/proto"
)

const (
// MediaTypeProtobuf is the Protobuf serialization format media type.
MediaTypeProtobuf = encoding.MediaType("application/x-protobuf")
// MediaTypeJSON is the JSON serialiation format media type.
MediaTypeJSON = encoding.MediaType("application/json")

NameProtobuf = "protobuf"
NameJSON = "json"
)

// ByMediaType are pre-configured default Codecs, ready to use OOTB
var ByMediaType = map[encoding.MediaType]encoding.Codec{
MediaTypeProtobuf: encoding.Codec{
Name: NameProtobuf,
Type: MediaTypeProtobuf,
NewEncoder: proto.NewEncoder,
NewDecoder: proto.NewDecoder,
},
MediaTypeJSON: encoding.Codec{
Name: NameJSON,
Type: MediaTypeJSON,
NewEncoder: json.NewEncoder,
NewDecoder: json.NewDecoder,
},
}
28 changes: 28 additions & 0 deletions api/v1/lib/encoding/json/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package json

import (
"encoding/json"

"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
)

// NewEncoder returns a new Encoder of Calls to JSON messages written to
// the given io.Writer.
func NewEncoder(s encoding.Sink) encoding.Encoder {
w := s()
return encoding.EncoderFunc(func(m encoding.Marshaler) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
return w.WriteFrame(b)
})
}

// NewDecoder returns a new Decoder of JSON messages read from the given source.
func NewDecoder(s encoding.Source) encoding.Decoder {
r := s()
dec := framing.NewDecoder(r, json.Unmarshal)
return encoding.DecoderFunc(func(u encoding.Unmarshaler) error { return dec.Decode(u) })
}
37 changes: 21 additions & 16 deletions api/v1/lib/encoding/proto/encoding.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
package proto

import (
"io"

"github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
)

// NewEncoder returns a new Encoder that writes to the given io.Writer.
func NewEncoder(w io.Writer) *Encoder {
return &Encoder{w: w}
// NewEncoder returns a new Encoder of Calls to Protobuf messages written to
// the given io.Writer.
func NewEncoder(s encoding.Sink) encoding.Encoder {
w := s()
return encoding.EncoderFunc(func(m encoding.Marshaler) error {
b, err := proto.Marshal(m.(proto.Message))
if err != nil {
return err
}
return w.WriteFrame(b)
})
}

// An Encoder encodes and writes Protobuf messages to an io.Writer.
type Encoder struct{ w io.Writer }

// Encode writes the given Protobuf-encoded message m to its io.Writer. If m
// isn't a proto.Message, Encode will panic.
func (e *Encoder) Encode(m interface{}) error {
bs, err := proto.Marshal(m.(proto.Message))
if err == nil {
_, err = e.w.Write(bs)
}
return err
// NewDecoder returns a new Decoder of Protobuf messages read from the given Source.
func NewDecoder(s encoding.Source) encoding.Decoder {
r := s()
var (
uf = func(b []byte, m interface{}) error { return proto.Unmarshal(b, m.(proto.Message)) }
dec = framing.NewDecoder(r, uf)
)
return encoding.DecoderFunc(func(u encoding.Unmarshaler) error { return dec.Decode(u) })
}
18 changes: 9 additions & 9 deletions api/v1/lib/encoding/proto/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@ import (
"bytes"
"testing"

"github.com/mesos/mesos-go/api/v1/lib"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
. "github.com/mesos/mesos-go/api/v1/lib/encoding/proto"
)

type FakeMessage struct {
Hello string `protobuf:"bytes,1,req,name=hello"`
}
type FakeMessage string

func (f *FakeMessage) Reset() {}
func (f *FakeMessage) ProtoMessage() {}
func (f *FakeMessage) String() string { return f.Hello }
func (f *FakeMessage) Marshal() ([]byte, error) { return ([]byte)(*f), nil }
func (f *FakeMessage) MarshalJSON() ([]byte, error) { return nil, nil }

func TestEncoder(t *testing.T) {
// write a proto message, validate that we're actually marshaling proto
buf := bytes.Buffer{}
enc := NewEncoder(&buf)
err := enc.Encode(&FakeMessage{"hello"})
enc := NewEncoder(encoding.SinkWriter(&buf))
err := enc.Encode(&mesos.FrameworkID{Value: "hello"})

if err != nil {
t.Fatal(err)
Expand All @@ -40,7 +39,8 @@ func TestEncoder(t *testing.T) {
caughtPanic = true
}
}()
enc.Encode("hello")
m := FakeMessage("hello")
enc.Encode(&m)
t.Fatal("expected panic, but Encode completed normally")
}()
if !caughtPanic {
Expand Down
86 changes: 11 additions & 75 deletions api/v1/lib/encoding/codec.go → api/v1/lib/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,6 @@ import (

type MediaType string

const (
// MediaTypeProtobuf is the Protobuf serialization format media type.
MediaTypeProtobuf = MediaType("application/x-protobuf")
// MediaTypeJSON is the JSON serialiation format media type.
MediaTypeJSON = MediaType("application/json")
)

// DefaultCodecs are pre-configured default Codecs, ready to use OOTB
var DefaultCodecs = map[MediaType]Codec{
MediaTypeProtobuf: Codec{
Name: "protobuf",
Type: MediaTypeProtobuf,
NewEncoder: NewProtobufEncoder,
NewDecoder: NewProtobufDecoder,
},
MediaTypeJSON: Codec{
Name: "json",
Type: MediaTypeJSON,
NewEncoder: NewJSONEncoder,
NewDecoder: NewJSONDecoder,
},
}

// Codec returns the configured Codec for the media type, or nil if no such Codec has been configured.
func (m MediaType) Codec() Codec { return DefaultCodecs[m] }

// ContentType returns the HTTP Content-Type associated with the MediaType
func (m MediaType) ContentType() string { return string(m) }

Expand All @@ -61,15 +35,16 @@ type (
NewSink(w io.Writer) Sink
}
SinkFactoryFunc func(w io.Writer) Sink

Stream interface {
SourceFactory
SinkFactory
}
)

func (f SourceFactoryFunc) NewSource(r io.Reader) Source { return f(r) }
func (f SinkFactoryFunc) NewSink(w io.Writer) Sink { return f(w) }

func (f SinkFactoryFunc) NewSink(w io.Writer) Sink { return f(w) }

var (
_ = SourceFactory(SourceFactoryFunc(nil))
_ = SinkFactory(SinkFactoryFunc(nil))
)

// SourceReader returns a Source that buffers all input from the given io.Reader
// and returns the contents in a single frame.
Expand Down Expand Up @@ -145,46 +120,7 @@ func (f DecoderFunc) Decode(u Unmarshaler) error { return f(u) }
// Encode implements the Encoder interface
func (f EncoderFunc) Encode(m Marshaler) error { return f(m) }

// NewProtobufEncoder returns a new Encoder of Calls to Protobuf messages written to
// the given io.Writer.
func NewProtobufEncoder(s Sink) Encoder {
w := s()
return EncoderFunc(func(m Marshaler) error {
b, err := pb.Marshal(m.(pb.Message))
if err != nil {
return err
}
return w.WriteFrame(b)
})
}

// NewJSONEncoder returns a new Encoder of Calls to JSON messages written to
// the given io.Writer.
func NewJSONEncoder(s Sink) Encoder {
w := s()
return EncoderFunc(func(m Marshaler) error {
b, err := json.Marshal(m)
if err != nil {
return err
}
return w.WriteFrame(b)
})
}

// NewProtobufDecoder returns a new Decoder of Protobuf messages read from the given Source.
func NewProtobufDecoder(s Source) Decoder {
r := s()
var (
uf = func(b []byte, m interface{}) error { return pb.Unmarshal(b, m.(pb.Message)) }
dec = framing.NewDecoder(r, uf)
)
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })

}

// NewJSONDecoder returns a new Decoder of JSON messages read from the given source.
func NewJSONDecoder(s Source) Decoder {
r := s()
dec := framing.NewDecoder(r, json.Unmarshal)
return DecoderFunc(func(u Unmarshaler) error { return dec.Decode(u) })
}
var (
_ = Encoder(EncoderFunc(nil))
_ = Decoder(DecoderFunc(nil))
)
3 changes: 2 additions & 1 deletion api/v1/lib/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/mesos/mesos-go/api/v1/lib/client"
logger "github.com/mesos/mesos-go/api/v1/lib/debug"
"github.com/mesos/mesos-go/api/v1/lib/encoding"
"github.com/mesos/mesos-go/api/v1/lib/encoding/codecs"
"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
"github.com/mesos/mesos-go/api/v1/lib/httpcli/apierrors"
"github.com/mesos/mesos-go/api/v1/lib/recordio"
Expand Down Expand Up @@ -79,7 +80,7 @@ type Client struct {
}

var (
DefaultCodec = encoding.MediaTypeProtobuf.Codec()
DefaultCodec = codecs.ByMediaType[codecs.MediaTypeProtobuf]
DefaultHeaders = http.Header{}

// DefaultConfigOpt represents the default client config options.
Expand Down

0 comments on commit 93a0891

Please sign in to comment.