Skip to content

Commit

Permalink
Fix block based encoding/decoding and create a seekable string iterator.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Dec 25, 2018
1 parent 3146c25 commit 3049264
Show file tree
Hide file tree
Showing 10 changed files with 312 additions and 163 deletions.
43 changes: 22 additions & 21 deletions encoding/block_reader.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
package encoding

import (
"errors"
"os"

"github.com/xichen2020/eventdb/generated/proto/encodingpb"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/proto"
)

var (
errEndOfBlock = errors.New("reached end of block, call Skip to move on to the next block")
)

type blockReader struct {
// Two readers, one for reading data. And another for seeking through data.
r io.SeekableReader
extBuf *[]byte
metaProto *encodingpb.BlockMeta
Expand All @@ -30,11 +36,9 @@ func newBlockReader(
}

func (br *blockReader) Read(p []byte) (int, error) {
// Skip to the next block if we've read this entire block of data.
// Return an end of block error if the reader has reached the end of a block.
if br.metaProto.NumBytes == br.numBytesRead {
if err := br.skip(); err != nil {
return 0, err
}
return 0, errEndOfBlock
}

// Calculate # of bytes to read so we don't overshoot.
Expand All @@ -59,36 +63,33 @@ func (br *blockReader) ReadByte() (byte, error) {
return 0, err
}
br.numBytesRead += int64(n)

// Skip to the next block if we've read this entire block of data.
if br.metaProto.NumBytes == br.numBytesRead {
if err := br.skip(); err != nil {
return 0, err
}
}
return (*br.extBuf)[0], nil
}

// Skip to the next block. Also called on `reset` to load the first block metadata.
// skip to the next block.
// NB(bodu): it is the caller's responsibility to call skip when they reach the end of a block.
func (br *blockReader) skip() error {
br.metaProto.Reset()

// Seek to the next block in the underlying reader.
if _, err := br.r.Seek(br.metaProto.NumBytes-br.numBytesRead, os.SEEK_CUR); err != nil {
return err
if br.metaProto.NumBytes > 0 {
if _, err := br.r.Seek(br.metaProto.NumBytes-br.numBytesRead, os.SEEK_CUR); err != nil {
return err
}
}

br.metaProto.Reset()
if err := proto.DecodeBlockMeta(br.metaProto, br.extBuf, br.r); err != nil {
return err
}
br.numBytesRead = 0
return nil
}

func (br *blockReader) reset(reader io.SeekableReader) {
br.metaProto.Reset()
br.r = reader
}

// numOfEvents is used by the caller to determine whether or not to skip.
func (br *blockReader) numOfEvents() int {
return int(br.metaProto.NumOfEvents)
}

func (br *blockReader) reset(reader io.SeekableReader) {
br.metaProto.Reset()
br.r = reader
}
10 changes: 7 additions & 3 deletions encoding/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (

// blockWriter encapsulates
type blockWriter struct {
w io.Writer
buf *bytes.Buffer
extBuf *[]byte
w io.Writer
metaProto *encodingpb.BlockMeta
}

Expand All @@ -30,8 +30,12 @@ func newBlockWriter(
}
}

func (bw *blockWriter) Write(p []byte) (int, error) {
return bw.buf.Write(p)
func (bw *blockWriter) Close() error {
bw.buf = nil
bw.extBuf = nil
bw.w = nil
bw.metaProto = nil
return nil
}

func (bw *blockWriter) reset(writer io.Writer) {
Expand Down
33 changes: 9 additions & 24 deletions encoding/compress_reader.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package encoding

import (
"errors"
"io"

"github.com/valyala/gozstd"
)

// Errors.
var (
ErrCompressReaderDoesNotImplementSkip = errors.New("compress reader does not implement Skip")
)

// CompressReader embeds gozstd's Reader
// and implements the `io.ByteReader` iface.
type CompressReader struct {
Expand All @@ -20,8 +14,7 @@ type CompressReader struct {
*gozstd.Reader
}

// NewCompressReader creates a new `CompressReader` instance.
func NewCompressReader(reader io.Reader) *CompressReader {
func newCompressReader(reader io.Reader) *CompressReader {
return &CompressReader{
Reader: gozstd.NewReader(reader),
// Create a byte buffer of a single byte so
Expand All @@ -39,33 +32,25 @@ func (cr *CompressReader) ReadByte() (byte, error) {
return cr.buf[0], nil
}

// Skip is not implemented for compress reader.
func (cr *CompressReader) Skip() error {
return ErrCompressReaderDoesNotImplementSkip
}

// SkippableCompressReader embeds CompressReader and implements a Skip function to skip underlying
// blocks of data.
type SkippableCompressReader struct {
// skippableCompressReader wraps both a block reader and a compress reader.
type skippableCompressReader struct {
br *blockReader

*CompressReader
}

func newSkippableCompressReader(
br *blockReader,
) *SkippableCompressReader {
return &SkippableCompressReader{
func newSkippableCompressReader(br *blockReader) *skippableCompressReader {
return &skippableCompressReader{
br: br,
CompressReader: NewCompressReader(br),
CompressReader: newCompressReader(br),
}
}

// Skip to the next underlying block of data.
func (scr *SkippableCompressReader) Skip() error {
// skip to the next underlying block of data.
func (scr *skippableCompressReader) skip() error {
return scr.br.skip()
}

func (scr *SkippableCompressReader) numOfEvents() int {
func (scr *skippableCompressReader) numOfEvents() int {
return scr.br.numOfEvents()
}
8 changes: 8 additions & 0 deletions encoding/dictionary_based_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package encoding

import (
"encoding/binary"
"errors"

"github.com/xichen2020/eventdb/generated/proto/encodingpb"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/proto"
)

var (
errDoesNotImplementSeek = errors.New("iterator does not implement `Seek`")
)

// DictionaryBasedStringIterator iterates over a
// dict encoded stream of string data.
type DictionaryBasedStringIterator struct {
Expand Down Expand Up @@ -50,6 +55,9 @@ func (it *DictionaryBasedStringIterator) Next() bool {
return true
}

// Seek is not implemented for `DictionaryBasedStringIterator`.
func (it *DictionaryBasedStringIterator) Seek(offset int) error { return errDoesNotImplementSeek }

// Current returns the current string.
func (it *DictionaryBasedStringIterator) Current() string { return it.curr }

Expand Down
114 changes: 102 additions & 12 deletions encoding/rawsize_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,46 @@ package encoding

import (
"encoding/binary"
"errors"

"github.com/xichen2020/eventdb/x/bytes"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/unsafe"
)

// Errors.
var (
errCannotSeekWithoutBlockStorage = errors.New("cannot seek without underlying block storage")
errFailedToCastReaderToSkippableCompressReader = errors.New("failed to cast reader to skippable compress reader")
)

// RawSizeStringIteratorOptions informs how to decode string data.
type RawSizeStringIteratorOptions struct {
UseBlocks bool
}

// RawSizeStringIterator iterates over a stream of
// raw size encoded string data.
type RawSizeStringIterator struct {
reader io.Reader
extBuf *[]byte
opts RawSizeStringIteratorOptions
curr string
err error
closed bool
// numOfEventsRead is the # of events read so far in a block.
numOfEventsRead int
err error
closed bool
}

func newRawSizeStringIterator(
reader io.Reader,
extBuf *[]byte, // extBuf is an external byte buffer for memory re-use.
opts RawSizeStringIteratorOptions,
) *RawSizeStringIterator {
return &RawSizeStringIterator{
reader: reader,
extBuf: extBuf,
opts: opts,
}
}

Expand All @@ -34,21 +51,43 @@ func (it *RawSizeStringIterator) Next() bool {
return false
}

var rawSizeBytes int64
rawSizeBytes, it.err = binary.ReadVarint(it.reader)
if it.err != nil {
return false
return it.tryRead()
}

// Seek to an offset (in # of events) relative to either the current event.
func (it *RawSizeStringIterator) Seek(offset int) error {
// Seek requires the underlying reader to be a `skippableCompressReader`.
if !it.opts.UseBlocks {
return errCannotSeekWithoutBlockStorage
}
reader, ok := it.reader.(*skippableCompressReader)
if !ok {
return errFailedToCastReaderToSkippableCompressReader
}

*it.extBuf = bytes.EnsureBufferSize(*it.extBuf, int(rawSizeBytes), bytes.DontCopyData)
// Calculate if we need to skip blocks or not.
remaining := reader.numOfEvents() - it.numOfEventsRead
// Skip until we can seek to an event w/in a block.
for offset > remaining {
// Move the offset along.
offset -= remaining
if err := reader.skip(); err != nil {
return err
}
// Since we just skipped a block, remaining is just the total # of events in this block.
remaining = reader.numOfEvents()
}

_, it.err = it.reader.Read((*it.extBuf)[:rawSizeBytes])
if it.err != nil {
return false
// Now the offset is w/in the current block.
// Iterate until we reach the event.
for offset >= 0 {
if !it.Next() {
return it.err
}
offset--
}

it.curr = unsafe.ToString((*it.extBuf)[:rawSizeBytes])
return true
return nil
}

// Current returns the current string.
Expand All @@ -66,3 +105,54 @@ func (it *RawSizeStringIterator) Close() error {
it.reader = nil
return nil
}

// tryRead tries a read and handles retries if we reach the end of a block.
func (it *RawSizeStringIterator) tryRead() bool {
var rawSizeBytes int64
rawSizeBytes, it.err = binary.ReadVarint(it.reader)
// Skip to the next block if we reach the end of one and retry read.
if it.err == errEndOfBlock {
return it.skip()
}
if it.err != nil {
return false
}

*it.extBuf = bytes.EnsureBufferSize(*it.extBuf, int(rawSizeBytes), bytes.DontCopyData)

_, it.err = it.reader.Read((*it.extBuf)[:rawSizeBytes])
// Skip to the next block if we reach the end of one and retry read.
if it.err == errEndOfBlock {
return it.skip()
}
if it.err != nil {
return false
}

it.curr = unsafe.ToString((*it.extBuf)[:rawSizeBytes])
it.numOfEventsRead++
return true
}

// skip to the next block and do some housekeeping.
func (it *RawSizeStringIterator) skip() bool {
// Seek requires the underlying reader to be a `io.SkippableReader`.
if !it.opts.UseBlocks {
it.err = errCannotSeekWithoutBlockStorage
return false
}
reader, ok := it.reader.(*skippableCompressReader)
if !ok {
it.err = errFailedToCastReaderToSkippableCompressReader
return false
}

it.err = reader.skip()
if it.err != nil {
return false
}
// clear the error otherwise.
it.err = nil
it.numOfEventsRead = 0
return it.tryRead()
}
Loading

0 comments on commit 3049264

Please sign in to comment.