-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
aggregate_resultset.go
159 lines (137 loc) · 4.88 KB
/
aggregate_resultset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package reads
import (
"context"
"math"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
type windowAggregateResultSet struct {
ctx context.Context
req *datatypes.ReadWindowAggregateRequest
seriesCursor SeriesCursor
seriesRow SeriesRow
arrayCursors multiShardCursors
cursor cursors.Cursor
err error
}
// IsLastDescendingAggregateOptimization checks two things: If the request passed in
// is using the `last` aggregate type, and if it doesn't have a window. If both
// conditions are met, it returns false, otherwise, it returns true.
func IsLastDescendingAggregateOptimization(req *datatypes.ReadWindowAggregateRequest) bool {
if len(req.Aggregate) != 1 {
// Descending optimization for last only applies when it is the only aggregate.
return false
}
// The following is an optimization where in the case of a single window,
// the selector `last` is implemented as a descending array cursor followed
// by a limit array cursor that selects only the first point, i.e the point
// with the largest timestamp, from the descending array cursor.
if req.Aggregate[0].Type == datatypes.Aggregate_AggregateTypeLast {
if req.Window == nil {
if req.WindowEvery == 0 || req.WindowEvery == math.MaxInt64 {
return true
}
} else if (req.Window.Every.Nsecs == 0 && req.Window.Every.Months == 0) || req.Window.Every.Nsecs == math.MaxInt64 {
return true
}
}
return false
}
func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, cursor SeriesCursor) (ResultSet, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
span.LogKV("aggregate_window_every", req.WindowEvery)
for _, aggregate := range req.Aggregate {
span.LogKV("aggregate_type", aggregate.String())
}
if nAggs := len(req.Aggregate); nAggs != 1 {
return nil, errors.Errorf(errors.InternalError, "attempt to create a windowAggregateResultSet with %v aggregate functions", nAggs)
}
ascending := !IsLastDescendingAggregateOptimization(req)
results := &windowAggregateResultSet{
ctx: ctx,
req: req,
seriesCursor: cursor,
arrayCursors: newMultiShardArrayCursors(ctx, req.Range.GetStart(), req.Range.GetEnd(), ascending),
}
return results, nil
}
func (r *windowAggregateResultSet) Next() bool {
if r == nil || r.err != nil {
return false
}
seriesRow := r.seriesCursor.Next()
if seriesRow == nil {
return false
}
r.seriesRow = *seriesRow
r.cursor, r.err = r.createCursor(r.seriesRow)
return r.err == nil
}
func convertNsecs(nsecs int64) values.Duration {
negative := false
if nsecs < 0 {
negative, nsecs = true, -nsecs
}
return values.MakeDuration(nsecs, 0, negative)
}
func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cursor, error) {
agg := r.req.Aggregate[0]
every := r.req.WindowEvery
offset := r.req.Offset
cursor := r.arrayCursors.createCursor(seriesRow)
var everyDur values.Duration
var offsetDur values.Duration
var periodDur values.Duration
if r.req.Window != nil {
// assume window was passed in and translate protobuf window to execute.Window
everyDur = values.MakeDuration(r.req.Window.Every.Nsecs, r.req.Window.Every.Months, r.req.Window.Every.Negative)
periodDur = values.MakeDuration(r.req.Window.Every.Nsecs, r.req.Window.Every.Months, r.req.Window.Every.Negative)
if r.req.Window.Offset != nil {
offsetDur = values.MakeDuration(r.req.Window.Offset.Nsecs, r.req.Window.Offset.Months, r.req.Window.Offset.Negative)
} else {
offsetDur = values.MakeDuration(0, 0, false)
}
} else {
// nanosecond values were passed in and need to be converted to windows
everyDur = convertNsecs(every)
periodDur = convertNsecs(every)
offsetDur = convertNsecs(offset)
}
window, err := interval.NewWindow(everyDur, periodDur, offsetDur)
if err != nil {
return nil, err
}
if everyDur.Nanoseconds() == math.MaxInt64 {
// This means to aggregate over whole series for the query's time range
return newAggregateArrayCursor(r.ctx, agg, cursor)
} else {
return newWindowAggregateArrayCursor(r.ctx, agg, window, cursor)
}
}
func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
return r.cursor
}
func (r *windowAggregateResultSet) Close() {
if r == nil {
return
}
r.seriesRow.Query = nil
r.seriesCursor.Close()
}
func (r *windowAggregateResultSet) Err() error { return r.err }
func (r *windowAggregateResultSet) Stats() cursors.CursorStats {
if r.seriesRow.Query == nil {
return cursors.CursorStats{}
}
// See the equivalent method in *resultSet.Stats.
return r.seriesRow.Query.Stats()
}
func (r *windowAggregateResultSet) Tags() models.Tags {
return r.seriesRow.Tags
}