Skip to content

Commit

Permalink
Skip past points at the same time in derivative call within a merged …
Browse files Browse the repository at this point in the history
…series

The derivative() call would panic if it received two points at the same
time because it tried to divide by zero. The derivative call now skips
past these points. To avoid skipping past these points, use `GROUP BY *`
so that each series is kept separated into their own series.

The difference() call has also been modified to skip past these points.
Even though difference doesn't divide by the time, difference is
supposed to perform the same as derivative, but without dividing by the
time.
  • Loading branch information
jsternberg committed Sep 13, 2016
1 parent 796f35b commit 0b94f5d
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@
- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars.
- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement.
- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client.
- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series.

## v1.0.0 [2016-09-07]

Expand Down
40 changes: 40 additions & 0 deletions influxql/functions.go
Expand Up @@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool)

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand Down Expand Up @@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand All @@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer {

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand All @@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand Down
96 changes: 96 additions & 0 deletions influxql/select_test.go
Expand Up @@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) {
}
}

func TestSelect_Derivative_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Derivative_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down Expand Up @@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) {
}
}

func TestSelect_Difference_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Elapsed_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down

0 comments on commit 0b94f5d

Please sign in to comment.