forked from openshift/installer
/
watch.go
158 lines (138 loc) · 4.58 KB
/
watch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package main
import (
"context"
"errors"
"fmt"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/watch"
watchtools "k8s.io/client-go/tools/watch"
)
// WatcherFunc is from https://github.com/kubernetes/kubernetes/pull/50102.
type WatcherFunc func(sinceResourceVersion string) (watch.Interface, error)
type resourceVersionGetter interface {
GetResourceVersion() string
}
// RetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102.
type RetryWatcher struct {
lastResourceVersion string
watcherFunc WatcherFunc
resultChan chan watch.Event
stopChan chan struct{}
}
// Until is from https://github.com/kubernetes/kubernetes/pull/50102.
func Until(ctx context.Context, initialResourceVersion string, watcherFunc WatcherFunc, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
return watchtools.UntilWithoutRetry(ctx, NewRetryWatcher(initialResourceVersion, watcherFunc), conditions...)
}
// NewRetryWatcher is from https://github.com/kubernetes/kubernetes/pull/50102.
func NewRetryWatcher(initialResourceVersion string, watcherFunc WatcherFunc) *RetryWatcher {
rw := &RetryWatcher{
lastResourceVersion: initialResourceVersion,
watcherFunc: watcherFunc,
stopChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
}
go rw.receive()
return rw
}
func (rw *RetryWatcher) send(event watch.Event) bool {
// Writing to an unbuffered channel is blocking and we need to check if we need to be able to stop while doing so!
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
return false
}
}
func (rw *RetryWatcher) doReceive() bool {
watcher, err := rw.watcherFunc(rw.lastResourceVersion)
if err != nil {
status := apierrors.NewInternalError(fmt.Errorf("retry watcher: watcherFunc failed: %v", err)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// Stop the watcher
return true
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case <-rw.stopChan:
logrus.Debug("Stopping RetryWatcher.")
return true
case event, ok := <-ch:
if !ok {
logrus.Warningf("RetryWatcher - getting event failed! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
return false
}
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
status := apierrors.NewInternalError(errors.New("__internal__: RetryWatcher: doesn't support resourceVersion")).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}
resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher: object %#v doesn't support resourceVersion", event.Object)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}
// All is fine; send the event and update lastResourceVersion
ok = rw.send(event)
if !ok {
return true
}
rw.lastResourceVersion = resourceVersion
continue
case watch.Error:
_ = rw.send(event)
return true
default:
logrus.Errorf("RetryWatcher failed to recognize Event type %q", event.Type)
status := apierrors.NewInternalError(fmt.Errorf("__internal__: RetryWatcher failed to recognize Event type %q", event.Type)).Status()
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &status,
})
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true
}
}
}
}
func (rw *RetryWatcher) receive() {
defer close(rw.resultChan)
for {
select {
case <-rw.stopChan:
logrus.Debug("Stopping RetryWatcher.")
return
default:
done := rw.doReceive()
if done {
return
}
}
}
}
// ResultChan is from https://github.com/kubernetes/kubernetes/pull/50102.
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}
// Stop is from https://github.com/kubernetes/kubernetes/pull/50102.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
}