Skip to content

Commit

Permalink
Use reader embedded in a no-op closer.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 11, 2019
1 parent 1bba57d commit 0bce519
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 41 deletions.
3 changes: 1 addition & 2 deletions values/decoding/compressed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ func (cr *compressedReader) ReadByte() (byte, error) {
}

// Close releases the underlying resources of the compressed reader.
func (cr *compressedReader) Close() error {
func (cr *compressedReader) Close() {
// Release does not return an error.
cr.Release()
return nil
}
15 changes: 5 additions & 10 deletions values/decoding/dictionary_based_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"encoding/binary"
"fmt"
"io"

xio "github.com/xichen2020/eventdb/x/io"
)

// dictionaryBasedStringIterator iterates over a
// dict encoded stream of string data.
type dictionaryBasedStringIterator struct {
reader io.ByteReader
reader xio.SimpleReadCloser
// extDict is passed externally from the string decoder
// and should not be mutated during iteration.
extDict []string
Expand All @@ -19,7 +21,7 @@ type dictionaryBasedStringIterator struct {
}

func newDictionaryBasedStringIterator(
reader io.ByteReader,
reader xio.SimpleReadCloser,
extDict []string,
) *dictionaryBasedStringIterator {
return &dictionaryBasedStringIterator{
Expand Down Expand Up @@ -65,13 +67,6 @@ func (it *dictionaryBasedStringIterator) Err() error {
func (it *dictionaryBasedStringIterator) Close() {
it.extDict = nil
it.err = nil
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
rc, ok := it.reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
rc.Close()
}
it.reader.Close()
it.reader = nil
}
4 changes: 2 additions & 2 deletions values/decoding/int_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newIntIteratorFromMeta(
extDict []int,
encodedDictBytes int,
) (iterator.ForwardIntIterator, error) {
reader := bytes.NewReader(encodedBytes)
reader := xio.NewReaderNoopCloser(bytes.NewReader(encodedBytes))
switch metaProto.Encoding {
case encodingpb.EncodingType_VARINT:
return newVarintIntIterator(reader), nil
Expand Down Expand Up @@ -121,7 +121,7 @@ func newIntDictionaryIndexIterator(
encodedBytes []byte,
encodedDictBytes int,
) (iterator.ForwardIntIterator, error) {
reader := bytes.NewReader(encodedBytes)
reader := xio.NewReaderNoopCloser(bytes.NewReader(encodedBytes))
// Simply discard the bytes used to encode the dictionary since we've already
// received the dictionary parameter.
_, err := io.CopyN(ioutil.Discard, reader, int64(encodedDictBytes))
Expand Down
23 changes: 6 additions & 17 deletions values/decoding/raw_size_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,19 @@ const (
// raw size encoded string data.
// TODO(xichen): Get the buffer from bytes pool.
type rawSizeStringIterator struct {
reader xio.Reader
byteReader io.ByteReader // Same as `reader` but has the proper type to save interface conversions in `Next`
reader xio.SimpleReadCloser

curr string
err error
buf []byte
}

func newRawSizeStringIterator(
reader xio.Reader,
reader xio.SimpleReadCloser,
) *rawSizeStringIterator {
return &rawSizeStringIterator{
reader: reader,
byteReader: reader,
buf: make([]byte, defaultInitialStringBufferCapacity),
reader: reader,
buf: make([]byte, defaultInitialStringBufferCapacity),
}
}

Expand All @@ -42,7 +40,7 @@ func (it *rawSizeStringIterator) Next() bool {
}

var rawSizeBytes int64
rawSizeBytes, it.err = binary.ReadVarint(it.byteReader)
rawSizeBytes, it.err = binary.ReadVarint(it.reader)
if it.err != nil {
return false
}
Expand Down Expand Up @@ -74,15 +72,6 @@ func (it *rawSizeStringIterator) Err() error {
func (it *rawSizeStringIterator) Close() {
it.buf = nil
it.err = nil
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
// Since `it.reader` and `it.byteReader` reference the same reader, attempt close one of them.
rc, ok := it.reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
rc.Close()
}
it.reader.Close()
it.reader = nil
it.byteReader = nil
}
2 changes: 1 addition & 1 deletion values/decoding/string_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func tryDecodeStringDictionary(
func newStringReaderFromMeta(
metaProto encodingpb.StringMeta,
data []byte,
) (xio.Reader, error) {
) (xio.SimpleReadCloser, error) {
var reader xio.Reader = bytes.NewReader(data)
switch metaProto.Compression {
case encodingpb.CompressionType_ZSTD:
Expand Down
6 changes: 4 additions & 2 deletions values/decoding/varint_int_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ package decoding
import (
"encoding/binary"
"io"

xio "github.com/xichen2020/eventdb/x/io"
)

// varintIntIterator iterates over a stream of
// varint encoded int data.
type varintIntIterator struct {
reader io.ByteReader
reader xio.SimpleReadCloser

closed bool
curr int
err error
}

func newVarintIntIterator(reader io.ByteReader) *varintIntIterator {
func newVarintIntIterator(reader xio.SimpleReadCloser) *varintIntIterator {
return &varintIntIterator{reader: reader}
}

Expand Down
11 changes: 5 additions & 6 deletions values/encoding/string_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer)
}

// Compress the bytes.
var compressWriter *gozstd.Writer
switch metaProto.Compression {
case encodingpb.CompressionType_ZSTD:
// TODO(bodu): Figure out a cleaner way to do this.
compressWriter := gozstd.NewWriter(writer)
compressWriter = gozstd.NewWriter(writer)
// Release all resources occupied by compressWriter.
defer compressWriter.Release()
writer = compressWriter
Expand All @@ -122,12 +122,11 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer)
return fmt.Errorf("invalid encoding type: %v", metaProto.Encoding)
}

// Close the writer if it satisifies the `io.WriteCloser` iface.
wc, ok := writer.(io.WriteCloser)
if ok {
// Close the compressWriter if its present.
if compressWriter != nil {
// NB(xichen): Close flushes and closes the compressed writer but doesn't
// close the writer wrapped by the compressed writer.
return wc.Close()
return compressWriter.Close()
}

return nil
Expand Down
25 changes: 24 additions & 1 deletion x/io/reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
package io

import "io"
import (
"io"

"github.com/m3db/m3x/close"
)

// Reader embeds both `io.Reader` and `io.ByteReader`.
type Reader interface {
io.Reader
io.ByteReader
}

// SimpleReadCloser embeds both `Reader` and `close.SimpleCloser`.
type SimpleReadCloser interface {
Reader
close.SimpleCloser
}

// ReaderNoopCloser is a reader that has a no-op Close method.
type ReaderNoopCloser struct {
Reader
}

// NewReaderNoopCloser returns a new reader that implements a no-op Close.
func NewReaderNoopCloser(r Reader) SimpleReadCloser {
return ReaderNoopCloser{Reader: r}
}

// Close is a no-op.
func (r ReaderNoopCloser) Close() {}

0 comments on commit 0bce519

Please sign in to comment.