-
Notifications
You must be signed in to change notification settings - Fork 38.7k
/
watchlist.go
125 lines (107 loc) · 4.67 KB
/
watchlist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"context"
"fmt"
"os"
"sort"
"time"
"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/test/e2e/feature"
"k8s.io/kubernetes/test/e2e/framework"
)
var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), feature.WatchList, func() {
f := framework.NewDefaultFramework("watchlist")
ginkgo.It("should be requested when ENABLE_CLIENT_GO_WATCH_LIST_ALPHA is set", func(ctx context.Context) {
prevWatchListEnvValue, wasWatchListEnvSet := os.LookupEnv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA")
os.Setenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA", "true")
defer func() {
if !wasWatchListEnvSet {
os.Unsetenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA")
return
}
os.Setenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA", prevWatchListEnvValue)
}()
stopCh := make(chan struct{})
defer close(stopCh)
secretInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return nil, fmt.Errorf("unexpected list call")
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Watch(context.TODO(), options)
},
},
&v1.Secret{},
time.Duration(0),
nil,
)
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
for i := 1; i <= 5; i++ {
_, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
framework.ExpectNoError(err)
}
ginkgo.By("Starting the secret informer")
go secretInformer.Run(stopCh)
ginkgo.By("Waiting until the secret informer is fully synchronised")
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, false, func(context.Context) (done bool, err error) {
return secretInformer.HasSynced(), nil
})
framework.ExpectNoError(err, "Failed waiting for the secret informer in %s namespace to be synced", f.Namespace.Namespace)
ginkgo.By("Verifying if the secret informer was properly synchronised")
verifyStore(ctx, f, secretInformer.GetStore())
ginkgo.By("Modifying a secret and checking if the update was picked up by the secret informer")
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Get(ctx, "secret-1", metav1.GetOptions{})
framework.ExpectNoError(err)
secret.StringData = map[string]string{"foo": "bar"}
_, err = f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Update(ctx, secret, metav1.UpdateOptions{})
framework.ExpectNoError(err)
verifyStore(ctx, f, secretInformer.GetStore())
})
})
func verifyStore(ctx context.Context, f *framework.Framework, store cache.Store) {
ginkgo.By(fmt.Sprintf("Listing secrets directly from the server from %s namespace", f.Namespace.Name))
expectedSecretsList, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
expectedSecrets := expectedSecretsList.Items
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (done bool, err error) {
ginkgo.By("Comparing secrets retrieved directly from the server with the ones that have been streamed to the secret informer")
rawStreamedSecrets := store.List()
streamedSecrets := make([]v1.Secret, 0, len(rawStreamedSecrets))
for _, rawSecret := range rawStreamedSecrets {
streamedSecrets = append(streamedSecrets, *rawSecret.(*v1.Secret))
}
sort.Sort(byName(expectedSecrets))
sort.Sort(byName(streamedSecrets))
return cmp.Equal(expectedSecrets, streamedSecrets), nil
})
framework.ExpectNoError(err)
}
type byName []v1.Secret
func (a byName) Len() int { return len(a) }
func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func newSecret(name string) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: name},
}
}