Skip to content

Commit

Permalink
Add fill(linear) to query language
Browse files Browse the repository at this point in the history
Clean up template for fill average

Change fill(average) to fill(linear)

Update average to linear in infuxql spec

Add Integer Tests and associated fixes

Update CHANGELOG for fill(linear)
  • Loading branch information
desa committed Oct 4, 2016
1 parent 8e35dd3 commit 966e550
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features

- [#7403](https://github.com/influxdata/influxdb/pull/7403): Add `fill(linear)` to query language
- [#7120](https://github.com/influxdata/influxdb/issues/7120): Add additional statistics to query executor.
- [#7135](https://github.com/influxdata/influxdb/pull/7135): Support enable HTTP service over unix domain socket. Thanks @oiooj
- [#3634](https://github.com/influxdata/influxdb/issues/3634): Support mixed duration units.
Expand Down
2 changes: 1 addition & 1 deletion influxql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ field = expr [ alias ] .
fields = field { "," field } .
fill_option = "null" | "none" | "previous" | int_lit | float_lit .
fill_option = "null" | "none" | "previous" | "linear" | int_lit | float_lit .
host = string_lit .
Expand Down
4 changes: 4 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,8 @@ const (
NumberFill
// PreviousFill means that empty aggregate windows will be filled with whatever the previous aggregate window had
PreviousFill
// LinearFill means that empty aggregate windows will be filled with whatever a linear value between non null windows
LinearFill
)

// SelectStatement represents a command for extracting data from the database.
Expand Down Expand Up @@ -1356,6 +1358,8 @@ func (s *SelectStatement) String() string {
_, _ = buf.WriteString(" fill(none)")
case NumberFill:
_, _ = buf.WriteString(fmt.Sprintf(" fill(%v)", s.FillValue))
case LinearFill:
_, _ = buf.WriteString(" fill(linear)")
case PreviousFill:
_, _ = buf.WriteString(" fill(previous)")
}
Expand Down
36 changes: 36 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,22 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) {
}

switch itr.opt.Fill {
case LinearFill:
if !itr.prev.Nil {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linearFloat(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
} else {
p.Nil = true
}
} else {
p.Nil = true
}
case NullFill:
p.Nil = true
case NumberFill:
Expand Down Expand Up @@ -2742,6 +2758,22 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) {
}

switch itr.opt.Fill {
case LinearFill:
if !itr.prev.Nil {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linearInteger(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
} else {
p.Nil = true
}
} else {
p.Nil = true
}
case NullFill:
p.Nil = true
case NumberFill:
Expand Down Expand Up @@ -4808,6 +4840,8 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) {
}

switch itr.opt.Fill {
case LinearFill:
fallthrough
case NullFill:
p.Nil = true
case NumberFill:
Expand Down Expand Up @@ -6874,6 +6908,8 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) {
}

switch itr.opt.Fill {
case LinearFill:
fallthrough
case NullFill:
p.Nil = true
case NumberFill:
Expand Down
20 changes: 20 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,26 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) {
}

switch itr.opt.Fill {
case LinearFill:
{{- if or (eq $k.Name "Float") (eq $k.Name "Integer")}}
if !itr.prev.Nil {
next, err := itr.input.peek()
if err != nil {
return nil, err
}
if next != nil {
interval := int64(itr.opt.Interval.Duration)
start := itr.window.time / interval
p.Value = linear{{$k.Name}}(start, itr.prev.Time/interval, next.Time/interval, itr.prev.Value, next.Value)
} else {
p.Nil = true
}
} else {
p.Nil = true
}
{{else}}
fallthrough
{{- end}}
case NullFill:
p.Nil = true
case NumberFill:
Expand Down
21 changes: 21 additions & 0 deletions influxql/linear.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package influxql

// linearFloat computes the the slope of the line between the points (previousTime, previousValue) and (nextTime, nextValue)
// and returns the value of the point on the line with time windowTime
// y = mx + b
func linearFloat(windowTime, previousTime, nextTime int64, previousValue, nextValue float64) float64 {
m := (nextValue - previousValue) / float64(nextTime-previousTime) // the slope of the line
x := float64(windowTime - previousTime) // how far into the interval we are
b := previousValue
return m*x + b
}

// linearInteger computes the the slope of the line between the points (previousTime, previousValue) and (nextTime, nextValue)
// and returns the value of the point on the line with time windowTime
// y = mx + b
func linearInteger(windowTime, previousTime, nextTime int64, previousValue, nextValue int64) int64 {
m := float64(nextValue-previousValue) / float64(nextTime-previousTime) // the slope of the line
x := float64(windowTime - previousTime) // how far into the interval we are
b := float64(previousValue)
return int64(m*x + b)
}
4 changes: 3 additions & 1 deletion influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2144,7 +2144,7 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) {
if !ok {
return NullFill, nil, errors.New("fill must be a function call")
} else if len(fill.Args) != 1 {
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous")
return NullFill, nil, errors.New("fill requires an argument, e.g.: 0, null, none, previous, linear")
}
switch fill.Args[0].String() {
case "null":
Expand All @@ -2153,6 +2153,8 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) {
return NoFill, nil, nil
case "previous":
return PreviousFill, nil, nil
case "linear":
return LinearFill, nil, nil
default:
switch num := fill.Args[0].(type) {
case *IntegerLiteral:
Expand Down
19 changes: 19 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,25 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SELECT statement with average fill
{
s: fmt.Sprintf(`SELECT mean(value) FROM cpu where time < '%s' GROUP BY time(5m) FILL(linear)`, now.UTC().Format(time.RFC3339Nano)),
stmt: &influxql.SelectStatement{
Fields: []*influxql.Field{{
Expr: &influxql.Call{
Name: "mean",
Args: []influxql.Expr{&influxql.VarRef{Val: "value"}}}}},
Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}},
Condition: &influxql.BinaryExpr{
Op: influxql.LT,
LHS: &influxql.VarRef{Val: "time"},
RHS: &influxql.StringLiteral{Val: now.UTC().Format(time.RFC3339Nano)},
},
Dimensions: []*influxql.Dimension{{Expr: &influxql.Call{Name: "time", Args: []influxql.Expr{&influxql.DurationLiteral{Val: 5 * time.Minute}}}}},
Fill: influxql.LinearFill,
},
},

// SELECT casts
{
s: `SELECT field1::float, field2::integer, field3::string, field4::boolean, field5::field, tag1::tag FROM cpu`,
Expand Down
113 changes: 113 additions & 0 deletions influxql/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,119 @@ func TestSelect_Fill_Previous_Float(t *testing.T) {
}
}

// Ensure a SELECT query with a fill(linear) statement can be executed.
func TestSelect_Fill_Linear_Float_One(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 32 * Second, Value: 4},
}}, opt)
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4, Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Fill_Linear_Float_Many(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 2},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 62 * Second, Value: 7},
}}, opt)
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT mean(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 2, Aggregated: 1}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 3}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Value: 5}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Value: 6}},
{&influxql.FloatPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 60 * Second, Value: 7, Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

// Ensure a SELECT query with a fill(linear) statement can be executed for integers.
func TestSelect_Fill_Linear_Integer_One(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 32 * Second, Value: 4},
}}, opt)
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:00Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 1, Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 2}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4, Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Nil: true}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Fill_Linear_Integer_Many(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return influxql.NewCallIterator(&IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Tags: ParseTags("host=A"), Time: 12 * Second, Value: 1},
{Name: "cpu", Tags: ParseTags("host=A"), Time: 72 * Second, Value: 10},
}}, opt)
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT max(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:01:20Z' GROUP BY host, time(10s) fill(linear)`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 0 * Second, Nil: true}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 10 * Second, Value: 1, Aggregated: 1}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 20 * Second, Value: 2}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 30 * Second, Value: 4}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 40 * Second, Value: 5}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 50 * Second, Value: 7}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 60 * Second, Value: 8}},
{&influxql.IntegerPoint{Name: "cpu", Tags: ParseTags("host=A"), Time: 70 * Second, Value: 10, Aggregated: 1}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

// Ensure a SELECT stddev() query can be executed.
func TestSelect_Stddev_Float(t *testing.T) {
var ic IteratorCreator
Expand Down

0 comments on commit 966e550

Please sign in to comment.