forked from argoproj/argo-workflows
-
Notifications
You must be signed in to change notification settings - Fork 0
/
exec_control.go
149 lines (136 loc) · 5.14 KB
/
exec_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package controller
import (
"encoding/json"
"fmt"
"sync"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
)
// applyExecutionControl will ensure a pod's execution control annotation is up-to-date
// kills any pending pods when workflow has reached it's deadline
func (woc *wfOperationCtx) applyExecutionControl(pod *apiv1.Pod, wfNodesLock *sync.RWMutex) error {
if pod == nil {
return nil
}
switch pod.Status.Phase {
case apiv1.PodSucceeded, apiv1.PodFailed:
// Skip any pod which are already completed
return nil
case apiv1.PodPending:
// Check if we are past the workflow deadline. If we are, and the pod is still pending
// then we should simply delete it and mark the pod as Failed
if woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline) {
woc.log.Infof("Deleting Pending pod %s/%s which has exceeded workflow deadline %s", pod.Namespace, pod.Name, woc.workflowDeadline)
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[pod.Name]
var message string
if woc.workflowDeadline.IsZero() {
message = "terminated"
} else {
message = fmt.Sprintf("step exceeded workflow deadline %s", *woc.workflowDeadline)
}
woc.markNodePhase(node.Name, wfv1.NodeFailed, message)
return nil
}
// If we fail to delete the pod, fall back to setting the annotation
woc.log.Warnf("Failed to delete %s/%s: %v", pod.Namespace, pod.Name, err)
}
}
var podExecCtl common.ExecutionControl
if execCtlStr, ok := pod.Annotations[common.AnnotationKeyExecutionControl]; ok && execCtlStr != "" {
err := json.Unmarshal([]byte(execCtlStr), &podExecCtl)
if err != nil {
woc.log.Warnf("Failed to unmarshal execution control from pod %s", pod.Name)
}
}
if podExecCtl.Deadline == nil && woc.workflowDeadline == nil {
return nil
} else if podExecCtl.Deadline != nil && woc.workflowDeadline != nil {
if podExecCtl.Deadline.Equal(*woc.workflowDeadline) {
return nil
}
}
if podExecCtl.Deadline != nil && podExecCtl.Deadline.IsZero() {
// If the pod has already been explicitly signaled to terminate, then do nothing.
// This can happen when daemon steps are terminated.
woc.log.Infof("Skipping sync of execution control of pod %s. pod has been signaled to terminate", pod.Name)
return nil
}
// Assign new deadline value to PodExeCtl
podExecCtl.Deadline = woc.workflowDeadline
woc.log.Infof("Execution control for pod %s out-of-sync desired: %v, actual: %v", pod.Name, woc.workflowDeadline, podExecCtl.Deadline)
return woc.updateExecutionControl(pod.Name, podExecCtl)
}
// killDaemonedChildren kill any daemoned pods of a steps or DAG template node.
func (woc *wfOperationCtx) killDaemonedChildren(nodeID string) error {
woc.log.Infof("Checking daemoned children of %s", nodeID)
var firstErr error
execCtl := common.ExecutionControl{
Deadline: &time.Time{},
}
for _, childNode := range woc.wf.Status.Nodes {
if childNode.BoundaryID != nodeID {
continue
}
if childNode.Daemoned == nil || !*childNode.Daemoned {
continue
}
err := woc.updateExecutionControl(childNode.ID, execCtl)
if err != nil {
woc.log.Errorf("Failed to update execution control of node %s: %+v", childNode.ID, err)
if firstErr == nil {
firstErr = err
}
}
}
return firstErr
}
// updateExecutionControl updates the execution control parameters
func (woc *wfOperationCtx) updateExecutionControl(podName string, execCtl common.ExecutionControl) error {
execCtlBytes, err := json.Marshal(execCtl)
if err != nil {
return errors.InternalWrapError(err)
}
woc.log.Infof("Updating execution control of %s: %s", podName, execCtlBytes)
err = common.AddPodAnnotation(
woc.controller.kubeclientset,
podName,
woc.wf.ObjectMeta.Namespace,
common.AnnotationKeyExecutionControl,
string(execCtlBytes),
)
if err != nil {
return err
}
// Ideally we would simply annotate the pod with the updates and be done with it, allowing
// the executor to notice the updates naturally via the Downward API annotations volume
// mounted file. However, updates to the Downward API volumes take a very long time to
// propagate (minutes). The following code fast-tracks this by signaling the executor
// using SIGUSR2 that something changed.
woc.log.Infof("Signalling %s of updates", podName)
exec, err := common.ExecPodContainer(
woc.controller.restConfig, woc.wf.ObjectMeta.Namespace, podName,
common.WaitContainerName, true, true, "sh", "-c", "kill -s USR2 $(pidof argoexec)",
)
if err != nil {
return err
}
go func() {
// This call is necessary to actually send the exec. Since signalling is best effort,
// it is launched as a goroutine and the error is discarded
_, _, err = common.GetExecutorOutput(exec)
if err != nil {
woc.log.Warnf("Signal command failed: %v", err)
return
}
woc.log.Infof("Signal of %s (%s) successfully issued", podName, common.WaitContainerName)
}()
return nil
}