Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip past points at the same time in derivative call within a merged series #7293

Merged
merged 1 commit into from Sep 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -7,6 +7,7 @@
- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars.
- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement.
- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client.
- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series.

## v1.0.0 [2016-09-07]

Expand Down
40 changes: 40 additions & 0 deletions influxql/functions.go
Expand Up @@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool)

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand Down Expand Up @@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand All @@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer {

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand All @@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand Down
96 changes: 96 additions & 0 deletions influxql/select_test.go
Expand Up @@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) {
}
}

func TestSelect_Derivative_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Derivative_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down Expand Up @@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) {
}
}

func TestSelect_Difference_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Elapsed_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down