Skip to content

Commit

Permalink
Merge pull request #9858 from influxdata/js-9197-top-return-correct-a…
Browse files Browse the repository at this point in the history
…uxiliary-values

Return the correct auxiliary values for top/bottom
  • Loading branch information
jsternberg committed May 17, 2018
2 parents 3fc40dd + 8a2bc63 commit d37dc75
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 40 deletions.
42 changes: 30 additions & 12 deletions query/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,11 +1882,14 @@ func (r *FloatTopReducer) AggregateFloat(p *FloatPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone FloatPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *FloatTopReducer) Emit() []FloatPoint {
Expand Down Expand Up @@ -1925,11 +1928,14 @@ func (r *IntegerTopReducer) AggregateInteger(p *IntegerPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone IntegerPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *IntegerTopReducer) Emit() []IntegerPoint {
Expand Down Expand Up @@ -1968,11 +1974,14 @@ func (r *UnsignedTopReducer) AggregateUnsigned(p *UnsignedPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone UnsignedPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *UnsignedTopReducer) Emit() []UnsignedPoint {
Expand Down Expand Up @@ -2011,11 +2020,14 @@ func (r *FloatBottomReducer) AggregateFloat(p *FloatPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone FloatPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *FloatBottomReducer) Emit() []FloatPoint {
Expand Down Expand Up @@ -2054,11 +2066,14 @@ func (r *IntegerBottomReducer) AggregateInteger(p *IntegerPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone IntegerPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *IntegerBottomReducer) Emit() []IntegerPoint {
Expand Down Expand Up @@ -2097,11 +2112,14 @@ func (r *UnsignedBottomReducer) AggregateUnsigned(p *UnsignedPoint) {
if !r.h.cmp(&r.h.points[0], p) {
return
}
r.h.points[0] = *p
p.CopyTo(&r.h.points[0])
heap.Fix(r.h, 0)
return
}
heap.Push(r.h, *p)

var clone UnsignedPoint
p.CopyTo(&clone)
heap.Push(r.h, clone)
}

func (r *UnsignedBottomReducer) Emit() []UnsignedPoint {
Expand Down
80 changes: 75 additions & 5 deletions query/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ type FloatIterator struct {
Closed bool
Delay time.Duration
stats query.IteratorStats
point query.FloatPoint
}

func (itr *FloatIterator) Stats() query.IteratorStats { return itr.stats }
Expand Down Expand Up @@ -1549,7 +1550,20 @@ func (itr *FloatIterator) Next() (*query.FloatPoint, error) {
}
v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil

// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}

func FloatIterators(inputs []*FloatIterator) []query.Iterator {
Expand All @@ -1565,6 +1579,7 @@ type IntegerIterator struct {
Points []query.IntegerPoint
Closed bool
stats query.IteratorStats
point query.IntegerPoint
}

func (itr *IntegerIterator) Stats() query.IteratorStats { return itr.stats }
Expand All @@ -1578,7 +1593,20 @@ func (itr *IntegerIterator) Next() (*query.IntegerPoint, error) {

v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil

// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}

func IntegerIterators(inputs []*IntegerIterator) []query.Iterator {
Expand All @@ -1594,6 +1622,7 @@ type UnsignedIterator struct {
Points []query.UnsignedPoint
Closed bool
stats query.IteratorStats
point query.UnsignedPoint
}

func (itr *UnsignedIterator) Stats() query.IteratorStats { return itr.stats }
Expand All @@ -1607,7 +1636,20 @@ func (itr *UnsignedIterator) Next() (*query.UnsignedPoint, error) {

v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil

// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}

func UnsignedIterators(inputs []*UnsignedIterator) []query.Iterator {
Expand All @@ -1623,6 +1665,7 @@ type StringIterator struct {
Points []query.StringPoint
Closed bool
stats query.IteratorStats
point query.StringPoint
}

func (itr *StringIterator) Stats() query.IteratorStats { return itr.stats }
Expand All @@ -1636,7 +1679,20 @@ func (itr *StringIterator) Next() (*query.StringPoint, error) {

v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil

// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}

func StringIterators(inputs []*StringIterator) []query.Iterator {
Expand All @@ -1652,6 +1708,7 @@ type BooleanIterator struct {
Points []query.BooleanPoint
Closed bool
stats query.IteratorStats
point query.BooleanPoint
}

func (itr *BooleanIterator) Stats() query.IteratorStats { return itr.stats }
Expand All @@ -1665,7 +1722,20 @@ func (itr *BooleanIterator) Next() (*query.BooleanPoint, error) {

v := &itr.Points[0]
itr.Points = itr.Points[1:]
return v, nil

// Copy the returned point into a static point that we return.
// This actual storage engine returns a point from the same memory location
// so we need to test that the query engine does not misuse this memory.
itr.point.Name = v.Name
itr.point.Tags = v.Tags
itr.point.Time = v.Time
itr.point.Value = v.Value
itr.point.Nil = v.Nil
if len(itr.point.Aux) != len(v.Aux) {
itr.point.Aux = make([]interface{}, len(v.Aux))
}
copy(itr.point.Aux, v.Aux)
return &itr.point, nil
}

func BooleanIterators(inputs []*BooleanIterator) []query.Iterator {
Expand Down
40 changes: 30 additions & 10 deletions query/point.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ func (v *FloatPoint) Clone() *FloatPoint {

// CopyTo makes a deep copy into the point.
func (v *FloatPoint) CopyTo(other *FloatPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down Expand Up @@ -282,9 +286,13 @@ func (v *IntegerPoint) Clone() *IntegerPoint {

// CopyTo makes a deep copy into the point.
func (v *IntegerPoint) CopyTo(other *IntegerPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down Expand Up @@ -503,9 +511,13 @@ func (v *UnsignedPoint) Clone() *UnsignedPoint {

// CopyTo makes a deep copy into the point.
func (v *UnsignedPoint) CopyTo(other *UnsignedPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down Expand Up @@ -722,9 +734,13 @@ func (v *StringPoint) Clone() *StringPoint {

// CopyTo makes a deep copy into the point.
func (v *StringPoint) CopyTo(other *StringPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down Expand Up @@ -943,9 +959,13 @@ func (v *BooleanPoint) Clone() *BooleanPoint {

// CopyTo makes a deep copy into the point.
func (v *BooleanPoint) CopyTo(other *BooleanPoint) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down
8 changes: 6 additions & 2 deletions query/point.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ func (v *{{.Name}}Point) Clone() *{{.Name}}Point {

// CopyTo makes a deep copy into the point.
func (v *{{.Name}}Point) CopyTo(other *{{.Name}}Point) {
*other = *v
other.Name, other.Tags = v.Name, v.Tags
other.Time = v.Time
other.Value, other.Nil = v.Value, v.Nil
if v.Aux != nil {
other.Aux = make([]interface{}, len(v.Aux))
if len(other.Aux) != len(v.Aux) {
other.Aux = make([]interface{}, len(v.Aux))
}
copy(other.Aux, v.Aux)
}
}
Expand Down
Loading

0 comments on commit d37dc75

Please sign in to comment.