From c0f120073e0478a26194f1c5e9f37764f38c0e31 Mon Sep 17 00:00:00 2001 From: Xi Chen Date: Mon, 14 Jan 2019 18:00:01 -0500 Subject: [PATCH] A bit cleanup and refactoring --- query/raw_result.go | 1 + storage/immutable_segment.go | 436 +++++++++++++++++++++-------------- storage/raw_result.go | 172 -------------- 3 files changed, 268 insertions(+), 341 deletions(-) delete mode 100644 storage/raw_result.go diff --git a/query/raw_result.go b/query/raw_result.go index 664342c..cc3d607 100644 --- a/query/raw_result.go +++ b/query/raw_result.go @@ -125,6 +125,7 @@ func NewRawResultHeap( } } +// Data returns the underlying array backing the heap. func (h RawResultHeap) Data() []RawResult { return h.dv } // Min returns the "smallest" heap element according to the `lessThan` function. diff --git a/storage/immutable_segment.go b/storage/immutable_segment.go index c986634..7f71c01 100644 --- a/storage/immutable_segment.go +++ b/storage/immutable_segment.go @@ -42,6 +42,10 @@ type immutableSegment interface { Flush(persistFns persist.Fns) error } +const ( + defaultRawResultsCapacity = 4096 +) + var ( errImmutableSegmentAlreadyClosed = errors.New("immutable segment is already closed") errFlushingNotInMemoryOnlySegment = errors.New("flushing a segment that is not in memory only") @@ -175,16 +179,12 @@ func (s *immutableSeg) QueryRaw( } // Validate that the fields to order results by have one and only one field type. - orderByStart := len(allowedFieldTypes) - len(orderBy) - for i := orderByStart; i < len(allowedFieldTypes); i++ { - if len(allowedFieldTypes[i]) == 0 { - // The field to order results by does not exist, as such we return an empty result early here. - return nil, nil - } - if len(allowedFieldTypes[i]) > 1 { - // The field to order results by has more than one type. This is currently not supported. - return nil, fmt.Errorf("orderBy field %v has multiple types %v", orderBy[i-orderByStart], allowedFieldTypes[i]) - } + hasEmptyResult, err := validateOrderByClauses(allowedFieldTypes, orderBy) + if err != nil { + return nil, err + } + if hasEmptyResult { + return nil, nil } // Retrieve all fields (possibly from disk) identified above. @@ -197,11 +197,6 @@ func (s *immutableSeg) QueryRaw( if err != nil { return nil, err } - rawDocSourceField, ok := queryFields[1].StringField() - if !ok { - return nil, errNoStringValuesInRawDocSourceField - } - defer func() { for i := range queryFields { if queryFields[i] != nil { @@ -211,6 +206,11 @@ func (s *immutableSeg) QueryRaw( } }() + rawDocSourceField, ok := queryFields[1].StringField() + if !ok { + return nil, errNoStringValuesInRawDocSourceField + } + // Apply filters to determine the doc ID set matching the filters. filteredDocIDIter, err := applyFilters( startNanosInclusive, endNanosExclusive, filters, @@ -221,162 +221,18 @@ func (s *immutableSeg) QueryRaw( } if len(orderBy) == 0 { - // Return unordered results - rawDocSourceIter, err := rawDocSourceField.Fetch(filteredDocIDIter) - if err != nil { - filteredDocIDIter.Close() - return nil, fmt.Errorf("error fetching raw doc source data: %v", err) - } - defer rawDocSourceIter.Close() - - resultsCapacity := defaultRawResultsCapacity - if limit != nil && *limit >= 0 { - resultsCapacity = *limit - } - // TODO(xichen): Pool the results array. - rawResults := make([]query.RawResult, 0, resultsCapacity) - for rawDocSourceIter.Next() { - rawResults = append(rawResults, query.RawResult{Data: rawDocSourceIter.Value()}) - if limit != nil && len(rawResults) >= *limit { - break - } - } - if err := rawDocSourceIter.Err(); err != nil { - return nil, fmt.Errorf("error iterating over raw doc source data: %v", err) - } - return rawResults, nil - } - - // Precondition: Each orderBy field has one and only one field type. - orderByIters := make([]indexfield.BaseFieldIterator, 0, len(orderBy)) - for i := orderByStart; i < len(allowedFieldTypes); i++ { - var t field.ValueType - for key := range allowedFieldTypes[i] { - t = key - break - } - queryFieldIdx := fieldIndexMap[i] - queryField := queryFields[queryFieldIdx] - fu, found := queryField.FieldForType(t) - if !found { - err = fmt.Errorf("orderBy field %v does not have values of type %v", orderBy[i-orderByStart], t) - break - } - var it indexfield.BaseFieldIterator - it, err = fu.Iter() - if err != nil { - err = fmt.Errorf("error getting iterator for orderBy field %v type %v", orderBy[i-orderByStart], t) - break - } - orderByIters = append(orderByIters, it) + return collectUnorderedRawDocSourceData(rawDocSourceField, filteredDocIDIter, limit) } - if err != nil { - // Clean up. - var multiErr xerrors.MultiError - for i := range orderByIters { - if itErr := orderByIters[i].Err(); itErr != nil { - multiErr = multiErr.Add(itErr) - } - orderByIters[i].Close() - orderByIters[i] = nil - } - - // TODO(xichen): Add filteredDocIDIter.Err() here. - filteredDocIDIter.Close() - filteredDocIDIter = nil - - return nil, err - } - - // NB(xichen): Worth optimizing for the single-field-orderBy case? - - orderByMultiIter := indexfield.NewMultiFieldIterator(orderByIters) - filteredOrderByIter := indexfield.NewDocIDMultiFieldIntersectIterator(filteredDocIDIter, orderByMultiIter) - defer filteredOrderByIter.Close() - - // TODO(xichen): This algorithm runs in O(Nlogk) time. Should investigate whethere this is - // in practice faster than first selecting top N values via a selection sort algorithm that - // runs in O(N) time, then sorting the results in O(klogk) time. - compareFns := make([]field.ValueCompareFn, 0, len(orderBy)) - for _, ob := range orderBy { - // NB(xichen): Reverse compare function is needed to keep the top N values. For example, - // if we need to keep the top 10 values sorted in ascending order, it means we need the - // 10 smallest values, and as such we keep a max heap and only add values to the heap - // if the current value is smaller than the max heap value. - compareFn, err := ob.SortOrder.ReverseCompareFn() - if err != nil { - return nil, fmt.Errorf("error determinging the value compare fn for sort order %v: %v", ob.SortOrder, err) - } - compareFns = append(compareFns, compareFn) - } - lessThanFn := query.NewLessThanFn(compareFns) - - // Inserting values into the heap to select the top results based on the query ordering. - heapCapacity := -1 - if limit != nil { - heapCapacity = *limit - } - results := query.NewRawResultHeap(heapCapacity, lessThanFn) - for filteredOrderByIter.Next() { - docID := filteredOrderByIter.DocID() - values := filteredOrderByIter.Values() // values here is only valid till the next iteration. - dv := query.RawResult{DocID: docID, OrderByValues: values} - if heapCapacity == -1 || results.Len() <= heapCapacity { - // TODO(xichen): Should pool and reuse the value array here. - valuesClone := make([]field.ValueUnion, len(values)) - copy(valuesClone, values) - dv.OrderByValues = valuesClone - results.Push(dv) - continue - } - if min := results.Min(); !lessThanFn(min, dv) { - continue - } - removed := results.Pop() - // Reuse values array. - copy(removed.OrderByValues, values) - dv.OrderByValues = removed.OrderByValues - results.Push(dv) - } - if err = filteredOrderByIter.Err(); err != nil { - return nil, fmt.Errorf("error iterating over filtered order by items: %v", err) - } - - // Sort the result heap in place, and when done the items are sorted from left to - // in the right order based on the query sorting critiera (i.e., if the sort order - // is ascending, the leftmost item is the smallest item). - for results.Len() > 0 { - results.Pop() - } - orderedRawResults := results.Data() - for i := 0; i < len(orderedRawResults); i++ { - orderedRawResults[i].OrderIdx = i - } - sort.Sort(query.RawResultsByDocIDAsc(orderedRawResults)) - docIDValuesIt := query.NewRawResultIterator(orderedRawResults) - rawDocSourceIter, err := rawDocSourceField.Fetch(docIDValuesIt) - if err != nil { - return nil, fmt.Errorf("error fetching raw doc source data: %v", err) - } - numItemsWithRawDocSource := 0 - for rawDocSourceIter.Next() { - maskingPos := rawDocSourceIter.MaskingPosition() - orderedRawResults[maskingPos].HasData = true - orderedRawResults[maskingPos].Data = rawDocSourceIter.Value() - numItemsWithRawDocSource++ - } - if err := rawDocSourceIter.Err(); err != nil { - return nil, fmt.Errorf("error iterating over raw doc source field: %v", err) - } - sort.Sort(query.RawResultsByOrderIdxAsc(orderedRawResults)) - orderedRawResults = orderedRawResults[:numItemsWithRawDocSource] - return orderedRawResults, nil -} - -type indexValues struct { - index int - values []field.ValueUnion + return collectOrderedRawDocSourceData( + allowedFieldTypes, + fieldIndexMap, + queryFields, + rawDocSourceField, + filteredDocIDIter, + orderBy, + limit, + ) } func (s *immutableSeg) LoadedStatus() segmentLoadedStatus { @@ -605,6 +461,26 @@ func (s *immutableSeg) intersectWithAvailableTypes( } } +// validateOrderByClauses validates the fields and types specified in the query +// orderBy clauses are valid. +func validateOrderByClauses( + allowedFieldTypes []field.ValueTypeSet, + orderBy []query.OrderBy, +) (hasEmptyResult bool, err error) { + orderByStart := len(allowedFieldTypes) - len(orderBy) + for i := orderByStart; i < len(allowedFieldTypes); i++ { + if len(allowedFieldTypes[i]) == 0 { + // The field to order results by does not exist, as such we return an empty result early here. + return true, nil + } + if len(allowedFieldTypes[i]) > 1 { + // The field to order results by has more than one type. This is currently not supported. + return false, fmt.Errorf("orderBy field %v has multiple types %v", orderBy[i-orderByStart], allowedFieldTypes[i]) + } + } + return false, nil +} + // retrieveFields returns the set of field for a list of field retrieving options. // If no error is returned, the result array contains the same number of slots // as the number of fields to retrieve. Fields that don't exist in the segment @@ -960,6 +836,228 @@ func applyFilter( return toFilter.Filter(flt.Op, flt.Value, numTotalDocs) } +// NB: This method owns `maskingDocIDSetIt` and handles closing regardless of success or failure. +func collectUnorderedRawDocSourceData( + rawDocSourceField indexfield.StringField, + maskingDocIDSetIt index.DocIDSetIterator, + limit *int, +) ([]query.RawResult, error) { + // Return unordered results + rawDocSourceIter, err := rawDocSourceField.Fetch(maskingDocIDSetIt) + if err != nil { + maskingDocIDSetIt.Close() + return nil, fmt.Errorf("error fetching raw doc source data: %v", err) + } + defer rawDocSourceIter.Close() + + resultsCapacity := defaultRawResultsCapacity + if limit != nil && *limit >= 0 { + resultsCapacity = *limit + } + // TODO(xichen): Pool the results array. + rawResults := make([]query.RawResult, 0, resultsCapacity) + for rawDocSourceIter.Next() { + rawResults = append(rawResults, query.RawResult{Data: rawDocSourceIter.Value()}) + if limit != nil && len(rawResults) >= *limit { + break + } + } + if err := rawDocSourceIter.Err(); err != nil { + return nil, fmt.Errorf("error iterating over raw doc source data: %v", err) + } + return rawResults, nil +} + +// NB: This method owns `maskingDocIDSetIt` and handles closing regardless of success or failure. +func collectOrderedRawDocSourceData( + allowedFieldTypes []field.ValueTypeSet, + fieldIndexMap []int, + queryFields []indexfield.DocsField, + rawDocSourceField indexfield.StringField, + maskingDocIDSetIter index.DocIDSetIterator, + orderBy []query.OrderBy, + limit *int, +) ([]query.RawResult, error) { + filteredOrderByIter, err := createFilteredOrderByIterator( + allowedFieldTypes, + fieldIndexMap, + queryFields, + maskingDocIDSetIter, + orderBy, + ) + if err != nil { + // TODO(xichen): Add filteredDocIDIter.Err() here. + maskingDocIDSetIter.Close() + maskingDocIDSetIter = nil + return nil, err + } + defer filteredOrderByIter.Close() + + orderedRawResults, err := collectTopNRawResultDocIDOrderByValues(filteredOrderByIter, orderBy, limit) + if err != nil { + return nil, err + } + + return collectTopNRawResults(rawDocSourceField, orderedRawResults) +} + +// NB: If an error is encountered, `maskingDocIDSetIter` should be closed at the callsite. +func createFilteredOrderByIterator( + allowedFieldTypes []field.ValueTypeSet, + fieldIndexMap []int, + queryFields []indexfield.DocsField, + maskingDocIDSetIter index.DocIDSetIterator, + orderBy []query.OrderBy, +) (*indexfield.DocIDMultiFieldIntersectIterator, error) { + // Precondition: Each orderBy field has one and only one field type. + var ( + orderByStart = len(allowedFieldTypes) - len(orderBy) + orderByIters = make([]indexfield.BaseFieldIterator, 0, len(orderBy)) + err error + ) + for i := orderByStart; i < len(allowedFieldTypes); i++ { + var t field.ValueType + for key := range allowedFieldTypes[i] { + t = key + break + } + queryFieldIdx := fieldIndexMap[i] + queryField := queryFields[queryFieldIdx] + fu, found := queryField.FieldForType(t) + if !found { + err = fmt.Errorf("orderBy field %v does not have values of type %v", orderBy[i-orderByStart], t) + break + } + var it indexfield.BaseFieldIterator + it, err = fu.Iter() + if err != nil { + err = fmt.Errorf("error getting iterator for orderBy field %v type %v", orderBy[i-orderByStart], t) + break + } + orderByIters = append(orderByIters, it) + } + + if err != nil { + // Clean up. + var multiErr xerrors.MultiError + for i := range orderByIters { + if itErr := orderByIters[i].Err(); itErr != nil { + multiErr = multiErr.Add(itErr) + } + orderByIters[i].Close() + orderByIters[i] = nil + } + return nil, err + } + + // NB(xichen): Worth optimizing for the single-field-orderBy case? + + orderByMultiIter := indexfield.NewMultiFieldIterator(orderByIters) + filteredOrderByIter := indexfield.NewDocIDMultiFieldIntersectIterator(maskingDocIDSetIter, orderByMultiIter) + return filteredOrderByIter, nil +} + +// collectTopNRawResultDocIDOrderByValues returns the top N doc IDs and the field values to order +// raw results by based on the ordering criteria defined by `orderBy` as well as the query limit. +// The result array returned contains raw results ordered in the same order as that dictated by the +// `orderBy` clauses (e.g., if `orderBy` requires results by sorted by timestamp in descending order, +// the result array will also be sorted by timestamp in descending order). Note that the result array +// does not contain the actual raw result data, only the doc IDs and the orderBy field values. +func collectTopNRawResultDocIDOrderByValues( + filteredOrderByIter *indexfield.DocIDMultiFieldIntersectIterator, + orderBy []query.OrderBy, + limit *int, +) ([]query.RawResult, error) { + // TODO(xichen): This algorithm runs in O(Nlogk) time. Should investigate whethere this is + // in practice faster than first selecting top N values via a selection sort algorithm that + // runs in O(N) time, then sorting the results in O(klogk) time. + compareFns := make([]field.ValueCompareFn, 0, len(orderBy)) + for _, ob := range orderBy { + // NB(xichen): Reverse compare function is needed to keep the top N values. For example, + // if we need to keep the top 10 values sorted in ascending order, it means we need the + // 10 smallest values, and as such we keep a max heap and only add values to the heap + // if the current value is smaller than the max heap value. + compareFn, err := ob.SortOrder.ReverseCompareFn() + if err != nil { + return nil, fmt.Errorf("error determining the value compare fn for sort order %v: %v", ob.SortOrder, err) + } + compareFns = append(compareFns, compareFn) + } + lessThanFn := query.NewLessThanFn(compareFns) + + // Inserting values into the heap to select the top results based on the query ordering. + heapCapacity := -1 + if limit != nil { + heapCapacity = *limit + } + results := query.NewRawResultHeap(heapCapacity, lessThanFn) + for filteredOrderByIter.Next() { + docID := filteredOrderByIter.DocID() + values := filteredOrderByIter.Values() // values here is only valid till the next iteration. + dv := query.RawResult{DocID: docID, OrderByValues: values} + if heapCapacity == -1 || results.Len() <= heapCapacity { + // TODO(xichen): Should pool and reuse the value array here. + valuesClone := make([]field.ValueUnion, len(values)) + copy(valuesClone, values) + dv.OrderByValues = valuesClone + results.Push(dv) + continue + } + if min := results.Min(); !lessThanFn(min, dv) { + continue + } + removed := results.Pop() + // Reuse values array. + copy(removed.OrderByValues, values) + dv.OrderByValues = removed.OrderByValues + results.Push(dv) + } + if err := filteredOrderByIter.Err(); err != nil { + return nil, fmt.Errorf("error iterating over filtered order by items: %v", err) + } + + // Sort the result heap in place, and when done the items are sorted from left to + // in the right order based on the query sorting criteria (i.e., if the sort order + // is ascending, the leftmost item is the smallest item). + for results.Len() > 0 { + results.Pop() + } + return results.Data(), nil +} + +// collectTopNRawResults collects the top N raw results from the raw doc source field +// based on the doc IDs and the ordering specified in `orderdRawResults`. The result +// array returned contains raw results ordered in the same order as that dictated by the +// `orderBy` clauses (e.g., if `orderBy` requires results by sorted by timestamp in descending order, +// the result array will also be sorted by timestamp in descending order). +func collectTopNRawResults( + rawDocSourceField indexfield.StringField, + orderedRawResults []query.RawResult, +) ([]query.RawResult, error) { + for i := 0; i < len(orderedRawResults); i++ { + orderedRawResults[i].OrderIdx = i + } + sort.Sort(query.RawResultsByDocIDAsc(orderedRawResults)) + docIDValuesIt := query.NewRawResultIterator(orderedRawResults) + rawDocSourceIter, err := rawDocSourceField.Fetch(docIDValuesIt) + if err != nil { + return nil, fmt.Errorf("error fetching raw doc source data: %v", err) + } + numItemsWithRawDocSource := 0 + for rawDocSourceIter.Next() { + maskingPos := rawDocSourceIter.MaskingPosition() + orderedRawResults[maskingPos].HasData = true + orderedRawResults[maskingPos].Data = rawDocSourceIter.Value() + numItemsWithRawDocSource++ + } + if err := rawDocSourceIter.Err(); err != nil { + return nil, fmt.Errorf("error iterating over raw doc source field: %v", err) + } + sort.Sort(query.RawResultsByOrderIdxAsc(orderedRawResults)) + orderedRawResults = orderedRawResults[:numItemsWithRawDocSource] + return orderedRawResults, nil +} + func intersectFieldTypes( first []field.ValueType, second field.ValueTypeSet, diff --git a/storage/raw_result.go b/storage/raw_result.go deleted file mode 100644 index eb8a655..0000000 --- a/storage/raw_result.go +++ /dev/null @@ -1,172 +0,0 @@ -package storage - -import ( - "github.com/xichen2020/eventdb/document/field" -) - -const ( - defaultRawResultsCapacity = 4096 -) - -type rawResult struct { - data string - - // For joining and sorting purposes. These fields are empty for unsorted raw results. - docID int32 - orderByValues []field.ValueUnion - orderIdx int - hasData bool -} - -// rawResultsByDocIDAsc sorts a list of raw results by their doc IDs in ascending order. -type rawResultsByDocIDAsc []rawResult - -func (a rawResultsByDocIDAsc) Len() int { return len(a) } -func (a rawResultsByDocIDAsc) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a rawResultsByDocIDAsc) Less(i, j int) bool { return a[i].docID < a[j].docID } - -// rawResultsByOrderIndices sorts a list of doc ID values by their order indices in ascending order. -// NB(xichen): If an item does not have `hasData` set, it'll be at the end of the array after sorting. -// The ordering between two items neither of which has `hasData` set is non deterministic. -type rawResultsByOrderIdxAsc []rawResult - -func (a rawResultsByOrderIdxAsc) Len() int { return len(a) } -func (a rawResultsByOrderIdxAsc) Swap(i, j int) { a[i], a[j] = a[j], a[i] } - -func (a rawResultsByOrderIdxAsc) Less(i, j int) bool { - if !a[i].hasData { - return false - } - if !a[j].hasData { - return true - } - return a[i].orderIdx < a[j].orderIdx -} - -type rawResultIterator struct { - resultsByDocIDAsc []rawResult - - currIdx int -} - -func newRawResultIterator(resultsByDocIDAsc []rawResult) *rawResultIterator { - return &rawResultIterator{ - resultsByDocIDAsc: resultsByDocIDAsc, - currIdx: -1, - } -} - -func (it *rawResultIterator) Next() bool { - if it.currIdx >= len(it.resultsByDocIDAsc) { - return false - } - it.currIdx++ - return it.currIdx < len(it.resultsByDocIDAsc) -} - -func (it *rawResultIterator) DocID() int32 { return it.resultsByDocIDAsc[it.currIdx].docID } - -func (it *rawResultIterator) Values() []field.ValueUnion { - return it.resultsByDocIDAsc[it.currIdx].orderByValues -} - -func (it *rawResultIterator) Err() error { return nil } - -func (it *rawResultIterator) Close() { it.resultsByDocIDAsc = nil } - -// rawResultHeap is a heap storing a list of raw results. -// The ordering of such items are determined by `compareFns`. -// The smallest item will be at the top of the heap. -type rawResultHeap struct { - dv []rawResult - lessThanFn rawResultLessThanFn -} - -type rawResultLessThanFn func(v1, v2 rawResult) bool - -func newLessThanFn(compareFns []field.ValueCompareFn) rawResultLessThanFn { - return func(v1, v2 rawResult) bool { - for idx, fn := range compareFns { - res := fn(v1.orderByValues[idx], v2.orderByValues[idx]) - if res < 0 { - return false - } - if res > 0 { - return true - } - } - return true - } -} - -func newRawResultHeap( - capacity int, - lessThanFn rawResultLessThanFn, -) rawResultHeap { - initCapacity := defaultRawResultsCapacity - if capacity >= 0 { - initCapacity = capacity - } - return rawResultHeap{ - dv: make([]rawResult, 0, initCapacity), - lessThanFn: lessThanFn, - } -} - -// Min returns the "smallest" heap element according to the `lessThan` function. -func (h rawResultHeap) Min() rawResult { return h.dv[0] } - -func (h rawResultHeap) Len() int { return len(h.dv) } - -func (h rawResultHeap) Less(i, j int) bool { - return h.lessThanFn(h.dv[i], h.dv[j]) -} - -func (h rawResultHeap) Swap(i, j int) { h.dv[i], h.dv[j] = h.dv[j], h.dv[i] } - -func (h *rawResultHeap) Push(value rawResult) { - h.dv = append(h.dv, value) - h.shiftUp(h.Len() - 1) -} - -func (h *rawResultHeap) Pop() rawResult { - var ( - n = h.Len() - val = h.dv[0] - ) - - h.dv[0], h.dv[n-1] = h.dv[n-1], h.dv[0] - h.heapify(0, n-1) - h.dv = h.dv[0 : n-1] - return val -} - -func (h rawResultHeap) shiftUp(i int) { - for { - parent := (i - 1) / 2 - if parent == i || !h.Less(i, parent) { - break - } - h.dv[parent], h.dv[i] = h.dv[i], h.dv[parent] - i = parent - } -} - -func (h rawResultHeap) heapify(i, n int) { - for { - left := i*2 + 1 - right := left + 1 - smallest := i - if left < n && h.Less(left, smallest) { - smallest = left - } - if right < n && h.Less(right, smallest) { - smallest = right - } - if smallest == i { - return - } - h.dv[i], h.dv[smallest] = h.dv[smallest], h.dv[i] - i = smallest - } -}