/
deploywatcher.go
118 lines (101 loc) · 2.94 KB
/
deploywatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package k8
import (
"errors"
"fmt"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type deployStatus int
const (
statRunning deployStatus = iota
statFailed
statDone
)
var acceptableWaitingReasons = [...]string{
"ContainerCreating",
}
var ErrPodsFailedToStart = errors.New("The new Kubernetes pods failed to start.")
type podSet map[string]bool
type k8DeployWatcher struct {
running podSet // Pod IDs that have started up but have not completed yet.
done podSet // Pod IDs that have finished successfully.
dead podSet // Pod IDs that have failed to start.
}
func newK8DeployWatcher() *k8DeployWatcher {
return &k8DeployWatcher{
running: make(podSet, 5),
done: make(podSet, 5),
dead: make(podSet, 5),
}
}
// watchIt watches events on a K8 pods with the appropriate `name` and
// `version` labels and returns an error if at least `expectedReplicas` are
// not deployed successfully.
func (kdw *k8DeployWatcher) watchIt(client *kubernetes.Clientset, name, version string, expectedReplicas int32, gen int64) error {
podWatcher, err := client.CoreV1().
Pods(k8Namespace).
Watch(metav1.ListOptions{
LabelSelector: fmt.Sprintf("app=%v,version=%v", name, version),
})
if err != nil {
fmt.Println(err)
return err
}
for event := range podWatcher.ResultChan() {
pod, ok := event.Object.(*v1.Pod)
if !ok {
continue
}
switch kdw.inspectPodStatus(pod) {
case statRunning:
kdw.running[pod.ObjectMeta.Name] = true
case statFailed:
delete(kdw.running, pod.ObjectMeta.Name)
kdw.dead[pod.ObjectMeta.Name] = true
// This might be a little naive, but it should suffice.
if int32(len(kdw.dead)) >= expectedReplicas {
return ErrPodsFailedToStart
}
case statDone:
delete(kdw.running, pod.ObjectMeta.Name)
kdw.done[pod.ObjectMeta.Name] = true
if int32(len(kdw.done)) >= expectedReplicas {
return nil
}
}
}
return nil
}
// inspectPodStatus evaluates the pod's status and container condition's to
// determine if it has successfully started or not.
func (kdw *k8DeployWatcher) inspectPodStatus(pod *v1.Pod) deployStatus {
for _, cond := range pod.Status.Conditions {
if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue {
return statDone
}
}
// If this pod just started it may not have "ContainerStatuses" set yet.
// If so, considering it to still be "running".
if len(pod.Status.ContainerStatuses) < 1 {
return statRunning
}
for _, stat := range pod.Status.ContainerStatuses {
if stat.State.Waiting != nil && kdw.isAcceptableWaitingState(stat.State.Waiting) {
return statRunning
} else if stat.State.Running != nil {
return statDone
} else if stat.State.Terminated != nil {
return statFailed
}
}
return statFailed
}
func (kdw *k8DeployWatcher) isAcceptableWaitingState(state *v1.ContainerStateWaiting) bool {
for _, r := range acceptableWaitingReasons {
if state.Reason == r {
return true
}
}
return false
}