Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jan 11, 2019
1 parent 4708b6e commit 1fa1ca7
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 10 deletions.
1 change: 1 addition & 0 deletions .excludemetalint
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
vendor/
generated/
index/template/
values/template/
_mock.go
_gen.go
41 changes: 35 additions & 6 deletions index/at_position_doc_id_set_iterator.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package index

import (
"errors"

"github.com/xichen2020/eventdb/values/iterator"
)

var (
errPositionIterDocIDIterCountMismatch = errors.New("doc ID iterator and the position iterator iterator count mismatch")
)

// AtPositionDocIDSetIterator outputs the doc IDs from the doc ID set iterator at the
// given positions from the position iterator.
// TODO(xichen): Fix doc ID set iterator to return an error.
// TODO(xichen): Doc ID set iterator must be advanced first.
type AtPositionDocIDSetIterator struct {
docIt DocIDSetIterator
seekableDocIt SeekableDocIDSetIterator
positionIt iterator.PositionIterator

done bool
err error
firstTime bool
currDocID int32
currPos int
done bool
}

// NewAtPositionDocIDSetIterator creates a new at position iterator.
Expand All @@ -24,16 +30,20 @@ func NewAtPositionDocIDSetIterator(
positionIt iterator.PositionIterator,
) *AtPositionDocIDSetIterator {
seekableDocIt, _ := docIt.(SeekableDocIDSetIterator)
if seekableDocIt != nil {
docIt = nil
}
return &AtPositionDocIDSetIterator{
docIt: docIt,
seekableDocIt: seekableDocIt,
positionIt: positionIt,
firstTime: true,
}
}

