Skip to content

Commit

Permalink
recordio: refactor frame parsing, bug fixes, better unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
James DeFelice committed Jun 18, 2017
1 parent 5d21284 commit 75921d5
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 134 deletions.
36 changes: 9 additions & 27 deletions api/v1/lib/encoding/framing/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package framing

import (
"fmt"
"io"
)

type (
Expand Down Expand Up @@ -34,33 +35,14 @@ var (
// Decode reads the next Protobuf-encoded message from its input and stores it
// in the value pointed to by m. If m isn't a proto.Message, Decode will panic.
func (d *Decoder) Decode(m interface{}) error {
var (
buf = d.buf
readlen = 0
)
for {
eof, nr, err := d.r.ReadFrame(buf)
if err != nil {
return err
}

readlen += nr
if readlen > MaxSize {
return ErrSize
}

if eof {
return d.uf(d.buf[:readlen], m)
}

if len(buf) == nr {
// readlen and len(d.buf) are the same here
newbuf := make([]byte, readlen+4096)
copy(newbuf, d.buf)
d.buf = newbuf
buf = d.buf[readlen:]
} else {
buf = buf[nr:]
// Note: the buf returned by ReadFrame will change over time, it can't be sub-sliced
// and then those sub-slices retained. Examination of generated proto code seems to indicate
// that byte buffers are copied vs. referenced by sub-slice (gogo protoc).
frame, err := d.r.ReadFrame()
if err == nil || err == io.EOF {
if err2 := d.uf(frame, m); err2 != nil {
err = err2
}
}
return err
}
12 changes: 11 additions & 1 deletion api/v1/lib/encoding/framing/framing.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package framing

type Error string

func (err Error) Error() string { return string(err) }

const (
ErrorUnderrun = Error("frame underrun, unexpected EOF")
ErrorBadSize = Error("bad frame size")
ErrorOversizedFrame = Error("oversized frame, max size exceeded")
)

type Reader interface {
ReadFrame(buf []byte) (endOfFrame bool, n int, err error)
ReadFrame() (frame []byte, err error)
}
2 changes: 1 addition & 1 deletion api/v1/lib/httpcli/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *Client) HandleResponse(res *http.Response, err error) (mesos.Response,
res.Body.Close()
return nil, ProtocolError(fmt.Sprintf("unexpected content type: %q", ct))
}
result.Decoder = c.codec.NewDecoder(recordio.NewFrameReader(res.Body))
result.Decoder = c.codec.NewDecoder(recordio.NewReader(res.Body))

case http.StatusAccepted:
if debug {
Expand Down
204 changes: 139 additions & 65 deletions api/v1/lib/recordio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,94 +4,168 @@ import (
"bufio"
"bytes"
"io"
"strconv"
"io/ioutil"
"log"

"github.com/mesos/mesos-go/api/v1/lib/encoding/framing"
)

// NewReader returns an io.Reader that unpacks the data read from r out of
// RecordIO framing before returning it.
func NewReader(r io.Reader) io.Reader {
br, ok := r.(*bufio.Reader)
if !ok {
br = bufio.NewReader(r)
type Debugger bool

const Debug = Debugger(false)

func (d Debugger) Log(v ...interface{}) {
if d {
log.Print(v...)
}
return &reader{r: br}
}

func NewFrameReader(r io.Reader) framing.Reader {
br, ok := r.(*bufio.Reader)
if !ok {
br = bufio.NewReader(r)
func (d Debugger) Logf(s string, v ...interface{}) {
if d {
log.Printf(s, v...)
}
return &reader{r: br}
}

type reader struct {
r *bufio.Reader
pending uint64
}
type (
Reader interface {
framing.Reader
io.Closer
}

func (rr *reader) ReadFrame(p []byte) (endOfFrame bool, n int, err error) {
for err == nil && len(p) > 0 && !endOfFrame {
if rr.pending == 0 {
if n > 0 {
endOfFrame = true
// We've read enough. Don't potentially block reading the next header.
Opt func(*reader)

// Only send back 1 frame at a time; note if pending==0 here then we basically
// skip over reporting an empty frame because the next time ReadFrame() is invoked
// we'll have no idea if we previously read pending==0 here, or if we're new.
break
}
rr.pending, err = rr.size()
continue
reader struct {
io.Closer
*bufio.Scanner
pend int
splitf func(data []byte, atEOF bool) (int, []byte, error)
maxf int // max frame size
}
)

// NewReader returns a reader that parses frames from a recordio stream.
func NewReader(read io.Reader, opt ...Opt) Reader {
Debug.Log("new frame reader")
rc, ok := read.(io.ReadCloser)
if !ok {
rc = ioutil.NopCloser(read)
}
r := &reader{Scanner: bufio.NewScanner(rc)}
r.Split(func(data []byte, atEOF bool) (int, []byte, error) {
// Scanner panics if we invoke Split after scanning has started,
// use this proxy func as a work-around.
return r.splitf(data, atEOF)
})
buf := make([]byte, 16*1024)
r.Buffer(buf, 1<<22) // 1<<22 == max protobuf size
r.splitf = r.splitSize
r.Closer = rc
// apply options
for _, f := range opt {
if f != nil {
f(r)
}
read, hi := 0, min(rr.pending, uint64(len(p)))
read, err = rr.r.Read(p[:hi])
n += read
p = p[read:]
rr.pending -= uint64(read)
}
return
return r
}

func (rr *reader) Read(p []byte) (n int, err error) {
for err == nil && len(p) > 0 {
if rr.pending == 0 {
if n > 0 && !rr.more() {
// We've read enough. Don't potentially block reading the next header.
break
}
rr.pending, err = rr.size()
continue
}
read, hi := 0, min(rr.pending, uint64(len(p)))
read, err = rr.r.Read(p[:hi])
n += read
p = p[read:]
rr.pending -= uint64(read)
// MaxMessageSize returns a functional option that configures the internal Scanner's buffer and max token (message)
// length, in bytes.
func MaxMessageSize(max int) Opt {
return func(r *reader) {
buf := make([]byte, max>>1)
r.Buffer(buf, max)
r.maxf = max
}
return n, err
}

func (rr *reader) more() bool {
peek, err := rr.r.Peek(rr.r.Buffered())
return err != nil && bytes.IndexByte(peek, '\n') >= 0
func (r *reader) splitSize(data []byte, atEOF bool) (int, []byte, error) {
const maxTokenLength = 20 // textual length of largest uint64 number
if atEOF {
x := len(data)
switch {
case x == 0:
Debug.Log("EOF and empty frame, returning io.EOF")
return 0, nil, io.EOF
case x < 2: // min frame size
Debug.Log("remaining data less than min total frame length")
return 0, nil, framing.ErrorUnderrun
}
// otherwise, we may have a valid frame...
}
Debug.Log("len(data)=", len(data))
adv := 0
for {
i := 0
for ; i < maxTokenLength && i < len(data) && data[i] != '\n'; i++ {
}
Debug.Log("i=", i)
if i == len(data) {
Debug.Log("need more input")
return 0, nil, nil // need more input
}
if i == maxTokenLength && data[i] != '\n' {
Debug.Log("frame size: max token length exceeded")
return 0, nil, framing.ErrorBadSize
}
n, err := ParseUintBytes(bytes.TrimSpace(data[:i]), 10, 64)
if err != nil {
Debug.Log("failed to parse frame size field:", err)
return 0, nil, framing.ErrorBadSize
}
if r.maxf != 0 && int(n) > r.maxf {
Debug.Log("frame size max length exceeded:", n)
return 0, nil, framing.ErrorOversizedFrame
}
if n == 0 {
// special case... don't invoke splitData, just parse the next size header
adv += i + 1
data = data[i+1:]
continue
}
r.pend = int(n)
r.splitf = r.splitFrame
Debug.Logf("split next frame: %d, %d", n, adv+i+1)
return adv + i + 1, data[:0], nil // returning a nil token screws up the Scanner, so return empty
}
}

func (rr *reader) size() (uint64, error) {
header, err := rr.r.ReadSlice('\n')
if err != nil {
return 0, err
func (r *reader) splitFrame(data []byte, atEOF bool) (advance int, token []byte, err error) {
x := len(data)
Debug.Log("splitFrame:x=", x, ",eof=", atEOF)
if atEOF {
if x < r.pend {
return 0, nil, framing.ErrorUnderrun
}
}
if r.pend == 0 {
panic("asked to read frame data, but no data left in frame")
}
if x < int(r.pend) {
// need more data
return 0, nil, nil
}
// NOTE(tsenart): https://github.com/golang/go/issues/2632
return strconv.ParseUint(string(bytes.TrimSpace(header)), 10, 64)
r.splitf = r.splitSize
adv := int(r.pend)
r.pend = 0
return adv, data[:adv], nil
}

func min(a, b uint64) uint64 {
if a < b {
return a
// ReadFrame implements framing.Reader
func (r *reader) ReadFrame() (tok []byte, err error) {
for r.Scan() {
b := r.Bytes()
if len(b) == 0 {
continue
}
tok = b
Debug.Log("len(tok)", len(tok))
break
}
return b
// either scan failed, or it succeeded and we have a token...
err = r.Err()
if err == nil && len(tok) == 0 {
err = io.EOF
}
return
}
Loading

0 comments on commit 75921d5

Please sign in to comment.