diff --git a/values/decoding/dictionary_based_string_iterator.go b/values/decoding/dictionary_based_string_iterator.go index 52a40a9..784690b 100644 --- a/values/decoding/dictionary_based_string_iterator.go +++ b/values/decoding/dictionary_based_string_iterator.go @@ -11,7 +11,8 @@ import ( // dictionaryBasedStringIterator iterates over a // dict encoded stream of string data. type dictionaryBasedStringIterator struct { - reader xio.SimpleReadCloser + reader xio.SimpleReadCloser + byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next` // extDict is passed externally from the string decoder // and should not be mutated during iteration. extDict []string @@ -25,8 +26,9 @@ func newDictionaryBasedStringIterator( extDict []string, ) *dictionaryBasedStringIterator { return &dictionaryBasedStringIterator{ - reader: reader, - extDict: extDict, + reader: reader, + byteReader: reader, + extDict: extDict, } } @@ -38,7 +40,7 @@ func (it *dictionaryBasedStringIterator) Next() bool { } var idx int64 - idx, it.err = binary.ReadVarint(it.reader) + idx, it.err = binary.ReadVarint(it.byteReader) if it.err != nil { return false } @@ -69,4 +71,5 @@ func (it *dictionaryBasedStringIterator) Close() { it.err = nil it.reader.Close() it.reader = nil + it.byteReader = nil } diff --git a/values/decoding/raw_size_string_iterator.go b/values/decoding/raw_size_string_iterator.go index ac05101..83314b8 100644 --- a/values/decoding/raw_size_string_iterator.go +++ b/values/decoding/raw_size_string_iterator.go @@ -17,7 +17,8 @@ const ( // raw size encoded string data. // TODO(xichen): Get the buffer from bytes pool. type rawSizeStringIterator struct { - reader xio.SimpleReadCloser + reader xio.SimpleReadCloser + byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next` curr string err error @@ -28,8 +29,9 @@ func newRawSizeStringIterator( reader xio.SimpleReadCloser, ) *rawSizeStringIterator { return &rawSizeStringIterator{ - reader: reader, - buf: make([]byte, defaultInitialStringBufferCapacity), + reader: reader, + byteReader: reader, + buf: make([]byte, defaultInitialStringBufferCapacity), } } @@ -40,7 +42,7 @@ func (it *rawSizeStringIterator) Next() bool { } var rawSizeBytes int64 - rawSizeBytes, it.err = binary.ReadVarint(it.reader) + rawSizeBytes, it.err = binary.ReadVarint(it.byteReader) if it.err != nil { return false } @@ -74,4 +76,5 @@ func (it *rawSizeStringIterator) Close() { it.err = nil it.reader.Close() it.reader = nil + it.byteReader = nil } diff --git a/values/decoding/varint_int_iterator.go b/values/decoding/varint_int_iterator.go index 8da2eb1..32502d2 100644 --- a/values/decoding/varint_int_iterator.go +++ b/values/decoding/varint_int_iterator.go @@ -10,7 +10,8 @@ import ( // varintIntIterator iterates over a stream of // varint encoded int data. type varintIntIterator struct { - reader xio.SimpleReadCloser + reader xio.SimpleReadCloser + byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next` closed bool curr int @@ -18,7 +19,10 @@ type varintIntIterator struct { } func newVarintIntIterator(reader xio.SimpleReadCloser) *varintIntIterator { - return &varintIntIterator{reader: reader} + return &varintIntIterator{ + reader: reader, + byteReader: reader, + } } // Next iteration. @@ -28,7 +32,7 @@ func (it *varintIntIterator) Next() bool { } var curr int64 - curr, it.err = binary.ReadVarint(it.reader) + curr, it.err = binary.ReadVarint(it.byteReader) if it.err != nil { return false } @@ -55,4 +59,5 @@ func (it *varintIntIterator) Close() { it.err = nil it.reader.Close() it.reader = nil + it.byteReader = nil } diff --git a/values/encoding/string_encode.go b/values/encoding/string_encode.go index 3c05b2f..dcc717c 100644 --- a/values/encoding/string_encode.go +++ b/values/encoding/string_encode.go @@ -12,6 +12,7 @@ import ( "github.com/xichen2020/eventdb/x/proto" "github.com/xichen2020/eventdb/x/unsafe" + xerrors "github.com/m3db/m3x/errors" "github.com/valyala/gozstd" ) @@ -109,27 +110,24 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer) } defer valuesIt.Close() + var multiErr xerrors.MultiError switch metaProto.Encoding { case encodingpb.EncodingType_RAW_SIZE: - if err := enc.rawSizeEncode(valuesIt, writer); err != nil { - return err - } + multiErr.Add(enc.rawSizeEncode(valuesIt, writer)) case encodingpb.EncodingType_DICTIONARY: - if err := enc.dictionaryEncode(valuesIt, dictionary, writer); err != nil { - return err - } + multiErr.Add(enc.dictionaryEncode(valuesIt, dictionary, writer)) default: - return fmt.Errorf("invalid encoding type: %v", metaProto.Encoding) + multiErr.Add(fmt.Errorf("invalid encoding type: %v", metaProto.Encoding)) } // Close the compressWriter if its present. if compressWriter != nil { // NB(xichen): Close flushes and closes the compressed writer but doesn't // close the writer wrapped by the compressed writer. - return compressWriter.Close() + multiErr.Add(compressWriter.Close()) } - return nil + return multiErr.FinalError() } func (enc *stringEncoder) reset() {