-
Notifications
You must be signed in to change notification settings - Fork 1
/
grouped_query.go
163 lines (146 loc) · 4.62 KB
/
grouped_query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package query
import (
"github.com/xichen2020/eventdb/calculation"
"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/x/hash"
)
// ParsedGroupedQuery represents a validated, sanitized group query.
type ParsedGroupedQuery struct {
Namespace string
StartNanosInclusive int64
EndNanosExclusive int64
Filters []FilterList
GroupBy [][]string
Calculations []Calculation
OrderBy []OrderBy
Limit int
// Derived fields.
NewCalculationResultArrayFn calculation.NewResultArrayFromValueTypesFn
FieldConstraints map[hash.Hash]FieldMeta // Field constraints inferred from query
}
func newParsedGroupedQuery(q *ParsedQuery) (ParsedGroupedQuery, error) {
gq := ParsedGroupedQuery{
Namespace: q.Namespace,
StartNanosInclusive: q.StartTimeNanos,
EndNanosExclusive: q.EndTimeNanos,
Filters: q.Filters,
GroupBy: q.GroupBy,
Calculations: q.Calculations,
OrderBy: q.OrderBy,
Limit: q.Limit,
}
if err := gq.computeDerived(q.opts); err != nil {
return ParsedGroupedQuery{}, err
}
return gq, nil
}
// NewGroupedResults creates a new grouped results from the parsed grouped query.
func (q *ParsedGroupedQuery) NewGroupedResults() *GroupedResults {
return &GroupedResults{
GroupBy: q.GroupBy,
Calculations: q.Calculations,
OrderBy: q.OrderBy,
Limit: q.Limit,
NewCalculationResultArrayFn: q.NewCalculationResultArrayFn,
}
}
// TimestampFieldIndex returns the index of the timestamp field.
func (q *ParsedGroupedQuery) TimestampFieldIndex() int { return 0 }
// FilterStartIndex returns the start index of fields in query filters if any.
func (q *ParsedGroupedQuery) FilterStartIndex() int { return 1 }
// NumFieldsForQuery returns the total number of fields involved in executing the query.
// NB: `OrderBy` fields are covered by either `GroupBy` or `Calculations`
func (q *ParsedGroupedQuery) NumFieldsForQuery() int {
numFieldsForQuery := 1 // Timestamp field
numFieldsForQuery += q.NumFilters()
numFieldsForQuery += len(q.GroupBy)
for _, calc := range q.Calculations {
if calc.FieldPath == nil {
continue
}
numFieldsForQuery++
}
return numFieldsForQuery
}
// NumFilters returns the number of filters in the query.
func (q *ParsedGroupedQuery) NumFilters() int {
numFilters := 0
for _, f := range q.Filters {
numFilters += len(f.Filters)
}
return numFilters
}
func (q *ParsedGroupedQuery) computeDerived(opts ParseOptions) error {
fieldConstraints, err := q.computeFieldConstraints(opts)
if err != nil {
return err
}
q.FieldConstraints = fieldConstraints
q.NewCalculationResultArrayFn = q.computeNewCalculationResultArrayFn()
return nil
}
func (q *ParsedGroupedQuery) computeFieldConstraints(
opts ParseOptions,
) (map[hash.Hash]FieldMeta, error) {
// Compute total number of fields involved in executing the query.
numFieldsForQuery := q.NumFieldsForQuery()
// Collect fields needed for query execution into a map for deduplciation.
fieldMap := make(map[hash.Hash]FieldMeta, numFieldsForQuery)
// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
},
},
})
// Insert filter fields.
currIndex++
for _, fl := range q.Filters {
for _, f := range fl.Filters {
allowedFieldTypes, err := f.AllowedFieldTypes()
if err != nil {
return nil, err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
})
currIndex++
}
}
// Insert group by fields.
for _, gb := range q.GroupBy {
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: gb,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.GroupableTypes.Clone(),
},
})
currIndex++
}
// Insert calculation fields.
for _, calc := range q.Calculations {
if calc.FieldPath == nil {
continue
}
allowedFieldTypes, err := calc.Op.AllowedTypes()
if err != nil {
return nil, err
}
// NB(xichen): Restrict the calculation field types for now, but in the future
// can relax this constraint.
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: calc.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
})
currIndex++
}
return fieldMap, nil
}