Skip to content

Commit

Permalink
Use existing results for early query termination
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jan 15, 2019
1 parent 402da10 commit 7d3bbe1
Show file tree
Hide file tree
Showing 14 changed files with 346 additions and 186 deletions.
4 changes: 2 additions & 2 deletions index/field/doc_id_multi_field_intersect_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// iterator and the multi field iterator.
type DocIDMultiFieldIntersectIterator struct {
docIt index.DocIDSetIterator
multiFieldIt *MultiFieldIterator
multiFieldIt *MultiFieldIntersectIterator

done bool
err error
Expand All @@ -21,7 +21,7 @@ type DocIDMultiFieldIntersectIterator struct {
// NewDocIDMultiFieldIntersectIterator creates a new DocIDMultiFieldIntersectIterator.
func NewDocIDMultiFieldIntersectIterator(
docIt index.DocIDSetIterator,
multiFieldIt *MultiFieldIterator,
multiFieldIt *MultiFieldIntersectIterator,
) *DocIDMultiFieldIntersectIterator {
return &DocIDMultiFieldIntersectIterator{
docIt: docIt,
Expand Down
2 changes: 1 addition & 1 deletion index/field/doc_id_multi_field_intersect_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestDocIDMultiFieldIntersectIterator(t *testing.T) {
it2.EXPECT().Err().Return(nil),
it2.EXPECT().Close(),
)
multiFieldIt := NewMultiFieldIterator([]BaseFieldIterator{it1, it2})
multiFieldIt := NewMultiFieldIntersectIterator([]BaseFieldIterator{it1, it2})
intersectIt := NewDocIDMultiFieldIntersectIterator(docIt, multiFieldIt)
defer intersectIt.Close()

Expand Down
34 changes: 34 additions & 0 deletions index/field/field_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/values"
)

// Union is a union of different typed fields.
Expand Down Expand Up @@ -36,3 +37,36 @@ func (u *Union) Iter() (BaseFieldIterator, error) {
}
return nil, fmt.Errorf("unknown field type in union: %v", u.Type)
}

// ValuesMeta returns the corresponding values metadata for the field represented
// by the union, or an error otherwise.
func (u *Union) ValuesMeta() (values.MetaUnion, error) {
mu := values.MetaUnion{Type: u.Type}
switch u.Type {
case field.NullType:
break
case field.BoolType:
mu.BoolMeta = u.BoolField.Values().Metadata()
case field.IntType:
mu.IntMeta = u.IntField.Values().Metadata()
case field.DoubleType:
mu.DoubleMeta = u.DoubleField.Values().Metadata()
case field.StringType:
mu.StringMeta = u.StringField.Values().Metadata()
case field.TimeType:
mu.TimeMeta = u.TimeField.Values().Metadata()
default:
return values.MetaUnion{}, fmt.Errorf("unknown field type in union: %v", u.Type)
}
return mu, nil
}

// MustValuesMeta returns the values metadata for the field represented by the union,
// and panics if an error is encountered.
func (u *Union) MustValuesMeta() values.MetaUnion {
mu, err := u.ValuesMeta()
if err != nil {
panic(err)
}
return mu
}
20 changes: 10 additions & 10 deletions index/field/multi_field_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"github.com/xichen2020/eventdb/document/field"
)

// MultiFieldIterator is an iterator that iterates over multiple fields,
// MultiFieldIntersectIterator is an iterator that iterates over multiple fields,
// which are joined on their doc IDs. As a result, a document needs to
// contain all fields associated with the iterator in order for it to
// be included in the output of the multi-field iterator.
type MultiFieldIterator struct {
type MultiFieldIntersectIterator struct {
iters []BaseFieldIterator

done bool
Expand All @@ -17,9 +17,9 @@ type MultiFieldIterator struct {
currVals []field.ValueUnion
}

// NewMultiFieldIterator creates a new multi-field iterator union.
func NewMultiFieldIterator(iters []BaseFieldIterator) *MultiFieldIterator {
return &MultiFieldIterator{
// NewMultiFieldIntersectIterator creates a new multi-field intersecting iterator.
func NewMultiFieldIntersectIterator(iters []BaseFieldIterator) *MultiFieldIntersectIterator {
return &MultiFieldIntersectIterator{
iters: iters,
done: len(iters) == 0,
currDocIDs: make([]int32, len(iters)),
Expand All @@ -28,7 +28,7 @@ func NewMultiFieldIterator(iters []BaseFieldIterator) *MultiFieldIterator {
}

// Next returns true if there are more items to be iterated over.
func (it *MultiFieldIterator) Next() bool {
func (it *MultiFieldIntersectIterator) Next() bool {
if it.done || it.err != nil {
return false
}
Expand Down Expand Up @@ -70,18 +70,18 @@ func (it *MultiFieldIterator) Next() bool {
}

// DocID returns the current doc ID, which remains valid until the next iteration.
func (it *MultiFieldIterator) DocID() int32 { return it.currDocIDs[0] }
func (it *MultiFieldIntersectIterator) DocID() int32 { return it.currDocIDs[0] }

// Values returns the current list of field values, which remains valid until the
// next iteration. If the caller needs to retain a valid refence to the value array
// after `Next` is called again, the caller needs to make a copy of the value array.
func (it *MultiFieldIterator) Values() []field.ValueUnion { return it.currVals }
func (it *MultiFieldIntersectIterator) Values() []field.ValueUnion { return it.currVals }

// Err returns errors if any.
func (it *MultiFieldIterator) Err() error { return it.err }
func (it *MultiFieldIntersectIterator) Err() error { return it.err }

// Close closes the iterator.
func (it *MultiFieldIterator) Close() {
func (it *MultiFieldIntersectIterator) Close() {
for i := range it.iters {
it.iters[i].Close()
it.iters[i] = nil
Expand Down
4 changes: 2 additions & 2 deletions index/field/multi_field_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/golang/mock/gomock"
)

func TestMultiFieldIterator(t *testing.T) {
func TestMultiFieldIntersectIterator(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -63,7 +63,7 @@ func TestMultiFieldIterator(t *testing.T) {
it3.EXPECT().Close(),
)

it := NewMultiFieldIterator([]BaseFieldIterator{it1, it2, it3})
it := NewMultiFieldIntersectIterator([]BaseFieldIterator{it1, it2, it3})
defer it.Close()

expectedDocIDs := []int32{20, 24}
Expand Down
81 changes: 37 additions & 44 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const (
defaultTimeGranularity = time.Second
defaultTimeUnit = TimeUnit(xtime.Second)
defaultFilterCombinator = filter.And
defaultRawDocumentQueryLimit = 500
defaultRawQuerySizeLimit = 100
defaultGroupedQuerySizeLimit = 10
defaultOrderBySortOrder = Ascending
)

Expand Down Expand Up @@ -367,30 +368,18 @@ func (q *RawQuery) parseOrderBy(rob RawOrderBy, opts ParseOptions) (OrderBy, err
return ob, fmt.Errorf("invalid order by clause: %v", rob)
}

func (q *RawQuery) parseLimit() (*int, error) {
// TODO(xichen): Protect against overly aggressive limits.
func (q *RawQuery) parseLimit() (int, error) {
if err := q.validateLimit(); err != nil {
return nil, err
return 0, err
}
var limit int
if len(q.GroupBy) == 0 {
// This is a raw document query, for which the limit is the upper
// limit on the log document returned. If the limit is nil, a default
// limit is applied.
limit = defaultRawDocumentQueryLimit
if q.Limit != nil {
limit = *q.Limit
}
return &limit, nil
if q.Limit != nil && *q.Limit >= 0 {
return *q.Limit, nil
}

// This is a group by query, for which the limit is the upper limit
// on the maximum number of groups returned. If the limit is nil,
// no limit is applied.
if q.Limit == nil {
return nil, nil
if len(q.GroupBy) == 0 {
return defaultRawQuerySizeLimit, nil
}
limit = *q.Limit
return &limit, nil
return defaultGroupedQuerySizeLimit, nil
}

func (q RawQuery) validateLimit() error {
Expand All @@ -417,11 +406,12 @@ type ParsedQuery struct {
GroupBy []string
Calculations []Calculation
OrderBy []OrderBy
Limit *int
Limit int

// Derived fields for raw query.
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
RawResultReverseLessThanFn RawResultLessThanFn
}

// IsRaw returns true if the query is querying raw results (i.e., not grouped), and false otherwise.
Expand All @@ -433,14 +423,15 @@ func (q *ParsedQuery) IsGrouped() bool { return !q.IsRaw() }
// RawQuery returns the parsed raw query for raw results.
func (q *ParsedQuery) RawQuery() ParsedRawQuery {
return ParsedRawQuery{
Namespace: q.Namespace,
StartNanosInclusive: q.StartTimeNanos,
EndNanosExclusive: q.EndTimeNanos,
Filters: q.Filters,
OrderBy: q.OrderBy,
Limit: q.Limit,
AllowedFieldTypes: q.AllowedFieldTypes,
RawResultLessThanFn: q.RawResultLessThanFn,
Namespace: q.Namespace,
StartNanosInclusive: q.StartTimeNanos,
EndNanosExclusive: q.EndTimeNanos,
Filters: q.Filters,
OrderBy: q.OrderBy,
Limit: q.Limit,
AllowedFieldTypes: q.AllowedFieldTypes,
RawResultLessThanFn: q.RawResultLessThanFn,
RawResultReverseLessThanFn: q.RawResultReverseLessThanFn,
}
}

Expand All @@ -464,7 +455,7 @@ func (q *ParsedQuery) computeRawDerived(opts ParseOptions) error {
if err := q.computeAllowedFieldTypes(opts); err != nil {
return err
}
return q.computeRawResultLessThan()
return q.computeRawResultCompareFns()
}

func (q *ParsedQuery) computeAllowedFieldTypes(opts ParseOptions) error {
Expand Down Expand Up @@ -550,20 +541,19 @@ func addQueryFieldToMap(
fm[fieldHash] = meta
}

func (q *ParsedQuery) computeRawResultLessThan() error {
func (q *ParsedQuery) computeRawResultCompareFns() error {
compareFns := make([]field.ValueCompareFn, 0, len(q.OrderBy))
for _, ob := range q.OrderBy {
// NB(xichen): Reverse compare function is needed to keep the top N values. For example,
// if we need to keep the top 10 values sorted in ascending order, it means we need the
// 10 smallest values, and as such we keep a max heap and only add values to the heap
// if the current value is smaller than the max heap value.
compareFn, err := ob.SortOrder.ReverseCompareFn()
compareFn, err := ob.SortOrder.CompareFn()
if err != nil {
return fmt.Errorf("error determining the value compare fn for sort order %v: %v", ob.SortOrder, err)
}
compareFns = append(compareFns, compareFn)
}
q.RawResultLessThanFn = NewLessThanFn(compareFns)
q.RawResultReverseLessThanFn = func(v1, v2 RawResult) bool {
return !q.RawResultLessThanFn(v1, v2)
}
return nil
}

Expand Down Expand Up @@ -594,11 +584,12 @@ type ParsedRawQuery struct {
EndNanosExclusive int64
Filters []FilterList
OrderBy []OrderBy
Limit *int
Limit int

// Derived fields.
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
RawResultReverseLessThanFn RawResultLessThanFn
}

// NumFieldsForQuery returns the total number of fields for query.
Expand All @@ -614,8 +605,10 @@ func (q *ParsedRawQuery) NumFieldsForQuery() int {
// NewRawResults creates a new raw results from the parsed raw query.
func (q *ParsedRawQuery) NewRawResults() RawResults {
return RawResults{
IsOrdered: len(q.OrderBy) > 0,
Limit: q.Limit,
OrderBy: q.OrderBy,
Limit: q.Limit,
LessThanFn: q.RawResultLessThanFn,
ReverseLessThanFn: q.RawResultReverseLessThanFn,
}
}

Expand Down
25 changes: 20 additions & 5 deletions query/raw_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,18 +198,33 @@ func (h RawResultHeap) heapify(i, n int) {

// RawResults is a collection of raw results.
type RawResults struct {
IsOrdered bool
Limit *int
OrderBy []OrderBy
Limit int
LessThanFn RawResultLessThanFn
ReverseLessThanFn RawResultLessThanFn

Data []RawResult `json:"data"`
}

// Add adds a list of raw results to the collection.
func (r *RawResults) Add(rs []RawResult) {
panic("not implemented")
// IsOrdered returns true if the raw results are kept in order.
func (r *RawResults) IsOrdered() bool {
return len(r.OrderBy) > 0
}

// LimitReached returns true if we have collected enough raw results.
// TODO(xichen): Implement this.
func (r *RawResults) LimitReached() bool {
panic("not implemented")
}

// Add adds a raw result to the collection.
// TODO(xichen): Implement this.
func (r *RawResults) Add(rr RawResult) {
panic("not implemented")
}

// AddBatch adds a batch of raw results to the collection.
// TODO(xichen): Implement this.
func (r *RawResults) AddBatch(rr []RawResult) {
panic("not implemented")
}
12 changes: 0 additions & 12 deletions query/sort_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,6 @@ func (f SortOrder) CompareFn() (field.ValueCompareFn, error) {
}
}

// ReverseCompareFn returns the function to reverse compare two values.
func (f SortOrder) ReverseCompareFn() (field.ValueCompareFn, error) {
switch f {
case Ascending:
return field.MustReverseCompareUnion, nil
case Descending:
return field.MustCompareUnion, nil
default:
return nil, fmt.Errorf("unknown sort order %v", f)
}
}

// String returns the string representation of the sort order.
func (f SortOrder) String() string {
if s, exists := sortOrderStrings[f]; exists {
Expand Down
Loading

0 comments on commit 7d3bbe1

Please sign in to comment.