-
Notifications
You must be signed in to change notification settings - Fork 0
/
pipeline.go
54 lines (49 loc) · 1.31 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package dagproc
import (
"errors"
"fmt"
"github.com/yourbasic/graph"
)
type pipeline struct {
v []action
g *graph.Mutable
}
// newPipeline creates a new pipeline. It creates a direct acyclic graph,
// with each "action" being a vertex, and dependencies represented by graph
// edges.
func newPipeline(nodes []Node) (pipeline, error) {
// populate nodes index and actions.
var (
idx = make(map[string]int, len(nodes))
actions = make([]action, len(nodes))
)
for i := range nodes {
// ensure that the id is unique.
if _, seen := idx[nodes[i].ID()]; seen {
return pipeline{}, fmt.Errorf("non-unique Node ID=%s", nodes[i].ID())
}
idx[nodes[i].ID()] = i
actions[i] = action{
n: nodes[i],
}
}
// resolve dependencies
g := graph.New(len(nodes))
for i := range nodes {
for _, parID := range nodes[i].ParentIDs() {
// ensure that the parent ID exists.
idxPar, exists := idx[parID]
if !exists {
return pipeline{}, fmt.Errorf("node ID=%s references non-existing node ID=%s", nodes[i].ID(), parID)
}
actions[idxPar].AddChild(&actions[i])
g.Add(idxPar, i)
}
actions[i].IncParentCount(len(nodes[i].ParentIDs()))
}
// ensure there are no circular dependencies.
if !graph.Acyclic(g) {
return pipeline{}, errors.New("graph is not acyclic")
}
return pipeline{v: actions, g: g}, nil
}