forked from argoproj/argo-workflows
/
wait.go
136 lines (114 loc) · 3.55 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package commands
import (
"fmt"
"os"
"sync"
"time"
wfclient "github.com/argoproj/argo/workflow/client"
goversion "github.com/hashicorp/go-version"
"github.com/jpillora/backoff"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
apierr "k8s.io/apimachinery/pkg/api/errors"
)
func init() {
RootCmd.AddCommand(waitCmd)
waitCmd.Flags().BoolVar(&waitArgs.ignoreNotFound, "ignore-not-found", false, "Ignore the wait if the workflow is not found")
}
type waitFlags struct {
ignoreNotFound bool
}
var waitArgs waitFlags
var waitCmd = &cobra.Command{
Use: "wait WORKFLOW1 WORKFLOW2..,",
Short: "waits for all workflows specified on command line to complete",
Run: WaitWorkflowsRun,
}
// VersionChecker checks the Kubernetes version and currently logs a message if wait should
// be implemented using watch instead of polling.
type VersionChecker struct{}
func (vc *VersionChecker) run() {
// Watch APIs on CRDs using fieldSelectors are only supported in Kubernetes v1.9.0 onwards.
// https://github.com/kubernetes/kubernetes/issues/51046.
versionInfo, err := clientset.ServerVersion()
if err != nil {
log.Fatalf("Failed to get Kubernetes version: %v", err)
}
serverVersion, err := goversion.NewVersion(versionInfo.String())
if err != nil {
log.Fatalf("Failed to create version: %v", err)
}
minVersion, err := goversion.NewVersion("1.9")
if err != nil {
log.Fatalf("Failed to create minimum version: %v", err)
}
if serverVersion.Equal(minVersion) || serverVersion.GreaterThan(minVersion) {
fmt.Printf("This should be changed to use a \"watch\" based approach.\n")
}
}
// WorkflowStatusPoller exports methods to wait on workflows by periodically
// querying their status.
type WorkflowStatusPoller struct {
wfc *wfclient.WorkflowClient
ignoreNotFound bool
noOutput bool
}
// NewWorkflowStatusPoller creates a new WorkflowStatusPoller object.
func NewWorkflowStatusPoller(wfc *wfclient.WorkflowClient, ignoreNotFound bool, noOutput bool) *WorkflowStatusPoller {
return &WorkflowStatusPoller{wfc, ignoreNotFound, noOutput}
}
func (wsp *WorkflowStatusPoller) waitOnOne(workflowName string) {
b := &backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 2,
}
for {
wf, err := wsp.wfc.GetWorkflow(workflowName)
if err != nil {
if wsp.ignoreNotFound && apierr.IsNotFound(err) {
if !wsp.noOutput {
fmt.Printf("%s not found. Ignoring...\n", workflowName)
}
return
}
panic(err)
}
if !wf.Status.FinishedAt.IsZero() {
if !wsp.noOutput {
fmt.Printf("%s completed at %v\n", workflowName, wf.Status.FinishedAt)
}
return
}
time.Sleep(b.Duration())
continue
}
}
func (wsp *WorkflowStatusPoller) waitUpdateWaitGroup(workflowName string, wg *sync.WaitGroup) {
defer wg.Done()
wsp.waitOnOne(workflowName)
}
// WaitWorkflows waits for the given workflowNames.
func (wsp *WorkflowStatusPoller) WaitWorkflows(workflowNames []string) {
// TODO(shri): When Kubernetes 1.9 support is added, this block should be executed
// only for versions 1.8 and for 1.9, a new "watch" based implmentation should be
// used.
var vc VersionChecker
vc.run()
var wg sync.WaitGroup
for _, workflowName := range workflowNames {
wg.Add(1)
go wsp.waitUpdateWaitGroup(workflowName, &wg)
}
wg.Wait()
}
// WaitWorkflowsRun is the handler for the wait command.
func WaitWorkflowsRun(cmd *cobra.Command, args []string) {
if len(args) == 0 {
cmd.HelpFunc()(cmd, args)
os.Exit(1)
}
wfc := InitWorkflowClient()
wsp := NewWorkflowStatusPoller(wfc, waitArgs.ignoreNotFound, false)
wsp.WaitWorkflows(args)
}