This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
node.go
116 lines (97 loc) · 4.39 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package executors
import (
"context"
"fmt"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)
//go:generate mockery -all -case=underscore
// p of the node
type NodePhase int
const (
// Indicates that the node is not yet ready to be executed and is pending any previous nodes completion
NodePhasePending NodePhase = iota
// Indicates that the node was queued and will start running soon
NodePhaseQueued
// Indicates that the payload associated with this node is being executed and is not yet done
NodePhaseRunning
// Indicates that the nodes payload has been successfully completed, but any downstream nodes from this node may not yet have completed
// We could make Success = running, but this enables more granular control
NodePhaseSuccess
// Complete indicates successful completion of a node. For singular nodes (nodes that have only one execution) success = complete, but, the executor
// will always signal completion
NodePhaseComplete
// Node failed in execution, either this node or anything in the downstream chain
NodePhaseFailed
// Internal error observed. This state should always be accompanied with an `error`. if not the behavior is undefined
NodePhaseUndefined
// Finalize node failing due to timeout
NodePhaseTimingOut
// Node failed because execution timed out
NodePhaseTimedOut
)
func (p NodePhase) String() string {
switch p {
case NodePhaseRunning:
return "Running"
case NodePhaseQueued:
return "Queued"
case NodePhasePending:
return "Pending"
case NodePhaseFailed:
return "Failed"
case NodePhaseSuccess:
return "Success"
case NodePhaseComplete:
return "Complete"
case NodePhaseUndefined:
return "Undefined"
case NodePhaseTimedOut:
return "NodePhaseTimedOut"
}
return fmt.Sprintf("Unknown - %d", p)
}
// Core Node Executor that is used to execute a node. This is a recursive node executor and understands node dependencies
type Node interface {
// This method is used specifically to set inputs for start node. This is because start node does not retrieve inputs
// from predecessors, but the inputs are inputs to the workflow or inputs to the parent container (workflow) node.
SetInputsForStartNode(ctx context.Context, execContext ExecutionContext, dag DAGStructureWithStartNode, nl NodeLookup, inputs *core.LiteralMap) (NodeStatus, error)
// This is the main entrypoint to execute a node. It recursively depth-first goes through all ready nodes and starts their execution
// This returns either
// - 1. It finds a blocking node (not ready, or running)
// - 2. A node fails and hence the workflow will fail
// - 3. The final/end node has completed and the workflow should be stopped
RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error)
// This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them
AbortHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error
FinalizeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode) error
// This method should be used to initialize Node executor
Initialize(ctx context.Context) error
}
// Helper struct to allow passing of status between functions
type NodeStatus struct {
NodePhase NodePhase
Err *core.ExecutionError
}
func (n *NodeStatus) IsComplete() bool {
return n.NodePhase == NodePhaseComplete
}
func (n *NodeStatus) HasFailed() bool {
return n.NodePhase == NodePhaseFailed
}
func (n *NodeStatus) HasTimedOut() bool {
return n.NodePhase == NodePhaseTimedOut
}
func (n *NodeStatus) PartiallyComplete() bool {
return n.NodePhase == NodePhaseSuccess
}
var NodeStatusPending = NodeStatus{NodePhase: NodePhasePending}
var NodeStatusQueued = NodeStatus{NodePhase: NodePhaseQueued}
var NodeStatusRunning = NodeStatus{NodePhase: NodePhaseRunning}
var NodeStatusSuccess = NodeStatus{NodePhase: NodePhaseSuccess}
var NodeStatusComplete = NodeStatus{NodePhase: NodePhaseComplete}
var NodeStatusUndefined = NodeStatus{NodePhase: NodePhaseUndefined}
var NodeStatusTimedOut = NodeStatus{NodePhase: NodePhaseTimedOut}
func NodeStatusFailed(err *core.ExecutionError) NodeStatus {
return NodeStatus{NodePhase: NodePhaseFailed, Err: err}
}