-
Notifications
You must be signed in to change notification settings - Fork 153
/
logical.go
273 lines (225 loc) · 6.88 KB
/
logical.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package plan
import (
"fmt"
"time"
"github.com/influxdata/flux"
)
// LogicalPlanner translates a flux.Spec into a plan.Spec and applies any
// registered logical rules to the plan.
//
// Logical planning should transform the plan in ways that are independent of
// actual physical algorithms used to implement operations, and independent of
// the actual data being processed.
type LogicalPlanner interface {
CreateInitialPlan(spec *flux.Spec) (*Spec, error)
Plan(*Spec) (*Spec, error)
}
// NewLogicalPlanner returns a new logical plan with the given options.
// The plan will be configured to apply any logical rules that have
// been registered.
func NewLogicalPlanner(options ...LogicalOption) LogicalPlanner {
thePlanner := &logicalPlanner{
heuristicPlanner: newHeuristicPlanner(),
}
rules := make([]Rule, len(ruleNameToLogicalRule))
i := 0
for _, v := range ruleNameToLogicalRule {
rules[i] = v
i++
}
thePlanner.addRules(rules...)
// Options may add or remove rules, so process them after we've
// added registered rules.
for _, opt := range options {
opt.apply(thePlanner)
}
return thePlanner
}
// LogicalOption is an option to configure the behavior of the logical plan.
type LogicalOption interface {
apply(*logicalPlanner)
}
type logicalOption func(*logicalPlanner)
func (opt logicalOption) apply(lp *logicalPlanner) {
opt(lp)
}
type logicalPlanner struct {
*heuristicPlanner
disableIntegrityChecks bool
}
// OnlyLogicalRules produces a logical plan option that forces only a set of particular rules to be
// applied.
func OnlyLogicalRules(rules ...Rule) LogicalOption {
return logicalOption(func(lp *logicalPlanner) {
lp.clearRules()
lp.addRules(rules...)
})
}
func AddLogicalRules(rules ...Rule) LogicalOption {
return logicalOption(func(lp *logicalPlanner) {
lp.addRules(rules...)
})
}
// Disables integrity checks in the logical planner
func DisableIntegrityChecks() LogicalOption {
return logicalOption(func(lp *logicalPlanner) {
lp.disableIntegrityChecks = true
})
}
// CreateInitialPlan translates the flux.Spec into an unoptimized, naive plan.
func (l *logicalPlanner) CreateInitialPlan(spec *flux.Spec) (*Spec, error) {
return createLogicalPlan(spec)
}
// Plan transforms the given naive plan by applying rules.
func (l *logicalPlanner) Plan(logicalPlan *Spec) (*Spec, error) {
newLogicalPlan, err := l.heuristicPlanner.Plan(logicalPlan)
if err != nil {
return nil, err
}
// check integrity after planning is complete
if !l.disableIntegrityChecks {
err := newLogicalPlan.CheckIntegrity()
if err != nil {
return nil, err
}
}
return newLogicalPlan, nil
}
type administration struct {
now time.Time
}
func (a administration) Now() time.Time {
return a.now
}
// LogicalNode consists of the input and output edges and a procedure spec
// that describes what the node does.
type LogicalNode struct {
edges
bounds
id NodeID
Spec ProcedureSpec
}
// ID returns a human-readable identifier unique to this plan.
func (lpn *LogicalNode) ID() NodeID {
return lpn.id
}
// Kind returns the kind of procedure performed by this plan node.
func (lpn *LogicalNode) Kind() ProcedureKind {
return lpn.Spec.Kind()
}
// ProcedureSpec returns the procedure spec for this plan node.
func (lpn *LogicalNode) ProcedureSpec() ProcedureSpec {
return lpn.Spec
}
func (lpn *LogicalNode) ReplaceSpec(newSpec ProcedureSpec) error {
lpn.Spec = newSpec
return nil
}
func (lpn *LogicalNode) ShallowCopy() Node {
newNode := new(LogicalNode)
newNode.edges = lpn.edges.shallowCopy()
newNode.id = lpn.id + "_copy"
newNode.Spec = lpn.Spec.Copy()
return newNode
}
// createLogicalPlan creates a logical query plan from a flux spec
func createLogicalPlan(spec *flux.Spec) (*Spec, error) {
nodes := make(map[flux.OperationID]Node, len(spec.Operations))
admin := administration{now: spec.Now}
plan := NewPlanSpec()
plan.Resources = spec.Resources
plan.Now = spec.Now
v := &fluxSpecVisitor{
a: admin,
spec: spec,
plan: plan,
nodes: nodes,
yieldNames: make(map[string]struct{}),
}
if err := spec.Walk(v.visitOperation); err != nil {
return nil, err
}
return v.plan, nil
}
// fluxSpecVisitor visits a flux spec and constructs from it a logical plan DAG
type fluxSpecVisitor struct {
a Administration
spec *flux.Spec
plan *Spec
nodes map[flux.OperationID]Node
yieldNames map[string]struct{}
}
func (v *fluxSpecVisitor) addYieldName(pn Node) error {
yieldSpec := pn.ProcedureSpec().(YieldProcedureSpec)
name := yieldSpec.YieldName()
_, isDup := v.yieldNames[name]
if isDup {
return fmt.Errorf("duplicate yield name \"%v\" found on plan node: %v", name, pn.ID())
}
v.yieldNames[name] = struct{}{}
return nil
}
func generateYieldNode(pred Node) Node {
yieldSpec := &GeneratedYieldProcedureSpec{Name: DefaultYieldName}
yieldNode := CreateLogicalNode(NodeID("generated_yield"), yieldSpec)
pred.AddSuccessors(yieldNode)
yieldNode.AddPredecessors(pred)
return yieldNode
}
// visitOperation takes a flux spec operation, converts it to its equivalent
// logical procedure spec, and adds it to the current logical plan DAG.
func (v *fluxSpecVisitor) visitOperation(o *flux.Operation) error {
// Retrieve the create function for this query operation
createFns, ok := createProcedureFnsFromKind(o.Spec.Kind())
if !ok {
return fmt.Errorf("no ProcedureSpec available for %s", o.Spec.Kind())
}
// TODO: differentiate between logical and physical procedures.
// There should be just one logical procedure for each operation, but could be
// several physical procedures.
create := createFns[0]
// Create a ProcedureSpec from the query operation procedureSpec
procedureSpec, err := create(o.Spec, v.a)
if err != nil {
return err
}
// Create a LogicalNode using the ProcedureSpec
logicalNode := CreateLogicalNode(NodeID(o.ID), procedureSpec)
v.nodes[o.ID] = logicalNode
// Add this node to the logical plan by connecting predecessors and successors
for _, parent := range v.spec.Parents(o.ID) {
logicalParent := v.nodes[parent.ID]
logicalNode.AddPredecessors(logicalParent)
logicalParent.AddSuccessors(logicalNode)
}
_, isYield := procedureSpec.(YieldProcedureSpec)
if isYield {
err = v.addYieldName(logicalNode)
if err != nil {
return err
}
}
// no children => no successors => root node
if len(v.spec.Children(o.ID)) == 0 {
if isYield || HasSideEffect(procedureSpec) {
v.plan.Roots[logicalNode] = struct{}{}
} else {
// Generate a yield node
generateYieldNode := generateYieldNode(logicalNode)
err = v.addYieldName(generateYieldNode)
if err != nil {
return err
}
v.plan.Roots[generateYieldNode] = struct{}{}
}
}
return nil
}
// CreateLogicalNode creates a single logical plan node from a procedure spec.
// The newly created logical node has no incoming or outgoing edges.
func CreateLogicalNode(id NodeID, spec ProcedureSpec) *LogicalNode {
return &LogicalNode{
id: id,
Spec: spec,
}
}