-
Notifications
You must be signed in to change notification settings - Fork 0
/
phase_node.go
69 lines (59 loc) · 2.25 KB
/
phase_node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package controller
import (
"fmt"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)
// A phaseNode is a node in a BFS of all nodes for the purposes of determining overall DAG phase. nodeId is the corresponding
// nodeId and phase is the current branchPhase associated with the node
type phaseNode struct {
nodeId string
phase wfv1.NodePhase
}
func generatePhaseNodes(children []string, branchPhase wfv1.NodePhase) []phaseNode {
out := make([]phaseNode, len(children))
for i, child := range children {
out[i] = phaseNode{nodeId: child, phase: branchPhase}
}
return out
}
type uniquePhaseNodeQueue struct {
seen map[string]bool
queue []phaseNode
}
// A uniquePhaseNodeQueue is a queue that only accepts a phaseNode only once during its life. If a node with a
// phaseNode is added while another had already been added before, the add will not succeed. Even if a phaseNode
// is added, popped, and re-added, the re-add will not succeed. Failed adds fail silently. Note that two phaseNodes
// with the same nodeId but different phases may be added, but only once per nodeId-phase combination. This is to ensure
// that branches with different branchPhases can still be processed: if an Omitted node is reached first from a step
// that succeeded, we consider the omitted node succeeded. However, it may be subsequently reached from another step
// that did not succeed. In that case we want to update the deduced status of the omitted node, and we may only do so by
// adding it to the queue again.
func newUniquePhaseNodeQueue(nodes ...phaseNode) *uniquePhaseNodeQueue {
uq := &uniquePhaseNodeQueue{
seen: make(map[string]bool),
queue: []phaseNode{},
}
uq.add(nodes...)
return uq
}
// If a phaseNode has already existed, it will not be added silently
func (uq *uniquePhaseNodeQueue) add(nodes ...phaseNode) {
for _, node := range nodes {
key := fmt.Sprintf("%s-%s", node.nodeId, node.phase)
if _, ok := uq.seen[key]; !ok {
uq.seen[key] = true
uq.queue = append(uq.queue, node)
}
}
}
func (uq *uniquePhaseNodeQueue) pop() phaseNode {
var head phaseNode
head, uq.queue = uq.queue[0], uq.queue[1:]
return head
}
func (uq *uniquePhaseNodeQueue) empty() bool {
return uq.len() == 0
}
func (uq *uniquePhaseNodeQueue) len() int {
return len(uq.queue)
}