From 7d3bbe134f80259ff5bf8f6bb09fc5e6e4683976 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Tue, 15 Jan 2019 12:53:26 -0500 Subject: [PATCH] Use existing results for early query termination --- .../doc_id_multi_field_intersect_iterator.go | 4 +- ..._id_multi_field_intersect_iterator_test.go | 2 +- index/field/field_union.go | 34 +++ index/field/multi_field_iterator.go | 20 +- index/field/multi_field_iterator_test.go | 4 +- query/query.go | 81 +++--- query/raw_result.go | 25 +- query/sort_order.go | 12 - storage/immutable_segment.go | 251 +++++++++++------- storage/namespace.go | 4 +- storage/sealed_segment.go | 10 +- storage/shard.go | 17 +- storage/shard_test.go | 8 +- values/meta.go | 60 +++++ 14 files changed, 346 insertions(+), 186 deletions(-) create mode 100644 values/meta.go diff --git a/index/field/doc_id_multi_field_intersect_iterator.go b/index/field/doc_id_multi_field_intersect_iterator.go index 4d8c11d..d2554f9 100644 --- a/index/field/doc_id_multi_field_intersect_iterator.go +++ b/index/field/doc_id_multi_field_intersect_iterator.go @@ -10,7 +10,7 @@ import ( // iterator and the multi field iterator. type DocIDMultiFieldIntersectIterator struct { docIt index.DocIDSetIterator - multiFieldIt *MultiFieldIterator + multiFieldIt *MultiFieldIntersectIterator done bool err error @@ -21,7 +21,7 @@ type DocIDMultiFieldIntersectIterator struct { // NewDocIDMultiFieldIntersectIterator creates a new DocIDMultiFieldIntersectIterator. func NewDocIDMultiFieldIntersectIterator( docIt index.DocIDSetIterator, - multiFieldIt *MultiFieldIterator, + multiFieldIt *MultiFieldIntersectIterator, ) *DocIDMultiFieldIntersectIterator { return &DocIDMultiFieldIntersectIterator{ docIt: docIt, diff --git a/index/field/doc_id_multi_field_intersect_iterator_test.go b/index/field/doc_id_multi_field_intersect_iterator_test.go index 11ff153..b483190 100644 --- a/index/field/doc_id_multi_field_intersect_iterator_test.go +++ b/index/field/doc_id_multi_field_intersect_iterator_test.go @@ -58,7 +58,7 @@ func TestDocIDMultiFieldIntersectIterator(t *testing.T) { it2.EXPECT().Err().Return(nil), it2.EXPECT().Close(), ) - multiFieldIt := NewMultiFieldIterator([]BaseFieldIterator{it1, it2}) + multiFieldIt := NewMultiFieldIntersectIterator([]BaseFieldIterator{it1, it2}) intersectIt := NewDocIDMultiFieldIntersectIterator(docIt, multiFieldIt) defer intersectIt.Close() diff --git a/index/field/field_union.go b/index/field/field_union.go index 793928f..fcc37e3 100644 --- a/index/field/field_union.go +++ b/index/field/field_union.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/xichen2020/eventdb/document/field" + "github.com/xichen2020/eventdb/values" ) // Union is a union of different typed fields. @@ -36,3 +37,36 @@ func (u *Union) Iter() (BaseFieldIterator, error) { } return nil, fmt.Errorf("unknown field type in union: %v", u.Type) } + +// ValuesMeta returns the corresponding values metadata for the field represented +// by the union, or an error otherwise. +func (u *Union) ValuesMeta() (values.MetaUnion, error) { + mu := values.MetaUnion{Type: u.Type} + switch u.Type { + case field.NullType: + break + case field.BoolType: + mu.BoolMeta = u.BoolField.Values().Metadata() + case field.IntType: + mu.IntMeta = u.IntField.Values().Metadata() + case field.DoubleType: + mu.DoubleMeta = u.DoubleField.Values().Metadata() + case field.StringType: + mu.StringMeta = u.StringField.Values().Metadata() + case field.TimeType: + mu.TimeMeta = u.TimeField.Values().Metadata() + default: + return values.MetaUnion{}, fmt.Errorf("unknown field type in union: %v", u.Type) + } + return mu, nil +} + +// MustValuesMeta returns the values metadata for the field represented by the union, +// and panics if an error is encountered. +func (u *Union) MustValuesMeta() values.MetaUnion { + mu, err := u.ValuesMeta() + if err != nil { + panic(err) + } + return mu +} diff --git a/index/field/multi_field_iterator.go b/index/field/multi_field_iterator.go index 4d111ad..de2e696 100644 --- a/index/field/multi_field_iterator.go +++ b/index/field/multi_field_iterator.go @@ -4,11 +4,11 @@ import ( "github.com/xichen2020/eventdb/document/field" ) -// MultiFieldIterator is an iterator that iterates over multiple fields, +// MultiFieldIntersectIterator is an iterator that iterates over multiple fields, // which are joined on their doc IDs. As a result, a document needs to // contain all fields associated with the iterator in order for it to // be included in the output of the multi-field iterator. -type MultiFieldIterator struct { +type MultiFieldIntersectIterator struct { iters []BaseFieldIterator done bool @@ -17,9 +17,9 @@ type MultiFieldIterator struct { currVals []field.ValueUnion } -// NewMultiFieldIterator creates a new multi-field iterator union. -func NewMultiFieldIterator(iters []BaseFieldIterator) *MultiFieldIterator { - return &MultiFieldIterator{ +// NewMultiFieldIntersectIterator creates a new multi-field intersecting iterator. +func NewMultiFieldIntersectIterator(iters []BaseFieldIterator) *MultiFieldIntersectIterator { + return &MultiFieldIntersectIterator{ iters: iters, done: len(iters) == 0, currDocIDs: make([]int32, len(iters)), @@ -28,7 +28,7 @@ func NewMultiFieldIterator(iters []BaseFieldIterator) *MultiFieldIterator { } // Next returns true if there are more items to be iterated over. -func (it *MultiFieldIterator) Next() bool { +func (it *MultiFieldIntersectIterator) Next() bool { if it.done || it.err != nil { return false } @@ -70,18 +70,18 @@ func (it *MultiFieldIterator) Next() bool { } // DocID returns the current doc ID, which remains valid until the next iteration. -func (it *MultiFieldIterator) DocID() int32 { return it.currDocIDs[0] } +func (it *MultiFieldIntersectIterator) DocID() int32 { return it.currDocIDs[0] } // Values returns the current list of field values, which remains valid until the // next iteration. If the caller needs to retain a valid refence to the value array // after `Next` is called again, the caller needs to make a copy of the value array. -func (it *MultiFieldIterator) Values() []field.ValueUnion { return it.currVals } +func (it *MultiFieldIntersectIterator) Values() []field.ValueUnion { return it.currVals } // Err returns errors if any. -func (it *MultiFieldIterator) Err() error { return it.err } +func (it *MultiFieldIntersectIterator) Err() error { return it.err } // Close closes the iterator. -func (it *MultiFieldIterator) Close() { +func (it *MultiFieldIntersectIterator) Close() { for i := range it.iters { it.iters[i].Close() it.iters[i] = nil diff --git a/index/field/multi_field_iterator_test.go b/index/field/multi_field_iterator_test.go index f7604a0..aadc0be 100644 --- a/index/field/multi_field_iterator_test.go +++ b/index/field/multi_field_iterator_test.go @@ -10,7 +10,7 @@ import ( "github.com/golang/mock/gomock" ) -func TestMultiFieldIterator(t *testing.T) { +func TestMultiFieldIntersectIterator(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -63,7 +63,7 @@ func TestMultiFieldIterator(t *testing.T) { it3.EXPECT().Close(), ) - it := NewMultiFieldIterator([]BaseFieldIterator{it1, it2, it3}) + it := NewMultiFieldIntersectIterator([]BaseFieldIterator{it1, it2, it3}) defer it.Close() expectedDocIDs := []int32{20, 24} diff --git a/query/query.go b/query/query.go index 0bee617..06038d5 100644 --- a/query/query.go +++ b/query/query.go @@ -18,7 +18,8 @@ const ( defaultTimeGranularity = time.Second defaultTimeUnit = TimeUnit(xtime.Second) defaultFilterCombinator = filter.And - defaultRawDocumentQueryLimit = 500 + defaultRawQuerySizeLimit = 100 + defaultGroupedQuerySizeLimit = 10 defaultOrderBySortOrder = Ascending ) @@ -367,30 +368,18 @@ func (q *RawQuery) parseOrderBy(rob RawOrderBy, opts ParseOptions) (OrderBy, err return ob, fmt.Errorf("invalid order by clause: %v", rob) } -func (q *RawQuery) parseLimit() (*int, error) { +// TODO(xichen): Protect against overly aggressive limits. +func (q *RawQuery) parseLimit() (int, error) { if err := q.validateLimit(); err != nil { - return nil, err + return 0, err } - var limit int - if len(q.GroupBy) == 0 { - // This is a raw document query, for which the limit is the upper - // limit on the log document returned. If the limit is nil, a default - // limit is applied. - limit = defaultRawDocumentQueryLimit - if q.Limit != nil { - limit = *q.Limit - } - return &limit, nil + if q.Limit != nil && *q.Limit >= 0 { + return *q.Limit, nil } - - // This is a group by query, for which the limit is the upper limit - // on the maximum number of groups returned. If the limit is nil, - // no limit is applied. - if q.Limit == nil { - return nil, nil + if len(q.GroupBy) == 0 { + return defaultRawQuerySizeLimit, nil } - limit = *q.Limit - return &limit, nil + return defaultGroupedQuerySizeLimit, nil } func (q RawQuery) validateLimit() error { @@ -417,11 +406,12 @@ type ParsedQuery struct { GroupBy []string Calculations []Calculation OrderBy []OrderBy - Limit *int + Limit int // Derived fields for raw query. - AllowedFieldTypes map[hash.Hash]FieldMeta - RawResultLessThanFn RawResultLessThanFn + AllowedFieldTypes map[hash.Hash]FieldMeta + RawResultLessThanFn RawResultLessThanFn + RawResultReverseLessThanFn RawResultLessThanFn } // IsRaw returns true if the query is querying raw results (i.e., not grouped), and false otherwise. @@ -433,14 +423,15 @@ func (q *ParsedQuery) IsGrouped() bool { return !q.IsRaw() } // RawQuery returns the parsed raw query for raw results. func (q *ParsedQuery) RawQuery() ParsedRawQuery { return ParsedRawQuery{ - Namespace: q.Namespace, - StartNanosInclusive: q.StartTimeNanos, - EndNanosExclusive: q.EndTimeNanos, - Filters: q.Filters, - OrderBy: q.OrderBy, - Limit: q.Limit, - AllowedFieldTypes: q.AllowedFieldTypes, - RawResultLessThanFn: q.RawResultLessThanFn, + Namespace: q.Namespace, + StartNanosInclusive: q.StartTimeNanos, + EndNanosExclusive: q.EndTimeNanos, + Filters: q.Filters, + OrderBy: q.OrderBy, + Limit: q.Limit, + AllowedFieldTypes: q.AllowedFieldTypes, + RawResultLessThanFn: q.RawResultLessThanFn, + RawResultReverseLessThanFn: q.RawResultReverseLessThanFn, } } @@ -464,7 +455,7 @@ func (q *ParsedQuery) computeRawDerived(opts ParseOptions) error { if err := q.computeAllowedFieldTypes(opts); err != nil { return err } - return q.computeRawResultLessThan() + return q.computeRawResultCompareFns() } func (q *ParsedQuery) computeAllowedFieldTypes(opts ParseOptions) error { @@ -550,20 +541,19 @@ func addQueryFieldToMap( fm[fieldHash] = meta } -func (q *ParsedQuery) computeRawResultLessThan() error { +func (q *ParsedQuery) computeRawResultCompareFns() error { compareFns := make([]field.ValueCompareFn, 0, len(q.OrderBy)) for _, ob := range q.OrderBy { - // NB(xichen): Reverse compare function is needed to keep the top N values. For example, - // if we need to keep the top 10 values sorted in ascending order, it means we need the - // 10 smallest values, and as such we keep a max heap and only add values to the heap - // if the current value is smaller than the max heap value. - compareFn, err := ob.SortOrder.ReverseCompareFn() + compareFn, err := ob.SortOrder.CompareFn() if err != nil { return fmt.Errorf("error determining the value compare fn for sort order %v: %v", ob.SortOrder, err) } compareFns = append(compareFns, compareFn) } q.RawResultLessThanFn = NewLessThanFn(compareFns) + q.RawResultReverseLessThanFn = func(v1, v2 RawResult) bool { + return !q.RawResultLessThanFn(v1, v2) + } return nil } @@ -594,11 +584,12 @@ type ParsedRawQuery struct { EndNanosExclusive int64 Filters []FilterList OrderBy []OrderBy - Limit *int + Limit int // Derived fields. - AllowedFieldTypes map[hash.Hash]FieldMeta - RawResultLessThanFn RawResultLessThanFn + AllowedFieldTypes map[hash.Hash]FieldMeta + RawResultLessThanFn RawResultLessThanFn + RawResultReverseLessThanFn RawResultLessThanFn } // NumFieldsForQuery returns the total number of fields for query. @@ -614,8 +605,10 @@ func (q *ParsedRawQuery) NumFieldsForQuery() int { // NewRawResults creates a new raw results from the parsed raw query. func (q *ParsedRawQuery) NewRawResults() RawResults { return RawResults{ - IsOrdered: len(q.OrderBy) > 0, - Limit: q.Limit, + OrderBy: q.OrderBy, + Limit: q.Limit, + LessThanFn: q.RawResultLessThanFn, + ReverseLessThanFn: q.RawResultReverseLessThanFn, } } diff --git a/query/raw_result.go b/query/raw_result.go index 48f14de..d086815 100644 --- a/query/raw_result.go +++ b/query/raw_result.go @@ -198,18 +198,33 @@ func (h RawResultHeap) heapify(i, n int) { // RawResults is a collection of raw results. type RawResults struct { - IsOrdered bool - Limit *int + OrderBy []OrderBy + Limit int + LessThanFn RawResultLessThanFn + ReverseLessThanFn RawResultLessThanFn Data []RawResult `json:"data"` } -// Add adds a list of raw results to the collection. -func (r *RawResults) Add(rs []RawResult) { - panic("not implemented") +// IsOrdered returns true if the raw results are kept in order. +func (r *RawResults) IsOrdered() bool { + return len(r.OrderBy) > 0 } // LimitReached returns true if we have collected enough raw results. +// TODO(xichen): Implement this. func (r *RawResults) LimitReached() bool { panic("not implemented") } + +// Add adds a raw result to the collection. +// TODO(xichen): Implement this. +func (r *RawResults) Add(rr RawResult) { + panic("not implemented") +} + +// AddBatch adds a batch of raw results to the collection. +// TODO(xichen): Implement this. +func (r *RawResults) AddBatch(rr []RawResult) { + panic("not implemented") +} diff --git a/query/sort_order.go b/query/sort_order.go index 4dd2512..7b09654 100644 --- a/query/sort_order.go +++ b/query/sort_order.go @@ -36,18 +36,6 @@ func (f SortOrder) CompareFn() (field.ValueCompareFn, error) { } } -// ReverseCompareFn returns the function to reverse compare two values. -func (f SortOrder) ReverseCompareFn() (field.ValueCompareFn, error) { - switch f { - case Ascending: - return field.MustReverseCompareUnion, nil - case Descending: - return field.MustCompareUnion, nil - default: - return nil, fmt.Errorf("unknown sort order %v", f) - } -} - // String returns the string representation of the sort order. func (f SortOrder) String() string { if s, exists := sortOrderStrings[f]; exists { diff --git a/storage/immutable_segment.go b/storage/immutable_segment.go index 1b8a81f..5378f11 100644 --- a/storage/immutable_segment.go +++ b/storage/immutable_segment.go @@ -12,6 +12,7 @@ import ( indexfield "github.com/xichen2020/eventdb/index/field" "github.com/xichen2020/eventdb/persist" "github.com/xichen2020/eventdb/query" + "github.com/xichen2020/eventdb/values" iterimpl "github.com/xichen2020/eventdb/values/iterator/impl" "github.com/xichen2020/eventdb/x/hash" @@ -23,11 +24,16 @@ import ( type immutableSegment interface { immutableSegmentBase - // QueryRaw returns results for a given raw query. + // QueryRaw queries results for a given raw query. + // Existing results if any (e.g., from querying other segments) are passed in `res` + // to help facilitate fast elimination of segments that do not have eligible records + // for the given query. The results from the current segment if any are merged with + // those in `res` upon completion. QueryRaw( ctx context.Context, q query.ParsedRawQuery, - ) ([]query.RawResult, error) + res *query.RawResults, + ) error // LoadedStatus returns the segment loaded status. LoadedStatus() segmentLoadedStatus @@ -39,10 +45,6 @@ type immutableSegment interface { Flush(persistFns persist.Fns) error } -const ( - defaultRawResultsCapacity = 4096 -) - var ( errImmutableSegmentAlreadyClosed = errors.New("immutable segment is already closed") errFlushingNotInMemoryOnlySegment = errors.New("flushing a segment that is not in memory only") @@ -102,8 +104,10 @@ type immutableSeg struct { } type fieldEntry struct { - fieldMeta indexfield.DocsFieldMetadata - field indexfield.DocsField + fieldPath []string + fieldTypes []field.ValueType + valuesMeta []values.MetaUnion + field indexfield.DocsField } // nolint: unparam @@ -125,9 +129,17 @@ func newImmutableSegment( } entries := make(map[hash.Hash]*fieldEntry, len(fields)) for k, f := range fields { + fm := f.Metadata() + valuesMeta := make([]values.MetaUnion, 0, len(fm.FieldTypes)) + for _, t := range fm.FieldTypes { + tf, _ := f.FieldForType(t) + valuesMeta = append(valuesMeta, tf.MustValuesMeta()) + } entries[k] = &fieldEntry{ - fieldMeta: f.Metadata(), - field: f, + fieldPath: fm.FieldPath, + fieldTypes: fm.FieldTypes, + valuesMeta: valuesMeta, + field: f, } } return &immutableSeg{ @@ -160,25 +172,20 @@ func newImmutableSegment( func (s *immutableSeg) QueryRaw( ctx context.Context, q query.ParsedRawQuery, -) ([]query.RawResult, error) { - // Fast path if the limit indicates no results are needed. - if q.Limit != nil && *q.Limit <= 0 { - return nil, nil + res *query.RawResults, +) error { + shouldExit, err := s.checkForFastExit(q, res) + if err != nil { + return err + } + if shouldExit { + return nil } // Identify the set of fields needed for query execution. allowedFieldTypes, fieldIndexMap, fieldsToRetrieve, err := s.collectFieldsForRawQuery(q) if err != nil { - return nil, err - } - - // Validate that the fields to order results by have one and only one field type. - hasEmptyResult, err := validateOrderByClauses(allowedFieldTypes, q.OrderBy) - if err != nil { - return nil, err - } - if hasEmptyResult { - return nil, nil + return err } // Retrieve all fields (possibly from disk) identified above. @@ -189,7 +196,7 @@ func (s *immutableSeg) QueryRaw( // - Fields in `orderBy` if applicable queryFields, err := s.retrieveFields(fieldsToRetrieve) if err != nil { - return nil, err + return err } defer func() { for i := range queryFields { @@ -202,7 +209,7 @@ func (s *immutableSeg) QueryRaw( rawDocSourceField, ok := queryFields[1].StringField() if !ok { - return nil, errNoStringValuesInRawDocSourceField + return errNoStringValuesInRawDocSourceField } // Apply filters to determine the doc ID set matching the filters. @@ -211,11 +218,11 @@ func (s *immutableSeg) QueryRaw( allowedFieldTypes, fieldIndexMap, queryFields, s.NumDocuments(), ) if err != nil { - return nil, err + return err } if len(q.OrderBy) == 0 { - return collectUnorderedRawDocSourceData(rawDocSourceField, filteredDocIDIter, q.Limit) + return collectUnorderedRawDocSourceData(rawDocSourceField, filteredDocIDIter, res) } return collectOrderedRawDocSourceData( @@ -224,9 +231,8 @@ func (s *immutableSeg) QueryRaw( queryFields, rawDocSourceField, filteredDocIDIter, - q.OrderBy, - q.RawResultLessThanFn, - q.Limit, + q, + res, ) } @@ -248,7 +254,7 @@ func (s *immutableSeg) Unload() error { return nil } s.loadedStatus = segmentUnloaded - // Nil out the field values but keep the field metadata so the segment + // Nil out the field values but keep the metadata so the segment // can easily determine whether a field needs to be loaded from disk. for _, entry := range s.entries { if entry.field != nil { @@ -317,6 +323,97 @@ func (s *immutableSeg) Close() { s.entries = nil } +// checkForFastExit checks if the current segment should be queried, +// returns true if not to reduce computation work, and false otherwise. +func (s *immutableSeg) checkForFastExit( + q query.ParsedRawQuery, + res *query.RawResults, +) (bool, error) { + // Fast path if the limit indicates no results are needed. + if q.Limit <= 0 { + return true, nil + } + + // If this is an unordered query, we can fast exit iff we've gathered enough results. + if !res.IsOrdered() { + if res.LimitReached() { + return true, nil + } + return false, nil + } + + var ( + hasExistingResults = len(res.Data) > 0 + minOrderByValues []field.ValueUnion + maxOrderByValues []field.ValueUnion + ) + for i, ob := range res.OrderBy { + orderByFieldHash := s.fieldHashFn(ob.FieldPath) + entry, exists := s.entries[orderByFieldHash] + if !exists { + // This segment does not have one of the field to order results by, + // as such we bail early as we require the orderBy field be present in + // the result documents. + return true, nil + } + availableTypes := entry.fieldTypes + if len(availableTypes) > 1 { + // We do not allow the orderBy field to have more than one type. + return false, fmt.Errorf("order by field %v has multiple types %v", ob.FieldPath, availableTypes) + } + if !hasExistingResults { + continue + } + if res.Data[0].OrderByValues[i].Type != availableTypes[0] { + // We expect the orderBy fields to have consistent types. + return false, fmt.Errorf("order by field have type %v in the results and type %v in the segment", res.Data[0].OrderByValues[i].Type, availableTypes[0]) + } + if !res.LimitReached() { + continue + } + valuesMeta := entry.valuesMeta[0] + minUnion, maxUnion, err := valuesMeta.ToMinMaxValueUnion() + if err != nil { + return false, err + } + if minOrderByValues == nil { + minOrderByValues = make([]field.ValueUnion, 0, len(res.OrderBy)) + maxOrderByValues = make([]field.ValueUnion, 0, len(res.OrderBy)) + } + if ob.SortOrder == query.Ascending { + minOrderByValues = append(minOrderByValues, minUnion) + maxOrderByValues = append(maxOrderByValues, maxUnion) + } else { + minOrderByValues = append(minOrderByValues, maxUnion) + maxOrderByValues = append(maxOrderByValues, minUnion) + } + } + + if !hasExistingResults || !res.LimitReached() { + return false, nil + } + + var ( + minExistingRes = res.Data[0] + maxExistingRes = res.Data[len(res.Data)-1] + ) + + // Assert the values of all orderBy fields are within the bounds of existing results. + minRawResult := query.RawResult{OrderByValues: minOrderByValues} + if res.LessThanFn(maxExistingRes, minRawResult) { + // If the maximum existing result is less than the minimum raw result in this segment, + // there is no need to query this segment as we've gathered enough raw results. + return true, nil + } + maxRawResult := query.RawResult{OrderByValues: maxOrderByValues} + if res.LessThanFn(maxRawResult, minExistingRes) { + // If the maximum raw result in this segment is less than the minimum existing result, + // there is no need to query this segment as we've gathered enough raw results. + return true, nil + } + return false, nil +} + func (s *immutableSeg) collectFieldsForRawQuery( q query.ParsedRawQuery, ) ( @@ -374,7 +471,7 @@ func (s *immutableSeg) intersectWithAvailableTypes( var availableTypes []field.ValueType entry, exists := s.entries[k] if exists { - availableTypes = entry.fieldMeta.FieldTypes + availableTypes = entry.fieldTypes } for srcIdx, allowedTypes := range meta.AllowedTypesBySourceIdx { intersectedTypes := intersectFieldTypes(availableTypes, allowedTypes) @@ -385,26 +482,6 @@ func (s *immutableSeg) intersectWithAvailableTypes( return clonedFieldMap } -// validateOrderByClauses validates the fields and types specified in the query -// orderBy clauses are valid. -func validateOrderByClauses( - allowedFieldTypes []field.ValueTypeSet, - orderBy []query.OrderBy, -) (hasEmptyResult bool, err error) { - orderByStart := len(allowedFieldTypes) - len(orderBy) - for i := orderByStart; i < len(allowedFieldTypes); i++ { - if len(allowedFieldTypes[i]) == 0 { - // The field to order results by does not exist, as such we return an empty result early here. - return true, nil - } - if len(allowedFieldTypes[i]) > 1 { - // The field to order results by has more than one type. This is currently not supported. - return false, fmt.Errorf("orderBy field %v has multiple types %v", orderBy[i-orderByStart], allowedFieldTypes[i]) - } - } - return false, nil -} - // retrieveFields returns the set of field for a list of field retrieving options. // If no error is returned, the result array contains the same number of slots // as the number of fields to retrieve. Fields that don't exist in the segment @@ -616,6 +693,7 @@ func (s *immutableSeg) insertFields( fields[i] = nil continue } + if entry.field == nil { entry.field = fields[i].ShallowCopy() continue @@ -764,32 +842,26 @@ func applyFilter( func collectUnorderedRawDocSourceData( rawDocSourceField indexfield.StringField, maskingDocIDSetIt index.DocIDSetIterator, - limit *int, -) ([]query.RawResult, error) { + res *query.RawResults, +) error { // Return unordered results rawDocSourceIter, err := rawDocSourceField.Fetch(maskingDocIDSetIt) if err != nil { maskingDocIDSetIt.Close() - return nil, fmt.Errorf("error fetching raw doc source data: %v", err) + return fmt.Errorf("error fetching raw doc source data: %v", err) } defer rawDocSourceIter.Close() - resultsCapacity := defaultRawResultsCapacity - if limit != nil && *limit >= 0 { - resultsCapacity = *limit - } - // TODO(xichen): Pool the results array. - rawResults := make([]query.RawResult, 0, resultsCapacity) for rawDocSourceIter.Next() { - rawResults = append(rawResults, query.RawResult{Data: rawDocSourceIter.Value()}) - if limit != nil && len(rawResults) >= *limit { - break + res.Add(query.RawResult{Data: rawDocSourceIter.Value()}) + if res.LimitReached() { + return nil } } if err := rawDocSourceIter.Err(); err != nil { - return nil, fmt.Errorf("error iterating over raw doc source data: %v", err) + return fmt.Errorf("error iterating over raw doc source data: %v", err) } - return rawResults, nil + return nil } // NB: This method owns `maskingDocIDSetIt` and handles closing regardless of success or failure. @@ -799,35 +871,34 @@ func collectOrderedRawDocSourceData( queryFields []indexfield.DocsField, rawDocSourceField indexfield.StringField, maskingDocIDSetIter index.DocIDSetIterator, - orderBy []query.OrderBy, - lessThanFn query.RawResultLessThanFn, - limit *int, -) ([]query.RawResult, error) { + q query.ParsedRawQuery, + res *query.RawResults, +) error { filteredOrderByIter, err := createFilteredOrderByIterator( allowedFieldTypes, fieldIndexMap, queryFields, maskingDocIDSetIter, - orderBy, + q.OrderBy, ) if err != nil { // TODO(xichen): Add filteredDocIDIter.Err() here. maskingDocIDSetIter.Close() maskingDocIDSetIter = nil - return nil, err + return err } defer filteredOrderByIter.Close() orderedRawResults, err := collectTopNRawResultDocIDOrderByValues( filteredOrderByIter, - lessThanFn, - limit, + q.RawResultReverseLessThanFn, + q.Limit, ) if err != nil { - return nil, err + return err } - return collectTopNRawResults(rawDocSourceField, orderedRawResults) + return collectTopNRawResults(rawDocSourceField, orderedRawResults, res) } // NB: If an error is encountered, `maskingDocIDSetIter` should be closed at the callsite. @@ -882,7 +953,7 @@ func createFilteredOrderByIterator( // NB(xichen): Worth optimizing for the single-field-orderBy case? - orderByMultiIter := indexfield.NewMultiFieldIterator(orderByIters) + orderByMultiIter := indexfield.NewMultiFieldIntersectIterator(orderByIters) filteredOrderByIter := indexfield.NewDocIDMultiFieldIntersectIterator(maskingDocIDSetIter, orderByMultiIter) return filteredOrderByIter, nil } @@ -896,23 +967,23 @@ func createFilteredOrderByIterator( func collectTopNRawResultDocIDOrderByValues( filteredOrderByIter *indexfield.DocIDMultiFieldIntersectIterator, lessThanFn query.RawResultLessThanFn, - limit *int, + limit int, ) ([]query.RawResult, error) { // TODO(xichen): This algorithm runs in O(Nlogk) time. Should investigate whethere this is // in practice faster than first selecting top N values via a selection sort algorithm that // runs in O(N) time, then sorting the results in O(klogk) time. // Inserting values into the heap to select the top results based on the query ordering. - heapCapacity := -1 - if limit != nil { - heapCapacity = *limit - } - results := query.NewRawResultHeap(heapCapacity, lessThanFn) + // NB(xichen): The compare function has been reversed to keep the top N values. For example, + // if we need to keep the top 10 values sorted in ascending order, it means we need the + // 10 smallest values, and as such we keep a max heap and only add values to the heap + // if the current value is smaller than the max heap value. + results := query.NewRawResultHeap(limit, lessThanFn) for filteredOrderByIter.Next() { docID := filteredOrderByIter.DocID() values := filteredOrderByIter.Values() // values here is only valid till the next iteration. dv := query.RawResult{DocID: docID, OrderByValues: values} - if heapCapacity == -1 || results.Len() <= heapCapacity { + if results.Len() <= limit { // TODO(xichen): Should pool and reuse the value array here. valuesClone := make([]field.ValueUnion, len(values)) copy(valuesClone, values) @@ -950,7 +1021,8 @@ func collectTopNRawResultDocIDOrderByValues( func collectTopNRawResults( rawDocSourceField indexfield.StringField, orderedRawResults []query.RawResult, -) ([]query.RawResult, error) { + res *query.RawResults, +) error { for i := 0; i < len(orderedRawResults); i++ { orderedRawResults[i].OrderIdx = i } @@ -958,7 +1030,7 @@ func collectTopNRawResults( docIDValuesIt := query.NewRawResultIterator(orderedRawResults) rawDocSourceIter, err := rawDocSourceField.Fetch(docIDValuesIt) if err != nil { - return nil, fmt.Errorf("error fetching raw doc source data: %v", err) + return fmt.Errorf("error fetching raw doc source data: %v", err) } numItemsWithRawDocSource := 0 for rawDocSourceIter.Next() { @@ -968,11 +1040,12 @@ func collectTopNRawResults( numItemsWithRawDocSource++ } if err := rawDocSourceIter.Err(); err != nil { - return nil, fmt.Errorf("error iterating over raw doc source field: %v", err) + return fmt.Errorf("error iterating over raw doc source field: %v", err) } sort.Sort(query.RawResultsByOrderIdxAsc(orderedRawResults)) orderedRawResults = orderedRawResults[:numItemsWithRawDocSource] - return orderedRawResults, nil + res.AddBatch(orderedRawResults) + return nil } func intersectFieldTypes( diff --git a/storage/namespace.go b/storage/namespace.go index 50f2d9c..8030c6a 100644 --- a/storage/namespace.go +++ b/storage/namespace.go @@ -121,8 +121,8 @@ func (n *dbNamespace) QueryRaw( if err != nil { return query.RawResults{}, err } - res.Add(shardRes.Data) - if res.LimitReached() { + res.AddBatch(shardRes.Data) + if !res.IsOrdered() && res.LimitReached() { // We've got enough data, bail early. break } diff --git a/storage/sealed_segment.go b/storage/sealed_segment.go index 1c3d67a..76f6c85 100644 --- a/storage/sealed_segment.go +++ b/storage/sealed_segment.go @@ -32,7 +32,8 @@ type sealedSegment interface { QueryRaw( ctx context.Context, q query.ParsedRawQuery, - ) ([]query.RawResult, error) + res *query.RawResults, + ) error // ShouldUnload returns true if the segment is eligible for unloading. ShouldUnload() bool @@ -98,10 +99,11 @@ func newSealedFlushingSegment( func (s *sealedFlushingSeg) QueryRaw( ctx context.Context, q query.ParsedRawQuery, -) ([]query.RawResult, error) { - res, err := s.immutableSegment.QueryRaw(ctx, q) + res *query.RawResults, +) error { + err := s.immutableSegment.QueryRaw(ctx, q, res) atomic.StoreInt64(&s.lastReadAtNanos, s.nowFn().UnixNano()) - return res, err + return err } func (s *sealedFlushingSeg) ShouldUnload() bool { diff --git a/storage/shard.go b/storage/shard.go index 816d10d..22962a4 100644 --- a/storage/shard.go +++ b/storage/shard.go @@ -150,8 +150,6 @@ func (s *dbShard) QueryRaw( active := s.active active.IncAccessor() - // TODO(xichen): Find the first element whose max time is greater than or equal - // to start time nanos. This is currently not implemented by the skiplist API. var sealed []sealedFlushingSegment geElem := s.sealedByMaxTimeAsc.GetGreaterThanOrEqualTo(float64(q.StartNanosInclusive)) for elem := geElem; elem != nil; elem = elem.Next() { @@ -173,10 +171,6 @@ func (s *dbShard) QueryRaw( defer cleanup() // Querying active segment and adds to result set. - var ( - res = q.NewRawResults() - err error - ) activeRes, err := active.QueryRaw(ctx, q) if err == errMutableSegmentAlreadySealed { // The active segment has become sealed before a read can be performed @@ -186,19 +180,18 @@ func (s *dbShard) QueryRaw( if err != nil { return query.RawResults{}, err } - res.Add(activeRes) - if res.LimitReached() { + res := q.NewRawResults() + res.AddBatch(activeRes) + if !res.IsOrdered() && res.LimitReached() { return res, nil } // Querying sealed segments and adds to result set. for _, ss := range sealed { - sealedRes, err := ss.QueryRaw(ctx, q) - if err != nil { + if err := ss.QueryRaw(ctx, q, &res); err != nil { return query.RawResults{}, err } - res.Add(sealedRes) - if res.LimitReached() { + if !res.IsOrdered() && res.LimitReached() { return res, nil } } diff --git a/storage/shard_test.go b/storage/shard_test.go index 35983f7..9ffd4f1 100644 --- a/storage/shard_test.go +++ b/storage/shard_test.go @@ -74,7 +74,8 @@ type mockImmutableSegment struct { queryRawFn func( ctx context.Context, q query.ParsedRawQuery, - ) ([]query.RawResult, error) + res *query.RawResults, + ) error loadedStatusFn func() segmentLoadedStatus unloadFn func() error @@ -84,8 +85,9 @@ type mockImmutableSegment struct { func (m *mockImmutableSegment) QueryRaw( ctx context.Context, q query.ParsedRawQuery, -) ([]query.RawResult, error) { - return m.queryRawFn(ctx, q) + res *query.RawResults, +) error { + return m.queryRawFn(ctx, q, res) } func (m *mockImmutableSegment) LoadedStatus() segmentLoadedStatus { diff --git a/values/meta.go b/values/meta.go new file mode 100644 index 0000000..11e5cd2 --- /dev/null +++ b/values/meta.go @@ -0,0 +1,60 @@ +package values + +import ( + "fmt" + + "github.com/xichen2020/eventdb/document/field" +) + +// MetaUnion is a union of values meta. +type MetaUnion struct { + Type field.ValueType + BoolMeta BoolValuesMetadata + IntMeta IntValuesMetadata + DoubleMeta DoubleValuesMetadata + StringMeta StringValuesMetadata + TimeMeta TimeValuesMetadata +} + +// ToMinMaxValueUnion extracts the min value union and max value union from the meta union. +func (u *MetaUnion) ToMinMaxValueUnion() (minUnion, maxUnion field.ValueUnion, err error) { + minUnion.Type = u.Type + maxUnion.Type = u.Type + switch u.Type { + case field.NullType: + break + case field.BoolType: + if u.BoolMeta.NumTrues > 0 { + maxUnion.BoolVal = true + minUnion.BoolVal = true + } + if u.BoolMeta.NumFalses > 0 { + minUnion.BoolVal = false + } + case field.IntType: + minUnion.IntVal = u.IntMeta.Min + maxUnion.IntVal = u.IntMeta.Max + case field.DoubleType: + minUnion.DoubleVal = u.DoubleMeta.Min + maxUnion.DoubleVal = u.DoubleMeta.Max + case field.StringType: + minUnion.StringVal = u.StringMeta.Min + maxUnion.StringVal = u.StringMeta.Max + case field.TimeType: + minUnion.TimeNanosVal = u.TimeMeta.Min + maxUnion.TimeNanosVal = u.TimeMeta.Max + default: + return field.ValueUnion{}, field.ValueUnion{}, fmt.Errorf("unknown type %v in meta union", u.Type) + } + return minUnion, maxUnion, nil +} + +// MustToMinMaxValueUnion extracts the min value union and max value union from the meta union, +// and panics if an error is encountered. +func (u *MetaUnion) MustToMinMaxValueUnion() (minUnion, maxUnion field.ValueUnion) { + minUnion, maxUnion, err := u.ToMinMaxValueUnion() + if err != nil { + panic(err) + } + return minUnion, maxUnion +}