-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
graph.go
175 lines (151 loc) · 4.03 KB
/
graph.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package pipeline
import (
"time"
"github.com/pkg/errors"
"gonum.org/v1/gonum/graph"
"gonum.org/v1/gonum/graph/encoding"
"gonum.org/v1/gonum/graph/encoding/dot"
"gonum.org/v1/gonum/graph/simple"
"gonum.org/v1/gonum/graph/topo"
)
// TaskDAG fulfills the graph.DirectedGraph interface, which makes it possible
// for us to `dot.Unmarshal(...)` a DOT string directly into it. Once unmarshalled,
// calling `TaskDAG#TasksInDependencyOrder()` will return the unmarshaled tasks.
type TaskDAG struct {
*simple.DirectedGraph
DOTSource string
}
func NewTaskDAG() *TaskDAG {
return &TaskDAG{DirectedGraph: simple.NewDirectedGraph()}
}
func (g *TaskDAG) NewNode() graph.Node {
return &taskDAGNode{Node: g.DirectedGraph.NewNode(), g: g}
}
func (g *TaskDAG) UnmarshalText(bs []byte) (err error) {
if g.DirectedGraph == nil {
g.DirectedGraph = simple.NewDirectedGraph()
}
g.DOTSource = string(bs)
bs = append([]byte("digraph {\n"), bs...)
bs = append(bs, []byte("\n}")...)
err = dot.Unmarshal(bs, g)
if err != nil {
return errors.Wrap(err, "could not unmarshal DOT into a pipeline.TaskDAG")
}
return nil
}
func (g *TaskDAG) HasCycles() bool {
return len(topo.DirectedCyclesIn(g)) > 0
}
// Returns a slice of Tasks starting at the outputs of the DAG and ending at
// the inputs. As you iterate through this slice, you can expect that any individual
// Task's outputs will already have been traversed.
func (g TaskDAG) TasksInDependencyOrder() ([]Task, error) {
visited := make(map[int64]bool)
stack := g.outputs()
tasksByID := map[int64]Task{}
var tasks []Task
for len(stack) > 0 {
node := stack[0]
stack = stack[1:]
stack = append(stack, unwrapGraphNodes(g.To(node.ID()))...)
if visited[node.ID()] {
continue
}
task, err := UnmarshalTaskFromMap(TaskType(node.attrs["type"]), node.attrs, node.dotID, nil, nil, nil)
if err != nil {
return nil, err
}
err = task.SetDefaults(node.attrs, g, *node)
if err != nil {
return nil, err
}
var outputTasks []Task
for _, output := range node.outputs() {
outputTasks = append(outputTasks, tasksByID[output.ID()])
}
if len(outputTasks) > 1 {
return nil, errors.New("task has > 1 output task")
} else if len(outputTasks) == 1 {
task.SetOutputTask(outputTasks[0])
}
tasks = append(tasks, task)
tasksByID[node.ID()] = task
visited[node.ID()] = true
}
return tasks, nil
}
func (g TaskDAG) MinTimeout() (time.Duration, bool, error) {
var minTimeout time.Duration = 1<<63 - 1
var aTimeoutSet bool
tasks, err := g.TasksInDependencyOrder()
if err != nil {
return minTimeout, aTimeoutSet, err
}
for _, t := range tasks {
if timeout, set := t.TaskTimeout(); set && timeout < minTimeout {
minTimeout = timeout
aTimeoutSet = true
}
}
return minTimeout, aTimeoutSet, nil
}
func (g TaskDAG) outputs() []*taskDAGNode {
var outputs []*taskDAGNode
iter := g.Nodes()
for iter.Next() {
node, is := iter.Node().(*taskDAGNode)
if !is {
panic("this is impossible but we must appease go staticcheck")
}
if g.From(node.ID()) == graph.Empty {
outputs = append(outputs, node)
}
}
return outputs
}
type taskDAGNode struct {
graph.Node
g *TaskDAG
dotID string
attrs map[string]string
}
func (n *taskDAGNode) DOTID() string {
return n.dotID
}
func (n *taskDAGNode) SetDOTID(id string) {
n.dotID = id
}
func (n *taskDAGNode) String() string {
return n.dotID
}
func (n *taskDAGNode) SetAttribute(attr encoding.Attribute) error {
if n.attrs == nil {
n.attrs = make(map[string]string)
}
n.attrs[attr.Key] = attr.Value
return nil
}
func (n *taskDAGNode) inputs() []*taskDAGNode {
var nodes []*taskDAGNode
ns := n.g.To(n.ID())
for ns.Next() {
nodes = append(nodes, ns.Node().(*taskDAGNode))
}
return nodes
}
func (n *taskDAGNode) outputs() []*taskDAGNode {
var nodes []*taskDAGNode
ns := n.g.From(n.ID())
for ns.Next() {
nodes = append(nodes, ns.Node().(*taskDAGNode))
}
return nodes
}
func unwrapGraphNodes(nodes graph.Nodes) []*taskDAGNode {
var out []*taskDAGNode
for nodes.Next() {
out = append(out, nodes.Node().(*taskDAGNode))
}
return out
}