Skip to content

Commit

Permalink
Perform housekeeping on zstd compressed reader/writers.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Feb 7, 2019
1 parent 738e9f6 commit 1bba57d
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 7 deletions.
2 changes: 1 addition & 1 deletion storage/immutable_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type immutableSegment interface {
var (
errImmutableSegmentAlreadyClosed = errors.New("immutable segment is already closed")
errFlushingNotInMemoryOnlySegment = errors.New("flushing a segment that is not in memory only")
errDataNotAvailableInInMemoryOnlySegment = errors.New("data unavaible for in-memory only segment")
errDataNotAvailableInInMemoryOnlySegment = errors.New("data unavailable for in-memory only segment")
errNoTimeValuesInTimestampField = errors.New("no time values in timestamp field")
errNoTimestampField = errors.New("no timestamp field in the segment")
errNoRawDocSourceField = errors.New("no raw doc source field in the segment")
Expand Down
7 changes: 7 additions & 0 deletions values/decoding/compressed_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,10 @@ func (cr *compressedReader) ReadByte() (byte, error) {
}
return cr.buf[0], nil
}

// Close releases the underlying resources of the compressed reader.
func (cr *compressedReader) Close() error {
// Release does not return an error.
cr.Release()
return nil
}
8 changes: 8 additions & 0 deletions values/decoding/dictionary_based_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,13 @@ func (it *dictionaryBasedStringIterator) Err() error {
func (it *dictionaryBasedStringIterator) Close() {
it.extDict = nil
it.err = nil
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
rc, ok := it.reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
rc.Close()
}
it.reader = nil
}
9 changes: 9 additions & 0 deletions values/decoding/raw_size_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func (it *rawSizeStringIterator) Err() error {
func (it *rawSizeStringIterator) Close() {
it.buf = nil
it.err = nil
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
// Since `it.reader` and `it.byteReader` reference the same reader, attempt close one of them.
rc, ok := it.reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
rc.Close()
}
it.reader = nil
it.byteReader = nil
}
8 changes: 8 additions & 0 deletions values/decoding/string_decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ func tryDecodeStringDictionary(
if err != nil {
return 0, err
}
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
rc, ok := reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
defer rc.Close()
}
bytesRead, err := xproto.DecodeStringArray(reader, stringArrayProto, decodeBuf)
if err != nil {
return 0, err
Expand Down
8 changes: 8 additions & 0 deletions values/decoding/varint_int_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,13 @@ func (it *varintIntIterator) Err() error {
func (it *varintIntIterator) Close() {
it.closed = true
it.err = nil
// Close the underlying reader if it satisifies the `io.ReadCloser` iface.
rc, ok := it.reader.(io.ReadCloser)
if ok {
// NB(bodu): We don't need to propagate `Close` errors back up because there aren't any.
// We have two types of string readers. A bytes reader and a compress reader. The bytes reader
// doesn't implement the `io.Closer` iface and the compress reader has no errors when calling `Close`.
rc.Close()
}
it.reader = nil
}
23 changes: 18 additions & 5 deletions values/encoding/string_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer)
case encodingpb.CompressionType_ZSTD:
// TODO(bodu): Figure out a cleaner way to do this.
compressWriter := gozstd.NewWriter(writer)
// NB(xichen): Close flushes and closes the compressed writer but doesn't
// close the writer wrapped by the compressed writer.
defer compressWriter.Close()
// Release all resources occupied by compressWriter.
defer compressWriter.Release()
writer = compressWriter
default:
return fmt.Errorf("invalid compression type: %v", metaProto.Compression)
Expand All @@ -112,12 +111,26 @@ func (enc *stringEncoder) Encode(strVals values.StringValues, writer io.Writer)

switch metaProto.Encoding {
case encodingpb.EncodingType_RAW_SIZE:
return enc.rawSizeEncode(valuesIt, writer)
if err := enc.rawSizeEncode(valuesIt, writer); err != nil {
return err
}
case encodingpb.EncodingType_DICTIONARY:
return enc.dictionaryEncode(valuesIt, dictionary, writer)
if err := enc.dictionaryEncode(valuesIt, dictionary, writer); err != nil {
return err
}
default:
return fmt.Errorf("invalid encoding type: %v", metaProto.Encoding)
}

// Close the writer if it satisifies the `io.WriteCloser` iface.
wc, ok := writer.(io.WriteCloser)
if ok {
// NB(xichen): Close flushes and closes the compressed writer but doesn't
// close the writer wrapped by the compressed writer.
return wc.Close()
}

return nil
}

func (enc *stringEncoder) reset() {
Expand Down
2 changes: 1 addition & 1 deletion x/io/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io

import "io"

// Reader embeds both `io.Reader and `io.ByteReader`.
// Reader embeds both `io.Reader` and `io.ByteReader`.
type Reader interface {
io.Reader
io.ByteReader
Expand Down

0 comments on commit 1bba57d

Please sign in to comment.