// Next returns true if there are more doc IDs to be iterated over.
func (it *AtPositionDocIDSetIterator) Next() bool {
if it.done {
if it.done || it.err != nil {
return false
}
if !it.positionIt.Next() {
Expand All @@ -42,13 +52,28 @@ func (it *AtPositionDocIDSetIterator) Next() bool {
}
nextPos := it.positionIt.Position()
distance := nextPos - it.currPos

// We have a next position, now advance the doc ID set iterator for the first time.
if it.firstTime {
it.firstTime = false
if hasNoValues :=
(it.seekableDocIt != nil && !it.seekableDocIt.Next()) ||
(it.docIt != nil && !it.docIt.Next()); hasNoValues {
it.err = errPositionIterDocIDIterCountMismatch
return false
}
}

if it.seekableDocIt != nil {
it.seekableDocIt.SeekForward(distance)
if it.err = it.seekableDocIt.SeekForward(distance); it.err != nil {
return false
}
it.currDocID = it.seekableDocIt.DocID()
} else {
for i := 0; i < distance; i++ {
if !it.docIt.Next() {
panic("doc ID iterator and the position iterator iterator count mismatch")
it.err = errPositionIterDocIDIterCountMismatch
return false
}
}
it.currDocID = it.docIt.DocID()
Expand All @@ -60,6 +85,9 @@ func (it *AtPositionDocIDSetIterator) Next() bool {
// DocID returns the current doc ID.
func (it *AtPositionDocIDSetIterator) DocID() int32 { return it.currDocID }

// Err returns any error encountered.
func (it *AtPositionDocIDSetIterator) Err() error { return it.err }

// Close closes the iterator.
func (it *AtPositionDocIDSetIterator) Close() {
if it.docIt != nil {
Expand All @@ -70,4 +98,5 @@ func (it *AtPositionDocIDSetIterator) Close() {
it.seekableDocIt = nil
}
it.positionIt = nil
it.err = nil
}
41 changes: 40 additions & 1 deletion index/at_position_doc_id_set_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestAtPositionDocIDSetIterator(t *testing.T) {
func TestAtPositionDocIDSetIteratorForwardOnly(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -36,6 +36,7 @@ func TestAtPositionDocIDSetIterator(t *testing.T) {
mockPositionIt.EXPECT().Next().Return(false),
)
atPositionIt := NewAtPositionDocIDSetIterator(docIDSetIter, mockPositionIt)
defer atPositionIt.Close()

expected := []int32{7, 54, 107}
var actual []int32
Expand All @@ -44,3 +45,41 @@ func TestAtPositionDocIDSetIterator(t *testing.T) {
}
require.Equal(t, expected, actual)
}

func TestAtPositionDocIDSetIteratorSeekable(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

docIDSetIter := NewMockSeekableDocIDSetIterator(ctrl)
gomock.InOrder(
docIDSetIter.EXPECT().Next().Return(true),
docIDSetIter.EXPECT().SeekForward(2).Return(nil),
docIDSetIter.EXPECT().DocID().Return(int32(7)),
docIDSetIter.EXPECT().SeekForward(2).Return(nil),
docIDSetIter.EXPECT().DocID().Return(int32(54)),
docIDSetIter.EXPECT().SeekForward(3).Return(nil),
docIDSetIter.EXPECT().DocID().Return(int32(107)),
docIDSetIter.EXPECT().Close(),
)

mockPositionIt := iterator.NewMockPositionIterator(ctrl)
gomock.InOrder(
mockPositionIt.EXPECT().Next().Return(true),
mockPositionIt.EXPECT().Position().Return(2),
mockPositionIt.EXPECT().Next().Return(true),
mockPositionIt.EXPECT().Position().Return(4),
mockPositionIt.EXPECT().Next().Return(true),
mockPositionIt.EXPECT().Position().Return(7),
mockPositionIt.EXPECT().Next().Return(false),
)
atPositionIt := NewAtPositionDocIDSetIterator(docIDSetIter, mockPositionIt)
defer atPositionIt.Close()

expected := []int32{7, 54, 107}
var actual []int32
for atPositionIt.Next() {
actual = append(actual, atPositionIt.DocID())
}
require.NoError(t, atPositionIt.Err())
require.Equal(t, expected, actual)
}
1 change: 1 addition & 0 deletions index/bitmap_based_doc_id_position_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type bitmapBasedDocIDPositionIterator struct {
currPosition int
}

// nolint: deadcode
func newBitmapBasedDocIDPositionIterator(
bm *roaring.Bitmap,
maskingIt DocIDSetIterator,
Expand Down
1 change: 1 addition & 0 deletions index/bitmap_based_doc_id_position_iterator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func BenchmarkCustomBitmapDocIDPositionIterator(b *testing.B) {
}
}

// nolint: unparam
func initBenchBitmap(n int, everyN int) *roaring.Bitmap {
bm := roaring.NewBitmap()
for j := 0; j < n; j++ {
Expand Down
2 changes: 1 addition & 1 deletion index/doc_id_set_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type SeekableDocIDSetIterator interface {
DocIDSetIterator

// SeekForward moves the iterator forward n positions.
SeekForward(n int)
SeekForward(n int) error
}

// DocIDSetIteratorFn transforms an input doc ID set iterator into a new doc ID set iterator.
Expand Down
14 changes: 14 additions & 0 deletions index/field/bool_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type BoolField interface {
filterValue *field.ValueUnion,
numTotalDocs int32,
) (index.DocIDSetIterator, error)

// Fetch fetches the field values from the set of documents given by
// the doc ID set iterator passed in. If the field doesn't exist in
// a document from the doc ID set iterator output, it is ignored.
Fetch(it index.DocIDSetIterator) (BoolFieldIterator, error)
}

// CloseableBoolField is a bool field that can be closed.
Expand Down Expand Up @@ -119,6 +124,15 @@ func (f *boolField) Filter(
return index.NewAtPositionDocIDSetIterator(docIDSetIter, positionIt), nil
}

func (f *boolField) Fetch(it index.DocIDSetIterator) (BoolFieldIterator, error) {
valsIt, err := f.values.Iter()
if err != nil {
return nil, err
}
docIDPosIt := f.docIDSet.Fetch(it)
return newAtPositionBoolFieldIterator(docIDPosIt, valsIt), nil
}

func (f *boolField) ShallowCopy() CloseableBoolField {
f.IncRef()
shallowCopy := *f
Expand Down
14 changes: 14 additions & 0 deletions index/field/double_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type DoubleField interface {
filterValue *field.ValueUnion,
numTotalDocs int32,
) (index.DocIDSetIterator, error)

// Fetch fetches the field values from the set of documents given by
// the doc ID set iterator passed in. If the field doesn't exist in
// a document from the doc ID set iterator output, it is ignored.
Fetch(it index.DocIDSetIterator) (DoubleFieldIterator, error)
}

// CloseableDoubleField is a double field that can be closed.
Expand Down Expand Up @@ -119,6 +124,15 @@ func (f *doubleField) Filter(
return index.NewAtPositionDocIDSetIterator(docIDSetIter, positionIt), nil
}

func (f *doubleField) Fetch(it index.DocIDSetIterator) (DoubleFieldIterator, error) {
valsIt, err := f.values.Iter()
if err != nil {
return nil, err
}
docIDPosIt := f.docIDSet.Fetch(it)
return newAtPositionDoubleFieldIterator(docIDPosIt, valsIt), nil
}

func (f *doubleField) ShallowCopy() CloseableDoubleField {
f.IncRef()
shallowCopy := *f
Expand Down
14 changes: 14 additions & 0 deletions index/field/int_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type IntField interface {
filterValue *field.ValueUnion,
numTotalDocs int32,
) (index.DocIDSetIterator, error)

// Fetch fetches the field values from the set of documents given by
// the doc ID set iterator passed in. If the field doesn't exist in
// a document from the doc ID set iterator output, it is ignored.
Fetch(it index.DocIDSetIterator) (IntFieldIterator, error)
}

// CloseableIntField is a int field that can be closed.
Expand Down Expand Up @@ -119,6 +124,15 @@ func (f *intField) Filter(
return index.NewAtPositionDocIDSetIterator(docIDSetIter, positionIt), nil
}

func (f *intField) Fetch(it index.DocIDSetIterator) (IntFieldIterator, error) {
valsIt, err := f.values.Iter()
if err != nil {
return nil, err
}
docIDPosIt := f.docIDSet.Fetch(it)
return newAtPositionIntFieldIterator(docIDPosIt, valsIt), nil
}

func (f *intField) ShallowCopy() CloseableIntField {
f.IncRef()
shallowCopy := *f
Expand Down
9 changes: 9 additions & 0 deletions index/field/null_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type NullField interface {
filterValue *field.ValueUnion,
numTotalDocs int32,
) (index.DocIDSetIterator, error)

// Fetch fetches the field doc IDs from the set of documents given by
// the doc ID set iterator passed in. If the field doesn't exist in
// a document from the doc ID set iterator output, it is ignored.
Fetch(it index.DocIDSetIterator) (index.DocIDSetIterator, error)
}

// CloseableNullField is a null field that can be closed.
Expand Down Expand Up @@ -105,6 +110,10 @@ func (f *nullField) Filter(
return docIDSetIter, nil
}

func (f *nullField) Fetch(it index.DocIDSetIterator) (index.DocIDSetIterator, error) {
return f.docIDSet.Fetch(it), nil
}

func (f *nullField) ShallowCopy() CloseableNullField {
f.IncRef()
shallowCopy := *f
Expand Down
14 changes: 14 additions & 0 deletions index/field/time_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type TimeField interface {
filterValue *field.ValueUnion,
numTotalDocs int32,
) (index.DocIDSetIterator, error)

// Fetch fetches the field values from the set of documents given by
// the doc ID set iterator passed in. If the field doesn't exist in
// a document from the doc ID set iterator output, it is ignored.
Fetch(it index.DocIDSetIterator) (TimeFieldIterator, error)
}

// CloseableTimeField is a time field that can be closed.
Expand Down Expand Up @@ -119,6 +124,15 @@ func (f *timeField) Filter(
return index.NewAtPositionDocIDSetIterator(docIDSetIter, positionIt), nil
}

func (f *timeField) Fetch(it index.DocIDSetIterator) (TimeFieldIterator, error) {
valsIt, err := f.values.Iter()
if err != nil {
return nil, err
}
docIDPosIt := f.docIDSet.Fetch(it)
return newAtPositionTimeFieldIterator(docIDPosIt, valsIt), nil
}

func (f *timeField) ShallowCopy() CloseableTimeField {
f.IncRef()
shallowCopy := *f
Expand Down
6 changes: 4 additions & 2 deletions index/index_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ func (_mr *_MockSeekableDocIDSetIteratorRecorder) Next() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Next")
}

func (_m *MockSeekableDocIDSetIterator) SeekForward(_param0 int) {
_m.ctrl.Call(_m, "SeekForward", _param0)
func (_m *MockSeekableDocIDSetIterator) SeekForward(_param0 int) error {
ret := _m.ctrl.Call(_m, "SeekForward", _param0)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockSeekableDocIDSetIteratorRecorder) SeekForward(arg0 interface{}) *gomock.Call {
Expand Down

0 comments on commit 1fa1ca7

Please sign in to comment.