/
wait.go
85 lines (73 loc) · 2.56 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package kubex
import (
"context"
"errors"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
)
// WaitForPod watches a given Pod until the exitCondition is true.
func WaitForPod(ctx context.Context, clientset *kubernetes.Clientset, namespace string, name string, exitCondition watchtools.ConditionFunc) error {
selector := labels.SelectorFromSet(map[string]string{"app": name}).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.LabelSelector = selector
return clientset.CoreV1().Pods(namespace).List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.LabelSelector = selector
return clientset.CoreV1().Pods(namespace).Watch(ctx, options)
},
}
_, err := watchtools.UntilWithSync(ctx, lw, &corev1.Pod{}, nil, exitCondition)
return err
}
var (
errPodRestartedWithError = errors.New("pod restarted with non zero exit code")
)
// PodReady returns true if the Pod is ready.
func PodReady(podScheduledIndicator chan string, since time.Time) func(event watch.Event) (bool, error) {
informed := false
sinceK8sTime := metav1.NewTime(since)
return func(event watch.Event) (bool, error) {
switch t := event.Type; t {
case watch.Added, watch.Modified:
switch pod := event.Object.(type) {
case *corev1.Pod:
createdAt := pod.GetObjectMeta().GetCreationTimestamp()
// we don't care about previously created pods, for example when user do some upgrades, we watch for a new Pod instance only.
if createdAt.Before(&sinceK8sTime) {
return false, nil
}
if pod.Status.Phase == corev1.PodRunning && !informed {
informed = true
podScheduledIndicator <- pod.Name
close(podScheduledIndicator)
}
for _, cond := range pod.Status.ContainerStatuses {
if !cond.Ready && cond.RestartCount > 0 {
// pod was already restarted because of the problem, we restart botkube on permanent errors mostly, so let's stop watching
return true, errPodRestartedWithError
}
}
return isPodReady(pod), nil
}
}
return false, nil
}
}
// isPodReady returns true if a pod is ready; false otherwise.
func isPodReady(pod *corev1.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == corev1.PodReady && c.Status == corev1.ConditionTrue {
return true
}
}
return false
}