forked from influxdata/influxdb-comparisons
/
query_plan.go
185 lines (162 loc) · 5.74 KB
/
query_plan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package main
import (
"fmt"
"sort"
"time"
"github.com/gocql/gocql"
)
// A QueryPlan is a strategy used to fulfill an HLQuery.
type QueryPlan interface {
Execute(*gocql.Session) ([]CQLResult, error)
DebugQueries(int)
}
// A QueryPlanWithServerAggregation fulfills an HLQuery by performing
// aggregation on both the server and the client. This results in more
// round-trip requests, but uses the server to aggregate over large datasets.
//
// It has 1) an Aggregator, which merges data on the client, and 2) a map of
// time interval buckets to CQL queries, which are used to retrieve data
// relevant to each bucket.
type QueryPlanWithServerAggregation struct {
AggregatorLabel string
BucketedCQLQueries map[TimeInterval][]CQLQuery
}
// NewQueryPlanWithServerAggregation builds a QueryPlanWithServerAggregation.
// It is typically called via (*HLQuery).ToQueryPlanWithServerAggregation.
func NewQueryPlanWithServerAggregation(aggrLabel string, bucketedCQLQueries map[TimeInterval][]CQLQuery) (*QueryPlanWithServerAggregation, error) {
qp := &QueryPlanWithServerAggregation{
AggregatorLabel: aggrLabel,
BucketedCQLQueries: bucketedCQLQueries,
}
return qp, nil
}
// Execute runs all CQLQueries in the QueryPlan and collects the results.
//
// TODO(rw): support parallel execution.
func (qp *QueryPlanWithServerAggregation) Execute(session *gocql.Session) ([]CQLResult, error) {
// sort the time interval buckets we'll use:
sortedKeys := make([]TimeInterval, 0, len(qp.BucketedCQLQueries))
for k := range qp.BucketedCQLQueries {
sortedKeys = append(sortedKeys, k)
}
sort.Sort(TimeIntervals(sortedKeys))
// for each bucket, execute its queries while aggregating its results
// in constant space, then append them to the result set:
results := make([]CQLResult, 0, len(qp.BucketedCQLQueries))
for _, k := range sortedKeys {
agg, err := GetAggregator(qp.AggregatorLabel)
if err != nil {
return nil, err
}
for _, q := range qp.BucketedCQLQueries[k] {
// Execute one CQLQuery and collect its result
//
// For server-side aggregation, this will return only
// one row; for exclusive client-side aggregation this
// will return a sequence.
iter := session.Query(q.PreparableQueryString, q.Args...).Iter()
var x float64
for iter.Scan(&x) {
agg.Put(x)
}
if err := iter.Close(); err != nil {
return nil, err
}
}
results = append(results, CQLResult{TimeInterval: k, Value: agg.Get()})
}
return results, nil
}
// DebugQueries prints debugging information.
func (qp *QueryPlanWithServerAggregation) DebugQueries(level int) {
if level >= 1 {
n := 0
for _, qq := range qp.BucketedCQLQueries {
n += len(qq)
}
fmt.Printf("[qpsa] query with server aggregation plan has %d CQLQuery objects\n", n)
}
if level >= 2 {
for k, qq := range qp.BucketedCQLQueries {
for i, q := range qq {
fmt.Printf("[qpsa] CQL: %s, %d, %s\n", k, i, q)
}
}
}
}
// A QueryPlanWithoutServerAggregation fulfills an HLQuery by performing
// table scans on the server and aggregating all data on the client. This
// results in higher bandwidth usage but fewer round-trip requests.
//
// It has 1) a map of Aggregators (one for each time bucket) which merge data
// on the client, 2) a GroupByDuration, which is used to reconstruct time
// buckets from a server response, 3) a set of TimeBuckets, which are used to
// store final aggregated items, and 4) a set of CQLQueries used to fulfill
// this plan.
type QueryPlanWithoutServerAggregation struct {
Aggregators map[TimeInterval]Aggregator
GroupByDuration time.Duration
TimeBuckets []TimeInterval
CQLQueries []CQLQuery
}
// NewQueryPlanWithoutServerAggregation builds a QueryPlanWithoutServerAggregation.
// It is typically called via (*HLQuery).ToQueryPlanWithoutServerAggregation.
func NewQueryPlanWithoutServerAggregation(aggrLabel string, groupByDuration time.Duration, timeBuckets []TimeInterval, cqlQueries []CQLQuery) (*QueryPlanWithoutServerAggregation, error) {
aggrs := make(map[TimeInterval]Aggregator, len(timeBuckets))
for _, ti := range timeBuckets {
aggr, err := GetAggregator(aggrLabel)
if err != nil {
return nil, err
}
aggrs[ti] = aggr
}
qp := &QueryPlanWithoutServerAggregation{
Aggregators: aggrs,
GroupByDuration: groupByDuration,
TimeBuckets: timeBuckets,
CQLQueries: cqlQueries,
}
return qp, nil
}
// Execute runs all CQLQueries in the QueryPlan and collects the results.
//
// TODO(rw): support parallel execution.
func (qp *QueryPlanWithoutServerAggregation) Execute(session *gocql.Session) ([]CQLResult, error) {
// for each query, execute it, then put each result row into the
// client-side aggregator that matches its time bucket:
for _, q := range qp.CQLQueries {
iter := session.Query(q.PreparableQueryString, q.Args...).Iter()
var timestamp_ns int64
var value float64
for iter.Scan(×tamp_ns, &value) {
ts := time.Unix(0, timestamp_ns).UTC()
tsTruncated := ts.Truncate(qp.GroupByDuration)
bucketKey := TimeInterval{
Start: tsTruncated,
End: tsTruncated.Add(qp.GroupByDuration),
}
qp.Aggregators[bucketKey].Put(value)
}
if err := iter.Close(); err != nil {
return nil, err
}
}
// perform client-side aggregation across all buckets:
results := make([]CQLResult, 0, len(qp.TimeBuckets))
for _, ti := range qp.TimeBuckets {
acc := qp.Aggregators[ti].Get()
results = append(results, CQLResult{TimeInterval: ti, Value: acc})
}
return results, nil
}
// DebugQueries prints debugging information.
func (qp *QueryPlanWithoutServerAggregation) DebugQueries(level int) {
if level >= 1 {
fmt.Printf("[qpca] query with client aggregation plan has %d CQLQuery objects\n", len(qp.CQLQueries))
}
if level >= 2 {
for i, q := range qp.CQLQueries {
fmt.Printf("[qpca] CQL: %d, %s\n", i, q)
}
}
}