Skip to content

Commit

Permalink
Add derivative function
Browse files Browse the repository at this point in the history
Calculates the derivative of consequtive points and normalizes the
value to a given interval.  It supports simple derivates over
fields as well as nested derivatives over another aggregate function.

Fixes #1822
  • Loading branch information
jwilder committed May 13, 2015
1 parent 7fd9a0a commit a0a4600
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 7 deletions.
25 changes: 23 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,9 @@ type SelectStatement struct {
FillValue interface{}
}

func (s *SelectStatement) IsNonNestedDerivative() bool {
// HasDerivative returns true if one of the field in the statement is a
// derivative aggregate
func (s *SelectStatement) HasDerivative() bool {
for _, f := range s.Fields {
if f.Name() == "derivative" {
return true
Expand All @@ -651,6 +653,25 @@ func (s *SelectStatement) IsNonNestedDerivative() bool {
return false
}

// IsSimpleDerivative return true if a field is a derivative function with a
// variable ref as the first arg
func (s *SelectStatement) IsSimpleDerivative() bool {
for _, f := range s.Fields {
if f.Name() == "derivative" {
// cast to derivative call
if d, ok := f.Expr.(*Call); ok {

// it's nested if the first argument is an aggregate function
if _, ok := d.Args[0].(*VarRef); ok {
return true
}
}
return false
}
}
return false
}

// Clone returns a deep copy of the statement.
func (s *SelectStatement) Clone() *SelectStatement {
clone := &SelectStatement{
Expand Down Expand Up @@ -893,7 +914,7 @@ func (s *SelectStatement) Validate(tr targetRequirement) error {
}

func (s *SelectStatement) validateDerivative() error {
if !s.IsNonNestedDerivative() {
if !s.HasDerivative() {
return nil
}

Expand Down
114 changes: 112 additions & 2 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
}
defer m.Close()

// if it's a raw query we handle processing differently
if m.stmt.IsRawQuery {
// if it's a raw query or a non-nested derivative we handle processing differently
if m.stmt.IsRawQuery || m.stmt.IsSimpleDerivative() {
m.processRawQuery(out, filterEmptyResults)
return
}
Expand Down Expand Up @@ -196,6 +196,9 @@ func (m *MapReduceJob) Execute(out chan *Row, filterEmptyResults bool) {
// handle any fill options
resultValues = m.processFill(resultValues)

// process derivatives
resultValues = m.processDerivative(resultValues)

row := &Row{
Name: m.MeasurementName,
Tags: m.TagSet.Tags,
Expand Down Expand Up @@ -228,6 +231,7 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
valuesOffset := 0
valuesToReturn := make([]*rawQueryMapOutput, 0)

var lastValueFromPreviousChunk *rawQueryMapOutput
// loop until we've emptied out all the mappers and sent everything out
for {
// collect up to the limit for each mapper
Expand Down Expand Up @@ -324,6 +328,10 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// hit the chunk size? Send out what has been accumulated, but keep
// processing.
if len(valuesToReturn) >= m.chunkSize {
lastValueFromPreviousChunk = valuesToReturn[len(valuesToReturn)-1]

valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn)

row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
Expand All @@ -342,13 +350,115 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
out <- m.processRawResults(nil)
}
} else {
valuesToReturn = m.processRawQueryDerivative(lastValueFromPreviousChunk, valuesToReturn)

row := m.processRawResults(valuesToReturn)
// perform post-processing, such as math.
row.Values = m.processResults(row.Values)
out <- row
}
}

// derivativeInterval returns the time interval for the one (and only) derivative func
func (m *MapReduceJob) derivativeInterval() time.Duration {
return m.stmt.FunctionCalls()[0].Args[1].(*DurationLiteral).Val
}

func (m *MapReduceJob) processRawQueryDerivative(lastValueFromPreviousChunk *rawQueryMapOutput, valuesToReturn []*rawQueryMapOutput) []*rawQueryMapOutput {
// If we're called and do not have a derivative aggregate function, then return what was passed in
if !m.stmt.HasDerivative() {
return valuesToReturn
}

if len(valuesToReturn) == 0 {
return valuesToReturn
}

// If we only have 1 value, then the value did not change, so return
// a single row w/ 0.0
if len(valuesToReturn) == 1 {
return []*rawQueryMapOutput{
&rawQueryMapOutput{
Time: valuesToReturn[0].Time,
Values: 0.0,
},
}
}

if lastValueFromPreviousChunk == nil {
lastValueFromPreviousChunk = valuesToReturn[0]
}

// The duration to normalize the derivative by. This is so the derivative values
// can be expressed as "per second", etc.. within each time segment
interval := m.derivativeInterval()

derivativeValues := make([]*rawQueryMapOutput, len(valuesToReturn)-1)
for i := 1; i < len(valuesToReturn); i++ {
v := valuesToReturn[i]

// Calculate the derivate of successive points by dividing the difference
// of each value by the elapsed time normalized to the interval
diff := v.Values.(float64) - lastValueFromPreviousChunk.Values.(float64)
elapsed := v.Time - lastValueFromPreviousChunk.Time

derivativeValues[i-1] = &rawQueryMapOutput{
Time: v.Time,
Values: diff / (float64(elapsed) / float64(interval)),
}
lastValueFromPreviousChunk = v
}

return derivativeValues
}

// processDerivative returns the derivatives of the results
func (m *MapReduceJob) processDerivative(results [][]interface{}) [][]interface{} {

// Return early if we're not supposed to process the derivatives
if !m.stmt.HasDerivative() {
return results
}

// Return early if we can't calculate derivatives
if len(results) == 0 {
return results
}

// If we only have 1 value, then the value did not change, so return
// a single row w/ 0.0
if len(results) == 1 {
return [][]interface{}{
[]interface{}{results[0][0], 0.0},
}
}

// Otherwise calculate the derivatives as the difference between consequtive
// points divided by the elapsed time. Then normalize to the requested
// interval.
derivatives := make([][]interface{}, len(results)-1)
for i := 1; i < len(results); i++ {
prev := results[i-1]
cur := results[i]

if cur[1] == nil || prev[1] == nil {
derivatives[i-1] = cur
continue
}

elapsed := cur[0].(time.Time).Sub(prev[0].(time.Time))
diff := cur[1].(float64) - prev[1].(float64)

val := []interface{}{
cur[0],
float64(diff) / (float64(elapsed) / float64(m.derivativeInterval())),
}
derivatives[i-1] = val
}

return derivatives
}

// processsResults will apply any math that was specified in the select statement against the passed in results
func (m *MapReduceJob) processResults(results [][]interface{}) [][]interface{} {
hasMath := false
Expand Down
12 changes: 12 additions & 0 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ func TestParser_ParseStatement(t *testing.T) {
},
},

// SELECT statement
{
s: `SELECT derivative(field1, 1h) FROM myseries;`,
stmt: &influxql.SelectStatement{
IsRawQuery: false,
Fields: []*influxql.Field{
{Expr: &influxql.Call{Name: "derivative", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.DurationLiteral{Val: time.Hour}}}},
},
Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}},
},
},

// SELECT statement (lowercase)
{
s: `select my_field from myseries`,
Expand Down
14 changes: 11 additions & 3 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,20 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
}
}

// If a numerical aggregate is requested, ensure it is only performed on numeric data.
// If a numerical aggregate is requested, ensure it is only performed on numeric data or on a
// nested aggregate on numeric data.
for _, a := range stmt.FunctionCalls() {
lit, ok := a.Args[0].(*influxql.VarRef)
// Check for fields like `derivative(mean(value), 1d)`
var nested *influxql.Call = a
if fn, ok := nested.Args[0].(*influxql.Call); ok {
nested = fn
}

lit, ok := nested.Args[0].(*influxql.VarRef)
if !ok {
return nil, fmt.Errorf("aggregate call didn't contain a field %s", a.String())
}
if influxql.IsNumeric(a) {
if influxql.IsNumeric(nested) {
f := m.FieldByName(lit.Val)
if f.Type != influxql.Float && f.Type != influxql.Integer {
return nil, fmt.Errorf("aggregate '%s' requires numerical field values. Field '%s' is of type %s",
Expand Down Expand Up @@ -348,6 +355,7 @@ func (l *LocalMapper) Begin(c *influxql.Call, startingTime int64, chunkSize int)
l.limit = math.MaxUint64
}
} else {
// Check for calls like `derivative(mean(value), 1d)`
var nested *influxql.Call = c
if fn, ok := c.Args[0].(*influxql.Call); ok {
nested = fn
Expand Down

0 comments on commit a0a4600

Please sign in to comment.