/
compile.go
1233 lines (1099 loc) · 39.7 KB
/
compile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package query
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxql"
)
// CompileOptions are the customization options for the compiler.
type CompileOptions struct {
Now time.Time
}
// Statement is a compiled query statement.
type Statement interface {
// Prepare prepares the statement by mapping shards and finishing the creation
// of the query plan.
Prepare(ctx context.Context, shardMapper ShardMapper, opt SelectOptions) (PreparedStatement, error)
}
// compiledStatement represents a select statement that has undergone some initial processing to
// determine if it is valid and to have some initial modifications done on the AST.
type compiledStatement struct {
// Condition is the condition used for accessing data.
Condition influxql.Expr
// TimeRange is the TimeRange for selecting data.
TimeRange influxql.TimeRange
// Interval holds the time grouping interval.
Interval Interval
// InheritedInterval marks if the interval was inherited by a parent.
// If this is set, then an interval that was inherited will not cause
// a query that shouldn't have an interval to fail.
InheritedInterval bool
// ExtraIntervals is the number of extra intervals that will be read in addition
// to the TimeRange. It is a multiple of Interval and only applies to queries that
// have an Interval. It is used to extend the TimeRange of the mapped shards to
// include additional non-emitted intervals used by derivative and other functions.
// It will be set to the highest number of extra intervals that need to be read even
// if it doesn't apply to all functions. The number will always be positive.
// This value may be set to a non-zero value even if there is no interval for the
// compiled query.
ExtraIntervals int
// Ascending is true if the time ordering is ascending.
Ascending bool
// FunctionCalls holds a reference to the call expression of every function
// call that has been encountered.
FunctionCalls []*influxql.Call
// OnlySelectors is set to true when there are no aggregate functions.
OnlySelectors bool
// HasDistinct is set when the distinct() function is encountered.
HasDistinct bool
// FillOption contains the fill option for aggregates.
FillOption influxql.FillOption
// TopBottomFunction is set to top or bottom when one of those functions are
// used in the statement.
TopBottomFunction string
// HasAuxiliaryFields is true when the function requires auxiliary fields.
HasAuxiliaryFields bool
// Fields holds all of the fields that will be used.
Fields []*compiledField
// TimeFieldName stores the name of the time field's column.
// The column names generated by the compiler will not conflict with
// this name.
TimeFieldName string
// Limit is the number of rows per series this query should be limited to.
Limit int
// HasTarget is true if this query is being written into a target.
HasTarget bool
// Options holds the configured compiler options.
Options CompileOptions
stmt *influxql.SelectStatement
}
func newCompiler(opt CompileOptions) *compiledStatement {
if opt.Now.IsZero() {
opt.Now = time.Now().UTC()
}
return &compiledStatement{
OnlySelectors: true,
TimeFieldName: "time",
Options: opt,
}
}
func Compile(stmt *influxql.SelectStatement, opt CompileOptions) (_ Statement, err error) {
c := newCompiler(opt)
c.stmt = stmt.Clone()
if err := c.preprocess(c.stmt); err != nil {
return nil, err
}
if err := c.compile(c.stmt); err != nil {
return nil, err
}
c.stmt.TimeAlias = c.TimeFieldName
c.stmt.Condition = c.Condition
defer func() {
if e := recover(); e != nil && err == nil {
var ok bool
err, ok = e.(error)
if !ok {
err = fmt.Errorf("panic: %v", e)
}
err = fmt.Errorf("likely malformed statement, unable to rewrite: %w", err)
}
}()
// Convert DISTINCT into a call.
c.stmt.RewriteDistinct()
// Remove "time" from fields list.
c.stmt.RewriteTimeFields()
// Rewrite any regex conditions that could make use of the index.
c.stmt.RewriteRegexConditions()
return c, nil
}
// preprocess retrieves and records the global attributes of the current statement.
func (c *compiledStatement) preprocess(stmt *influxql.SelectStatement) error {
c.Ascending = stmt.TimeAscending()
c.Limit = stmt.Limit
c.HasTarget = stmt.Target != nil
valuer := influxql.NowValuer{Now: c.Options.Now, Location: stmt.Location}
cond, t, err := influxql.ConditionExpr(stmt.Condition, &valuer)
if err != nil {
return err
}
// Verify that the condition is actually ok to use.
if err := c.validateCondition(cond); err != nil {
return err
}
c.Condition = cond
c.TimeRange = t
// Read the dimensions of the query, validate them, and retrieve the interval
// if it exists.
if err := c.compileDimensions(stmt); err != nil {
return err
}
// Retrieve the fill option for the statement.
c.FillOption = stmt.Fill
// Resolve the min and max times now that we know if there is an interval or not.
if c.TimeRange.Min.IsZero() {
c.TimeRange.Min = time.Unix(0, influxql.MinTime).UTC()
}
if c.TimeRange.Max.IsZero() {
// If the interval is non-zero, then we have an aggregate query and
// need to limit the maximum time to now() for backwards compatibility
// and usability.
if !c.Interval.IsZero() {
c.TimeRange.Max = c.Options.Now
} else {
c.TimeRange.Max = time.Unix(0, influxql.MaxTime).UTC()
}
}
return nil
}
func (c *compiledStatement) compile(stmt *influxql.SelectStatement) error {
if err := c.compileFields(stmt); err != nil {
return err
}
if err := c.validateFields(); err != nil {
return err
}
// Look through the sources and compile each of the subqueries (if they exist).
// We do this after compiling the outside because subqueries may require
// inherited state.
for _, source := range stmt.Sources {
switch source := source.(type) {
case *influxql.SubQuery:
source.Statement.OmitTime = true
if err := c.subquery(source.Statement); err != nil {
return err
}
source.Statement.RewriteRegexConditions()
}
}
return nil
}
func (c *compiledStatement) compileFields(stmt *influxql.SelectStatement) error {
valuer := MathValuer{}
c.Fields = make([]*compiledField, 0, len(stmt.Fields))
for _, f := range stmt.Fields {
// Remove any time selection (it is automatically selected by default)
// and set the time column name to the alias of the time field if it exists.
// Such as SELECT time, max(value) FROM cpu will be SELECT max(value) FROM cpu
// and SELECT time AS timestamp, max(value) FROM cpu will return "timestamp"
// as the column name for the time.
if ref, ok := f.Expr.(*influxql.VarRef); ok && ref.Val == "time" {
if f.Alias != "" {
c.TimeFieldName = f.Alias
}
continue
}
// Append this field to the list of processed fields and compile it.
f.Expr = influxql.Reduce(f.Expr, &valuer)
field := &compiledField{
global: c,
Field: f,
AllowWildcard: true,
}
c.Fields = append(c.Fields, field)
if err := field.compileExpr(field.Field.Expr); err != nil {
return err
}
}
return nil
}
type compiledField struct {
// This holds the global state from the compiled statement.
global *compiledStatement
// Field is the top level field that is being compiled.
Field *influxql.Field
// AllowWildcard is set to true if a wildcard or regular expression is allowed.
AllowWildcard bool
}
// compileExpr creates the node that executes the expression and connects that
// node to the WriteEdge as the output.
func (c *compiledField) compileExpr(expr influxql.Expr) error {
switch expr := expr.(type) {
case *influxql.VarRef:
// A bare variable reference will require auxiliary fields.
c.global.HasAuxiliaryFields = true
return nil
case *influxql.Wildcard:
// Wildcards use auxiliary fields. We assume there will be at least one
// expansion.
c.global.HasAuxiliaryFields = true
if !c.AllowWildcard {
return errors.New("unable to use wildcard in a binary expression")
}
return nil
case *influxql.RegexLiteral:
if !c.AllowWildcard {
return errors.New("unable to use regex in a binary expression")
}
c.global.HasAuxiliaryFields = true
return nil
case *influxql.Call:
if isMathFunction(expr) {
return c.compileMathFunction(expr)
}
// Register the function call in the list of function calls.
c.global.FunctionCalls = append(c.global.FunctionCalls, expr)
switch expr.Name {
case "percentile":
return c.compilePercentile(expr.Args)
case "sample":
return c.compileSample(expr.Args)
case "distinct":
return c.compileDistinct(expr.Args, false)
case "top", "bottom":
return c.compileTopBottom(expr)
case "derivative", "non_negative_derivative":
isNonNegative := expr.Name == "non_negative_derivative"
return c.compileDerivative(expr.Args, isNonNegative)
case "difference", "non_negative_difference":
isNonNegative := expr.Name == "non_negative_difference"
return c.compileDifference(expr.Args, isNonNegative)
case "cumulative_sum":
return c.compileCumulativeSum(expr.Args)
case "moving_average":
return c.compileMovingAverage(expr.Args)
case "exponential_moving_average", "double_exponential_moving_average", "triple_exponential_moving_average", "relative_strength_index", "triple_exponential_derivative":
return c.compileExponentialMovingAverage(expr.Name, expr.Args)
case "kaufmans_efficiency_ratio", "kaufmans_adaptive_moving_average":
return c.compileKaufmans(expr.Name, expr.Args)
case "chande_momentum_oscillator":
return c.compileChandeMomentumOscillator(expr.Args)
case "elapsed":
return c.compileElapsed(expr.Args)
case "integral":
return c.compileIntegral(expr.Args)
case "count_hll":
return c.compileCountHll(expr.Args)
case "holt_winters", "holt_winters_with_fit":
withFit := expr.Name == "holt_winters_with_fit"
return c.compileHoltWinters(expr.Args, withFit)
default:
return c.compileFunction(expr)
}
case *influxql.Distinct:
call := expr.NewCall()
c.global.FunctionCalls = append(c.global.FunctionCalls, call)
return c.compileDistinct(call.Args, false)
case *influxql.BinaryExpr:
// Disallow wildcards in binary expressions. RewriteFields, which expands
// wildcards, is too complicated if we allow wildcards inside of expressions.
c.AllowWildcard = false
// Check if either side is a literal so we only compile one side if it is.
if _, ok := expr.LHS.(influxql.Literal); ok {
if _, ok := expr.RHS.(influxql.Literal); ok {
return errors.New("cannot perform a binary expression on two literals")
}
return c.compileExpr(expr.RHS)
} else if _, ok := expr.RHS.(influxql.Literal); ok {
return c.compileExpr(expr.LHS)
} else {
// Validate both sides of the expression.
if err := c.compileExpr(expr.LHS); err != nil {
return err
}
if err := c.compileExpr(expr.RHS); err != nil {
return err
}
return nil
}
case *influxql.ParenExpr:
return c.compileExpr(expr.Expr)
case influxql.Literal:
return errors.New("field must contain at least one variable")
}
return errors.New("unimplemented")
}
// compileNestedExpr ensures that the expression is compiled as if it were
// a nested expression.
func (c *compiledField) compileNestedExpr(expr influxql.Expr) error {
// Intercept the distinct call so we can pass nested as true.
switch expr := expr.(type) {
case *influxql.Call:
if expr.Name == "distinct" {
return c.compileDistinct(expr.Args, true)
}
case *influxql.Distinct:
call := expr.NewCall()
return c.compileDistinct(call.Args, true)
}
return c.compileExpr(expr)
}
func (c *compiledField) compileSymbol(name string, field influxql.Expr) error {
// Must be a variable reference, wildcard, or regexp.
switch field.(type) {
case *influxql.VarRef:
return nil
case *influxql.Wildcard:
if !c.AllowWildcard {
return fmt.Errorf("unsupported expression with wildcard: %s()", name)
}
c.global.OnlySelectors = false
return nil
case *influxql.RegexLiteral:
if !c.AllowWildcard {
return fmt.Errorf("unsupported expression with regex field: %s()", name)
}
c.global.OnlySelectors = false
return nil
default:
return fmt.Errorf("expected field argument in %s()", name)
}
}
func (c *compiledField) compileFunction(expr *influxql.Call) error {
// Validate the function call and mark down some meta properties
// related to the function for query validation.
switch expr.Name {
case "max", "min", "first", "last":
// top/bottom are not included here since they are not typical functions.
case "count", "sum", "mean", "median", "mode", "stddev", "spread", "sum_hll":
// These functions are not considered selectors.
c.global.OnlySelectors = false
default:
return fmt.Errorf("undefined function %s()", expr.Name)
}
if exp, got := 1, len(expr.Args); exp != got {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, exp, got)
}
// If this is a call to count(), allow distinct() to be used as the function argument.
if expr.Name == "count" {
// If we have count(), the argument may be a distinct() call.
if arg0, ok := expr.Args[0].(*influxql.Call); ok && arg0.Name == "distinct" {
return c.compileDistinct(arg0.Args, true)
} else if arg0, ok := expr.Args[0].(*influxql.Distinct); ok {
call := arg0.NewCall()
return c.compileDistinct(call.Args, true)
}
}
return c.compileSymbol(expr.Name, expr.Args[0])
}
func (c *compiledField) compilePercentile(args []influxql.Expr) error {
if exp, got := 2, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for percentile, expected %d, got %d", exp, got)
}
switch args[1].(type) {
case *influxql.IntegerLiteral:
case *influxql.NumberLiteral:
default:
return fmt.Errorf("expected float argument in percentile()")
}
return c.compileSymbol("percentile", args[0])
}
func (c *compiledField) compileSample(args []influxql.Expr) error {
if exp, got := 2, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for sample, expected %d, got %d", exp, got)
}
switch arg1 := args[1].(type) {
case *influxql.IntegerLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("sample window must be greater than 1, got %d", arg1.Val)
}
default:
return fmt.Errorf("expected integer argument in sample()")
}
return c.compileSymbol("sample", args[0])
}
func (c *compiledField) compileDerivative(args []influxql.Expr, isNonNegative bool) error {
name := "derivative"
if isNonNegative {
name = "non_negative_derivative"
}
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d but no more than %d, got %d", name, min, max, got)
}
// Retrieve the duration from the derivative() call, if specified.
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return fmt.Errorf("second argument to %s must be a duration, got %T", name, args[1])
}
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < 1 {
c.global.ExtraIntervals = 1
}
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileNestedExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileElapsed(args []influxql.Expr) error {
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for elapsed, expected at least %d but no more than %d, got %d", min, max, got)
}
// Retrieve the duration from the elapsed() call, if specified.
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return fmt.Errorf("second argument to elapsed must be a duration, got %T", args[1])
}
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < 1 {
c.global.ExtraIntervals = 1
}
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("elapsed aggregate requires a GROUP BY interval")
}
return c.compileNestedExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to elapsed")
}
return c.compileSymbol("elapsed", arg0)
}
}
func (c *compiledField) compileDifference(args []influxql.Expr, isNonNegative bool) error {
name := "difference"
if isNonNegative {
name = "non_negative_difference"
}
if got := len(args); got != 1 {
return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", name, got)
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < 1 {
c.global.ExtraIntervals = 1
}
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileNestedExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileCumulativeSum(args []influxql.Expr) error {
if got := len(args); got != 1 {
return fmt.Errorf("invalid number of arguments for cumulative_sum, expected 1, got %d", got)
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < 1 {
c.global.ExtraIntervals = 1
}
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("cumulative_sum aggregate requires a GROUP BY interval")
}
return c.compileNestedExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to cumulative_sum")
}
return c.compileSymbol("cumulative_sum", arg0)
}
}
func (c *compiledField) compileMovingAverage(args []influxql.Expr) error {
if got := len(args); got != 2 {
return fmt.Errorf("invalid number of arguments for moving_average, expected 2, got %d", got)
}
arg1, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("second argument for moving_average must be an integer, got %T", args[1])
} else if arg1.Val <= 1 {
return fmt.Errorf("moving_average window must be greater than 1, got %d", arg1.Val)
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < int(arg1.Val) {
c.global.ExtraIntervals = int(arg1.Val)
}
// Must be a variable reference, function, wildcard, or regexp.
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("moving_average aggregate requires a GROUP BY interval")
}
return c.compileNestedExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to moving_average")
}
return c.compileSymbol("moving_average", arg0)
}
}
func (c *compiledField) compileExponentialMovingAverage(name string, args []influxql.Expr) error {
if got := len(args); got < 2 || got > 4 {
return fmt.Errorf("invalid number of arguments for %s, expected at least 2 but no more than 4, got %d", name, got)
}
arg1, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("%s period must be an integer", name)
} else if arg1.Val < 1 {
return fmt.Errorf("%s period must be greater than or equal to 1", name)
}
if len(args) >= 3 {
switch arg2 := args[2].(type) {
case *influxql.IntegerLiteral:
if name == "triple_exponential_derivative" && arg2.Val < 1 && arg2.Val != -1 {
return fmt.Errorf("%s hold period must be greater than or equal to 1", name)
}
if arg2.Val < 0 && arg2.Val != -1 {
return fmt.Errorf("%s hold period must be greater than or equal to 0", name)
}
default:
return fmt.Errorf("%s hold period must be an integer", name)
}
}
if len(args) >= 4 {
switch arg3 := args[3].(type) {
case *influxql.StringLiteral:
switch arg3.Val {
case "exponential", "simple":
default:
return fmt.Errorf("%s warmup type must be one of: 'exponential' 'simple'", name)
}
default:
return fmt.Errorf("%s warmup type must be a string", name)
}
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < int(arg1.Val) {
c.global.ExtraIntervals = int(arg1.Val)
}
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileKaufmans(name string, args []influxql.Expr) error {
if got := len(args); got < 2 || got > 3 {
return fmt.Errorf("invalid number of arguments for %s, expected at least 2 but no more than 3, got %d", name, got)
}
arg1, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("%s period must be an integer", name)
} else if arg1.Val < 1 {
return fmt.Errorf("%s period must be greater than or equal to 1", name)
}
if len(args) >= 3 {
switch arg2 := args[2].(type) {
case *influxql.IntegerLiteral:
if arg2.Val < 0 && arg2.Val != -1 {
return fmt.Errorf("%s hold period must be greater than or equal to 0", name)
}
default:
return fmt.Errorf("%s hold period must be an integer", name)
}
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < int(arg1.Val) {
c.global.ExtraIntervals = int(arg1.Val)
}
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to %s", name)
}
return c.compileSymbol(name, arg0)
}
}
func (c *compiledField) compileChandeMomentumOscillator(args []influxql.Expr) error {
if got := len(args); got < 2 || got > 4 {
return fmt.Errorf("invalid number of arguments for chande_momentum_oscillator, expected at least 2 but no more than 4, got %d", got)
}
arg1, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("chande_momentum_oscillator period must be an integer")
} else if arg1.Val < 1 {
return fmt.Errorf("chande_momentum_oscillator period must be greater than or equal to 1")
}
if len(args) >= 3 {
switch arg2 := args[2].(type) {
case *influxql.IntegerLiteral:
if arg2.Val < 0 && arg2.Val != -1 {
return fmt.Errorf("chande_momentum_oscillator hold period must be greater than or equal to 0")
}
default:
return fmt.Errorf("chande_momentum_oscillator hold period must be an integer")
}
}
c.global.OnlySelectors = false
if c.global.ExtraIntervals < int(arg1.Val) {
c.global.ExtraIntervals = int(arg1.Val)
}
if len(args) >= 4 {
switch arg3 := args[3].(type) {
case *influxql.StringLiteral:
switch arg3.Val {
case "none", "exponential", "simple":
default:
return fmt.Errorf("chande_momentum_oscillator warmup type must be one of: 'none' 'exponential' 'simple'")
}
default:
return fmt.Errorf("chande_momentum_oscillator warmup type must be a string")
}
}
switch arg0 := args[0].(type) {
case *influxql.Call:
if c.global.Interval.IsZero() {
return fmt.Errorf("chande_momentum_oscillator aggregate requires a GROUP BY interval")
}
return c.compileExpr(arg0)
default:
if !c.global.Interval.IsZero() && !c.global.InheritedInterval {
return fmt.Errorf("aggregate function required inside the call to chande_momentum_oscillator")
}
return c.compileSymbol("chande_momentum_oscillator", arg0)
}
}
func (c *compiledField) compileIntegral(args []influxql.Expr) error {
if min, max, got := 1, 2, len(args); got > max || got < min {
return fmt.Errorf("invalid number of arguments for integral, expected at least %d but no more than %d, got %d", min, max, got)
}
if len(args) == 2 {
switch arg1 := args[1].(type) {
case *influxql.DurationLiteral:
if arg1.Val <= 0 {
return fmt.Errorf("duration argument must be positive, got %s", influxql.FormatDuration(arg1.Val))
}
default:
return errors.New("second argument must be a duration")
}
}
c.global.OnlySelectors = false
// Must be a variable reference, wildcard, or regexp.
return c.compileSymbol("integral", args[0])
}
func (c *compiledField) compileCountHll(args []influxql.Expr) error {
if exp, got := 1, len(args); exp != got {
return fmt.Errorf("invalid number of arguments for count_hll, expected %d, got %d", exp, got)
}
c.global.OnlySelectors = false
switch arg0 := args[0].(type) {
case *influxql.Call:
return c.compileExpr(arg0)
default:
return c.compileSymbol("count_hll", arg0)
}
}
func (c *compiledField) compileHoltWinters(args []influxql.Expr, withFit bool) error {
name := "holt_winters"
if withFit {
name = "holt_winters_with_fit"
}
if exp, got := 3, len(args); got != exp {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", name, exp, got)
}
n, ok := args[1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer argument as second arg in %s", name)
} else if n.Val <= 0 {
return fmt.Errorf("second arg to %s must be greater than 0, got %d", name, n.Val)
}
s, ok := args[2].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer argument as third arg in %s", name)
} else if s.Val < 0 {
return fmt.Errorf("third arg to %s cannot be negative, got %d", name, s.Val)
}
c.global.OnlySelectors = false
call, ok := args[0].(*influxql.Call)
if !ok {
return fmt.Errorf("must use aggregate function with %s", name)
} else if c.global.Interval.IsZero() {
return fmt.Errorf("%s aggregate requires a GROUP BY interval", name)
}
return c.compileNestedExpr(call)
}
func (c *compiledField) compileDistinct(args []influxql.Expr, nested bool) error {
if len(args) == 0 {
return errors.New("distinct function requires at least one argument")
} else if len(args) != 1 {
return errors.New("distinct function can only have one argument")
}
if _, ok := args[0].(*influxql.VarRef); !ok {
return errors.New("expected field argument in distinct()")
}
if !nested {
c.global.HasDistinct = true
}
c.global.OnlySelectors = false
return nil
}
func (c *compiledField) compileTopBottom(call *influxql.Call) error {
if c.global.TopBottomFunction != "" {
return fmt.Errorf("selector function %s() cannot be combined with other functions", c.global.TopBottomFunction)
}
if exp, got := 2, len(call.Args); got < exp {
return fmt.Errorf("invalid number of arguments for %s, expected at least %d, got %d", call.Name, exp, got)
}
limit, ok := call.Args[len(call.Args)-1].(*influxql.IntegerLiteral)
if !ok {
return fmt.Errorf("expected integer as last argument in %s(), found %s", call.Name, call.Args[len(call.Args)-1])
} else if limit.Val <= 0 {
return fmt.Errorf("limit (%d) in %s function must be at least 1", limit.Val, call.Name)
} else if c.global.Limit > 0 && int(limit.Val) > c.global.Limit {
return fmt.Errorf("limit (%d) in %s function can not be larger than the LIMIT (%d) in the select statement", limit.Val, call.Name, c.global.Limit)
}
if _, ok := call.Args[0].(*influxql.VarRef); !ok {
return fmt.Errorf("expected first argument to be a field in %s(), found %s", call.Name, call.Args[0])
}
if len(call.Args) > 2 {
for _, v := range call.Args[1 : len(call.Args)-1] {
ref, ok := v.(*influxql.VarRef)
if !ok {
return fmt.Errorf("only fields or tags are allowed in %s(), found %s", call.Name, v)
}
// Add a field for each of the listed dimensions when not writing the results.
if !c.global.HasTarget {
field := &compiledField{
global: c.global,
Field: &influxql.Field{Expr: ref},
}
c.global.Fields = append(c.global.Fields, field)
if err := field.compileExpr(ref); err != nil {
return err
}
}
}
}
c.global.TopBottomFunction = call.Name
return nil
}
func (c *compiledField) compileMathFunction(expr *influxql.Call) error {
// How many arguments are we expecting?
nargs := 1
switch expr.Name {
case "atan2", "pow", "log":
nargs = 2
}
// Did we get the expected number of args?
if got := len(expr.Args); got != nargs {
return fmt.Errorf("invalid number of arguments for %s, expected %d, got %d", expr.Name, nargs, got)
}
// Compile all the argument expressions that are not just literals.
for _, arg := range expr.Args {
if _, ok := arg.(influxql.Literal); ok {
continue
}
if err := c.compileExpr(arg); err != nil {
return err
}
}
return nil
}
func (c *compiledStatement) compileDimensions(stmt *influxql.SelectStatement) error {
for _, d := range stmt.Dimensions {
// Reduce the expression before attempting anything. Do not evaluate the call.
expr := influxql.Reduce(d.Expr, nil)
switch expr := expr.(type) {
case *influxql.VarRef:
if strings.EqualFold(expr.Val, "time") {
return errors.New("time() is a function and expects at least one argument")
}
case *influxql.Call:
// Ensure the call is time() and it has one or two duration arguments.
// If we already have a duration
if expr.Name != "time" {
return errors.New("only time() calls allowed in dimensions")
} else if got := len(expr.Args); got < 1 || got > 2 {
return errors.New("time dimension expected 1 or 2 arguments")
} else if lit, ok := expr.Args[0].(*influxql.DurationLiteral); !ok {
return errors.New("time dimension must have duration argument")
} else if c.Interval.Duration != 0 {
return errors.New("multiple time dimensions not allowed")
} else {
c.Interval.Duration = lit.Val
if len(expr.Args) == 2 {
switch lit := expr.Args[1].(type) {
case *influxql.DurationLiteral:
c.Interval.Offset = lit.Val % c.Interval.Duration
case *influxql.TimeLiteral:
c.Interval.Offset = lit.Val.Sub(lit.Val.Truncate(c.Interval.Duration))
case *influxql.Call:
if lit.Name != "now" {
return errors.New("time dimension offset function must be now()")
} else if len(lit.Args) != 0 {
return errors.New("time dimension offset now() function requires no arguments")
}
now := c.Options.Now
c.Interval.Offset = now.Sub(now.Truncate(c.Interval.Duration))
// Use the evaluated offset to replace the argument. Ideally, we would
// use the interval assigned above, but the query engine hasn't been changed
// to use the compiler information yet.
expr.Args[1] = &influxql.DurationLiteral{Val: c.Interval.Offset}
case *influxql.StringLiteral:
// If literal looks like a date time then parse it as a time literal.
if lit.IsTimeLiteral() {
t, err := lit.ToTimeLiteral(stmt.Location)
if err != nil {
return err
}
c.Interval.Offset = t.Val.Sub(t.Val.Truncate(c.Interval.Duration))
} else {
return errors.New("time dimension offset must be duration or now()")
}
default:
return errors.New("time dimension offset must be duration or now()")
}
}
}
case *influxql.Wildcard:
case *influxql.RegexLiteral:
default:
return errors.New("only time and tag dimensions allowed")
}
// Assign the reduced/changed expression to the dimension.
d.Expr = expr
}
return nil
}
// validateFields validates that the fields are mutually compatible with each other.
// This runs at the end of compilation but before linking.
func (c *compiledStatement) validateFields() error {
// Validate that at least one field has been selected.
if len(c.Fields) == 0 {
return errors.New("at least 1 non-time field must be queried")
}