forked from influxdata/influxdb-comparisons
/
query.go
115 lines (96 loc) · 4.37 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"fmt"
"time"
)
// HLQuery is a high-level query, usually read from stdin after being
// generated by a bulk query generator program.
//
// The primary use of an HLQuery is to combine it with a ClientSideIndex to
// construct a QueryPlan.
type HLQuery struct {
HumanLabel []byte
HumanDescription []byte
ID int64
MeasurementName []byte // e.g. "cpu"
FieldName []byte // e.g. "usage_user"
AggregationType []byte // e.g. "avg" or "sum". used literally in the cassandra query.
TimeStart time.Time
TimeEnd time.Time
GroupByDuration time.Duration
TagsCondition []byte
}
// String produces a debug-ready description of a Query.
func (q *HLQuery) String() string {
return fmt.Sprintf("ID: %d, HumanLabel: %s, HumanDescription: %s, MeasurementName: %s, FieldName: %s, AggregationType: %s, TimeStart: %s, TimeEnd: %s, GroupByDuration: %s, TagSets: %s", q.ID, q.HumanLabel, q.HumanDescription, q.MeasurementName, q.FieldName, q.AggregationType, q.TimeStart, q.TimeEnd, q.GroupByDuration, q.TagsCondition)
}
// ForceUTC rewrites timestamps in UTC, which is helpful for pretty-printing.
func (q *HLQuery) ForceUTC() {
q.TimeStart = q.TimeStart.UTC()
q.TimeEnd = q.TimeEnd.UTC()
}
// ToQueryPlanWithServerAggregation combines an HLQuery with a
// ClientSideIndex to make a QueryPlanWithServerAggregation.
func (q *HLQuery) ToQueryPlanWithServerAggregation() (qp *QueryPlanWithServerAggregation, err error) {
// Build the time buckets used for 'group by time'-type queries.
//
// It is important to populate these even if they end up being empty,
// so that we get correct results for empty 'time buckets'.
tis := bucketTimeIntervals(q.TimeStart, q.TimeEnd, q.GroupByDuration)
// For each group-by time bucket, convert its series into CQLQueries:
cqlBuckets := make(map[TimeInterval]CQLQuery, len(tis))
for _, ti := range tis {
start := ti.Start
end := ti.End
// the following two special cases ensure equivalency with rounded time boundaries as seen in influxdb:
// https://docs.influxdata.com/influxdb/v0.13/query_language/data_exploration/#rounded-group-by-time-boundaries
if start.Before(q.TimeStart) {
start = q.TimeStart
}
if end.After(q.TimeEnd) {
end = q.TimeEnd
}
cqlBuckets[ti] = NewCQLQuery(string(q.AggregationType), string(q.MeasurementName), string(q.FieldName), string(q.TagsCondition), start.UnixNano(), end.UnixNano())
}
qp, err = NewQueryPlanWithServerAggregation(string(q.AggregationType), cqlBuckets)
return
}
// ToQueryPlanWithoutServerAggregation combines an HLQuery with a
// ClientSideIndex to make a QueryPlanWithoutServerAggregation.
//
// It executes at most one CQLQuery per series.
func (q *HLQuery) ToQueryPlanWithoutServerAggregation() (qp *QueryPlanWithoutServerAggregation, err error) {
// Build the time buckets used for 'group by time'-type queries.
//
// It is important to populate these even if they end up being empty,
// so that we get correct results for empty 'time buckets'.
timeBuckets := bucketTimeIntervals(q.TimeStart, q.TimeEnd, q.GroupByDuration)
// Build CQLQuery objects that will be used to fulfill this HLQuery:
cqlQueries := []CQLQuery{}
cq := NewCQLQuery("", string(q.MeasurementName), string(q.FieldName), string(q.TagsCondition), q.TimeStart.UnixNano(), q.TimeEnd.UnixNano())
cqlQueries = append(cqlQueries, cq)
qp, err = NewQueryPlanWithoutServerAggregation(string(q.AggregationType), q.GroupByDuration, timeBuckets, cqlQueries)
return
}
// Type CQLQuery wraps data needed to execute a gocql.Query.
type CQLQuery struct {
PreparableQueryString string
Args []interface{}
}
// NewCQLQuery builds a CQLQuery, using prepared CQL statements.
func NewCQLQuery(aggrLabel, tableName, fieldName string, tagCondition string, timeStartNanos, timeEndNanos int64) CQLQuery {
var preparableQueryString string
if aggrLabel == "" {
preparableQueryString = fmt.Sprintf("SELECT time, %s FROM %s WHERE %s AND time >= ? AND time < ?", fieldName, tableName, tagCondition)
} else {
preparableQueryString = fmt.Sprintf("SELECT %s(%s) FROM %s WHERE %s AND time >= ? AND time < ?", aggrLabel, fieldName, tableName, tagCondition)
}
args := []interface{}{timeStartNanos, timeEndNanos}
return CQLQuery{preparableQueryString, args}
}
// Type CQLResult holds a result from a set of CQL aggregation queries.
// Used for debug printing.
type CQLResult struct {
TimeInterval
Value float64
}