Skip to content

Commit

Permalink
Support calculation against a subset of fields during groupBy queries
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Feb 1, 2019
1 parent d99b43a commit c37274a
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 286 deletions.
6 changes: 5 additions & 1 deletion calculation/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ func (arr ResultArray) New() ResultArray {
}
resArray := make(ResultArray, 0, len(arr))
for _, res := range arr {
if res == nil {
resArray = append(resArray, nil)
continue
}
resArray = append(resArray, res.New())
}
return resArray
Expand All @@ -329,7 +333,7 @@ func (arr ResultArray) MergeInPlace(other ResultArray) {
}

// NewResultArrayFromValueTypesFn creates a new result array based on the field value types.
type NewResultArrayFromValueTypesFn func(valueTypes []field.ValueType) (ResultArray, error)
type NewResultArrayFromValueTypesFn func(valueTypes field.OptionalTypeArray) (ResultArray, error)

// NewResultArrayFn creates a new result array.
type NewResultArrayFn func() (ResultArray, error)
9 changes: 6 additions & 3 deletions calculation/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,19 @@ type FieldValueToValueFn func(v *field.ValueUnion) ValueUnion

// AsValueFn returns a function that converts a field value union with the given field
// type to a calculation value union.
func AsValueFn(t field.ValueType) (FieldValueToValueFn, error) {
toValueFn, exists := toValueFnsByFieldType[t]
func AsValueFn(t field.OptionalType) (FieldValueToValueFn, error) {
if !t.HasType {
return nil, nil
}
toValueFn, exists := toValueFnsByFieldType[t.Type]
if !exists {
return nil, fmt.Errorf("no function exists to convert %v to a calculation value union", t)
}
return toValueFn, nil
}

// AsValueFns returns a list of value conversion functions for the given list of field types.
func AsValueFns(fieldTypes []field.ValueType) ([]FieldValueToValueFn, error) {
func AsValueFns(fieldTypes field.OptionalTypeArray) ([]FieldValueToValueFn, error) {
if len(fieldTypes) == 0 {
return nil, nil
}
Expand Down
54 changes: 54 additions & 0 deletions document/field/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,3 +390,57 @@ func (v Values) Clone() Values {
}
return cloned
}

// OptionalType is a type "option" which is either "null" or has a valid type.
// It is similar to a `*ValueType` from a functionality perspective but has
// lower GC overhead.
type OptionalType struct {
HasType bool
Type ValueType
}

// MergeInPlace merges the other optional type into the current optional type.
// The merging is valid if:
// - Neither optional types has a type.
// - Only one of `t` and `other` has a valid type.
// - Both optional types have the same valid type.
func (t *OptionalType) MergeInPlace(other OptionalType) error {
if !other.HasType {
return nil
}
if !t.HasType {
*t = other
return nil
}
if t.Type != other.Type {
return fmt.Errorf("merging two incompatible optional types %v and %v", *t, other)
}
return nil
}

// OptionalTypeArray is an array of optional types.
type OptionalTypeArray []OptionalType

// MergeInPlace merges the other type array into the current type array in place.
// The other type array becomes invalid after the merge.
// Precondition: One of the following conditions is true:
// - One of or both `v` and `other` are nil.
// - Both type arrays have the same size.
func (v *OptionalTypeArray) MergeInPlace(other OptionalTypeArray) error {
if len(other) == 0 {
return nil
}
if len(*v) == 0 {
*v = other
return nil
}
if len(*v) != len(other) {
return fmt.Errorf("merging two optional type arrays with different sizes %d and %d", len(*v), len(other))
}
for i := 0; i < len(other); i++ {
if err := (*v)[i].MergeInPlace(other[i]); err != nil {
return err
}
}
return nil
}
12 changes: 4 additions & 8 deletions query/grouped_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (q *ParsedGroupedQuery) computeFieldConstraints(
// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
IsRequired: true,
FieldPath: opts.TimestampFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
Expand All @@ -114,8 +113,7 @@ func (q *ParsedGroupedQuery) computeFieldConstraints(
return nil, err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
IsRequired: false,
FieldPath: f.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
Expand All @@ -127,8 +125,7 @@ func (q *ParsedGroupedQuery) computeFieldConstraints(
// Insert group by fields.
for _, gb := range q.GroupBy {
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: gb,
IsRequired: true,
FieldPath: gb,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.GroupableTypes.Clone(),
},
Expand All @@ -148,8 +145,7 @@ func (q *ParsedGroupedQuery) computeFieldConstraints(
// NB(xichen): Restrict the calculation field types for now, but in the future
// can relax this constraint.
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: calc.FieldPath,
IsRequired: true,
FieldPath: calc.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
Expand Down
36 changes: 4 additions & 32 deletions query/grouped_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type GroupedResults struct {
// Field types for ensuring single-type fields.
// These are derived from the first result group processed during query execution.
GroupByFieldTypes field.ValueTypeArray
CalcFieldTypes field.ValueTypeArray
CalcFieldTypes field.OptionalTypeArray

SingleKeyGroups *SingleKeyResultGroups
MultiKeyGroups *MultiKeyResultGroups
Expand Down Expand Up @@ -81,16 +81,6 @@ func (r *GroupedResults) IsEmpty() bool { return r.Len() == 0 }
// IsOrdered returns true if the grouped results are kept in order.
func (r *GroupedResults) IsOrdered() bool { return len(r.OrderBy) > 0 }

// HasOrderedFilter returns true if the grouped results supports filtering ordered values.
// This is used to determine whether the result should be used to fast eliminate ineligible
// segments by filtering out those whose range fall outside the current result value range.
//
// NB(xichen): We currently do not keep results in order internally because it's fairly
// expensive to update them during result merging and not useful to order those that are
// aggregations of field values (e.g,. `Count`), which is our primary groupBy use case.
// Can revisit this assumption in the future if needed.
func (r *GroupedResults) HasOrderedFilter() bool { return false }

// LimitReached returns true if we have collected enough grouped results.
func (r *GroupedResults) LimitReached() bool { return r.Len() >= r.Limit }

Expand All @@ -115,25 +105,6 @@ func (r *GroupedResults) NumGroupsLimit() int {
return defaultMaxNumGroupsLimit
}

// MinOrderByValues returns the orderBy field values for the smallest result in
// the result collection if applicable. This is only called if `HasOrderedFilter`
// returns true.
func (r *GroupedResults) MinOrderByValues() field.Values {
panic("not implemented")
}

// MaxOrderByValues returns the orderBy field values for the largest result in
// the result collection if applicable. This is only called if `HasOrderedFilter`
// returns true.
func (r *GroupedResults) MaxOrderByValues() field.Values {
panic("not implemented")
}

// FieldValuesLessThanFn returns the function to compare two set of field values.
func (r *GroupedResults) FieldValuesLessThanFn() field.ValuesLessThanFn {
panic("not implemented")
}

// Clear clears the grouped results.
func (r *GroupedResults) Clear() {
r.GroupBy = nil
Expand Down Expand Up @@ -163,9 +134,10 @@ func (r *GroupedResults) MergeInPlace(other *GroupedResults) error {
if !r.GroupByFieldTypes.Equal(other.GroupByFieldTypes) {
return fmt.Errorf("merging two grouped results with different group by field types %v and %v", r.GroupByFieldTypes, other.GroupByFieldTypes)
}
if !r.CalcFieldTypes.Equal(other.CalcFieldTypes) {
return fmt.Errorf("merging two grouped rsults with different calculation field types %v and %v", r.CalcFieldTypes, other.CalcFieldTypes)
if err := r.CalcFieldTypes.MergeInPlace(other.CalcFieldTypes); err != nil {
return fmt.Errorf("error merging calculation field types %v and %v in two grouped results: %v", r.CalcFieldTypes, other.CalcFieldTypes, err)
}

if r.HasSingleKey() {
if other.SingleKeyGroups == nil {
return nil
Expand Down
30 changes: 8 additions & 22 deletions query/parsed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ func (q *ParsedQuery) computeValueCompareFns() error {
type FieldMeta struct {
FieldPath []string

// Required is true if the field must be present, otherwise empty result is returned.
// This applies to `GroupBy`, `Calculation`, and `OrderBy` fields.
IsRequired bool

// AllowedTypesBySourceIdx contains the set of field types allowed by the query,
// keyed by the field index in the query clause.
AllowedTypesBySourceIdx map[int]field.ValueTypeSet
Expand All @@ -110,37 +106,27 @@ type FieldMeta struct {
// Precondition: m.fieldPath == other.fieldPath.
// Precondition: The set of source indices in the two metas don't overlap.
func (m *FieldMeta) MergeInPlace(other FieldMeta) {
if other.IsRequired {
// If one of the field metas dictates the field is required, mark the field as so.
m.IsRequired = true
}
for idx, types := range other.AllowedTypesBySourceIdx {
m.AllowedTypesBySourceIdx[idx] = types
}
}

func (q *ParsedGroupedQuery) computeNewCalculationResultArrayFn() calculation.NewResultArrayFromValueTypesFn {
// Precondition: `fieldTypes` contains the value type for each field that appear in the
// query calculation clauses, except those that do not require a field (e.g., `Count` calculations).
return func(fieldTypes []field.ValueType) (calculation.ResultArray, error) {
var (
fieldTypeIdx int
results = make(calculation.ResultArray, 0, len(q.Calculations))
)
for _, calc := range q.Calculations {
// Precondition: `fieldTypes` contains the field types for each calculation clause and has
// the same size as `q.Calculations`. For calculation operators that do not require a field,
// the corresponding item is an optional type whose `HasType` field is false.
return func(fieldTypes field.OptionalTypeArray) (calculation.ResultArray, error) {
results := make(calculation.ResultArray, 0, len(q.Calculations))
for i, calc := range q.Calculations {
var (
res calculation.Result
err error
)
if !calc.Op.RequiresField() {
// Pass in an unknown type as the op does not require a field.
res, err = calc.Op.NewResult(field.UnknownType)
} else {
if fieldTypeIdx >= len(fieldTypes) {
return nil, fmt.Errorf("field type index %d is out of range", fieldTypeIdx)
}
res, err = calc.Op.NewResult(fieldTypes[fieldTypeIdx])
fieldTypeIdx++
} else if fieldTypes[i].HasType {
res, err = calc.Op.NewResult(fieldTypes[i].Type)
}
if err != nil {
return nil, err
Expand Down
12 changes: 4 additions & 8 deletions query/raw_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (q *ParsedRawQuery) computeFieldConstraints(
// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
IsRequired: true,
FieldPath: opts.TimestampFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
Expand All @@ -108,8 +107,7 @@ func (q *ParsedRawQuery) computeFieldConstraints(
// Insert raw doc source field.
currIndex++
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.RawDocSourceFieldPath,
IsRequired: true,
FieldPath: opts.RawDocSourceFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.StringType: struct{}{},
Expand All @@ -126,8 +124,7 @@ func (q *ParsedRawQuery) computeFieldConstraints(
return nil, err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
IsRequired: false,
FieldPath: f.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
Expand All @@ -139,8 +136,7 @@ func (q *ParsedRawQuery) computeFieldConstraints(
// Insert order by fields.
for _, ob := range q.OrderBy {
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: ob.FieldPath,
IsRequired: true,
FieldPath: ob.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.OrderableTypes.Clone(),
},
Expand Down
34 changes: 0 additions & 34 deletions query/result.go

This file was deleted.

6 changes: 2 additions & 4 deletions query/time_bucket_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func (q *ParsedTimeBucketQuery) computeFieldConstraints(
// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
IsRequired: true,
FieldPath: opts.TimestampFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
Expand All @@ -105,8 +104,7 @@ func (q *ParsedTimeBucketQuery) computeFieldConstraints(
return nil, err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
IsRequired: false,
FieldPath: f.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
Expand Down
Loading

0 comments on commit c37274a

Please sign in to comment.