Skip to content

Commit

Permalink
Address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jan 31, 2019
1 parent 0761181 commit 9f50f64
Show file tree
Hide file tree
Showing 6 changed files with 1,102 additions and 1,070 deletions.
161 changes: 161 additions & 0 deletions query/grouped_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package query

import (
"github.com/xichen2020/eventdb/calculation"
"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/x/hash"
)

// ParsedGroupedQuery represents a validated, sanitized group query.
type ParsedGroupedQuery struct {
Namespace string
StartNanosInclusive int64
EndNanosExclusive int64
Filters []FilterList
GroupBy [][]string
Calculations []Calculation
OrderBy []OrderBy
Limit int

// Derived fields.
NewCalculationResultArrayFn calculation.NewResultArrayFromValueTypesFn
FieldConstraints map[hash.Hash]FieldMeta // Field constraints inferred from query
}

func newParsedGroupedQuery(q *ParsedQuery) (ParsedGroupedQuery, error) {
gq := ParsedGroupedQuery{
Namespace: q.Namespace,
StartNanosInclusive: q.StartTimeNanos,
EndNanosExclusive: q.EndTimeNanos,
Filters: q.Filters,
GroupBy: q.GroupBy,
Calculations: q.Calculations,
OrderBy: q.OrderBy,
Limit: q.Limit,
}
if err := gq.computeDerived(q.opts); err != nil {
return ParsedGroupedQuery{}, err
}
return gq, nil
}

// NewGroupedResults creates a new grouped results from the parsed grouped query.
func (q *ParsedGroupedQuery) NewGroupedResults() *GroupedResults {
return &GroupedResults{
GroupBy: q.GroupBy,
Calculations: q.Calculations,
OrderBy: q.OrderBy,
Limit: q.Limit,
NewCalculationResultArrayFn: q.NewCalculationResultArrayFn,
}
}

// NumFieldsForQuery returns the total number of fields involved in executing the query.
// NB: `OrderBy` fields are covered by either `GroupBy` or `Calculations`
func (q *ParsedGroupedQuery) NumFieldsForQuery() int {
numFieldsForQuery := 1 // Timestamp field
numFieldsForQuery += q.NumFilters()
numFieldsForQuery += len(q.GroupBy)
for _, calc := range q.Calculations {
if calc.FieldPath == nil {
continue
}
numFieldsForQuery++
}
return numFieldsForQuery
}

// NumFilters returns the number of filters in the query.
func (q *ParsedGroupedQuery) NumFilters() int {
numFilters := 0
for _, f := range q.Filters {
numFilters += len(f.Filters)
}
return numFilters
}

func (q *ParsedGroupedQuery) computeDerived(opts ParseOptions) error {
fieldConstraints, err := q.computeFieldConstraints(opts)
if err != nil {
return err
}
q.FieldConstraints = fieldConstraints
q.NewCalculationResultArrayFn = q.computeNewCalculationResultArrayFn()
return nil
}

func (q *ParsedGroupedQuery) computeFieldConstraints(
opts ParseOptions,
) (map[hash.Hash]FieldMeta, error) {
// Compute total number of fields involved in executing the query.
numFieldsForQuery := q.NumFieldsForQuery()

// Collect fields needed for query execution into a map for deduplciation.
fieldMap := make(map[hash.Hash]FieldMeta, numFieldsForQuery)

// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
IsRequired: true,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
},
},
})

// Insert filter fields.
currIndex++
for _, fl := range q.Filters {
for _, f := range fl.Filters {
allowedFieldTypes, err := f.AllowedFieldTypes()
if err != nil {
return nil, err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
IsRequired: false,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
})
currIndex++
}
}

// Insert group by fields.
for _, gb := range q.GroupBy {
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: gb,
IsRequired: true,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.GroupableTypes.Clone(),
},
})
currIndex++
}

// Insert calculation fields.
for _, calc := range q.Calculations {
if calc.FieldPath == nil {
continue
}
allowedFieldTypes, err := calc.Op.AllowedTypes()
if err != nil {
return nil, err
}
// NB(xichen): Restrict the calculation field types for now, but in the future
// can relax this constraint.
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: calc.FieldPath,
IsRequired: true,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
})
currIndex++
}

return fieldMap, nil
}
198 changes: 198 additions & 0 deletions query/parsed_query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package query

import (
"fmt"
"time"

"github.com/xichen2020/eventdb/calculation"
"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/x/hash"
)

// Type is the type of a query.
type Type int

// A list of supported query types.
const (
// RawQuery is a query that retrieves raw documents without grouping,
// returning a list of raw documents with optional ordering applied.
RawQuery Type = iota

// GroupedQuery is a query that groups documents by fields and applies
// calculations within each group, returning the grouped calculation results.
GroupedQuery

// TimeBucketQuery is a query that bucketizes the query time range into
// time buckets and counts the number of documents falling into each bucket,
// returning a list of counts ordered by time in ascending order.
TimeBucketQuery
)

