From 5ca5615ce8dc5c148c72333ca1810a5d56fdd605 Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Fri, 11 Nov 2016 17:26:02 +0000 Subject: [PATCH 1/6] Add (*Reader).CopyNext(w) (int64, error) It is useful to be able to efficiently copy objects without decoding them. My use case is filtering when I already know the indices of the objects I want to keep, and for rewriting a dictionary of objects as a column of objects. --- msgp/errors.go | 16 +++++++++ msgp/read.go | 62 ++++++++++++++++++++++++++++++++ msgp/read_test.go | 90 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) diff --git a/msgp/errors.go b/msgp/errors.go index 5c24f271..bde8ebfd 100644 --- a/msgp/errors.go +++ b/msgp/errors.go @@ -140,3 +140,19 @@ func (e *ErrUnsupportedType) Error() string { return fmt.Sprintf("msgp: type %q // Resumable returns 'true' for ErrUnsupportedType func (e *ErrUnsupportedType) Resumable() bool { return true } + +// ReadNextError is returned the buffer passed to ReadNext is not large enough. +type ReadNextError struct { + Got uintptr + Wanted uintptr +} + +// Error implements the error interface +func (err ReadNextError) Error() string { + return fmt.Sprintf("msgp: slice not big enough (%d < %d)", + err.Got, err.Wanted) +} + +// Resumable is always 'false' for ReadNextError, since we may be inside a +// recursive call which has already partially consumed the stream. +func (err ReadNextError) Resumable() bool { return false } diff --git a/msgp/read.go b/msgp/read.go index a493f941..4717a876 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -1,6 +1,7 @@ package msgp import ( + "fmt" "io" "math" "sync" @@ -146,6 +147,65 @@ func (m *Reader) Read(p []byte) (int, error) { return m.R.Read(p) } +// ReadNext reads the raw bytes for the next object on the wire into p. +// If p is not large enough, an error is returned. See GetNextSize. +func (m *Reader) ReadNext(p []byte) (int, error) { + sz, o, err := getNextSize(m.R) + if err != nil { + return 0, err + } + if uintptr(cap(p)) < sz { + return 0, ReadNextError{uintptr(len(p)), uintptr(sz)} + } + n, err := m.R.ReadFull(p[:sz]) + if err != nil { + return 0, err + } + if uintptr(n) != sz { + return 0, fmt.Errorf("wrong # bytes read (%d != %d)", n, int64(sz)) + } + + p = p[n:n:cap(p)] + tot := n + + // for maps and slices, read elements + for x := uintptr(0); x < o; x++ { + n, err = m.ReadNext(p) + if err != nil { + return 0, err + } + p = p[n:n:cap(p)] + tot += n + } + return tot, nil +} + +// CopyNext reads the next object from m without decoding it and writes it to w. +// It avoids unnecessary copies internally. +func (m *Reader) CopyNext(w io.Writer) (int64, error) { + sz, o, err := getNextSize(m.R) + if err != nil { + return 0, err + } + + // avoids allocating because m.R implements WriteTo. + n, err := io.CopyN(w, m.R, int64(sz)) + if err != nil { + return 0, err + } + + // for maps and slices, read elements + for x := uintptr(0); x < o; x++ { + var n2 int64 + n2, err = m.CopyNext(w) + if err != nil { + return n, err + } + n += n2 + } + return n, nil +} + // ReadFull implements `io.ReadFull` func (m *Reader) ReadFull(p []byte) (int, error) { return m.R.ReadFull(p) @@ -194,8 +254,10 @@ func (m *Reader) IsNil() bool { return err == nil && p[0] == mnil } +// getNextSize returns the size of the next object on the wire. // returns (obj size, obj elements, error) // only maps and arrays have non-zero obj elements +// for maps and arrays, obj size does not include elements // // use uintptr b/c it's guaranteed to be large enough // to hold whatever we can fit in memory. diff --git a/msgp/read_test.go b/msgp/read_test.go index aa191439..2efa16ab 100644 --- a/msgp/read_test.go +++ b/msgp/read_test.go @@ -722,3 +722,93 @@ func BenchmarkSkip(b *testing.B) { } } } + +func TestReadNext(t *testing.T) { + var buf bytes.Buffer + en := NewWriter(&buf) + + en.WriteMapHeader(6) + + en.WriteString("thing_one") + en.WriteString("value_one") + + en.WriteString("thing_two") + en.WriteFloat64(3.14159) + + en.WriteString("some_bytes") + en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd")) + + en.WriteString("the_time") + en.WriteTime(time.Now()) + + en.WriteString("what?") + en.WriteBool(true) + + en.WriteString("ext") + en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")}) + + en.Flush() + + // Read from a copy of the original buf. + de := NewReader(bytes.NewReader(buf.Bytes())) + p := make([]byte, 0, len(buf.Bytes())) + n, err := de.ReadNext(p) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(buf.Bytes(), p[:n]) { + t.Fatalf("not equal! %v, %v", buf.Bytes(), p[:n]) + } +} + +func TestCopyNext(t *testing.T) { + var buf bytes.Buffer + en := NewWriter(&buf) + + en.WriteMapHeader(6) + + en.WriteString("thing_one") + en.WriteString("value_one") + + en.WriteString("thing_two") + en.WriteFloat64(3.14159) + + en.WriteString("some_bytes") + en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd")) + + en.WriteString("the_time") + en.WriteTime(time.Now()) + + en.WriteString("what?") + en.WriteBool(true) + + en.WriteString("ext") + en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")}) + + en.Flush() + + // Read from a copy of the original buf. + de := NewReader(bytes.NewReader(buf.Bytes())) + + w := new(bytes.Buffer) + + n, err := de.CopyNext(w) + if err != nil { + t.Fatal(err) + } + if n != int64(buf.Len()) { + t.Fatalf("CopyNext returned the wrong value (%d != %d)", + n, buf.Len()) + } + + // p := make([]byte, 0, len(buf.Bytes())) + // n, err := de.ReadNext(p) + // if err != nil { + // t.Fatal(err) + // } + + if !bytes.Equal(buf.Bytes(), w.Bytes()) { + t.Fatalf("not equal! %v, %v", buf.Bytes(), w.Bytes()) + } +} From 2684df0a4483b04718e85a5d5558f77db543d870 Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Fri, 11 Nov 2016 19:31:25 +0000 Subject: [PATCH 2/6] Remove ReadNext --- msgp/read.go | 34 ---------------------------------- msgp/read_test.go | 39 --------------------------------------- 2 files changed, 73 deletions(-) diff --git a/msgp/read.go b/msgp/read.go index 4717a876..2518bc41 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -1,7 +1,6 @@ package msgp import ( - "fmt" "io" "math" "sync" @@ -147,39 +146,6 @@ func (m *Reader) Read(p []byte) (int, error) { return m.R.Read(p) } -// ReadNext reads the raw bytes for the next object on the wire into p. -// If p is not large enough, an error is returned. See GetNextSize. -func (m *Reader) ReadNext(p []byte) (int, error) { - sz, o, err := getNextSize(m.R) - if err != nil { - return 0, err - } - if uintptr(cap(p)) < sz { - return 0, ReadNextError{uintptr(len(p)), uintptr(sz)} - } - n, err := m.R.ReadFull(p[:sz]) - if err != nil { - return 0, err - } - if uintptr(n) != sz { - return 0, fmt.Errorf("wrong # bytes read (%d != %d)", n, int64(sz)) - } - - p = p[n:n:cap(p)] - tot := n - - // for maps and slices, read elements - for x := uintptr(0); x < o; x++ { - n, err = m.ReadNext(p) - if err != nil { - return 0, err - } - p = p[n:n:cap(p)] - tot += n - } - return tot, nil -} - // CopyNext reads the next object from m without decoding it and writes it to w. // It avoids unnecessary copies internally. func (m *Reader) CopyNext(w io.Writer) (int64, error) { diff --git a/msgp/read_test.go b/msgp/read_test.go index 2efa16ab..cb92dfac 100644 --- a/msgp/read_test.go +++ b/msgp/read_test.go @@ -723,45 +723,6 @@ func BenchmarkSkip(b *testing.B) { } } -func TestReadNext(t *testing.T) { - var buf bytes.Buffer - en := NewWriter(&buf) - - en.WriteMapHeader(6) - - en.WriteString("thing_one") - en.WriteString("value_one") - - en.WriteString("thing_two") - en.WriteFloat64(3.14159) - - en.WriteString("some_bytes") - en.WriteBytes([]byte("nkl4321rqw908vxzpojnlk2314rqew098-s09123rdscasd")) - - en.WriteString("the_time") - en.WriteTime(time.Now()) - - en.WriteString("what?") - en.WriteBool(true) - - en.WriteString("ext") - en.WriteExtension(&RawExtension{Type: 55, Data: []byte("raw data!!!")}) - - en.Flush() - - // Read from a copy of the original buf. - de := NewReader(bytes.NewReader(buf.Bytes())) - p := make([]byte, 0, len(buf.Bytes())) - n, err := de.ReadNext(p) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(buf.Bytes(), p[:n]) { - t.Fatalf("not equal! %v, %v", buf.Bytes(), p[:n]) - } -} - func TestCopyNext(t *testing.T) { var buf bytes.Buffer en := NewWriter(&buf) From e5aa9059a5bd76535a794bce9f20628cc92a8653 Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Fri, 11 Nov 2016 19:39:12 +0000 Subject: [PATCH 3/6] Add opportunistic optimization with m.R.Next --- msgp/read.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/msgp/read.go b/msgp/read.go index 2518bc41..39195ef6 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -154,10 +154,31 @@ func (m *Reader) CopyNext(w io.Writer) (int64, error) { return 0, err } - // avoids allocating because m.R implements WriteTo. - n, err := io.CopyN(w, m.R, int64(sz)) + // Opportunistic optimization: if we can fit the whole thing in the m.R + // buffer, then just get a pointer to that, and pass it to w.Write, + // avoiding an allocation. + buf, err := m.R.Next(int(sz)) + n := int64(len(buf)) if err != nil { - return 0, err + // Fall back to io.CopyN. + // May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer) + n, err = io.CopyN(w, m.R, int64(sz)) + if err != nil { + return 0, err + } + // Fallthrough + } else { + // Intentional "else no error" branch due to fallthrough above. + // Otherwise, + var nInt int + nInt, err = w.Write(buf) + n = int64(nInt) + if err != nil { + return 0, err + } else if n != int64(sz) { + return n, io.ErrShortWrite + } + } // for maps and slices, read elements From 37ba0318da9173fb6a250dcaa60e32c6fe3ca704 Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Fri, 11 Nov 2016 19:39:19 +0000 Subject: [PATCH 4/6] Remove unused ReadNextError --- msgp/errors.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/msgp/errors.go b/msgp/errors.go index bde8ebfd..5c24f271 100644 --- a/msgp/errors.go +++ b/msgp/errors.go @@ -140,19 +140,3 @@ func (e *ErrUnsupportedType) Error() string { return fmt.Sprintf("msgp: type %q // Resumable returns 'true' for ErrUnsupportedType func (e *ErrUnsupportedType) Resumable() bool { return true } - -// ReadNextError is returned the buffer passed to ReadNext is not large enough. -type ReadNextError struct { - Got uintptr - Wanted uintptr -} - -// Error implements the error interface -func (err ReadNextError) Error() string { - return fmt.Sprintf("msgp: slice not big enough (%d < %d)", - err.Got, err.Wanted) -} - -// Resumable is always 'false' for ReadNextError, since we may be inside a -// recursive call which has already partially consumed the stream. -func (err ReadNextError) Resumable() bool { return false } From eef5f705fb0352b88d7268e8121a93c349072f0b Mon Sep 17 00:00:00 2001 From: Peter Waller Date: Fri, 11 Nov 2016 19:39:45 +0000 Subject: [PATCH 5/6] Remove commented code --- msgp/read.go | 3 +-- msgp/read_test.go | 6 ------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/msgp/read.go b/msgp/read.go index 39195ef6..1cacd07e 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -169,7 +169,7 @@ func (m *Reader) CopyNext(w io.Writer) (int64, error) { // Fallthrough } else { // Intentional "else no error" branch due to fallthrough above. - // Otherwise, + // Just write the buffer. var nInt int nInt, err = w.Write(buf) n = int64(nInt) @@ -178,7 +178,6 @@ func (m *Reader) CopyNext(w io.Writer) (int64, error) { } else if n != int64(sz) { return n, io.ErrShortWrite } - } // for maps and slices, read elements diff --git a/msgp/read_test.go b/msgp/read_test.go index cb92dfac..711f6856 100644 --- a/msgp/read_test.go +++ b/msgp/read_test.go @@ -763,12 +763,6 @@ func TestCopyNext(t *testing.T) { n, buf.Len()) } - // p := make([]byte, 0, len(buf.Bytes())) - // n, err := de.ReadNext(p) - // if err != nil { - // t.Fatal(err) - // } - if !bytes.Equal(buf.Bytes(), w.Bytes()) { t.Fatalf("not equal! %v, %v", buf.Bytes(), w.Bytes()) } From bd807cafe6d3ed0c7d3654448c0c426e66e10fce Mon Sep 17 00:00:00 2001 From: Philip Hofer Date: Tue, 13 Dec 2016 11:37:10 -0800 Subject: [PATCH 6/6] small fixup - only call (*Reader).Next() when we're sure it won't realloc its buffer - promote io.ErrUnexpectedEOF to msgp.ErrShortBytes --- msgp/read.go | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/msgp/read.go b/msgp/read.go index 1cacd07e..20cd1ef8 100644 --- a/msgp/read.go +++ b/msgp/read.go @@ -154,31 +154,35 @@ func (m *Reader) CopyNext(w io.Writer) (int64, error) { return 0, err } + var n int64 // Opportunistic optimization: if we can fit the whole thing in the m.R // buffer, then just get a pointer to that, and pass it to w.Write, // avoiding an allocation. - buf, err := m.R.Next(int(sz)) - n := int64(len(buf)) - if err != nil { - // Fall back to io.CopyN. - // May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer) - n, err = io.CopyN(w, m.R, int64(sz)) + if int(sz) <= m.R.BufferSize() { + var nn int + var buf []byte + buf, err = m.R.Next(int(sz)) if err != nil { + if err == io.ErrUnexpectedEOF { + err = ErrShortBytes + } return 0, err } - // Fallthrough + nn, err = w.Write(buf) + n += int64(nn) } else { - // Intentional "else no error" branch due to fallthrough above. - // Just write the buffer. - var nInt int - nInt, err = w.Write(buf) - n = int64(nInt) - if err != nil { - return 0, err - } else if n != int64(sz) { - return n, io.ErrShortWrite + // Fall back to io.CopyN. + // May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer) + n, err = io.CopyN(w, m.R, int64(sz)) + if err == io.ErrUnexpectedEOF { + err = ErrShortBytes } } + if err != nil { + return n, err + } else if n < int64(sz) { + return n, io.ErrShortWrite + } // for maps and slices, read elements for x := uintptr(0); x < o; x++ {