Skip to content

Commit

Permalink
Merge 4416a57 into befca3f
Browse files Browse the repository at this point in the history
  • Loading branch information
jdef committed Oct 23, 2017
2 parents befca3f + 4416a57 commit fef2ca0
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 99 deletions.
12 changes: 3 additions & 9 deletions api/v1/lib/encoding/framing/decoder.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package framing

import (
"io"
)

type (
// UnmarshalFunc translates bytes to objects
UnmarshalFunc func([]byte, interface{}) error
Expand All @@ -30,11 +26,9 @@ func NewDecoder(r Reader, uf UnmarshalFunc) DecoderFunc {
// and then those sub-slices retained. Examination of generated proto code seems to indicate
// that byte buffers are copied vs. referenced by sub-slice (gogo protoc).
frame, err := r.ReadFrame()
if err == nil || err == io.EOF {
if err2 := uf(frame, m); err2 != nil {
err = err2
}
if err != nil {
return err
}
return err
return uf(frame, m)
}
}
107 changes: 81 additions & 26 deletions api/v1/lib/encoding/framing/decoder_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package framing
package framing_test

import (
"errors"
"fmt"
"io"
"reflect"
"testing"

. "github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
)

func TestNewDecoder(t *testing.T) {
Expand All @@ -28,40 +30,93 @@ func TestNewDecoder(t *testing.T) {
errorUnmarshaler = UnmarshalFunc(func(_ []byte, _ interface{}) error {
return fakeError
})
singletonReader = func(b []byte) ReaderFunc {
eof := false
return func() ([]byte, error) {
if eof {
panic("reader should only be called once")
}
eof = true
return b, io.EOF
}
}
errorReader = func(err error) ReaderFunc {
return func() ([]byte, error) { return nil, err }
}
)
for ti, tc := range []struct {
r Reader
uf UnmarshalFunc
wants []byte
wants [][]byte
wantsErr error
}{
{errorReader(ErrorBadSize), byteCopy, nil, ErrorBadSize},
{singletonReader(([]byte)("james")), byteCopy, ([]byte)("james"), io.EOF},
{singletonReader(([]byte)("james")), errorUnmarshaler, nil, fakeError},
{tokenReader("james"), byteCopy, frames("james"), nil},
{tokenReader("james", "foo"), byteCopy, frames("james", "foo"), nil},
{tokenReader("", "foo"), byteCopy, frames("", "foo"), nil},
{tokenReader("foo", ""), byteCopy, frames("foo", ""), nil},
{tokenReader(""), byteCopy, frames(""), nil},
{tokenReader(), byteCopy, frames(), io.EOF},
{tokenReader("james"), errorUnmarshaler, nil, fakeError},
} {
var (
buf []byte
d = NewDecoder(tc.r, tc.uf)
err = d.Decode(&buf)
)
if err != tc.wantsErr {
t.Errorf("test case %d failed: expected error %q instead of %q", ti, tc.wantsErr, err)
t.Run(fmt.Sprintf("test case %d", ti), func(t *testing.T) {
if (tc.wants == nil) != (tc.wantsErr != nil) {
t.Fatalf("invalid test case: cannot expect both data and an error")
}
var (
f [][]byte
d = NewDecoder(tc.r, tc.uf)
err error
)
for err == nil {
var buf []byte
err = d.Decode(&buf)
if err == io.EOF {
break
}
if err == nil {
f = append(f, buf)
}
if err != tc.wantsErr {
t.Errorf("expected error %q instead of %q", tc.wantsErr, err)
}
}
if !reflect.DeepEqual(f, tc.wants) {
t.Errorf("expected %#v instead of %#v", tc.wants, f)
}
if (tc.wants == nil) != (len(f) == 0) {
if len(f) == 0 {
t.Errorf("expected a decoded object but got none")
} else {
t.Errorf("expected no decoded object but got one")
}
}
})
}
}

func tokenReader(s ...string) ReaderFunc {
if len(s) == 0 {
return EOFReaderFunc
}
ch := make(chan []byte, len(s))
for i := range s {
ch <- ([]byte)(s[i])
}
return func() ([]byte, error) {
select {
case b := <-ch:
return b, nil
default:
return nil, io.EOF
}
if !reflect.DeepEqual(buf, tc.wants) {
t.Errorf("test case %d failed: expected %#v instead of %#v", ti, tc.wants, buf)
}
}

func errorReader(err error) ReaderFunc {
return func() ([]byte, error) { return nil, err }
}

func frames(s ...string) (f [][]byte) {
if len(s) == 0 {
return nil
}
f = make([][]byte, 0, len(s))
for i := range s {
// converting to/from []byte and string for empty string isn't a perfectly symmetrical
// operation. fix it up here with a quick length check.
if len(s[i]) == 0 {
f = append(f, nil)
continue
}
f = append(f, ([]byte)(s[i]))
}
return
}
37 changes: 36 additions & 1 deletion api/v1/lib/encoding/framing/framing.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package framing

import (
"io"
"io/ioutil"
)

type Error string

func (err Error) Error() string { return string(err) }
Expand All @@ -11,7 +16,8 @@ const (
)

type (
// Reader generates data frames from some source, returning io.EOF with the final frame.
// Reader generates data frames from some source, returning io.EOF when the end of the input stream is
// detected.
Reader interface {
ReadFrame() (frame []byte, err error)
}
Expand All @@ -33,3 +39,32 @@ func (f WriterFunc) WriteFrame(b []byte) error { return f(b) }

var _ = Reader(ReaderFunc(nil))
var _ = Writer(WriterFunc(nil))

// EOFReaderFunc always returns nil, io.EOF; it implements the ReaderFunc API.
func EOFReaderFunc() ([]byte, error) { return nil, io.EOF }

var _ = ReaderFunc(EOFReaderFunc) // sanity check

// ReadAll returns a reader func that returns the complete contents of `r` in a single frame.
// A zero length frame is treated as an "end of stream" condition, returning io.EOF.
func ReadAll(r io.Reader) ReaderFunc {
return func() (b []byte, err error) {
b, err = ioutil.ReadAll(r)
if len(b) == 0 && err == nil {
err = io.EOF
}
return
}
}

// WriterFor adapts an io.Writer to the Writer interface. All buffers are written to `w` without decoration or
// modification.
func WriterFor(w io.Writer) WriterFunc {
return func(b []byte) error {
n, err := w.Write(b)
if err == nil && n != len(b) {
return io.ErrShortWrite
}
return err
}
}
31 changes: 31 additions & 0 deletions api/v1/lib/encoding/framing/framing_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package framing

import (
"bytes"
"io"
"testing"
)

Expand All @@ -13,3 +15,32 @@ func TestError(t *testing.T) {
t.Errorf("expected 'a' instead of %q", a.Error())
}
}

func TestReadAll(t *testing.T) {
r := ReadAll(bytes.NewBufferString(""))
buf, err := r.ReadFrame()
if len(buf) != 0 {
t.Errorf("expected zero length frame instead of %+v", buf)
}
if err != io.EOF {
t.Errorf("expected EOF instead of %+v", err)
}

r = ReadAll(bytes.NewBufferString("foo"))
buf, err = r.ReadFrame()
if err != nil {
t.Fatalf("unexpected error %+v", err)
}
if string(buf) != "foo" {
t.Errorf("expected 'foo' instead of %q", string(buf))
}

// read again, now that there's no more data
buf, err = r.ReadFrame()
if len(buf) != 0 {
t.Errorf("expected zero length frame instead of %+v", buf)
}
if err != io.EOF {
t.Errorf("expected EOF instead of %+v", err)
}
}
33 changes: 9 additions & 24 deletions api/v1/lib/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package encoding
import (
"encoding/json"
"io"
"io/ioutil"

pb "github.com/gogo/protobuf/proto"
"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
Expand Down Expand Up @@ -49,34 +48,20 @@ var (
// SourceReader returns a Source that buffers all input from the given io.Reader
// and returns the contents in a single frame.
func SourceReader(r io.Reader) Source {
ch := make(chan framing.ReaderFunc, 1)
ch <- framing.ReadAll(r)
return func() framing.Reader {
b, err := ioutil.ReadAll(r)
return framing.ReaderFunc(func() (f []byte, e error) {
// only return a non-nil frame ONCE
f = b
b = nil
e = err

if e == nil {
e = io.EOF
}
return
})
select {
case f := <-ch:
return f
default:
return framing.ReaderFunc(framing.EOFReaderFunc)
}
}
}

// SinkWriter returns a Sink that sends a frame to an io.Writer with no decoration.
func SinkWriter(w io.Writer) Sink {
return func() framing.Writer {
return framing.WriterFunc(func(b []byte) error {
n, err := w.Write(b)
if err == nil && n != len(b) {
return io.ErrShortWrite
}
return err
})
}
}
func SinkWriter(w io.Writer) Sink { return func() framing.Writer { return framing.WriterFor(w) } }

// String implements the fmt.Stringer interface.
func (c *Codec) String() string {
Expand Down
Loading

0 comments on commit fef2ca0

Please sign in to comment.