Skip to content

Commit

Permalink
build(flux): update Flux to v0.64.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jsternberg committed Mar 11, 2020
1 parent b1ea8ef commit 636a27e
Show file tree
Hide file tree
Showing 44 changed files with 3,729 additions and 5,716 deletions.
67 changes: 67 additions & 0 deletions flux/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func init() {
PushDownGroupRule{},
PushDownReadTagKeysRule{},
PushDownReadTagValuesRule{},
SortedPivotRule{},
)
}

Expand Down Expand Up @@ -100,6 +101,11 @@ func (PushDownFilterRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
fromNode := pn.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec)

// Cannot push down when keeping empty tables.
if filterSpec.KeepEmptyTables {
return pn, false, nil
}

bodyExpr, ok := filterSpec.Fn.Fn.Block.Body.(semantic.Expression)
if !ok {
return pn, false, nil
Expand Down Expand Up @@ -460,6 +466,15 @@ func rewritePushableExpr(e semantic.Expression) (semantic.Expression, bool) {
e.Left, e.Right = left, right
return e, true
}

case *semantic.LogicalExpression:
left, lok := rewritePushableExpr(e.Left)
right, rok := rewritePushableExpr(e.Right)
if lok || rok {
e = e.Copy().(*semantic.LogicalExpression)
e.Left, e.Right = left, right
return e, true
}
}
return e, false
}
Expand Down Expand Up @@ -551,3 +566,55 @@ func isPushableFieldOperator(kind ast.OperatorKind) bool {

return false
}

// SortedPivotRule is a rule that optimizes a pivot when it is directly
// after an influxdb from.
type SortedPivotRule struct{}

func (SortedPivotRule) Name() string {
return "SortedPivotRule"
}

func (SortedPivotRule) Pattern() plan.Pattern {
return plan.Pat(universe.PivotKind, plan.Pat(ReadRangePhysKind))
}

func (SortedPivotRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
pivotSpec := pn.ProcedureSpec().Copy().(*universe.PivotProcedureSpec)
pivotSpec.IsSortedByFunc = func(cols []string, desc bool) bool {
if desc {
return false
}

// The only thing that disqualifies this from being
// sorted is if the _value column is mentioned or if
// the tag does not exist.
for _, label := range cols {
if label == execute.DefaultTimeColLabel {
continue
} else if label == execute.DefaultValueColLabel {
return false
}

// Everything else is a tag. Even if the tag does not exist,
// this is still considered sorted since sorting doesn't depend
// on a tag existing.
}

// We are already sorted.
return true
}
pivotSpec.IsKeyColumnFunc = func(label string) bool {
if label == execute.DefaultTimeColLabel || label == execute.DefaultValueColLabel {
return false
}
// Everything else would be a tag if it existed.
// The transformation itself will catch if the column does not exist.
return true
}

if err := pn.ReplaceSpec(pivotSpec); err != nil {
return nil, false, err
}
return pn, false, nil
}
67 changes: 67 additions & 0 deletions flux/stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,73 @@ func TestPushDownFilterRule(t *testing.T) {
},
NoChange: true,
},
{
Name: `r._measurement == "cpu" and exists r.host`,
Rules: []plan.Rule{influxdb.PushDownFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
}),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(&semantic.LogicalExpression{
Operator: ast.AndOperator,
Left: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "cpu",
},
},
Right: &semantic.UnaryExpression{
Operator: ast.ExistsOperator,
Argument: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
},
}),
}),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeFilterFn(&semantic.LogicalExpression{
Operator: ast.AndOperator,
Left: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "cpu",
},
},
Right: &semantic.BinaryExpression{
Operator: ast.NotEqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "",
},
},
}),
}),
},
},
},
}

for _, tc := range tests {
Expand Down
4 changes: 4 additions & 0 deletions flux/stdlib/influxdata/influxdb/to.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package influxdb

// TODO(jsternberg): Implement the to method in influxdb 1.x.
// This file is kept around so it shows up in the patch.
1 change: 1 addition & 0 deletions flux/stdlib/influxdata/influxdb/to_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package influxdb_test
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ require (
collectd.org v0.3.0
github.com/BurntSushi/toml v0.3.1
github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db
github.com/aws/aws-sdk-go v1.25.16 // indirect
github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40
github.com/boltdb/bolt v1.3.1
github.com/cespare/xxhash v1.1.0
Expand All @@ -19,7 +18,7 @@ require (
github.com/gogo/protobuf v1.1.1
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
github.com/google/go-cmp v0.4.0
github.com/influxdata/flux v0.50.2
github.com/influxdata/flux v0.64.0
github.com/influxdata/influxql v1.0.1
github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
Expand All @@ -30,7 +29,6 @@ require (
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada
github.com/mattn/go-isatty v0.0.4
github.com/mattn/go-zglob v0.0.1 // indirect
github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947
github.com/paulbellamy/ratecounter v0.2.0
Expand Down
Loading

0 comments on commit 636a27e

Please sign in to comment.