From 0b94f5dc1ada986515c304ee76b05d43aad77d77 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Tue, 13 Sep 2016 16:57:34 -0500 Subject: [PATCH] Skip past points at the same time in derivative call within a merged series The derivative() call would panic if it received two points at the same time because it tried to divide by zero. The derivative call now skips past these points. To avoid skipping past these points, use `GROUP BY *` so that each series is kept separated into their own series. The difference() call has also been modified to skip past these points. Even though difference doesn't divide by the time, difference is supposed to perform the same as derivative, but without dividing by the time. --- CHANGELOG.md | 1 + influxql/functions.go | 40 +++++++++++++++++ influxql/select_test.go | 96 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 137 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a88f31e0526..c78990ac3da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars. - [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement. - [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client. +- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series. ## v1.0.0 [2016-09-07] diff --git a/influxql/functions.go b/influxql/functions.go index cf0379fb8fa..3a6a268a766 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool) // AggregateFloat aggregates a point into the reducer and updates the current window. func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint { } value := diff / (float64(elapsed) / float64(r.interval.Duration)) + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + // Drop negative values for non-negative derivatives. if r.isNonNegative && diff < 0 { return nil @@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo // AggregateInteger aggregates a point into the reducer and updates the current window. func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint { } value := diff / (float64(elapsed) / float64(r.interval.Duration)) + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true + // Drop negative values for non-negative derivatives. if r.isNonNegative && diff < 0 { return nil @@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer { // AggregateFloat aggregates a point into the reducer and updates the current window. func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint { if !r.prev.Nil { // Calculate the difference of successive points. value := r.curr.Value - r.prev.Value + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true return []FloatPoint{{Time: r.curr.Time, Value: value}} } return nil @@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer { // AggregateInteger aggregates a point into the reducer and updates the current window. func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) { + // Skip past a point when it does not advance the stream. A joined series + // may have multiple points at the same time so we will discard anything + // except the first point we encounter. + if !r.curr.Nil && r.curr.Time == p.Time { + return + } + r.prev = r.curr r.curr = *p } @@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint { if !r.prev.Nil { // Calculate the difference of successive points. value := r.curr.Value - r.prev.Value + + // Mark this point as read by changing the previous point to nil. + r.prev.Nil = true return []IntegerPoint{{Time: r.curr.Time, Value: value}} } return nil diff --git a/influxql/select_test.go b/influxql/select_test.go index 8c7ff82f3b3..fd4abd1a0d7 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) { } } +func TestSelect_Derivative_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Derivative_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_Difference_Float(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { @@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) { } } +func TestSelect_Difference_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_Difference_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_Elapsed_Float(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {