-
Notifications
You must be signed in to change notification settings - Fork 104
/
pod_watcher.go
86 lines (71 loc) · 1.6 KB
/
pod_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Copyright 2024 The Carvel Authors.
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type PodWatcher struct {
podsClient typedcorev1.PodInterface
listOpts metav1.ListOptions
}
func NewPodWatcher(
podsClient typedcorev1.PodInterface,
listOpts metav1.ListOptions,
) PodWatcher {
return PodWatcher{podsClient, listOpts}
}
func (w PodWatcher) Watch(podsToWatchCh chan corev1.Pod, cancelCh chan struct{}) error {
podsList, err := w.podsClient.List(context.TODO(), w.listOpts)
if err != nil {
return err
}
for _, pod := range podsList.Items {
podsToWatchCh <- pod
}
// Return before potentially getting any events
select {
case <-cancelCh:
return nil
default:
}
for {
retry, err := w.watch(podsToWatchCh, cancelCh)
if err != nil {
return err
}
if !retry {
return nil
}
}
}
func (w PodWatcher) watch(podsToWatchCh chan corev1.Pod, cancelCh chan struct{}) (bool, error) {
watcher, err := w.podsClient.Watch(context.TODO(), w.listOpts)
if err != nil {
return false, fmt.Errorf("Creating Pod watcher: %w", err)
}
defer watcher.Stop()
for {
select {
case e, ok := <-watcher.ResultChan():
if !ok || e.Object == nil {
// Watcher may expire, hence try to retry
return true, nil
}
pod, ok := e.Object.(*corev1.Pod)
if !ok {
continue
}
switch e.Type {
case watch.Added:
podsToWatchCh <- *pod
}
case <-cancelCh:
return false, nil
}
}
}