Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 11, 2019
1 parent 300f5d3 commit d568781
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 20 deletions.
11 changes: 7 additions & 4 deletions values/decoding/dictionary_based_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
// dictionaryBasedStringIterator iterates over a
// dict encoded stream of string data.
type dictionaryBasedStringIterator struct {
reader xio.SimpleReadCloser
reader xio.SimpleReadCloser
byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next`
// extDict is passed externally from the string decoder
// and should not be mutated during iteration.
extDict []string
Expand All @@ -25,8 +26,9 @@ func newDictionaryBasedStringIterator(
extDict []string,
) *dictionaryBasedStringIterator {
return &dictionaryBasedStringIterator{
reader: reader,
extDict: extDict,
reader: reader,
byteReader: reader,
extDict: extDict,
}
}

Expand All @@ -38,7 +40,7 @@ func (it *dictionaryBasedStringIterator) Next() bool {
}

var idx int64
idx, it.err = binary.ReadVarint(it.reader)
idx, it.err = binary.ReadVarint(it.byteReader)
if it.err != nil {
return false
}
Expand Down Expand Up @@ -69,4 +71,5 @@ func (it *dictionaryBasedStringIterator) Close() {
it.err = nil
it.reader.Close()
it.reader = nil
it.byteReader = nil
}
11 changes: 7 additions & 4 deletions values/decoding/raw_size_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ const (
// raw size encoded string data.
// TODO(xichen): Get the buffer from bytes pool.
type rawSizeStringIterator struct {
reader xio.SimpleReadCloser
reader xio.SimpleReadCloser
byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next`

curr string
err error
Expand All @@ -28,8 +29,9 @@ func newRawSizeStringIterator(
reader xio.SimpleReadCloser,
) *rawSizeStringIterator {
return &rawSizeStringIterator{
reader: reader,
buf: make([]byte, defaultInitialStringBufferCapacity),
reader: reader,
byteReader: reader,
buf: make([]byte, defaultInitialStringBufferCapacity),
}
}

Expand All @@ -40,7 +42,7 @@ func (it *rawSizeStringIterator) Next() bool {
}

var rawSizeBytes int64
rawSizeBytes, it.err = binary.ReadVarint(it.reader)
rawSizeBytes, it.err = binary.ReadVarint(it.byteReader)
if it.err != nil {
return false
}
Expand Down Expand Up @@ -74,4 +76,5 @@ func (it *rawSizeStringIterator) Close() {
it.err = nil
it.reader.Close()
it.reader = nil
it.byteReader = nil
}
11 changes: 8 additions & 3 deletions values/decoding/varint_int_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import (
// varintIntIterator iterates over a stream of
// varint encoded int data.
type varintIntIterator struct {
reader xio.SimpleReadCloser
reader xio.SimpleReadCloser
byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next`

closed bool
curr int
err error
}

func newVarintIntIterator(reader xio.SimpleReadCloser) *varintIntIterator {
return &varintIntIterator{reader: reader}
return &varintIntIterator{
reader: reader,
byteReader: reader,
}
}

// Next iteration.
Expand All @@ -28,7 +32,7 @@ func (it *varintIntIterator) Next() bool {
}

var curr int64
curr, it.err = binary.ReadVarint(it.reader)
curr, it.err = binary.ReadVarint(it.byteReader)
if it.err != nil {
return false
}
Expand All @@ -55,4 +59,5 @@ func (it *varintIntIterator) Close() {
it.err = nil
it.reader.Close()
it.reader = nil
it.byteReader = nil
}
16 changes: 7 additions & 9 deletions values/encoding/string_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/xichen2020/eventdb/x/proto"
"github.com/xichen2020/eventdb/x/unsafe"

xerrors "github.com/m3db/m3x/errors"
"github.com/valyala/gozstd"
)

Expand Down Expand Up @@ -109,27 +110,24 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer)
}
defer valuesIt.Close()

var multiErr xerrors.MultiError
switch metaProto.Encoding {
case encodingpb.EncodingType_RAW_SIZE:
if err := enc.rawSizeEncode(valuesIt, writer); err != nil {
return err
}
multiErr.Add(enc.rawSizeEncode(valuesIt, writer))
case encodingpb.EncodingType_DICTIONARY:
if err := enc.dictionaryEncode(valuesIt, dictionary, writer); err != nil {
return err
}
multiErr.Add(enc.dictionaryEncode(valuesIt, dictionary, writer))
default:
return fmt.Errorf("invalid encoding type: %v", metaProto.Encoding)
multiErr.Add(fmt.Errorf("invalid encoding type: %v", metaProto.Encoding))
}

// Close the compressWriter if its present.
if compressWriter != nil {
// NB(xichen): Close flushes and closes the compressed writer but doesn't
// close the writer wrapped by the compressed writer.
return compressWriter.Close()
multiErr.Add(compressWriter.Close())
}

return nil
return multiErr.FinalError()
}

func (enc *stringEncoder) reset() {
Expand Down

0 comments on commit d568781

Please sign in to comment.