Skip to content

Commit

Permalink
Add impl for int encoder/decoder. Adds protos. Adds generics.
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Du committed Dec 7, 2018
1 parent f49aae4 commit c86852a
Show file tree
Hide file tree
Showing 30 changed files with 1,614 additions and 75 deletions.
4 changes: 3 additions & 1 deletion encoding/bool_decode.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package encoding

import "github.com/xichen2020/eventdb/x/io"

// BoolDecoder decodes bool values.
type BoolDecoder interface {
// Decode decodes bools from reader.
Decode(reader Reader) (ForwardBoolIterator, error)
Decode(reader io.Reader) (ForwardBoolIterator, error)

// Reset resets the decoder.
Reset()
Expand Down
78 changes: 78 additions & 0 deletions encoding/delta_int_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package encoding

import (
"io"

bitstream "github.com/dgryski/go-bitstream"
)

const (
negativeSign = 1
)

// DeltaIntIterator iterates over a stream of delta encoded data.
type DeltaIntIterator struct {
bitReader *bitstream.BitReader
bitsPerEncodedValue int64
negativeBit uint64
curr int
err error
closed bool
}

// NewDeltaIntIterator returns a new delta encoded int iterator.
func NewDeltaIntIterator(
reader io.Reader,
bitsPerEncodedValue int64,
deltaStart int64,
) *DeltaIntIterator {
return &DeltaIntIterator{
bitReader: bitstream.NewReader(reader),
bitsPerEncodedValue: bitsPerEncodedValue,
negativeBit: 1 << uint(bitsPerEncodedValue),
curr: int(deltaStart),
}
}

// Next iteration.
func (d *DeltaIntIterator) Next() bool {
if d.closed || d.err != nil {
return false
}

// Read in an extra bit for the sign.
var (
delta uint64
)
delta, d.err = d.bitReader.ReadBits(int(d.bitsPerEncodedValue) + 1)
if d.err != nil {
return false
}
// Check if negative bit is set.
isNegative := delta&d.negativeBit == d.negativeBit
if isNegative {
// Zero out the negative bit.
delta &^= d.negativeBit
d.curr -= int(delta)
} else {
d.curr += int(delta)
}

return true
}

// Current returns the current int.
func (d *DeltaIntIterator) Current() int {
return d.curr
}

// Err returns any error recorded while iterating.
func (d *DeltaIntIterator) Err() error {
return d.err
}

// Close the iterator.
func (d *DeltaIntIterator) Close() error {
d.closed = true
return nil
}
79 changes: 79 additions & 0 deletions encoding/dictionary_based_int_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package encoding

import (
"github.com/xichen2020/eventdb/generated/proto/encodingpb"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/proto"

bitstream "github.com/dgryski/go-bitstream"
)

// DictionaryBasedIntIterator iterates through a dict encoded stream of ints.
type DictionaryBasedIntIterator struct {
bitReader *bitstream.BitReader
bytesPerDictionaryValue int64
bitsPerEncodedValue int64
extBuf *[]byte
dict []byte
curr int
err error
closed bool
}

// NewDictionaryBasedIntIterator returns a new dictionary based int iterator.
func NewDictionaryBasedIntIterator(
reader io.Reader,
extProto *encodingpb.IntDictionary, // extProto is an external proto for memory re-use.
extBuf *[]byte, // extBuf is an external byte buffer for memory re-use.
bytesPerDictionaryValue int64,
bitsPerEncodedValue int64,
) (*DictionaryBasedIntIterator, error) {
if err := proto.DecodeIntDictionary(extProto, extBuf, reader); err != nil {
return nil, err
}
// Zero out extBuf so we can re-use it during iteration.
endianness.PutUint64(*extBuf, uint64(0))
return &DictionaryBasedIntIterator{
bitReader: bitstream.NewReader(reader),
bytesPerDictionaryValue: bytesPerDictionaryValue,
bitsPerEncodedValue: bitsPerEncodedValue,
extBuf: extBuf,
dict: extProto.Data,
}, nil
}

// Next iteration.
func (d *DictionaryBasedIntIterator) Next() bool {
if d.closed || d.err != nil {
return false
}

// Read the idx into the dict first.
var dictIdx uint64
dictIdx, d.err = d.bitReader.ReadBits(int(d.bitsPerEncodedValue))
if d.err != nil {
return false
}

// Use idx to fetch value.
start := int64(dictIdx) * d.bytesPerDictionaryValue
copy((*d.extBuf)[:d.bytesPerDictionaryValue], d.dict[start:start+d.bytesPerDictionaryValue])
d.curr = int(endianness.Uint64(*d.extBuf))
return true
}

// Current returns the current int.
func (d *DictionaryBasedIntIterator) Current() int {
return d.curr
}

// Err returns any error recorded while iterating.
func (d *DictionaryBasedIntIterator) Err() error {
return d.err
}

// Close the iterator.
func (d *DictionaryBasedIntIterator) Close() error {
d.closed = true
return nil
}
18 changes: 5 additions & 13 deletions encoding/dictionary_based_string_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package encoding

import (
"encoding/binary"
"io"

"github.com/xichen2020/eventdb/generated/proto/encodingpb"
"github.com/xichen2020/eventdb/x/bytes"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/proto"
)

// DictionaryBasedStringIterator iterates over a
// dict encoded stream of string data.
type DictionaryBasedStringIterator struct {
reader Reader
reader io.Reader
// dictionary is passed externally from the string encoder
// and should not be mutated during iteration.
dictionary []string
Expand All @@ -22,19 +22,11 @@ type DictionaryBasedStringIterator struct {

// NewDictionaryBasedStringIterator returns a new dictionary based string iterator.
func NewDictionaryBasedStringIterator(
reader Reader,
reader io.Reader,
extProto *encodingpb.StringArray, // extProto is an external proto for memory re-use.
extBuf *[]byte, // extBuf is an external byte buffer for memory re-use.
) (*DictionaryBasedStringIterator, error) {
protoSizeBytes, err := binary.ReadVarint(reader)
if err != nil {
return nil, err
}
*extBuf = bytes.EnsureBufferSize(*extBuf, int(protoSizeBytes), bytes.DontCopyData)
if _, err := io.ReadFull(reader, (*extBuf)[:protoSizeBytes]); err != nil {
return nil, err
}
if err := extProto.Unmarshal((*extBuf)[:protoSizeBytes]); err != nil {
if err := proto.DecodeStringArray(extProto, extBuf, reader); err != nil {
return nil, err
}
return &DictionaryBasedStringIterator{
Expand Down
4 changes: 3 additions & 1 deletion encoding/double_decode.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package encoding

import "github.com/xichen2020/eventdb/x/io"

// DoubleDecoder decodes double values.
type DoubleDecoder interface {
// Decode decodes doubles from reader.
Decode(reader Reader) (ForwardDoubleIterator, error)
Decode(reader io.Reader) (ForwardDoubleIterator, error)

// Reset resets the decoder.
Reset()
Expand Down
16 changes: 10 additions & 6 deletions encoding/encoding.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package encoding

import (
"io"
"encoding/binary"
)

// Reader is both an io.Reader and an io.ByteReader.
type Reader interface {
io.Reader
io.ByteReader
}
// ByteOrder for data serialization.
var (
endianness = binary.LittleEndian
)

// For allocating a buffer large enough to hold uint64 values.
const (
uint64SizeBytes = 8
)
67 changes: 66 additions & 1 deletion encoding/int_decode.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,75 @@
package encoding

import (
"fmt"

"github.com/xichen2020/eventdb/generated/proto/encodingpb"
"github.com/xichen2020/eventdb/x/io"
"github.com/xichen2020/eventdb/x/proto"
)

// IntDecoder decodes int values.
type IntDecoder interface {
// Decode decodes ints from reader.
Decode(reader Reader) (ForwardIntIterator, error)
Decode(reader io.Reader) (ForwardIntIterator, error)

// Reset resets the decoder.
Reset()
}

// IntDec is a int Decoder.
type IntDec struct {
dictionaryProto encodingpb.IntDictionary
metaProto encodingpb.IntMeta
buf []byte
}

// NewIntDecoder creates a new int Decoder.
func NewIntDecoder() *IntDec {
return &IntDec{
// Make buf at least big enough to hold Uint64 values.
buf: make([]byte, uint64SizeBytes),
}
}

// Decode encoded int data in a streaming fashion.
func (dec *IntDec) Decode(reader io.Reader) (ForwardIntIterator, error) {
// Decode metadata first.
if err := proto.DecodeIntMeta(&dec.metaProto, &dec.buf, reader); err != nil {
return nil, err
}

var (
iter ForwardIntIterator
err error
)
switch dec.metaProto.Encoding {
case encodingpb.EncodingType_DELTA:
iter = dec.decodeDelta(reader, dec.metaProto.BitsPerEncodedValue, dec.metaProto.DeltaStart)
case encodingpb.EncodingType_DICTIONARY:
iter, err = dec.decodeDictionary(reader, dec.metaProto.BytesPerDictionaryValue, dec.metaProto.BitsPerEncodedValue)
default:
return nil, fmt.Errorf("Invalid encoding type: %v", dec.metaProto.Encoding)
}

return iter, err
}

// Reset the int decoder.
func (dec *IntDec) Reset() {}

func (dec *IntDec) decodeDelta(
reader io.Reader,
bitsPerEncodedValue int64,
deltaStart int64,
) *DeltaIntIterator {
return NewDeltaIntIterator(reader, bitsPerEncodedValue, deltaStart)
}

func (dec *IntDec) decodeDictionary(
reader io.Reader,
bytesPerDictionaryValue int64,
bitsPerEncodedValue int64,
) (*DictionaryBasedIntIterator, error) {
return NewDictionaryBasedIntIterator(reader, &dec.dictionaryProto, &dec.buf, bytesPerDictionaryValue, bitsPerEncodedValue)
}
Loading

0 comments on commit c86852a

Please sign in to comment.