// ParsedQuery represents a validated, sanitized query object produced from a raw query.
type ParsedQuery struct {
Namespace string
StartTimeNanos int64
EndTimeNanos int64
TimeGranularity *time.Duration
Filters []FilterList
GroupBy [][]string
Calculations []Calculation
OrderBy []OrderBy
Limit int

// Derived fields for raw query.
// AllowedFieldTypes map[hash.Hash]FieldMeta
opts ParseOptions
valuesLessThanFn field.ValuesLessThanFn
valuesReverseLessThanFn field.ValuesLessThanFn
}

// Type returns the query type.
func (q *ParsedQuery) Type() Type {
if q.TimeGranularity != nil {
return TimeBucketQuery
}
if len(q.GroupBy) == 0 {
return RawQuery
}
return GroupedQuery
}

// RawQuery returns the parsed raw query.
func (q *ParsedQuery) RawQuery() (ParsedRawQuery, error) {
return newParsedRawQuery(q)
}

// GroupedQuery returns the parsed grouped query.
func (q *ParsedQuery) GroupedQuery() (ParsedGroupedQuery, error) {
return newParsedGroupedQuery(q)
}

// TimeBucketQuery returns the parsed time bucket query.
func (q *ParsedQuery) TimeBucketQuery() (ParsedTimeBucketQuery, error) {
return newParsedTimeBucketQuery(q)
}

func (q *ParsedQuery) computeDerived() error {
return q.computeValueCompareFns()
}

func (q *ParsedQuery) computeValueCompareFns() error {
compareFns := make([]field.ValueCompareFn, 0, len(q.OrderBy))
for _, ob := range q.OrderBy {
compareFn, err := ob.SortOrder.CompareFieldValueFn()
if err != nil {
return fmt.Errorf("error determining the value compare fn for sort order %v: %v", ob.SortOrder, err)
}
compareFns = append(compareFns, compareFn)
}
q.valuesLessThanFn = field.NewValuesLessThanFn(compareFns)
q.valuesReverseLessThanFn = func(v1, v2 field.Values) bool {
return q.valuesLessThanFn(v2, v1)
}
return nil
}

// FieldMeta contains field metadata.
type FieldMeta struct {
FieldPath []string

// Required is true if the field must be present, otherwise empty result is returned.
// This applies to `GroupBy`, `Calculation`, and `OrderBy` fields.
IsRequired bool

// AllowedTypesBySourceIdx contains the set of field types allowed by the query,
// keyed by the field index in the query clause.
AllowedTypesBySourceIdx map[int]field.ValueTypeSet
}

// MergeInPlace merges the other field meta into the current field meta.
// Precondition: m.fieldPath == other.fieldPath.
// Precondition: The set of source indices in the two metas don't overlap.
func (m *FieldMeta) MergeInPlace(other FieldMeta) {
if other.IsRequired {
// If one of the field metas dictates the field is required, mark the field as so.
m.IsRequired = true
}
for idx, types := range other.AllowedTypesBySourceIdx {
m.AllowedTypesBySourceIdx[idx] = types
}
}

func (q *ParsedGroupedQuery) computeNewCalculationResultArrayFn() calculation.NewResultArrayFromValueTypesFn {
// Precondition: `fieldTypes` contains the value type for each field that appear in the
// query calculation clauses, except those that do not require a field (e.g., `Count` calculations).
return func(fieldTypes []field.ValueType) (calculation.ResultArray, error) {
var (
fieldTypeIdx int
results = make(calculation.ResultArray, 0, len(q.Calculations))
)
for _, calc := range q.Calculations {
var (
res calculation.Result
err error
)
if !calc.Op.RequiresField() {
// Pass in an unknown type as the op does not require a field.
res, err = calc.Op.NewResult(field.UnknownType)
} else {
if fieldTypeIdx >= len(fieldTypes) {
return nil, fmt.Errorf("field type index %d is out of range", fieldTypeIdx)
}
res, err = calc.Op.NewResult(fieldTypes[fieldTypeIdx])
fieldTypeIdx++
}
if err != nil {
return nil, err
}
results = append(results, res)
}
return results, nil
}
}

// addQueryFieldToMap adds a new query field meta to the existing
// field meta map.
func addQueryFieldToMap(
fm map[hash.Hash]FieldMeta,
fieldHashFn hash.StringArrayHashFn,
newFieldMeta FieldMeta,
) {
// Do not insert empty fields.
if len(newFieldMeta.FieldPath) == 0 {
return
}
fieldHash := fieldHashFn(newFieldMeta.FieldPath)
meta, exists := fm[fieldHash]
if !exists {
fm[fieldHash] = newFieldMeta
return
}
meta.MergeInPlace(newFieldMeta)
fm[fieldHash] = meta
}

// Calculation represents a calculation object.
type Calculation struct {
FieldPath []string
Op calculation.Op
}

// OrderBy is a field used for ordering results.
type OrderBy struct {
FieldType OrderByFieldType
FieldIndex int
FieldPath []string
SortOrder SortOrder
}

// OrderByFieldType is the field type used for ordering.
type OrderByFieldType int

// A list of supported order-by field types.
const (
UnknownOrderByFieldType OrderByFieldType = iota
RawField
GroupByField
CalculationField
)
Loading

0 comments on commit 9f50f64

Please sign in to comment.